diff --git a/test/teos/unit/test_appointments_dbm.py b/test/teos/unit/test_appointments_dbm.py index b6cd69f..48928f6 100644 --- a/test/teos/unit/test_appointments_dbm.py +++ b/test/teos/unit/test_appointments_dbm.py @@ -169,15 +169,32 @@ def test_delete_locator_map(db_manager): assert len(locator_maps) != 0 for locator, uuids in locator_maps.items(): - db_manager.delete_locator_map(locator) + assert db_manager.delete_locator_map(locator) is True locator_maps = db_manager.load_appointments_db(prefix=LOCATOR_MAP_PREFIX) assert len(locator_maps) == 0 + # Keys of wrong type should fail + assert db_manager.delete_locator_map(42) is False + + +def test_store_watcher_appointment_wrong(db_manager, watcher_appointments): + # Wrong uuid types should fail + for _, appointment in watcher_appointments.items(): + assert db_manager.store_watcher_appointment(42, appointment.to_dict()) is False + + +def test_load_watcher_appointment_wrong(db_manager): + # Random keys should fail + assert db_manager.load_watcher_appointment(get_random_value_hex(16)) is None + + # Wrong format keys should also return None + assert db_manager.load_watcher_appointment(42) is None + def test_store_load_watcher_appointment(db_manager, watcher_appointments): for uuid, appointment in watcher_appointments.items(): - db_manager.store_watcher_appointment(uuid, appointment.to_dict()) + assert db_manager.store_watcher_appointment(uuid, appointment.to_dict()) is True db_watcher_appointments = db_manager.load_watcher_appointments() @@ -200,7 +217,7 @@ def test_store_load_triggered_appointment(db_manager): # Create an appointment flagged as triggered triggered_appointment, _ = generate_dummy_appointment(real_height=False) uuid = uuid4().hex - db_manager.store_watcher_appointment(uuid, triggered_appointment.to_dict()) + assert db_manager.store_watcher_appointment(uuid, triggered_appointment.to_dict()) is True db_manager.create_triggered_appointment_flag(uuid) # The new appointment is grabbed only if we set include_triggered @@ -208,9 +225,23 @@ def test_store_load_triggered_appointment(db_manager): assert uuid in db_manager.load_watcher_appointments(include_triggered=True) +def test_store_responder_trackers_wrong(db_manager, responder_trackers): + # Wrong uuid types should fail + for _, tracker in responder_trackers.items(): + assert db_manager.store_responder_tracker(42, {"value": tracker}) is False + + +def test_load_responder_tracker_wrong(db_manager): + # Random keys should fail + assert db_manager.load_responder_tracker(get_random_value_hex(16)) is None + + # Wrong format keys should also return None + assert db_manager.load_responder_tracker(42) is None + + def test_store_load_responder_trackers(db_manager, responder_trackers): for key, value in responder_trackers.items(): - db_manager.store_responder_tracker(key, {"value": value}) + assert db_manager.store_responder_tracker(key, {"value": value}) is True db_responder_trackers = db_manager.load_responder_trackers() @@ -226,16 +257,19 @@ def test_delete_watcher_appointment(db_manager, watcher_appointments): assert len(db_watcher_appointments) != 0 for key in watcher_appointments.keys(): - db_manager.delete_watcher_appointment(key) + assert db_manager.delete_watcher_appointment(key) is True db_watcher_appointments = db_manager.load_watcher_appointments() assert len(db_watcher_appointments) == 0 + # Keys of wrong type should fail + assert db_manager.delete_watcher_appointment(42) is False + def test_batch_delete_watcher_appointments(db_manager, watcher_appointments): # Let's start by adding a bunch of appointments for uuid, appointment in watcher_appointments.items(): - db_manager.store_watcher_appointment(uuid, appointment.to_dict()) + assert db_manager.store_watcher_appointment(uuid, appointment.to_dict()) is True first_half = list(watcher_appointments.keys())[: len(watcher_appointments) // 2] second_half = list(watcher_appointments.keys())[len(watcher_appointments) // 2 :] @@ -261,16 +295,19 @@ def test_delete_responder_tracker(db_manager, responder_trackers): assert len(db_responder_trackers) != 0 for key in responder_trackers.keys(): - db_manager.delete_responder_tracker(key) + assert db_manager.delete_responder_tracker(key) is True db_responder_trackers = db_manager.load_responder_trackers() assert len(db_responder_trackers) == 0 + # Keys of wrong type should fail + assert db_manager.delete_responder_tracker(42) is False + def test_batch_delete_responder_trackers(db_manager, responder_trackers): # Let's start by adding a bunch of appointments for uuid, value in responder_trackers.items(): - db_manager.store_responder_tracker(uuid, {"value": value}) + assert db_manager.store_responder_tracker(uuid, {"value": value}) is True first_half = list(responder_trackers.keys())[: len(responder_trackers) // 2] second_half = list(responder_trackers.keys())[len(responder_trackers) // 2 :] @@ -293,22 +330,28 @@ def test_batch_delete_responder_trackers(db_manager, responder_trackers): def test_store_load_last_block_hash_watcher(db_manager): # Let's first create a made up block hash local_last_block_hash = get_random_value_hex(32) - db_manager.store_last_block_hash_watcher(local_last_block_hash) + assert db_manager.store_last_block_hash_watcher(local_last_block_hash) is True db_last_block_hash = db_manager.load_last_block_hash_watcher() assert local_last_block_hash == db_last_block_hash + # Wrong types for last block should fail for both store and load + assert db_manager.store_last_block_hash_watcher(42) is False + def test_store_load_last_block_hash_responder(db_manager): # Same for the responder local_last_block_hash = get_random_value_hex(32) - db_manager.store_last_block_hash_responder(local_last_block_hash) + assert db_manager.store_last_block_hash_responder(local_last_block_hash) is True db_last_block_hash = db_manager.load_last_block_hash_responder() assert local_last_block_hash == db_last_block_hash + # Wrong types for last block should fail for both store and load + assert db_manager.store_last_block_hash_responder(42) is False + def test_create_triggered_appointment_flag(db_manager): # Test that flags are added @@ -354,12 +397,15 @@ def test_delete_triggered_appointment_flag(db_manager): # Delete all entries for k in keys: - db_manager.delete_triggered_appointment_flag(k) + assert db_manager.delete_triggered_appointment_flag(k) is True # Try to load them back for k in keys: assert db_manager.db.get((TRIGGERED_APPOINTMENTS_PREFIX + k).encode("utf-8")) is None + # Keys of wrong type should fail + assert db_manager.delete_triggered_appointment_flag(42) is False + def test_batch_delete_triggered_appointment_flag(db_manager): # Let's add some flags first diff --git a/test/teos/unit/test_users_dbm.py b/test/teos/unit/test_users_dbm.py new file mode 100644 index 0000000..5066561 --- /dev/null +++ b/test/teos/unit/test_users_dbm.py @@ -0,0 +1,82 @@ +from teos.appointments_dbm import AppointmentsDBM + +from test.teos.unit.conftest import get_random_value_hex + + +stored_users = {} + + +def open_create_db(db_path): + + try: + db_manager = AppointmentsDBM(db_path) + + return db_manager + + except ValueError: + return False + + +def test_store_user(user_db_manager): + # Store user should work as long as the user_pk is properly formatted and data is a dictionary + user_pk = "02" + get_random_value_hex(32) + user_data = {"available_slots": 42} + stored_users[user_pk] = user_data + assert user_db_manager.store_user(user_pk, user_data) is True + + # Wrong pks should return False on adding + user_pk = "04" + get_random_value_hex(32) + user_data = {"available_slots": 42} + assert user_db_manager.store_user(user_pk, user_data) is False + + # Same for wrong types + assert user_db_manager.store_user(42, user_data) is False + + # And for wrong type user data + assert user_db_manager.store_user(user_pk, 42) is False + + +def test_load_user(user_db_manager): + # Loading a user we have stored should work + for user_pk, user_data in stored_users.items(): + assert user_db_manager.load_user(user_pk) == user_data + + # Random keys should fail + assert user_db_manager.load_user(get_random_value_hex(33)) is None + + # Wrong format keys should also return None + assert user_db_manager.load_user(42) is None + + +def test_delete_user(user_db_manager): + # Deleting an existing user should work + for user_pk, user_data in stored_users.items(): + assert user_db_manager.delete_user(user_pk) is True + + for user_pk, user_data in stored_users.items(): + assert user_db_manager.load_user(user_pk) is None + + # But deleting a non existing one should not fail + assert user_db_manager.delete_user(get_random_value_hex(32)) is True + + # Keys of wrong type should fail + assert user_db_manager.delete_user(42) is False + + +def test_load_all_users(user_db_manager): + # There should be no users at the moment + assert user_db_manager.load_all_users() == {} + stored_users = {} + + # Adding some and checking we get them all + for i in range(10): + user_pk = "02" + get_random_value_hex(32) + user_data = {"available_slots": i} + user_db_manager.store_user(user_pk, user_data) + stored_users[user_pk] = user_data + + all_users = user_db_manager.load_all_users() + + assert set(all_users.keys()) == set(stored_users.keys()) + for k, v in all_users.items(): + assert stored_users[k] == v