Improves API returns for 404 and extends tests to cover it

This commit is contained in:
Sergi Delgado Segura
2020-04-17 18:01:15 +02:00
parent eb8ffb4916
commit eea12a695e
2 changed files with 40 additions and 30 deletions

View File

@@ -97,18 +97,22 @@ def add_appointment(client, appointment_data, user_pk):
return r
def test_register(client):
def test_register(client, api):
current_height = api.watcher.block_processor.get_block_count()
data = {"public_key": compressed_client_pk}
r = client.post(register_endpoint, json=data)
assert r.status_code == HTTP_OK
assert r.json.get("public_key") == compressed_client_pk
assert r.json.get("available_slots") == config.get("DEFAULT_SLOTS")
assert r.json.get("subscription_expiry") == current_height + config.get("DEFAULT_SUBSCRIPTION_DURATION")
def test_register_top_up(client):
# Calling register more than once will give us DEFAULT_SLOTS * number_of_calls slots
def test_register_top_up(client, api):
# Calling register more than once will give us DEFAULT_SLOTS * number_of_calls slots.
# It will also refresh the expiry.
temp_sk, tmp_pk = generate_keypair()
tmp_pk_hex = hexlify(tmp_pk.format(compressed=True)).decode("utf-8")
current_height = api.watcher.block_processor.get_block_count()
data = {"public_key": tmp_pk_hex}
@@ -117,16 +121,17 @@ def test_register_top_up(client):
assert r.status_code == HTTP_OK
assert r.json.get("public_key") == tmp_pk_hex
assert r.json.get("available_slots") == config.get("DEFAULT_SLOTS") * (i + 1)
assert r.json.get("subscription_expiry") == current_height + config.get("DEFAULT_SUBSCRIPTION_DURATION")
def test_register_no_client_pk(client):
data = {"public_key": compressed_client_pk + compressed_client_pk}
data = {}
r = client.post(register_endpoint, json=data)
assert r.status_code == HTTP_BAD_REQUEST
def test_register_wrong_client_pk(client):
data = {}
data = {"public_key": compressed_client_pk + compressed_client_pk}
r = client.post(register_endpoint, json=data)
assert r.status_code == HTTP_BAD_REQUEST
@@ -158,25 +163,27 @@ def test_add_appointment_no_json(api, client, appointment):
# Simulate the user registration (end time does not matter here)
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=1, subscription_expiry=0)
# Properly formatted appointment
# No JSON data
r = client.post(add_appointment_endpoint, data="random_message")
assert r.status_code == HTTP_BAD_REQUEST
assert "Request is not json encoded" in r.json.get("error")
def test_add_appointment_json_no_inner_dict(api, client, appointment):
# Simulate the user registration (end time does not matter here)
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=1, subscription_expiry=0)
# Properly formatted appointment
# JSON data with no inner dict (invalid data foramat)
r = client.post(add_appointment_endpoint, json="random_message")
assert r.status_code == HTTP_BAD_REQUEST
assert "Invalid request content" in r.json.get("error")
def test_add_appointment_wrong(api, client, appointment):
# Simulate the user registration (end time does not matter here)
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=1, subscription_expiry=0)
# Incorrect appointment
# Incorrect appointment (properly formatted, wrong data)
appointment.to_self_delay = 0
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment(
@@ -187,7 +194,7 @@ def test_add_appointment_wrong(api, client, appointment):
def test_add_appointment_not_registered(api, client, appointment):
# Properly formatted appointment
# Properly formatted appointment, user is not registered
tmp_sk, tmp_pk = generate_keypair()
tmp_compressed_pk = hexlify(tmp_pk.format(compressed=True)).decode("utf-8")
@@ -203,7 +210,7 @@ def test_add_appointment_registered_no_free_slots(api, client, appointment):
# Empty the user slots (end time does not matter here)
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=0, subscription_expiry=0)
# Properly formatted appointment
# Properly formatted appointment, user has no available slots
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment(
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
@@ -216,7 +223,7 @@ def test_add_appointment_registered_not_enough_free_slots(api, client, appointme
# Give some slots to the user (end time does not matter here)
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=1, subscription_expiry=0)
# Properly formatted appointment
# Properly formatted appointment, user has not enough slots
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
# Let's create a big blob
@@ -230,7 +237,7 @@ def test_add_appointment_registered_not_enough_free_slots(api, client, appointme
def test_add_appointment_multiple_times_same_user(api, client, appointment, n=MULTIPLE_APPOINTMENTS):
# Multiple appointments with the same locator should be valid and counted as updates
# Multiple appointments with the same locator should be valid and count as updates
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
# Simulate registering enough slots (end time does not matter here)
@@ -247,6 +254,7 @@ def test_add_appointment_multiple_times_same_user(api, client, appointment, n=MU
def test_add_appointment_multiple_times_different_users(api, client, appointment, n=MULTIPLE_APPOINTMENTS):
# If the same appointment comes from different users, all are kept
# Create user keys and appointment signatures
user_keys = [generate_keypair() for _ in range(n)]
signatures = [Cryptographer.sign(appointment.serialize(), key[0]) for key in user_keys]
@@ -272,7 +280,6 @@ def test_add_appointment_update_same_size(api, client, appointment):
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=1, subscription_expiry=0)
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
# Since we will replace the appointment, we won't added to appointments
r = add_appointment(
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
)
@@ -360,14 +367,16 @@ def test_add_too_many_appointment(api, client):
def test_get_appointment_no_json(api, client, appointment):
r = client.post(add_appointment_endpoint, data="random_message")
assert r.status_code == HTTP_BAD_REQUEST
assert "Request is not json encoded" in r.json.get("error")
def test_get_appointment_json_no_inner_dict(api, client, appointment):
r = client.post(add_appointment_endpoint, json="random_message")
assert r.status_code == HTTP_BAD_REQUEST
assert "Invalid request content" in r.json.get("error")
def test_request_random_appointment_registered_user(client, user_sk=client_sk):
def test_get_random_appointment_registered_user(client, user_sk=client_sk):
locator = get_random_value_hex(LOCATOR_LEN_BYTES)
message = "get appointment {}".format(locator)
signature = Cryptographer.sign(message.encode("utf-8"), user_sk)
@@ -381,16 +390,16 @@ def test_request_random_appointment_registered_user(client, user_sk=client_sk):
assert received_appointment.get("status") == "not_found"
def test_request_appointment_not_registered_user(client):
def test_get_appointment_not_registered_user(client):
# Not registered users have no associated appointments, so this should fail
tmp_sk, tmp_pk = generate_keypair()
# The tower is designed so a not found appointment and a request from a non-registered user return the same error to
# prevent probing.
test_request_random_appointment_registered_user(client, tmp_sk)
test_get_random_appointment_registered_user(client, tmp_sk)
def test_request_appointment_in_watcher(api, client, appointment):
def test_get_appointment_in_watcher(api, client, appointment):
# Mock the appointment in the Watcher
uuid = hash_160("{}{}".format(appointment.locator, compressed_client_pk))
api.watcher.db_manager.store_watcher_appointment(uuid, appointment.to_dict())
@@ -402,7 +411,7 @@ def test_request_appointment_in_watcher(api, client, appointment):
r = client.post(get_appointment_endpoint, json=data)
assert r.status_code == HTTP_OK
# Check that the appointment is on the watcher
# Check that the appointment is on the Watcher
assert r.json.get("status") == "being_watched"
# Check the the sent appointment matches the received one
@@ -412,7 +421,7 @@ def test_request_appointment_in_watcher(api, client, appointment):
assert appointment.to_dict() == r.json.get("appointment")
def test_request_appointment_in_responder(api, client, appointment):
def test_get_appointment_in_responder(api, client, appointment):
# Mock the appointment in the Responder
tracker_data = {
"locator": appointment.locator,
@@ -436,7 +445,7 @@ def test_request_appointment_in_responder(api, client, appointment):
r = client.post(get_appointment_endpoint, json=data)
assert r.status_code == HTTP_OK
# Check that the appointment is on the watcher
# Check that the appointment is on the Responder
assert r.json.get("status") == "dispute_responded"
# Check the the sent appointment matches the received one
@@ -474,7 +483,7 @@ def test_get_all_appointments_watcher(api, client, get_all_db_manager):
api.watcher.db_manager.store_watcher_appointment(uuid, appointment.to_dict())
api.watcher.db_manager.create_triggered_appointment_flag(uuid)
# We should only get check the non-triggered appointments
# We should only get the non-triggered appointments
r = client.get(get_all_appointment_endpoint)
assert r.status_code == HTTP_OK