Updates API unit tests

This commit is contained in:
Sergi Delgado Segura
2020-03-27 12:07:55 +01:00
parent 9e798916d1
commit f9a3315cec
2 changed files with 203 additions and 100 deletions

View File

@@ -100,7 +100,7 @@ def fork(block_hash):
requests.post(fork_endpoint, json={"parent": block_hash}) requests.post(fork_endpoint, json={"parent": block_hash})
def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_time_offset=30): def generate_dummy_appointment(real_height=True, start_time_offset=5, end_time_offset=30):
if real_height: if real_height:
current_height = bitcoin_cli(bitcoind_connect_params).getblockcount() current_height = bitcoin_cli(bitcoind_connect_params).getblockcount()
@@ -119,9 +119,6 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t
"to_self_delay": 20, "to_self_delay": 20,
} }
# dummy keys for this test
client_sk, client_pk = generate_keypair()
locator = compute_locator(dispute_txid) locator = compute_locator(dispute_txid)
blob = Blob(dummy_appointment_data.get("tx")) blob = Blob(dummy_appointment_data.get("tx"))
@@ -135,19 +132,7 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t
"encrypted_blob": encrypted_blob, "encrypted_blob": encrypted_blob,
} }
signature = Cryptographer.sign(Appointment.from_dict(appointment_data).serialize(), client_sk) return Appointment.from_dict(appointment_data), dispute_tx.hex()
data = {"appointment": appointment_data, "signature": signature}
return data, dispute_tx.hex()
def generate_dummy_appointment(real_height=True, start_time_offset=5, end_time_offset=30):
appointment_data, dispute_tx = generate_dummy_appointment_data(
real_height=real_height, start_time_offset=start_time_offset, end_time_offset=end_time_offset
)
return Appointment.from_dict(appointment_data["appointment"]), dispute_tx
def generate_dummy_tracker(): def generate_dummy_tracker():

View File

@@ -2,10 +2,12 @@ import json
import pytest import pytest
import requests import requests
from time import sleep from time import sleep
from binascii import hexlify
from threading import Thread from threading import Thread
from teos.api import API from teos.api import API
from teos import HOST, PORT from teos import HOST, PORT
import teos.errors as errors
from teos.watcher import Watcher from teos.watcher import Watcher
from teos.tools import bitcoin_cli from teos.tools import bitcoin_cli
from teos.inspector import Inspector from teos.inspector import Inspector
@@ -17,15 +19,16 @@ from test.teos.unit.conftest import (
generate_block, generate_block,
generate_blocks, generate_blocks,
get_random_value_hex, get_random_value_hex,
generate_dummy_appointment_data, generate_dummy_appointment,
generate_keypair, generate_keypair,
get_config, get_config,
bitcoind_connect_params, bitcoind_connect_params,
bitcoind_feed_params, bitcoind_feed_params,
) )
from common.blob import Blob
from common.constants import LOCATOR_LEN_BYTES from common.cryptographer import Cryptographer
from common.constants import HTTP_OK, HTTP_NOT_FOUND, HTTP_BAD_REQUEST, HTTP_SERVICE_UNAVAILABLE, LOCATOR_LEN_BYTES
TEOS_API = "http://{}:{}".format(HOST, PORT) TEOS_API = "http://{}:{}".format(HOST, PORT)
@@ -41,8 +44,12 @@ locator_dispute_tx_map = {}
config = get_config() config = get_config()
client_sk, client_pk = generate_keypair()
client_pk_hex = hexlify(client_pk.format(compressed=True)).decode("utf-8")
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def run_api(db_manager, carrier, block_processor): def api(db_manager, carrier, block_processor):
sk, pk = generate_keypair() sk, pk = generate_keypair()
responder = Responder(db_manager, carrier, block_processor) responder = Responder(db_manager, carrier, block_processor)
@@ -56,82 +63,220 @@ def run_api(db_manager, carrier, block_processor):
watcher.awake() watcher.awake()
chain_monitor.monitor_chain() chain_monitor.monitor_chain()
api_thread = Thread( api = API(Inspector(block_processor, config.get("MIN_TO_SELF_DELAY")), watcher, Gatekeeper())
target=API(Inspector(block_processor, config.get("MIN_TO_SELF_DELAY")), watcher, Gatekeeper()).start api_thread = Thread(target=api.start)
)
api_thread.daemon = True api_thread.daemon = True
api_thread.start() api_thread.start()
# It takes a little bit of time to start the API (otherwise the requests are sent too early and they fail) # It takes a little bit of time to start the API (otherwise the requests are sent too early and they fail)
sleep(0.1) sleep(0.1)
return api
@pytest.fixture @pytest.fixture
def new_appt_data(): def appointment():
appt_data, dispute_tx = generate_dummy_appointment_data() appointment, dispute_tx = generate_dummy_appointment()
locator_dispute_tx_map[appt_data["appointment"]["locator"]] = dispute_tx locator_dispute_tx_map[appointment.locator] = dispute_tx
return appt_data return appointment
def add_appointment(new_appt_data): def add_appointment(appointment_data):
r = requests.post(url=add_appointment_endpoint, json=new_appt_data, timeout=5) r = requests.post(url=add_appointment_endpoint, json=appointment_data, timeout=5)
if r.status_code == 200: if r.status_code == HTTP_OK:
appointments.append(new_appt_data["appointment"]) appointments.append(appointment_data["appointment"])
return r return r
def test_add_appointment(run_api, run_bitcoind, new_appt_data): def test_add_appointment(api, run_bitcoind, appointment):
# Simulate the user registration
api.gatekeeper.registered_users[client_pk_hex] = 1
# Properly formatted appointment # Properly formatted appointment
r = add_appointment(new_appt_data) appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
assert r.status_code == 200 r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
assert r.status_code == HTTP_OK
def test_add_appointment_wrong(api, appointment):
# Simulate the user registration
api.gatekeeper.registered_users[client_pk_hex] = 1
# Incorrect appointment # Incorrect appointment
new_appt_data["appointment"]["to_self_delay"] = 0 appointment.to_self_delay = 0
r = add_appointment(new_appt_data) appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
assert r.status_code == 400 r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
assert r.status_code == HTTP_BAD_REQUEST
assert "Error {}:".format(errors.APPOINTMENT_FIELD_TOO_SMALL) in r.json().get("error")
def test_request_random_appointment(): def test_add_appointment_not_registered(api, appointment):
r = requests.get(url="{}?locator={}".format(get_appointment_endpoint, get_random_value_hex(LOCATOR_LEN_BYTES))) # Properly formatted appointment
assert r.status_code == 200 tmp_sk, tmp_pk = generate_keypair()
appointment_signature = Cryptographer.sign(appointment.serialize(), tmp_sk)
received_appointments = json.loads(r.content) r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
appointment_status = [appointment.pop("status") for appointment in received_appointments] assert r.status_code == HTTP_BAD_REQUEST
assert "Error {}:".format(errors.APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS) in r.json().get("error")
assert all([status == "not_found" for status in appointment_status])
def test_add_appointment_multiple_times(new_appt_data, n=MULTIPLE_APPOINTMENTS): def test_add_appointment_registered_no_free_slots(api, appointment):
# Empty the user slots
api.gatekeeper.registered_users[client_pk_hex] = 0
# Properly formatted appointment
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
assert r.status_code == HTTP_BAD_REQUEST
assert "Error {}:".format(errors.APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS) in r.json().get("error")
def test_add_appointment_registered_not_enough_free_slots(api, appointment):
# Empty the user slots
api.gatekeeper.registered_users[client_pk_hex] = 1
# Properly formatted appointment
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
# Let's create a big blob
for _ in range(10):
appointment.encrypted_blob.data += appointment.encrypted_blob.data
r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
assert r.status_code == HTTP_BAD_REQUEST
assert "Error {}:".format(errors.APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS) in r.json().get("error")
def test_add_appointment_multiple_times_same_user(api, appointment, n=MULTIPLE_APPOINTMENTS):
# Multiple appointments with the same locator should be valid # Multiple appointments with the same locator should be valid
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
# Simulate registering enough slots
api.gatekeeper.registered_users[client_pk_hex] = n
# DISCUSS: #34-store-identical-appointments # DISCUSS: #34-store-identical-appointments
for _ in range(n): for _ in range(n):
r = add_appointment(new_appt_data) r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
assert r.status_code == 200 assert r.status_code == HTTP_OK
# Since all updates came from the same user, only the last one is stored
assert len(api.watcher.locator_uuid_map[appointment.locator]) == 1
def test_request_multiple_appointments_same_locator(new_appt_data, n=MULTIPLE_APPOINTMENTS): def test_add_appointment_multiple_times_different_users(api, appointment, n=MULTIPLE_APPOINTMENTS):
for _ in range(n): # Create user keys and appointment signatures
r = add_appointment(new_appt_data) user_keys = [generate_keypair() for _ in range(n)]
assert r.status_code == 200 signatures = [Cryptographer.sign(appointment.serialize(), key[0]) for key in user_keys]
test_request_appointment_watcher(new_appt_data) # Add one slot per public key
for pair in user_keys:
api.gatekeeper.registered_users[hexlify(pair[1].format(compressed=True)).decode("utf-8")] = 1
# Send the appointments
for signature in signatures:
r = add_appointment({"appointment": appointment.to_dict(), "signature": signature})
assert r.status_code == HTTP_OK
# Check that all the appointments have been added and that there are no duplicates
assert len(set(api.watcher.locator_uuid_map[appointment.locator])) == n
def test_add_too_many_appointment(new_appt_data): def test_request_random_appointment_registered_user(user_sk=client_sk):
for _ in range(config.get("MAX_APPOINTMENTS") - len(appointments)): locator = get_random_value_hex(LOCATOR_LEN_BYTES)
r = add_appointment(new_appt_data) message = "get appointment {}".format(locator)
assert r.status_code == 200 signature = Cryptographer.sign(message.encode("utf-8"), client_sk)
r = add_appointment(new_appt_data) data = {"locator": locator, "signature": signature}
assert r.status_code == 503 r = requests.post(url=get_appointment_endpoint, json=data, timeout=5)
# We should get a 404 not found since we are using a made up locator
received_appointment = r.json()
assert r.status_code == HTTP_NOT_FOUND
assert received_appointment.get("status") == "not_found"
def test_request_appointment_not_registered_user():
# Not registered users have no associated appointments, so this should fail
tmp_sk, tmp_pk = generate_keypair()
# The tower is designed so a not found appointment and a request from a non-registered user return the same error to
# prevent proving.
test_request_random_appointment_registered_user(tmp_sk)
def test_request_appointment_in_watcher(api, appointment):
# Give slots to the user
api.gatekeeper.registered_users[client_pk_hex] = 1
# Add an appointment
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
assert r.status_code == HTTP_OK
message = "get appointment {}".format(appointment.locator)
signature = Cryptographer.sign(message.encode("utf-8"), client_sk)
data = {"locator": appointment.locator, "signature": signature}
# Next we can request it
r = requests.post(url=get_appointment_endpoint, json=data, timeout=5)
assert r.status_code == HTTP_OK
appointment_data = json.loads(r.content)
# Check that the appointment is on the watcher
status = appointment_data.pop("status")
assert status == "being_watched"
# Check the the sent appointment matches the received one
assert appointment.to_dict() == appointment_data
def test_request_appointment_in_responder(api, appointment):
# Give slots to the user
api.gatekeeper.registered_users[client_pk_hex] = 1
# Let's do something similar to what we did with the watcher but now we'll send the dispute tx to the network
dispute_tx = locator_dispute_tx_map.pop(appointment.locator)
bitcoin_cli(bitcoind_connect_params).sendrawtransaction(dispute_tx)
# Add an appointment
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
assert r.status_code == HTTP_OK
# Generate a block to trigger the watcher
generate_block()
# Request back the data
message = "get appointment {}".format(appointment.locator)
signature = Cryptographer.sign(message.encode("utf-8"), client_sk)
data = {"locator": appointment.locator, "signature": signature}
# Next we can request it
r = requests.post(url=get_appointment_endpoint, json=data, timeout=5)
assert r.status_code == HTTP_OK
appointment_data = json.loads(r.content)
# Check that the appointment is on the watcher
status = appointment_data.pop("status")
assert status == "dispute_responded"
# Check the the sent appointment matches the received one
assert appointment.locator == appointment_data.get("locator")
assert appointment.encrypted_blob.data == Cryptographer.encrypt(
Blob(appointment_data.get("penalty_rawtx")), appointment_data.get("dispute_txid")
)
# Delete appointment so it does not mess up with future tests
appointments.pop()
uuids = api.watcher.responder.tx_tracker_map.pop(appointment_data.get("penalty_txid"))
api.watcher.responder.db_manager.delete_responder_tracker(uuids[0])
# api.watcher.responder.trackers.pop(uuids[0])
def test_get_all_appointments_watcher(): def test_get_all_appointments_watcher():
r = requests.get(url=get_all_appointment_endpoint) r = requests.get(url=get_all_appointment_endpoint)
assert r.status_code == 200 and r.reason == "OK" assert r.status_code == HTTP_OK
received_appointments = json.loads(r.content) received_appointments = json.loads(r.content)
@@ -165,45 +310,18 @@ def test_get_all_appointments_responder():
assert len(received_appointments["watcher_appointments"]) == 0 assert len(received_appointments["watcher_appointments"]) == 0
def test_request_appointment_watcher(new_appt_data): def test_add_too_many_appointment(api):
# First we need to add an appointment # Give slots to the user
r = add_appointment(new_appt_data) api.gatekeeper.registered_users[client_pk_hex] = 100
assert r.status_code == 200
# Next we can request it for i in range(config.get("MAX_APPOINTMENTS") - len(appointments)):
r = requests.get(url="{}?locator={}".format(get_appointment_endpoint, new_appt_data["appointment"]["locator"])) appointment, dispute_tx = generate_dummy_appointment()
assert r.status_code == 200 locator_dispute_tx_map[appointment.locator] = dispute_tx
# Each locator may point to multiple appointments, check them all appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
received_appointments = json.loads(r.content) r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
# Take the status out and leave the received appointments ready to compare if i != config.get("MAX_APPOINTMENTS") - len(appointments):
appointment_status = [appointment.pop("status") for appointment in received_appointments] assert r.status_code == HTTP_OK
else:
# Check that the appointment is within the received appointments assert r.status_code == HTTP_SERVICE_UNAVAILABLE
assert new_appt_data["appointment"] in received_appointments
# Check that all the appointments are being watched
assert all([status == "being_watched" for status in appointment_status])
def test_request_appointment_responder(new_appt_data):
# Let's do something similar to what we did with the watcher but now we'll send the dispute tx to the network
dispute_tx = locator_dispute_tx_map[new_appt_data["appointment"]["locator"]]
bitcoin_cli(bitcoind_connect_params).sendrawtransaction(dispute_tx)
r = add_appointment(new_appt_data)
assert r.status_code == 200
# Generate a block to trigger the watcher
generate_block()
r = requests.get(url="{}?locator={}".format(get_appointment_endpoint, new_appt_data["appointment"]["locator"]))
assert r.status_code == 200
received_appointments = json.loads(r.content)
appointment_status = [appointment.pop("status") for appointment in received_appointments]
appointment_locators = [appointment["locator"] for appointment in received_appointments]
assert new_appt_data["appointment"]["locator"] in appointment_locators and len(received_appointments) == 1
assert all([status == "dispute_responded" for status in appointment_status]) and len(appointment_status) == 1