mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
Updates unit tests, adds tests to cover new methods and removes unused imports
This commit is contained in:
@@ -4,7 +4,6 @@ from uuid import uuid4
|
||||
from pisa.responder import TransactionTracker
|
||||
from pisa.cleaner import Cleaner
|
||||
from common.appointment import Appointment
|
||||
from pisa.db_manager import WATCHER_PREFIX, TRIGGERED_APPOINTMENTS_PREFIX
|
||||
|
||||
from test.pisa.unit.conftest import get_random_value_hex
|
||||
|
||||
@@ -16,7 +15,6 @@ MAX_ITEMS = 100
|
||||
ITERATIONS = 10
|
||||
|
||||
|
||||
# WIP: FIX CLEANER TESTS AFTER ADDING delete_complete_appointment
|
||||
def set_up_appointments(db_manager, total_appointments):
|
||||
appointments = dict()
|
||||
locator_uuid_map = dict()
|
||||
@@ -30,7 +28,7 @@ def set_up_appointments(db_manager, total_appointments):
|
||||
locator_uuid_map[locator] = [uuid]
|
||||
|
||||
db_manager.store_watcher_appointment(uuid, appointment.to_json())
|
||||
db_manager.store_update_locator_map(locator, uuid)
|
||||
db_manager.create_append_locator_map(locator, uuid)
|
||||
|
||||
# Each locator can have more than one uuid assigned to it.
|
||||
if i % 2:
|
||||
@@ -40,7 +38,7 @@ def set_up_appointments(db_manager, total_appointments):
|
||||
locator_uuid_map[locator].append(uuid)
|
||||
|
||||
db_manager.store_watcher_appointment(uuid, appointment.to_json())
|
||||
db_manager.store_update_locator_map(locator, uuid)
|
||||
db_manager.create_append_locator_map(locator, uuid)
|
||||
|
||||
return appointments, locator_uuid_map
|
||||
|
||||
@@ -63,7 +61,7 @@ def set_up_trackers(db_manager, total_trackers):
|
||||
tx_tracker_map[penalty_txid] = [uuid]
|
||||
|
||||
db_manager.store_responder_tracker(uuid, tracker.to_json())
|
||||
db_manager.store_update_locator_map(tracker.locator, uuid)
|
||||
db_manager.create_append_locator_map(tracker.locator, uuid)
|
||||
|
||||
# Each penalty_txid can have more than one uuid assigned to it.
|
||||
if i % 2:
|
||||
@@ -73,34 +71,89 @@ def set_up_trackers(db_manager, total_trackers):
|
||||
tx_tracker_map[penalty_txid].append(uuid)
|
||||
|
||||
db_manager.store_responder_tracker(uuid, tracker.to_json())
|
||||
db_manager.store_update_locator_map(tracker.locator, uuid)
|
||||
db_manager.create_append_locator_map(tracker.locator, uuid)
|
||||
|
||||
return trackers, tx_tracker_map
|
||||
|
||||
|
||||
def test_delete_appointment_from_memory(db_manager):
|
||||
appointments, locator_uuid_map = set_up_appointments(db_manager, MAX_ITEMS)
|
||||
|
||||
for uuid in list(appointments.keys()):
|
||||
Cleaner.delete_appointment_from_memory(uuid, appointments, locator_uuid_map)
|
||||
|
||||
# The appointment should have been deleted from memory, but not from the db
|
||||
assert uuid not in appointments
|
||||
assert db_manager.load_watcher_appointment(uuid) is not None
|
||||
|
||||
|
||||
def test_delete_appointment_from_db(db_manager):
|
||||
appointments, locator_uuid_map = set_up_appointments(db_manager, MAX_ITEMS)
|
||||
|
||||
for uuid in list(appointments.keys()):
|
||||
Cleaner.delete_appointment_from_db(uuid, db_manager)
|
||||
|
||||
# The appointment should have been deleted from memory, but not from the db
|
||||
assert uuid in appointments
|
||||
assert db_manager.load_watcher_appointment(uuid) is None
|
||||
|
||||
|
||||
def test_update_delete_db_locator_map(db_manager):
|
||||
appointments, locator_uuid_map = set_up_appointments(db_manager, MAX_ITEMS)
|
||||
|
||||
for uuid, appointment in appointments.items():
|
||||
locator = appointment.get("locator")
|
||||
locator_map_before = db_manager.load_locator_map(locator)
|
||||
Cleaner.update_delete_db_locator_map(uuid, locator, db_manager)
|
||||
locator_map_after = db_manager.load_locator_map(locator)
|
||||
|
||||
if locator_map_after is None:
|
||||
assert locator_map_before is not None
|
||||
|
||||
else:
|
||||
assert uuid in locator_map_before and uuid not in locator_map_after
|
||||
|
||||
|
||||
def test_delete_expired_appointment(db_manager):
|
||||
for _ in range(ITERATIONS):
|
||||
appointments, locator_uuid_map = set_up_appointments(db_manager, MAX_ITEMS)
|
||||
expired_appointments = random.sample(list(appointments.keys()), k=ITEMS)
|
||||
|
||||
Cleaner.delete_expired_appointment(expired_appointments, appointments, locator_uuid_map, db_manager)
|
||||
Cleaner.delete_expired_appointments(expired_appointments, appointments, locator_uuid_map, db_manager)
|
||||
|
||||
assert not set(expired_appointments).issubset(appointments.keys())
|
||||
|
||||
|
||||
def test_delete_completed_appointments(db_manager):
|
||||
for _ in range(ITERATIONS):
|
||||
appointments, locator_uuid_map = set_up_appointments(db_manager, MAX_ITEMS)
|
||||
uuids = list(appointments.keys())
|
||||
completed_appointments = random.sample(list(appointments.keys()), k=ITEMS)
|
||||
|
||||
for uuid in uuids:
|
||||
Cleaner.delete_completed_appointment(uuid, appointments, locator_uuid_map, db_manager)
|
||||
len_before_clean = len(appointments)
|
||||
Cleaner.delete_completed_appointments(completed_appointments, appointments, locator_uuid_map, db_manager)
|
||||
|
||||
# All appointments should have been deleted
|
||||
assert len(appointments) == 0
|
||||
# ITEMS appointments should have been deleted from memory
|
||||
assert len(appointments) == len_before_clean - ITEMS
|
||||
|
||||
# Make sure they are not in the db either
|
||||
db_appointments = db_manager.load_watcher_appointments(include_triggered=True)
|
||||
assert not set(completed_appointments).issubset(db_appointments)
|
||||
|
||||
|
||||
def test_flag_triggered_appointments(db_manager):
|
||||
for _ in range(ITERATIONS):
|
||||
appointments, locator_uuid_map = set_up_appointments(db_manager, MAX_ITEMS)
|
||||
triggered_appointments = random.sample(list(appointments.keys()), k=ITEMS)
|
||||
|
||||
len_before_clean = len(appointments)
|
||||
Cleaner.flag_triggered_appointments(triggered_appointments, appointments, locator_uuid_map, db_manager)
|
||||
|
||||
# ITEMS appointments should have been deleted from memory
|
||||
assert len(appointments) == len_before_clean - ITEMS
|
||||
|
||||
# Make sure that all appointments are flagged as triggered in the db
|
||||
for uuid in uuids:
|
||||
assert db_manager.db.get((TRIGGERED_APPOINTMENTS_PREFIX + uuid).encode("utf-8")) is not None
|
||||
db_appointments = db_manager.load_all_triggered_flags()
|
||||
assert set(triggered_appointments).issubset(db_appointments)
|
||||
|
||||
|
||||
def test_delete_completed_trackers_db_match(db_manager):
|
||||
|
||||
@@ -171,25 +171,70 @@ def test_load_locator_map_empty(db_manager):
|
||||
assert db_manager.load_locator_map(get_random_value_hex(LOCATOR_LEN_BYTES)) is None
|
||||
|
||||
|
||||
def test_store_update_locator_map_empty(db_manager):
|
||||
def test_create_append_locator_map(db_manager):
|
||||
uuid = uuid4().hex
|
||||
locator = get_random_value_hex(LOCATOR_LEN_BYTES)
|
||||
db_manager.store_update_locator_map(locator, uuid)
|
||||
db_manager.create_append_locator_map(locator, uuid)
|
||||
|
||||
# Check that the locator map has been properly stored
|
||||
assert db_manager.load_locator_map(locator) == [uuid]
|
||||
|
||||
# If we try to add the same uuid again the list shouldn't change
|
||||
db_manager.store_update_locator_map(locator, uuid)
|
||||
db_manager.create_append_locator_map(locator, uuid)
|
||||
assert db_manager.load_locator_map(locator) == [uuid]
|
||||
|
||||
# Add another uuid to the same locator and check that it also works
|
||||
uuid2 = uuid4().hex
|
||||
db_manager.store_update_locator_map(locator, uuid2)
|
||||
db_manager.create_append_locator_map(locator, uuid2)
|
||||
|
||||
assert set(db_manager.load_locator_map(locator)) == set([uuid, uuid2])
|
||||
|
||||
|
||||
def test_update_locator_map(db_manager):
|
||||
# Let's create a couple of appointments with the same locator
|
||||
locator = get_random_value_hex(32)
|
||||
uuid1 = uuid4().hex
|
||||
uuid2 = uuid4().hex
|
||||
db_manager.create_append_locator_map(locator, uuid1)
|
||||
db_manager.create_append_locator_map(locator, uuid2)
|
||||
|
||||
locator_map = db_manager.load_locator_map(locator)
|
||||
assert uuid1 in locator_map
|
||||
|
||||
locator_map.remove(uuid1)
|
||||
db_manager.update_locator_map(locator, locator_map)
|
||||
|
||||
locator_map_after = db_manager.load_locator_map(locator)
|
||||
assert uuid1 not in locator_map_after and uuid2 in locator_map_after and len(locator_map_after) == 1
|
||||
|
||||
|
||||
def test_update_locator_map_wong_data(db_manager):
|
||||
# Let's try to update the locator map with a different list of uuids
|
||||
locator = get_random_value_hex(32)
|
||||
db_manager.create_append_locator_map(locator, uuid4().hex)
|
||||
db_manager.create_append_locator_map(locator, uuid4().hex)
|
||||
|
||||
locator_map = db_manager.load_locator_map(locator)
|
||||
wrong_map_update = [uuid4().hex]
|
||||
db_manager.update_locator_map(locator, wrong_map_update)
|
||||
locator_map_after = db_manager.load_locator_map(locator)
|
||||
|
||||
assert locator_map_after == locator_map
|
||||
|
||||
|
||||
def test_update_locator_map_empty(db_manager):
|
||||
# We shouldn't be able to update a map with an empty list
|
||||
locator = get_random_value_hex(32)
|
||||
db_manager.create_append_locator_map(locator, uuid4().hex)
|
||||
db_manager.create_append_locator_map(locator, uuid4().hex)
|
||||
|
||||
locator_map = db_manager.load_locator_map(locator)
|
||||
db_manager.update_locator_map(locator, [])
|
||||
locator_map_after = db_manager.load_locator_map(locator)
|
||||
|
||||
assert locator_map_after == locator_map
|
||||
|
||||
|
||||
def test_delete_locator_map(db_manager):
|
||||
locator_maps = db_manager.load_appointments_db(prefix=LOCATOR_MAP_PREFIX)
|
||||
assert len(locator_maps) != 0
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import importlib
|
||||
import os
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from shutil import copyfile
|
||||
|
||||
from pisa.pisad import load_config
|
||||
|
||||
@@ -140,7 +140,7 @@ def test_do_watch(watcher):
|
||||
for uuid, appointment in appointments.items():
|
||||
watcher.appointments[uuid] = {"locator": appointment.locator, "end_time": appointment.end_time}
|
||||
watcher.db_manager.store_watcher_appointment(uuid, appointment.to_json())
|
||||
watcher.db_manager.store_update_locator_map(appointment.locator, uuid)
|
||||
watcher.db_manager.create_append_locator_map(appointment.locator, uuid)
|
||||
|
||||
Thread(target=watcher.do_watch, daemon=True).start()
|
||||
|
||||
@@ -190,7 +190,7 @@ def test_filter_valid_breaches_random_data(watcher):
|
||||
uuid = uuid4().hex
|
||||
appointments[uuid] = {"locator": dummy_appointment.locator, "end_time": dummy_appointment.end_time}
|
||||
watcher.db_manager.store_watcher_appointment(uuid, dummy_appointment.to_json())
|
||||
watcher.db_manager.store_update_locator_map(dummy_appointment.locator, uuid)
|
||||
watcher.db_manager.create_append_locator_map(dummy_appointment.locator, uuid)
|
||||
|
||||
locator_uuid_map[dummy_appointment.locator] = [uuid]
|
||||
|
||||
@@ -201,9 +201,10 @@ def test_filter_valid_breaches_random_data(watcher):
|
||||
watcher.locator_uuid_map = locator_uuid_map
|
||||
watcher.appointments = appointments
|
||||
|
||||
filtered_valid_breaches = watcher.filter_valid_breaches(breaches)
|
||||
valid_breaches, invalid_breaches = watcher.filter_valid_breaches(breaches)
|
||||
|
||||
assert not any([fil_breach["valid_breach"] for uuid, fil_breach in filtered_valid_breaches.items()])
|
||||
# We have "triggered" TEST_SET_SIZE/2 breaches, all of them invalid.
|
||||
assert len(valid_breaches) == 0 and len(invalid_breaches) == TEST_SET_SIZE / 2
|
||||
|
||||
|
||||
def test_filter_valid_breaches(watcher):
|
||||
@@ -229,10 +230,11 @@ def test_filter_valid_breaches(watcher):
|
||||
for uuid, appointment in appointments.items():
|
||||
watcher.appointments[uuid] = {"locator": appointment.locator, "end_time": appointment.end_time}
|
||||
watcher.db_manager.store_watcher_appointment(uuid, dummy_appointment.to_json())
|
||||
watcher.db_manager.store_update_locator_map(dummy_appointment.locator, uuid)
|
||||
watcher.db_manager.create_append_locator_map(dummy_appointment.locator, uuid)
|
||||
|
||||
watcher.locator_uuid_map = locator_uuid_map
|
||||
|
||||
filtered_valid_breaches = watcher.filter_valid_breaches(breaches)
|
||||
valid_breaches, invalid_breaches = watcher.filter_valid_breaches(breaches)
|
||||
|
||||
assert all([fil_breach["valid_breach"] for uuid, fil_breach in filtered_valid_breaches.items()])
|
||||
# We have "triggered" a single breach and it was valid.
|
||||
assert len(invalid_breaches) == 0 and len(valid_breaches) == 1
|
||||
|
||||
Reference in New Issue
Block a user