diff --git a/test/teos/unit/test_api.py b/test/teos/unit/test_api.py index 46ba635..6d922cb 100644 --- a/test/teos/unit/test_api.py +++ b/test/teos/unit/test_api.py @@ -28,7 +28,14 @@ from test.teos.unit.conftest import ( from common.blob import Blob from common.cryptographer import Cryptographer -from common.constants import HTTP_OK, HTTP_NOT_FOUND, HTTP_BAD_REQUEST, HTTP_SERVICE_UNAVAILABLE, LOCATOR_LEN_BYTES +from common.constants import ( + HTTP_OK, + HTTP_NOT_FOUND, + HTTP_BAD_REQUEST, + HTTP_SERVICE_UNAVAILABLE, + LOCATOR_LEN_BYTES, + ENCRYPTED_BLOB_MAX_SIZE_HEX, +) TEOS_API = "http://{}:{}".format(HOST, PORT) @@ -201,7 +208,7 @@ def test_add_appointment_registered_no_free_slots(api, appointment): def test_add_appointment_registered_not_enough_free_slots(api, appointment): - # Empty the user slots + # Give some slots to the user api.gatekeeper.registered_users[client_pk_hex] = 1 # Properly formatted appointment @@ -217,7 +224,7 @@ def test_add_appointment_registered_not_enough_free_slots(api, appointment): def test_add_appointment_multiple_times_same_user(api, appointment, n=MULTIPLE_APPOINTMENTS): - # Multiple appointments with the same locator should be valid + # Multiple appointments with the same locator should be valid and counted as updates appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk) # Simulate registering enough slots @@ -251,6 +258,61 @@ def test_add_appointment_multiple_times_different_users(api, appointment, n=MULT assert len(set(api.watcher.locator_uuid_map[appointment.locator])) == n +def test_add_appointment_update_same_size(api, appointment): + # Update an appointment by one of the same size and check that no additional slots are filled + api.gatekeeper.registered_users[client_pk_hex] = 1 + + appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk) + r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature}) + assert r.status_code == HTTP_OK and r.json().get("available_slots") == 0 + + # The user has no additional slots, but it should be able to update + # Let's just reverse the encrypted blob for example + appointment.encrypted_blob.data = appointment.encrypted_blob.data[::-1] + appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk) + r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature}) + assert r.status_code == HTTP_OK and r.json().get("available_slots") == 0 + + +def test_add_appointment_update_bigger(api, appointment): + # Update an appointment by one bigger, and check additional slots are filled + api.gatekeeper.registered_users[client_pk_hex] = 2 + + appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk) + r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature}) + assert r.status_code == HTTP_OK and r.json().get("available_slots") == 1 + + # The user has one slot, so it should be able to update as long as it only takes 1 additional slot + appointment.encrypted_blob.data = "A" * ENCRYPTED_BLOB_MAX_SIZE_HEX + "AA" + appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk) + r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature}) + assert r.status_code == HTTP_OK and r.json().get("available_slots") == 0 + + # Check that it'll fail if no enough slots are available + # Double the size from before + appointment.encrypted_blob.data = "AA" * ENCRYPTED_BLOB_MAX_SIZE_HEX + "AA" + appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk) + r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature}) + assert r.status_code == HTTP_BAD_REQUEST + + +def test_add_appointment_update_smaller(api, appointment): + # Update an appointment by one bigger, and check slots are freed + api.gatekeeper.registered_users[client_pk_hex] = 2 + + # This should take 2 slots + appointment.encrypted_blob.data = "A" * ENCRYPTED_BLOB_MAX_SIZE_HEX + "AA" + appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk) + r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature}) + assert r.status_code == HTTP_OK and r.json().get("available_slots") == 0 + + # Let's update with one just small enough + appointment.encrypted_blob.data = "A" * (ENCRYPTED_BLOB_MAX_SIZE_HEX - 2) + appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk) + r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature}) + assert r.status_code == HTTP_OK and r.json().get("available_slots") == 1 + + def test_get_appointment_no_json(api, appointment): r = requests.post(url=add_appointment_endpoint, data="random_message", timeout=5) assert r.status_code == HTTP_BAD_REQUEST