Adds batch update unit tests for DBManager

This commit is contained in:
Sergi Delgado Segura
2020-01-31 13:57:30 +01:00
parent 4f000298fa
commit 6c957b067d

View File

@@ -303,6 +303,29 @@ def test_delete_watcher_appointment(db_manager, watcher_appointments):
assert len(db_watcher_appointments) == 0
def test_batch_delete_watcher_appointments(db_manager, watcher_appointments):
# Let's start by adding a bunch of appointments
for uuid, appointment in watcher_appointments.items():
db_manager.store_watcher_appointment(uuid, appointment.to_json())
first_half = list(watcher_appointments.keys())[: len(watcher_appointments) // 2]
second_half = list(watcher_appointments.keys())[len(watcher_appointments) // 2 :]
# Let's now delete half of them in a batch update
db_manager.batch_delete_watcher_appointments(first_half)
db_watcher_appointments = db_manager.load_watcher_appointments()
assert not set(db_watcher_appointments.keys()).issuperset(first_half)
assert set(db_watcher_appointments.keys()).issuperset(second_half)
# Let's delete the rest
db_manager.batch_delete_watcher_appointments(second_half)
# Now there should be no appointments left
db_watcher_appointments = db_manager.load_watcher_appointments()
assert not db_watcher_appointments
def test_delete_responder_tracker(db_manager, responder_trackers):
# Same for the responder
db_responder_trackers = db_manager.load_responder_trackers()
@@ -315,6 +338,29 @@ def test_delete_responder_tracker(db_manager, responder_trackers):
assert len(db_responder_trackers) == 0
def test_batch_delete_responder_trackers(db_manager, responder_trackers):
# Let's start by adding a bunch of appointments
for uuid, value in responder_trackers.items():
db_manager.store_responder_tracker(uuid, json.dumps({"value": value}))
first_half = list(responder_trackers.keys())[: len(responder_trackers) // 2]
second_half = list(responder_trackers.keys())[len(responder_trackers) // 2 :]
# Let's now delete half of them in a batch update
db_manager.batch_delete_responder_trackers(first_half)
db_responder_trackers = db_manager.load_responder_trackers()
assert not set(db_responder_trackers.keys()).issuperset(first_half)
assert set(db_responder_trackers.keys()).issuperset(second_half)
# Let's delete the rest
db_manager.batch_delete_responder_trackers(second_half)
# Now there should be no trackers left
db_responder_trackers = db_manager.load_responder_trackers()
assert not db_responder_trackers
def test_store_load_last_block_hash_watcher(db_manager):
# Let's first create a made up block hash
local_last_block_hash = get_random_value_hex(32)
@@ -347,6 +393,20 @@ def test_create_triggered_appointment_flag(db_manager):
assert db_manager.db.get((TRIGGERED_APPOINTMENTS_PREFIX + key).encode("utf-8")) is None
def test_batch_create_triggered_appointment_flag(db_manager):
# Test that flags are added in batch
keys = [get_random_value_hex(16) for _ in range(10)]
# Checked that non of the flags is already in the db
db_flags = db_manager.load_all_triggered_flags()
assert not set(db_flags).issuperset(keys)
# Make sure that they are now
db_manager.batch_create_triggered_appointment_flag(keys)
db_flags = db_manager.load_all_triggered_flags()
assert set(db_flags).issuperset(keys)
def test_load_all_triggered_flags(db_manager):
# There should be a some flags in the db from the previous tests. Let's load them
flags = db_manager.load_all_triggered_flags()
@@ -370,3 +430,22 @@ def test_delete_triggered_appointment_flag(db_manager):
# Try to load them back
for k in keys:
assert db_manager.db.get((TRIGGERED_APPOINTMENTS_PREFIX + k).encode("utf-8")) is None
def test_batch_delete_triggered_appointment_flag(db_manager):
# Let's add some flags first
keys = [get_random_value_hex(16) for _ in range(10)]
db_manager.batch_create_triggered_appointment_flag(keys)
# And now let's delete in batch
first_half = keys[: len(keys) // 2]
second_half = keys[len(keys) // 2 :]
db_manager.batch_delete_triggered_appointment_flag(first_half)
db_falgs = db_manager.load_all_triggered_flags()
assert not set(db_falgs).issuperset(first_half)
assert set(db_falgs).issuperset(second_half)
# Delete the rest
db_manager.batch_delete_triggered_appointment_flag(second_half)
assert not db_manager.load_all_triggered_flags()