mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 06:04:21 +01:00
Adds register unit tests, missing add and get appointment tests and sets MAX_APPOINTMENTS to 100 for testing
This commit is contained in:
@@ -32,10 +32,14 @@ from common.constants import HTTP_OK, HTTP_NOT_FOUND, HTTP_BAD_REQUEST, HTTP_SER
|
||||
|
||||
|
||||
TEOS_API = "http://{}:{}".format(HOST, PORT)
|
||||
register_endpoint = "{}/register".format(TEOS_API)
|
||||
add_appointment_endpoint = "{}/add_appointment".format(TEOS_API)
|
||||
get_appointment_endpoint = "{}/get_appointment".format(TEOS_API)
|
||||
get_all_appointment_endpoint = "{}/get_all_appointments".format(TEOS_API)
|
||||
|
||||
# Reduce the maximum number of appointments to something we can test faster
|
||||
MAX_APPOINTMENTS = 100
|
||||
|
||||
MULTIPLE_APPOINTMENTS = 10
|
||||
|
||||
appointments = []
|
||||
@@ -50,12 +54,11 @@ client_pk_hex = hexlify(client_pk.format(compressed=True)).decode("utf-8")
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def api(db_manager, carrier, block_processor):
|
||||
|
||||
sk, pk = generate_keypair()
|
||||
|
||||
responder = Responder(db_manager, carrier, block_processor)
|
||||
watcher = Watcher(
|
||||
db_manager, block_processor, responder, sk.to_der(), config.get("MAX_APPOINTMENTS"), config.get("EXPIRY_DELTA")
|
||||
)
|
||||
watcher = Watcher(db_manager, block_processor, responder, sk.to_der(), MAX_APPOINTMENTS, config.get("EXPIRY_DELTA"))
|
||||
|
||||
chain_monitor = ChainMonitor(
|
||||
watcher.block_queue, watcher.responder.block_queue, block_processor, bitcoind_feed_params
|
||||
@@ -63,7 +66,8 @@ def api(db_manager, carrier, block_processor):
|
||||
watcher.awake()
|
||||
chain_monitor.monitor_chain()
|
||||
|
||||
api = API(Inspector(block_processor, config.get("MIN_TO_SELF_DELAY")), watcher, Gatekeeper())
|
||||
gatekeeper = Gatekeeper(config.get("DEFAULT_SLOTS"))
|
||||
api = API(Inspector(block_processor, config.get("MIN_TO_SELF_DELAY")), watcher, gatekeeper)
|
||||
api_thread = Thread(target=api.start)
|
||||
api_thread.daemon = True
|
||||
api_thread.start()
|
||||
@@ -91,7 +95,51 @@ def add_appointment(appointment_data):
|
||||
return r
|
||||
|
||||
|
||||
def test_add_appointment(api, run_bitcoind, appointment):
|
||||
def test_register(api, run_bitcoind):
|
||||
data = {"public_key": client_pk_hex}
|
||||
r = requests.post(url=register_endpoint, json=data, timeout=5)
|
||||
assert r.status_code == HTTP_OK
|
||||
assert r.json().get("public_key") == client_pk_hex
|
||||
assert r.json().get("available_slots") == config.get("DEFAULT_SLOTS")
|
||||
|
||||
|
||||
def test_register_top_up(run_bitcoind):
|
||||
# Calling register more than once will give us DEFAULT_SLOTS * number_of_calls slots
|
||||
temp_sk, tmp_pk = generate_keypair()
|
||||
tmp_pk_hex = hexlify(tmp_pk.format(compressed=True)).decode("utf-8")
|
||||
|
||||
data = {"public_key": tmp_pk_hex}
|
||||
|
||||
for i in range(10):
|
||||
r = requests.post(url=register_endpoint, json=data, timeout=5)
|
||||
assert r.status_code == HTTP_OK
|
||||
assert r.json().get("public_key") == tmp_pk_hex
|
||||
assert r.json().get("available_slots") == config.get("DEFAULT_SLOTS") * (i + 1)
|
||||
|
||||
|
||||
def test_register_no_client_pk(run_bitcoind):
|
||||
data = {"public_key": client_pk_hex + client_pk_hex}
|
||||
r = requests.post(url=register_endpoint, json=data, timeout=5)
|
||||
assert r.status_code == HTTP_BAD_REQUEST
|
||||
|
||||
|
||||
def test_register_wrong_client_pk(run_bitcoind):
|
||||
data = {}
|
||||
r = requests.post(url=register_endpoint, json=data, timeout=5)
|
||||
assert r.status_code == HTTP_BAD_REQUEST
|
||||
|
||||
|
||||
def test_register_no_json(api, appointment):
|
||||
r = requests.post(url=register_endpoint, data="random_message", timeout=5)
|
||||
assert r.status_code == HTTP_BAD_REQUEST
|
||||
|
||||
|
||||
def test_register_json_no_inner_dict(api, appointment):
|
||||
r = requests.post(url=register_endpoint, json="random_message", timeout=5)
|
||||
assert r.status_code == HTTP_BAD_REQUEST
|
||||
|
||||
|
||||
def test_add_appointment(api, appointment):
|
||||
# Simulate the user registration
|
||||
api.gatekeeper.registered_users[client_pk_hex] = 1
|
||||
|
||||
@@ -101,6 +149,24 @@ def test_add_appointment(api, run_bitcoind, appointment):
|
||||
assert r.status_code == HTTP_OK
|
||||
|
||||
|
||||
def test_add_appointment_no_json(api, appointment):
|
||||
# Simulate the user registration
|
||||
api.gatekeeper.registered_users[client_pk_hex] = 1
|
||||
|
||||
# Properly formatted appointment
|
||||
r = requests.post(url=add_appointment_endpoint, data="random_message", timeout=5)
|
||||
assert r.status_code == HTTP_BAD_REQUEST
|
||||
|
||||
|
||||
def test_add_appointment_json_no_inner_dict(api, appointment):
|
||||
# Simulate the user registration
|
||||
api.gatekeeper.registered_users[client_pk_hex] = 1
|
||||
|
||||
# Properly formatted appointment
|
||||
r = requests.post(url=add_appointment_endpoint, json="random_message", timeout=5)
|
||||
assert r.status_code == HTTP_BAD_REQUEST
|
||||
|
||||
|
||||
def test_add_appointment_wrong(api, appointment):
|
||||
# Simulate the user registration
|
||||
api.gatekeeper.registered_users[client_pk_hex] = 1
|
||||
@@ -182,10 +248,20 @@ def test_add_appointment_multiple_times_different_users(api, appointment, n=MULT
|
||||
assert len(set(api.watcher.locator_uuid_map[appointment.locator])) == n
|
||||
|
||||
|
||||
def test_get_appointment_no_json(api, appointment):
|
||||
r = requests.post(url=add_appointment_endpoint, data="random_message", timeout=5)
|
||||
assert r.status_code == HTTP_BAD_REQUEST
|
||||
|
||||
|
||||
def test_get_appointment_json_no_inner_dict(api, appointment):
|
||||
r = requests.post(url=add_appointment_endpoint, json="random_message", timeout=5)
|
||||
assert r.status_code == HTTP_BAD_REQUEST
|
||||
|
||||
|
||||
def test_request_random_appointment_registered_user(user_sk=client_sk):
|
||||
locator = get_random_value_hex(LOCATOR_LEN_BYTES)
|
||||
message = "get appointment {}".format(locator)
|
||||
signature = Cryptographer.sign(message.encode("utf-8"), client_sk)
|
||||
signature = Cryptographer.sign(message.encode("utf-8"), user_sk)
|
||||
|
||||
data = {"locator": locator, "signature": signature}
|
||||
r = requests.post(url=get_appointment_endpoint, json=data, timeout=5)
|
||||
@@ -312,16 +388,18 @@ def test_get_all_appointments_responder():
|
||||
|
||||
def test_add_too_many_appointment(api):
|
||||
# Give slots to the user
|
||||
api.gatekeeper.registered_users[client_pk_hex] = 100
|
||||
api.gatekeeper.registered_users[client_pk_hex] = 200
|
||||
|
||||
for i in range(config.get("MAX_APPOINTMENTS") - len(appointments)):
|
||||
free_appointment_slots = MAX_APPOINTMENTS - len(api.watcher.appointments)
|
||||
|
||||
for i in range(free_appointment_slots + 1):
|
||||
appointment, dispute_tx = generate_dummy_appointment()
|
||||
locator_dispute_tx_map[appointment.locator] = dispute_tx
|
||||
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
|
||||
r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
|
||||
|
||||
if i != config.get("MAX_APPOINTMENTS") - len(appointments):
|
||||
if i < free_appointment_slots:
|
||||
assert r.status_code == HTTP_OK
|
||||
else:
|
||||
assert r.status_code == HTTP_SERVICE_UNAVAILABLE
|
||||
|
||||
Reference in New Issue
Block a user