mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 06:04:21 +01:00
Improves API testing. Mocks data moving from Watcher/Responder instead of running them in the background. Closes #77.
This commit is contained in:
@@ -1,28 +1,25 @@
|
||||
import pytest
|
||||
from shutil import rmtree
|
||||
from binascii import hexlify
|
||||
|
||||
from teos.api import API
|
||||
from teos import HOST, PORT
|
||||
import teos.errors as errors
|
||||
from teos.watcher import Watcher
|
||||
from teos.tools import bitcoin_cli
|
||||
from teos.inspector import Inspector
|
||||
from teos.responder import Responder
|
||||
from teos.db_manager import DBManager
|
||||
from teos.gatekeeper import Gatekeeper
|
||||
from teos.chain_monitor import ChainMonitor
|
||||
from teos.responder import Responder, TransactionTracker
|
||||
|
||||
from test.teos.unit.conftest import (
|
||||
generate_block,
|
||||
generate_blocks,
|
||||
get_random_value_hex,
|
||||
generate_dummy_appointment,
|
||||
generate_keypair,
|
||||
get_config,
|
||||
bitcoind_connect_params,
|
||||
bitcoind_feed_params,
|
||||
)
|
||||
|
||||
from common.blob import Blob
|
||||
from common.cryptographer import Cryptographer, hash_160
|
||||
from common.constants import (
|
||||
HTTP_OK,
|
||||
@@ -56,20 +53,24 @@ client_sk, client_pk = generate_keypair()
|
||||
compressed_client_pk = hexlify(client_pk.format(compressed=True)).decode("utf-8")
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def get_all_db_manager():
|
||||
manager = DBManager("get_all_tmp_db")
|
||||
# Add last know block for the Responder in the db
|
||||
|
||||
yield manager
|
||||
|
||||
manager.db.close()
|
||||
rmtree("get_all_tmp_db")
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def api(db_manager, carrier, block_processor, run_bitcoind):
|
||||
|
||||
sk, pk = generate_keypair()
|
||||
|
||||
responder = Responder(db_manager, carrier, block_processor)
|
||||
watcher = Watcher(db_manager, block_processor, responder, sk.to_der(), MAX_APPOINTMENTS, config.get("EXPIRY_DELTA"))
|
||||
|
||||
chain_monitor = ChainMonitor(
|
||||
watcher.block_queue, watcher.responder.block_queue, block_processor, bitcoind_feed_params
|
||||
)
|
||||
watcher.awake()
|
||||
chain_monitor.monitor_chain()
|
||||
|
||||
gatekeeper = Gatekeeper(config.get("DEFAULT_SLOTS"))
|
||||
api = API(Inspector(block_processor, config.get("MIN_TO_SELF_DELAY")), watcher, gatekeeper)
|
||||
|
||||
@@ -275,145 +276,6 @@ def test_add_appointment_multiple_times_different_users(api, client, appointment
|
||||
assert len(set(api.watcher.locator_uuid_map[appointment.locator])) == n
|
||||
|
||||
|
||||
def test_get_appointment_no_json(api, client, appointment):
|
||||
r = client.post(add_appointment_endpoint, data="random_message")
|
||||
assert r.status_code == HTTP_BAD_REQUEST
|
||||
|
||||
|
||||
def test_get_appointment_json_no_inner_dict(api, client, appointment):
|
||||
r = client.post(add_appointment_endpoint, json="random_message")
|
||||
assert r.status_code == HTTP_BAD_REQUEST
|
||||
|
||||
|
||||
def test_request_random_appointment_registered_user(client, user_sk=client_sk):
|
||||
locator = get_random_value_hex(LOCATOR_LEN_BYTES)
|
||||
message = "get appointment {}".format(locator)
|
||||
signature = Cryptographer.sign(message.encode("utf-8"), user_sk)
|
||||
|
||||
data = {"locator": locator, "signature": signature}
|
||||
r = client.post(get_appointment_endpoint, json=data)
|
||||
|
||||
# We should get a 404 not found since we are using a made up locator
|
||||
received_appointment = r.json
|
||||
assert r.status_code == HTTP_NOT_FOUND
|
||||
assert received_appointment.get("status") == "not_found"
|
||||
|
||||
|
||||
def test_request_appointment_not_registered_user(client):
|
||||
# Not registered users have no associated appointments, so this should fail
|
||||
tmp_sk, tmp_pk = generate_keypair()
|
||||
|
||||
# The tower is designed so a not found appointment and a request from a non-registered user return the same error to
|
||||
# prevent proving.
|
||||
test_request_random_appointment_registered_user(client, tmp_sk)
|
||||
|
||||
|
||||
def test_request_appointment_in_watcher(api, client, appointment):
|
||||
# Give slots to the user
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = 1
|
||||
|
||||
# Add an appointment
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
|
||||
r = add_appointment(
|
||||
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
|
||||
)
|
||||
assert r.status_code == HTTP_OK
|
||||
|
||||
message = "get appointment {}".format(appointment.locator)
|
||||
signature = Cryptographer.sign(message.encode("utf-8"), client_sk)
|
||||
data = {"locator": appointment.locator, "signature": signature}
|
||||
|
||||
# Next we can request it
|
||||
r = client.post(get_appointment_endpoint, json=data)
|
||||
assert r.status_code == HTTP_OK
|
||||
|
||||
# Check that the appointment is on the watcher
|
||||
assert r.json.get("status") == "being_watched"
|
||||
|
||||
# Check the the sent appointment matches the received one
|
||||
assert r.json.get("locator") == appointment.locator
|
||||
assert appointment.to_dict() == r.json.get("appointment")
|
||||
|
||||
|
||||
def test_request_appointment_in_responder(api, client, appointment):
|
||||
# Give slots to the user
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = 1
|
||||
|
||||
# Let's do something similar to what we did with the watcher but now we'll send the dispute tx to the network
|
||||
dispute_tx = locator_dispute_tx_map.pop(appointment.locator)
|
||||
bitcoin_cli(bitcoind_connect_params).sendrawtransaction(dispute_tx)
|
||||
|
||||
# Add an appointment (avoid calling add_appointment to not add this one to the sent appointments list)
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
|
||||
r = client.post(
|
||||
add_appointment_endpoint, json={"appointment": appointment.to_dict(), "signature": appointment_signature}
|
||||
)
|
||||
assert r.status_code == HTTP_OK
|
||||
|
||||
# Generate a block to trigger the watcher
|
||||
generate_block()
|
||||
|
||||
# Request back the data
|
||||
message = "get appointment {}".format(appointment.locator)
|
||||
signature = Cryptographer.sign(message.encode("utf-8"), client_sk)
|
||||
data = {"locator": appointment.locator, "signature": signature}
|
||||
|
||||
# Next we can request it
|
||||
r = client.post(get_appointment_endpoint, json=data)
|
||||
assert r.status_code == HTTP_OK
|
||||
|
||||
# Check that the appointment is on the watcher
|
||||
assert r.json.get("status") == "dispute_responded"
|
||||
|
||||
# Check the the sent appointment matches the received one
|
||||
assert appointment.locator == r.json.get("locator")
|
||||
assert appointment.encrypted_blob.data == Cryptographer.encrypt(
|
||||
Blob(r.json.get("appointment").get("penalty_rawtx")), r.json.get("appointment").get("dispute_txid")
|
||||
)
|
||||
|
||||
# Delete appointment so it does not mess up with future tests
|
||||
uuids = api.watcher.responder.tx_tracker_map.pop(r.json.get("appointment").get("penalty_txid"))
|
||||
api.watcher.responder.db_manager.delete_responder_tracker(uuids[0])
|
||||
|
||||
|
||||
def test_get_all_appointments_watcher(client):
|
||||
r = client.get(get_all_appointment_endpoint)
|
||||
assert r.status_code == HTTP_OK
|
||||
|
||||
received_appointments = r.json
|
||||
|
||||
# Make sure there all the locators re in the watcher
|
||||
watcher_locators = [v["locator"] for k, v in received_appointments["watcher_appointments"].items()]
|
||||
local_locators = [appointment["locator"] for uuid, appointment in appointments.items()]
|
||||
|
||||
assert set(watcher_locators) == set(local_locators)
|
||||
assert len(received_appointments["responder_trackers"]) == 0
|
||||
|
||||
|
||||
def test_get_all_appointments_responder(api, client):
|
||||
# Trigger all disputes
|
||||
local_locators = [appointment.get("locator") for uuids, appointment in appointments.items()]
|
||||
for locator, dispute_tx in locator_dispute_tx_map.items():
|
||||
if locator in local_locators:
|
||||
bitcoin_cli(bitcoind_connect_params).sendrawtransaction(dispute_tx)
|
||||
|
||||
# Confirm transactions
|
||||
generate_blocks(6)
|
||||
|
||||
# Get all appointments
|
||||
r = client.get(get_all_appointment_endpoint)
|
||||
received_appointments = r.json
|
||||
|
||||
# Make sure there is not pending locator in the watcher
|
||||
responder_trackers = [v["locator"] for k, v in received_appointments["responder_trackers"].items()]
|
||||
|
||||
assert set(responder_trackers) == set(local_locators)
|
||||
assert len(received_appointments["watcher_appointments"]) == 0
|
||||
|
||||
|
||||
# UPDATE TEST MUST BE AFTER get_all_appointments TESTS:
|
||||
# This tests send data to the Watcher and Responder that may not be passed along, so it's easier to have it here and
|
||||
# not keep track of what's being sent
|
||||
def test_add_appointment_update_same_size(api, client, appointment):
|
||||
# Update an appointment by one of the same size and check that no additional slots are filled
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = 1
|
||||
@@ -503,3 +365,165 @@ def test_add_too_many_appointment(api, client):
|
||||
assert r.status_code == HTTP_OK
|
||||
else:
|
||||
assert r.status_code == HTTP_SERVICE_UNAVAILABLE
|
||||
|
||||
|
||||
def test_get_appointment_no_json(api, client, appointment):
|
||||
r = client.post(add_appointment_endpoint, data="random_message")
|
||||
assert r.status_code == HTTP_BAD_REQUEST
|
||||
|
||||
|
||||
def test_get_appointment_json_no_inner_dict(api, client, appointment):
|
||||
r = client.post(add_appointment_endpoint, json="random_message")
|
||||
assert r.status_code == HTTP_BAD_REQUEST
|
||||
|
||||
|
||||
def test_request_random_appointment_registered_user(client, user_sk=client_sk):
|
||||
locator = get_random_value_hex(LOCATOR_LEN_BYTES)
|
||||
message = "get appointment {}".format(locator)
|
||||
signature = Cryptographer.sign(message.encode("utf-8"), user_sk)
|
||||
|
||||
data = {"locator": locator, "signature": signature}
|
||||
r = client.post(get_appointment_endpoint, json=data)
|
||||
|
||||
# We should get a 404 not found since we are using a made up locator
|
||||
received_appointment = r.json
|
||||
assert r.status_code == HTTP_NOT_FOUND
|
||||
assert received_appointment.get("status") == "not_found"
|
||||
|
||||
|
||||
def test_request_appointment_not_registered_user(client):
|
||||
# Not registered users have no associated appointments, so this should fail
|
||||
tmp_sk, tmp_pk = generate_keypair()
|
||||
|
||||
# The tower is designed so a not found appointment and a request from a non-registered user return the same error to
|
||||
# prevent proving.
|
||||
test_request_random_appointment_registered_user(client, tmp_sk)
|
||||
|
||||
|
||||
def test_request_appointment_in_watcher(api, client, appointment):
|
||||
# Mock the appointment in the Watcher
|
||||
uuid = hash_160("{}{}".format(appointment.locator, compressed_client_pk))
|
||||
api.watcher.db_manager.store_watcher_appointment(uuid, appointment.to_json())
|
||||
|
||||
# Next we can request it
|
||||
message = "get appointment {}".format(appointment.locator)
|
||||
signature = Cryptographer.sign(message.encode("utf-8"), client_sk)
|
||||
data = {"locator": appointment.locator, "signature": signature}
|
||||
r = client.post(get_appointment_endpoint, json=data)
|
||||
assert r.status_code == HTTP_OK
|
||||
|
||||
# Check that the appointment is on the watcher
|
||||
assert r.json.get("status") == "being_watched"
|
||||
|
||||
# Check the the sent appointment matches the received one
|
||||
assert r.json.get("locator") == appointment.locator
|
||||
assert appointment.to_dict() == r.json.get("appointment")
|
||||
|
||||
|
||||
def test_request_appointment_in_responder(api, client, appointment):
|
||||
# Mock the appointment in the Responder
|
||||
tracker_data = {
|
||||
"locator": appointment.locator,
|
||||
"dispute_txid": get_random_value_hex(32),
|
||||
"penalty_txid": get_random_value_hex(32),
|
||||
"penalty_rawtx": get_random_value_hex(250),
|
||||
"appointment_end": appointment.end_time,
|
||||
}
|
||||
tx_tracker = TransactionTracker.from_dict(tracker_data)
|
||||
|
||||
uuid = hash_160("{}{}".format(appointment.locator, compressed_client_pk))
|
||||
api.watcher.db_manager.create_triggered_appointment_flag(uuid)
|
||||
api.watcher.responder.db_manager.store_responder_tracker(uuid, tx_tracker.to_json())
|
||||
|
||||
# Request back the data
|
||||
message = "get appointment {}".format(appointment.locator)
|
||||
signature = Cryptographer.sign(message.encode("utf-8"), client_sk)
|
||||
data = {"locator": appointment.locator, "signature": signature}
|
||||
|
||||
# Next we can request it
|
||||
r = client.post(get_appointment_endpoint, json=data)
|
||||
assert r.status_code == HTTP_OK
|
||||
|
||||
# Check that the appointment is on the watcher
|
||||
assert r.json.get("status") == "dispute_responded"
|
||||
|
||||
# Check the the sent appointment matches the received one
|
||||
assert tx_tracker.locator == r.json.get("locator")
|
||||
assert tx_tracker.dispute_txid == r.json.get("appointment").get("dispute_txid")
|
||||
assert tx_tracker.penalty_txid == r.json.get("appointment").get("penalty_txid")
|
||||
assert tx_tracker.penalty_rawtx == r.json.get("appointment").get("penalty_rawtx")
|
||||
assert tx_tracker.appointment_end == r.json.get("appointment").get("appointment_end")
|
||||
|
||||
|
||||
def test_get_all_appointments_watcher(api, client, get_all_db_manager, appointment):
|
||||
# Let's reset the dbs so we can test this clean
|
||||
api.watcher.db_manager = get_all_db_manager
|
||||
api.watcher.responder.db_manager = get_all_db_manager
|
||||
|
||||
# Check that they are wiped clean
|
||||
r = client.get(get_all_appointment_endpoint)
|
||||
assert r.status_code == HTTP_OK
|
||||
assert len(r.json.get("watcher_appointments")) == 0 and len(r.json.get("responder_trackers")) == 0
|
||||
|
||||
# Add some appointments to the Watcher db
|
||||
non_triggered_appointments = {}
|
||||
for _ in range(10):
|
||||
uuid = get_random_value_hex(16)
|
||||
appointment.locator = get_random_value_hex(16)
|
||||
non_triggered_appointments[uuid] = appointment.to_dict()
|
||||
api.watcher.db_manager.store_watcher_appointment(uuid, appointment.to_json())
|
||||
|
||||
triggered_appointments = {}
|
||||
for _ in range(10):
|
||||
uuid = get_random_value_hex(16)
|
||||
appointment.locator = get_random_value_hex(16)
|
||||
triggered_appointments[uuid] = appointment.to_dict()
|
||||
api.watcher.db_manager.store_watcher_appointment(uuid, appointment.to_json())
|
||||
api.watcher.db_manager.create_triggered_appointment_flag(uuid)
|
||||
|
||||
# We should only get check the non-triggered appointments
|
||||
r = client.get(get_all_appointment_endpoint)
|
||||
assert r.status_code == HTTP_OK
|
||||
|
||||
watcher_locators = [v["locator"] for k, v in r.json["watcher_appointments"].items()]
|
||||
local_locators = [appointment["locator"] for uuid, appointment in non_triggered_appointments.items()]
|
||||
|
||||
assert set(watcher_locators) == set(local_locators)
|
||||
assert len(r.json["responder_trackers"]) == 0
|
||||
|
||||
|
||||
def test_get_all_appointments_responder(api, client, get_all_db_manager):
|
||||
# Let's reset the dbs so we can test this clean
|
||||
api.watcher.db_manager = get_all_db_manager
|
||||
api.watcher.responder.db_manager = get_all_db_manager
|
||||
|
||||
# Check that they are wiped clean
|
||||
r = client.get(get_all_appointment_endpoint)
|
||||
assert r.status_code == HTTP_OK
|
||||
assert len(r.json.get("watcher_appointments")) == 0 and len(r.json.get("responder_trackers")) == 0
|
||||
|
||||
# Add some trackers to the Responder db
|
||||
tx_trackers = {}
|
||||
for _ in range(10):
|
||||
uuid = get_random_value_hex(16)
|
||||
tracker_data = {
|
||||
"locator": get_random_value_hex(16),
|
||||
"dispute_txid": get_random_value_hex(32),
|
||||
"penalty_txid": get_random_value_hex(32),
|
||||
"penalty_rawtx": get_random_value_hex(250),
|
||||
"appointment_end": 20,
|
||||
}
|
||||
tracker = TransactionTracker.from_dict(tracker_data)
|
||||
tx_trackers[uuid] = tracker.to_dict()
|
||||
api.watcher.responder.db_manager.store_responder_tracker(uuid, tracker.to_json())
|
||||
api.watcher.db_manager.create_triggered_appointment_flag(uuid)
|
||||
|
||||
# Get all appointments
|
||||
r = client.get(get_all_appointment_endpoint)
|
||||
|
||||
# Make sure there is not pending locator in the watcher
|
||||
responder_trackers = [v["locator"] for k, v in r.json["responder_trackers"].items()]
|
||||
local_locators = [tracker["locator"] for uuid, tracker in tx_trackers.items()]
|
||||
|
||||
assert set(responder_trackers) == set(local_locators)
|
||||
assert len(r.json["watcher_appointments"]) == 0
|
||||
|
||||
Reference in New Issue
Block a user