user_pk/client_pk -> user_id and cli/client -> user (when it does not reffer to the software)

This commit is contained in:
Sergi Delgado Segura
2020-04-21 16:42:43 +02:00
parent 0e5a99b778
commit 5f7a909804
8 changed files with 168 additions and 196 deletions

View File

@@ -40,8 +40,8 @@ appointments = {}
locator_dispute_tx_map = {}
client_sk, client_pk = generate_keypair()
compressed_client_pk = hexlify(client_pk.format(compressed=True)).decode("utf-8")
user_sk, user_pk = generate_keypair()
user_id = hexlify(user_pk.format(compressed=True)).decode("utf-8")
@pytest.fixture()
@@ -86,12 +86,12 @@ def appointment():
return appointment
def add_appointment(client, appointment_data, user_pk):
def add_appointment(client, appointment_data, user_id):
r = client.post(add_appointment_endpoint, json=appointment_data)
if r.status_code == HTTP_OK:
locator = appointment_data.get("appointment").get("locator")
uuid = hash_160("{}{}".format(locator, user_pk))
uuid = hash_160("{}{}".format(locator, user_id))
appointments[uuid] = appointment_data["appointment"]
return r
@@ -99,10 +99,10 @@ def add_appointment(client, appointment_data, user_pk):
def test_register(client, api):
current_height = api.watcher.block_processor.get_block_count()
data = {"public_key": compressed_client_pk}
data = {"public_key": user_id}
r = client.post(register_endpoint, json=data)
assert r.status_code == HTTP_OK
assert r.json.get("public_key") == compressed_client_pk
assert r.json.get("public_key") == user_id
assert r.json.get("available_slots") == config.get("DEFAULT_SLOTS")
assert r.json.get("subscription_expiry") == current_height + config.get("DEFAULT_SUBSCRIPTION_DURATION")
@@ -111,15 +111,15 @@ def test_register_top_up(client, api):
# Calling register more than once will give us DEFAULT_SLOTS * number_of_calls slots.
# It will also refresh the expiry.
temp_sk, tmp_pk = generate_keypair()
tmp_pk_hex = hexlify(tmp_pk.format(compressed=True)).decode("utf-8")
tmp_user_id = hexlify(tmp_pk.format(compressed=True)).decode("utf-8")
current_height = api.watcher.block_processor.get_block_count()
data = {"public_key": tmp_pk_hex}
data = {"public_key": tmp_user_id}
for i in range(10):
r = client.post(register_endpoint, json=data)
assert r.status_code == HTTP_OK
assert r.json.get("public_key") == tmp_pk_hex
assert r.json.get("public_key") == tmp_user_id
assert r.json.get("available_slots") == config.get("DEFAULT_SLOTS") * (i + 1)
assert r.json.get("subscription_expiry") == current_height + config.get("DEFAULT_SUBSCRIPTION_DURATION")
@@ -131,7 +131,7 @@ def test_register_no_client_pk(client):
def test_register_wrong_client_pk(client):
data = {"public_key": compressed_client_pk + compressed_client_pk}
data = {"public_key": user_id + user_id}
r = client.post(register_endpoint, json=data)
assert r.status_code == HTTP_BAD_REQUEST
@@ -148,20 +148,18 @@ def test_register_json_no_inner_dict(client):
def test_add_appointment(api, client, appointment):
# Simulate the user registration (end time does not matter here)
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=1, subscription_expiry=0)
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0)
# Properly formatted appointment
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment(
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
)
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_OK
assert r.json.get("available_slots") == 0
def test_add_appointment_no_json(api, client, appointment):
# Simulate the user registration (end time does not matter here)
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=1, subscription_expiry=0)
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0)
# No JSON data
r = client.post(add_appointment_endpoint, data="random_message")
@@ -171,7 +169,7 @@ def test_add_appointment_no_json(api, client, appointment):
def test_add_appointment_json_no_inner_dict(api, client, appointment):
# Simulate the user registration (end time does not matter here)
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=1, subscription_expiry=0)
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0)
# JSON data with no inner dict (invalid data foramat)
r = client.post(add_appointment_endpoint, json="random_message")
@@ -181,14 +179,12 @@ def test_add_appointment_json_no_inner_dict(api, client, appointment):
def test_add_appointment_wrong(api, client, appointment):
# Simulate the user registration (end time does not matter here)
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=1, subscription_expiry=0)
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0)
# Incorrect appointment (properly formatted, wrong data)
appointment.to_self_delay = 0
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment(
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
)
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_BAD_REQUEST
assert "Error {}:".format(errors.APPOINTMENT_FIELD_TOO_SMALL) in r.json.get("error")
@@ -208,44 +204,38 @@ def test_add_appointment_not_registered(api, client, appointment):
def test_add_appointment_registered_no_free_slots(api, client, appointment):
# Empty the user slots (end time does not matter here)
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=0, subscription_expiry=0)
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=0, subscription_expiry=0)
# Properly formatted appointment, user has no available slots
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment(
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
)
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_BAD_REQUEST
assert "Error {}:".format(errors.APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS) in r.json.get("error")
def test_add_appointment_registered_not_enough_free_slots(api, client, appointment):
# Give some slots to the user (end time does not matter here)
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=1, subscription_expiry=0)
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0)
# Properly formatted appointment, user has not enough slots
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
# Let's create a big blob
appointment.encrypted_blob = TWO_SLOTS_BLOTS
r = add_appointment(
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_BAD_REQUEST
assert "Error {}:".format(errors.APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS) in r.json.get("error")
def test_add_appointment_multiple_times_same_user(api, client, appointment, n=MULTIPLE_APPOINTMENTS):
# Multiple appointments with the same locator should be valid and count as updates
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
# Simulate registering enough slots (end time does not matter here)
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=n, subscription_expiry=0)
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=n, subscription_expiry=0)
for _ in range(n):
r = add_appointment(
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_OK
assert r.json.get("available_slots") == n - 1
@@ -277,75 +267,61 @@ def test_add_appointment_multiple_times_different_users(api, client, appointment
def test_add_appointment_update_same_size(api, client, appointment):
# Update an appointment by one of the same size and check that no additional slots are filled
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=1, subscription_expiry=0)
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0)
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment(
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
)
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0
# The user has no additional slots, but it should be able to update
# Let's just reverse the encrypted blob for example
appointment.encrypted_blob = appointment.encrypted_blob[::-1]
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment(
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
)
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0
def test_add_appointment_update_bigger(api, client, appointment):
# Update an appointment by one bigger, and check additional slots are filled
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=2, subscription_expiry=0)
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=2, subscription_expiry=0)
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment(
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
)
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 1
# The user has one slot, so it should be able to update as long as it only takes 1 additional slot
appointment.encrypted_blob = TWO_SLOTS_BLOTS
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment(
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
)
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0
# Check that it'll fail if no enough slots are available
# Double the size from before
appointment.encrypted_blob = TWO_SLOTS_BLOTS + TWO_SLOTS_BLOTS
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment(
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
)
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_BAD_REQUEST
def test_add_appointment_update_smaller(api, client, appointment):
# Update an appointment by one bigger, and check slots are freed
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=2, subscription_expiry=0)
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=2, subscription_expiry=0)
# This should take 2 slots
appointment.encrypted_blob = TWO_SLOTS_BLOTS
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment(
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
)
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0
# Let's update with one just small enough
appointment.encrypted_blob = "A" * (ENCRYPTED_BLOB_MAX_SIZE_HEX - 2)
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment(
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
)
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 1
def test_add_too_many_appointment(api, client):
# Give slots to the user
api.watcher.gatekeeper.registered_users[compressed_client_pk] = UserInfo(available_slots=200, subscription_expiry=0)
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=200, subscription_expiry=0)
free_appointment_slots = MAX_APPOINTMENTS - len(api.watcher.appointments)
@@ -353,10 +329,8 @@ def test_add_too_many_appointment(api, client):
appointment, dispute_tx = generate_dummy_appointment()
locator_dispute_tx_map[appointment.locator] = dispute_tx
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
r = add_appointment(
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
)
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
if i < free_appointment_slots:
assert r.status_code == HTTP_OK
@@ -376,7 +350,7 @@ def test_get_appointment_json_no_inner_dict(api, client, appointment):
assert "Invalid request content" in r.json.get("error")
def test_get_random_appointment_registered_user(client, user_sk=client_sk):
def test_get_random_appointment_registered_user(client, user_sk=user_sk):
locator = get_random_value_hex(LOCATOR_LEN_BYTES)
message = "get appointment {}".format(locator)
signature = Cryptographer.sign(message.encode("utf-8"), user_sk)
@@ -401,12 +375,12 @@ def test_get_appointment_not_registered_user(client):
def test_get_appointment_in_watcher(api, client, appointment):
# Mock the appointment in the Watcher
uuid = hash_160("{}{}".format(appointment.locator, compressed_client_pk))
uuid = hash_160("{}{}".format(appointment.locator, user_id))
api.watcher.db_manager.store_watcher_appointment(uuid, appointment.to_dict())
# Next we can request it
message = "get appointment {}".format(appointment.locator)
signature = Cryptographer.sign(message.encode("utf-8"), client_sk)
signature = Cryptographer.sign(message.encode("utf-8"), user_sk)
data = {"locator": appointment.locator, "signature": signature}
r = client.post(get_appointment_endpoint, json=data)
assert r.status_code == HTTP_OK
@@ -432,13 +406,13 @@ def test_get_appointment_in_responder(api, client, appointment):
}
tx_tracker = TransactionTracker.from_dict(tracker_data)
uuid = hash_160("{}{}".format(appointment.locator, compressed_client_pk))
uuid = hash_160("{}{}".format(appointment.locator, user_id))
api.watcher.db_manager.create_triggered_appointment_flag(uuid)
api.watcher.responder.db_manager.store_responder_tracker(uuid, tx_tracker.to_dict())
# Request back the data
message = "get appointment {}".format(appointment.locator)
signature = Cryptographer.sign(message.encode("utf-8"), client_sk)
signature = Cryptographer.sign(message.encode("utf-8"), user_sk)
data = {"locator": appointment.locator, "signature": signature}
# Next we can request it

View File

@@ -28,17 +28,17 @@ def test_init(gatekeeper, run_bitcoind):
def test_add_update_user(gatekeeper):
# add_update_user adds DEFAULT_SLOTS to a given user as long as the identifier is {02, 03}| 32-byte hex str
# it also add DEFAULT_SUBSCRIPTION_DURATION + current_block_height to the user
user_pk = "02" + get_random_value_hex(32)
user_id = "02" + get_random_value_hex(32)
for _ in range(10):
user = gatekeeper.registered_users.get(user_pk)
user = gatekeeper.registered_users.get(user_id)
current_slots = user.available_slots if user is not None else 0
gatekeeper.add_update_user(user_pk)
gatekeeper.add_update_user(user_id)
assert gatekeeper.registered_users.get(user_pk).available_slots == current_slots + config.get("DEFAULT_SLOTS")
assert gatekeeper.registered_users.get(user_id).available_slots == current_slots + config.get("DEFAULT_SLOTS")
assert gatekeeper.registered_users[
user_pk
user_id
].subscription_expiry == gatekeeper.block_processor.get_block_count() + config.get(
"DEFAULT_SUBSCRIPTION_DURATION"
)
@@ -46,31 +46,31 @@ def test_add_update_user(gatekeeper):
# The same can be checked for multiple users
for _ in range(10):
# The user identifier is changed every call
user_pk = "03" + get_random_value_hex(32)
user_id = "03" + get_random_value_hex(32)
gatekeeper.add_update_user(user_pk)
assert gatekeeper.registered_users.get(user_pk).available_slots == config.get("DEFAULT_SLOTS")
gatekeeper.add_update_user(user_id)
assert gatekeeper.registered_users.get(user_id).available_slots == config.get("DEFAULT_SLOTS")
assert gatekeeper.registered_users[
user_pk
user_id
].subscription_expiry == gatekeeper.block_processor.get_block_count() + config.get(
"DEFAULT_SUBSCRIPTION_DURATION"
)
def test_add_update_user_wrong_pk(gatekeeper):
def test_add_update_user_wrong_id(gatekeeper):
# Passing a wrong pk defaults to the errors in check_user_pk. We can try with one.
wrong_pk = get_random_value_hex(32)
wrong_id = get_random_value_hex(32)
with pytest.raises(InvalidParameter):
gatekeeper.add_update_user(wrong_pk)
gatekeeper.add_update_user(wrong_id)
def test_add_update_user_wrong_pk_prefix(gatekeeper):
def test_add_update_user_wrong_id_prefix(gatekeeper):
# Prefixes must be 02 or 03, anything else should fail
wrong_pk = "04" + get_random_value_hex(32)
wrong_id = "04" + get_random_value_hex(32)
with pytest.raises(InvalidParameter):
gatekeeper.add_update_user(wrong_pk)
gatekeeper.add_update_user(wrong_id)
def test_identify_user(gatekeeper):
@@ -79,13 +79,13 @@ def test_identify_user(gatekeeper):
# Let's first register a user
sk, pk = generate_keypair()
compressed_pk = Cryptographer.get_compressed_pk(pk)
gatekeeper.add_update_user(compressed_pk)
user_id = Cryptographer.get_compressed_pk(pk)
gatekeeper.add_update_user(user_id)
message = "Hey, it's me"
signature = Cryptographer.sign(message.encode(), sk)
assert gatekeeper.authenticate_user(message.encode(), signature) == compressed_pk
assert gatekeeper.authenticate_user(message.encode(), signature) == user_id
def test_identify_user_non_registered(gatekeeper):
@@ -132,15 +132,15 @@ def test_update_available_slots(gatekeeper):
# update_available_slots should decrease the slot count if a new appointment is added
# let's add a new user
sk, pk = generate_keypair()
compressed_pk = Cryptographer.get_compressed_pk(pk)
gatekeeper.add_update_user(compressed_pk)
user_id = Cryptographer.get_compressed_pk(pk)
gatekeeper.add_update_user(user_id)
# And now update the slots given an appointment
appointment, _ = generate_dummy_appointment()
gatekeeper.update_available_slots(compressed_pk, appointment.get_summary())
gatekeeper.update_available_slots(user_id, appointment.get_summary())
# This is a standard size appointment, so it should have reduced the slots by one
assert gatekeeper.registered_users[compressed_pk].available_slots == config.get("DEFAULT_SLOTS") - 1
assert gatekeeper.registered_users[user_id].available_slots == config.get("DEFAULT_SLOTS") - 1
# Updates can leave the count as it, decrease it, or increase it, depending on the appointment size (modulo
# ENCRYPTED_BLOB_MAX_SIZE_HEX)
@@ -148,7 +148,7 @@ def test_update_available_slots(gatekeeper):
# Appointments of the same size leave it as is
appointment_same_size, _ = generate_dummy_appointment()
remaining_slots = gatekeeper.update_available_slots(
compressed_pk, appointment.get_summary(), appointment_same_size.get_summary()
user_id, appointment.get_summary(), appointment_same_size.get_summary()
)
assert remaining_slots == config.get("DEFAULT_SLOTS") - 1
@@ -156,20 +156,20 @@ def test_update_available_slots(gatekeeper):
appointment_x2_size = appointment_same_size
appointment_x2_size.encrypted_blob = "A" * (ENCRYPTED_BLOB_MAX_SIZE_HEX + 1)
remaining_slots = gatekeeper.update_available_slots(
compressed_pk, appointment_x2_size.get_summary(), appointment.get_summary()
user_id, appointment_x2_size.get_summary(), appointment.get_summary()
)
assert remaining_slots == config.get("DEFAULT_SLOTS") - 2
# Smaller appointments increase it (using the same data but flipped)
remaining_slots = gatekeeper.update_available_slots(
compressed_pk, appointment.get_summary(), appointment_x2_size.get_summary()
user_id, appointment.get_summary(), appointment_x2_size.get_summary()
)
assert remaining_slots == config.get("DEFAULT_SLOTS") - 1
# If the appointment needs more slots than there's free, it should fail
gatekeeper.registered_users[compressed_pk].available_slots = 1
gatekeeper.registered_users[user_id].available_slots = 1
with pytest.raises(NotEnoughSlots):
gatekeeper.update_available_slots(compressed_pk, appointment_x2_size.get_summary())
gatekeeper.update_available_slots(user_id, appointment_x2_size.get_summary())
def test_get_expired_appointments(gatekeeper):

View File

@@ -19,27 +19,27 @@ def open_create_db(db_path):
def test_store_user(user_db_manager):
# Store user should work as long as the user_pk is properly formatted and data is a dictionary
user_pk = "02" + get_random_value_hex(32)
user_id = "02" + get_random_value_hex(32)
user_info = UserInfo(available_slots=42, subscription_expiry=100)
stored_users[user_pk] = user_info.to_dict()
assert user_db_manager.store_user(user_pk, user_info.to_dict()) is True
stored_users[user_id] = user_info.to_dict()
assert user_db_manager.store_user(user_id, user_info.to_dict()) is True
# Wrong pks should return False on adding
user_pk = "04" + get_random_value_hex(32)
user_id = "04" + get_random_value_hex(32)
user_info = UserInfo(available_slots=42, subscription_expiry=100)
assert user_db_manager.store_user(user_pk, user_info.to_dict()) is False
assert user_db_manager.store_user(user_id, user_info.to_dict()) is False
# Same for wrong types
assert user_db_manager.store_user(42, user_info.to_dict()) is False
# And for wrong type user data
assert user_db_manager.store_user(user_pk, 42) is False
assert user_db_manager.store_user(user_id, 42) is False
def test_load_user(user_db_manager):
# Loading a user we have stored should work
for user_pk, user_data in stored_users.items():
assert user_db_manager.load_user(user_pk) == user_data
for user_id, user_data in stored_users.items():
assert user_db_manager.load_user(user_id) == user_data
# Random keys should fail
assert user_db_manager.load_user(get_random_value_hex(33)) is None
@@ -50,11 +50,11 @@ def test_load_user(user_db_manager):
def test_delete_user(user_db_manager):
# Deleting an existing user should work
for user_pk, user_data in stored_users.items():
assert user_db_manager.delete_user(user_pk) is True
for user_id, user_data in stored_users.items():
assert user_db_manager.delete_user(user_id) is True
for user_pk, user_data in stored_users.items():
assert user_db_manager.load_user(user_pk) is None
for user_id, user_data in stored_users.items():
assert user_db_manager.load_user(user_id) is None
# But deleting a non existing one should not fail
assert user_db_manager.delete_user(get_random_value_hex(32)) is True
@@ -70,10 +70,10 @@ def test_load_all_users(user_db_manager):
# Adding some and checking we get them all
for i in range(10):
user_pk = "02" + get_random_value_hex(32)
user_id = "02" + get_random_value_hex(32)
user_info = UserInfo(available_slots=42, subscription_expiry=100)
user_db_manager.store_user(user_pk, user_info.to_dict())
stored_users[user_pk] = user_info.to_dict()
user_db_manager.store_user(user_id, user_info.to_dict())
stored_users[user_id] = user_info.to_dict()
all_users = user_db_manager.load_all_users()