Adds tests for appointments updates of different sizes

This commit is contained in:
Sergi Delgado Segura
2020-03-30 17:57:47 +02:00
parent c9b3bb625f
commit 9ecf98e0c5

View File

@@ -28,7 +28,14 @@ from test.teos.unit.conftest import (
from common.blob import Blob from common.blob import Blob
from common.cryptographer import Cryptographer from common.cryptographer import Cryptographer
from common.constants import HTTP_OK, HTTP_NOT_FOUND, HTTP_BAD_REQUEST, HTTP_SERVICE_UNAVAILABLE, LOCATOR_LEN_BYTES from common.constants import (
HTTP_OK,
HTTP_NOT_FOUND,
HTTP_BAD_REQUEST,
HTTP_SERVICE_UNAVAILABLE,
LOCATOR_LEN_BYTES,
ENCRYPTED_BLOB_MAX_SIZE_HEX,
)
TEOS_API = "http://{}:{}".format(HOST, PORT) TEOS_API = "http://{}:{}".format(HOST, PORT)
@@ -201,7 +208,7 @@ def test_add_appointment_registered_no_free_slots(api, appointment):
def test_add_appointment_registered_not_enough_free_slots(api, appointment): def test_add_appointment_registered_not_enough_free_slots(api, appointment):
# Empty the user slots # Give some slots to the user
api.gatekeeper.registered_users[client_pk_hex] = 1 api.gatekeeper.registered_users[client_pk_hex] = 1
# Properly formatted appointment # Properly formatted appointment
@@ -217,7 +224,7 @@ def test_add_appointment_registered_not_enough_free_slots(api, appointment):
def test_add_appointment_multiple_times_same_user(api, appointment, n=MULTIPLE_APPOINTMENTS): def test_add_appointment_multiple_times_same_user(api, appointment, n=MULTIPLE_APPOINTMENTS):
# Multiple appointments with the same locator should be valid # Multiple appointments with the same locator should be valid and counted as updates
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk) appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
# Simulate registering enough slots # Simulate registering enough slots
@@ -251,6 +258,61 @@ def test_add_appointment_multiple_times_different_users(api, appointment, n=MULT
assert len(set(api.watcher.locator_uuid_map[appointment.locator])) == n assert len(set(api.watcher.locator_uuid_map[appointment.locator])) == n
def test_add_appointment_update_same_size(api, appointment):
# Update an appointment by one of the same size and check that no additional slots are filled
api.gatekeeper.registered_users[client_pk_hex] = 1
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
assert r.status_code == HTTP_OK and r.json().get("available_slots") == 0
# The user has no additional slots, but it should be able to update
# Let's just reverse the encrypted blob for example
appointment.encrypted_blob.data = appointment.encrypted_blob.data[::-1]
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
assert r.status_code == HTTP_OK and r.json().get("available_slots") == 0
def test_add_appointment_update_bigger(api, appointment):
# Update an appointment by one bigger, and check additional slots are filled
api.gatekeeper.registered_users[client_pk_hex] = 2
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
assert r.status_code == HTTP_OK and r.json().get("available_slots") == 1
# The user has one slot, so it should be able to update as long as it only takes 1 additional slot
appointment.encrypted_blob.data = "A" * ENCRYPTED_BLOB_MAX_SIZE_HEX + "AA"
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
assert r.status_code == HTTP_OK and r.json().get("available_slots") == 0
# Check that it'll fail if no enough slots are available
# Double the size from before
appointment.encrypted_blob.data = "AA" * ENCRYPTED_BLOB_MAX_SIZE_HEX + "AA"
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
assert r.status_code == HTTP_BAD_REQUEST
def test_add_appointment_update_smaller(api, appointment):
# Update an appointment by one bigger, and check slots are freed
api.gatekeeper.registered_users[client_pk_hex] = 2
# This should take 2 slots
appointment.encrypted_blob.data = "A" * ENCRYPTED_BLOB_MAX_SIZE_HEX + "AA"
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
assert r.status_code == HTTP_OK and r.json().get("available_slots") == 0
# Let's update with one just small enough
appointment.encrypted_blob.data = "A" * (ENCRYPTED_BLOB_MAX_SIZE_HEX - 2)
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment({"appointment": appointment.to_dict(), "signature": appointment_signature})
assert r.status_code == HTTP_OK and r.json().get("available_slots") == 1
def test_get_appointment_no_json(api, appointment): def test_get_appointment_no_json(api, appointment):
r = requests.post(url=add_appointment_endpoint, data="random_message", timeout=5) r = requests.post(url=add_appointment_endpoint, data="random_message", timeout=5)
assert r.status_code == HTTP_BAD_REQUEST assert r.status_code == HTTP_BAD_REQUEST