From a385fa16a99aa7578eb18491d0f8352f25794f73 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Tue, 29 Oct 2019 14:51:59 -0700 Subject: [PATCH] Adds DBManager unit tests --- test/unit/conftest.py | 9 +- test/unit/test_db_manager.py | 240 +++++++++++++++++++++++++++++++++++ 2 files changed, 247 insertions(+), 2 deletions(-) create mode 100644 test/unit/test_db_manager.py diff --git a/test/unit/conftest.py b/test/unit/conftest.py index 182e170..c461c5f 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -2,6 +2,7 @@ import pytest import random import requests from time import sleep +from shutil import rmtree from threading import Thread from pisa.conf import DB_PATH @@ -39,9 +40,13 @@ def prng_seed(): random.seed(0) -@pytest.fixture(scope='session') +@pytest.fixture(scope='module') def db_manager(): - return DBManager('test_db') + manager = DBManager('test_db') + yield manager + + manager.db.close() + rmtree('test_db') def get_random_value_hex(nbytes): diff --git a/test/unit/test_db_manager.py b/test/unit/test_db_manager.py new file mode 100644 index 0000000..0a2724e --- /dev/null +++ b/test/unit/test_db_manager.py @@ -0,0 +1,240 @@ +import os +import json +import pytest +import shutil +from uuid import uuid4 + +from pisa.db_manager import DBManager +from test.unit.conftest import get_random_value_hex +from pisa.conf import WATCHER_LAST_BLOCK_KEY, RESPONDER_LAST_BLOCK_KEY + + +@pytest.fixture(scope='module') +def watcher_appointments(): + return {uuid4().hex: get_random_value_hex(32) for _ in range(10)} + + +@pytest.fixture(scope='module') +def responder_jobs(): + return {get_random_value_hex(32): get_random_value_hex(32) for _ in range(10)} + + +def open_create_db(db_path): + + try: + db_manager = DBManager(db_path) + + return db_manager + + except ValueError: + return False + + +def test_init(): + db_path = 'init_test_db' + + # First we check if the db exists, and if so we delete it + if os.path.isdir(db_path): + shutil.rmtree(db_path) + + # Check that the db can be created if it does not exist + db_manager = open_create_db(db_path) + assert isinstance(db_manager, DBManager) + print(type(db_manager)) + db_manager.db.close() + + # Check that we can open an already create db + db_manager = open_create_db(db_path) + assert isinstance(db_manager, DBManager) + db_manager.db.close() + + # Check we cannot create/open a db with an invalid parameter + assert open_create_db(0) is False + + # Removing test db + shutil.rmtree(db_path) + + +def test_load_appointments_db(db_manager): + # Let's made up a prefix and try to load data from the database using it + prefix = 'XX' + db_appointments = db_manager.load_appointments_db(prefix) + + assert len(db_appointments) == 0 + + # We can add a bunch of data to the db and try again (data is stored in json by the manager) + local_appointments = {} + for _ in range(10): + key = get_random_value_hex(32) + value = get_random_value_hex(32) + local_appointments[key] = value + + db_manager.db.put((prefix+key).encode('utf-8'), json.dumps({'value': value}).encode('utf-8')) + + db_appointments = db_manager.load_appointments_db(prefix) + + # Check that both keys and values are the same + assert db_appointments.keys() == local_appointments.keys() + + values = [appointment["value"] for appointment in db_appointments.values()] + assert set(values) == set(local_appointments.values()) and (len(values) == len(local_appointments)) + + +def test_get_last_known_block(db_manager): + # Trying to get any last block for either the watcher or the responder should return None for an empty db + for key in [WATCHER_LAST_BLOCK_KEY, RESPONDER_LAST_BLOCK_KEY]: + assert db_manager.get_last_known_block(key) is None + + # After saving some block in the db we should get that exact value + for key in [WATCHER_LAST_BLOCK_KEY, RESPONDER_LAST_BLOCK_KEY]: + block_hash = get_random_value_hex(32) + db_manager.db.put(key.encode('utf-8'), block_hash.encode('utf-8')) + assert db_manager.get_last_known_block(key) == block_hash + + +def test_create_entry(db_manager): + key = get_random_value_hex(32) + value = get_random_value_hex(32) + + # Adding a value with no prefix (create entry encodes values in utf-8 internally) + db_manager.create_entry(key, value) + + # We should be able to get it straightaway from the key + assert db_manager.db.get(key.encode('utf-8')).decode('utf-8') == value + + # If we prefix the key we should be able to get it if we add the prefix, but not otherwise + key = get_random_value_hex(32) + prefix = 'w' + db_manager.create_entry(key, value, prefix=prefix) + + assert db_manager.db.get((prefix+key).encode('utf-8')).decode('utf-8') == value + assert db_manager.db.get(key.encode('utf-8')) is None + + # Same if we try to use any other prefix + another_prefix = 'r' + assert db_manager.db.get((another_prefix+key).encode('utf-8')) is None + + +def test_delete_entry(db_manager): + # Let's first get the key all the things we've wrote so far in the db + data = [k.decode('utf-8') for k, v in db_manager.db.iterator()] + + # Let's empty the db now + for key in data: + db_manager.delete_entry(key) + + assert len([k for k, v in db_manager.db.iterator()]) == 0 + + # Let's check that the same works if a prefix is provided. + prefix = "r" + key = get_random_value_hex(32) + value = get_random_value_hex(32) + db_manager.create_entry(key, value, prefix) + + # Checks it's there + assert db_manager.db.get((prefix + key).encode('utf-8')).decode('utf-8') == value + + # And now it's gone + db_manager.delete_entry(key, prefix) + assert db_manager.db.get((prefix+key).encode('utf-8')) is None + + +def test_load_watcher_appointments_empty(db_manager): + assert len(db_manager.load_watcher_appointments()) == 0 + + +def test_load_responder_jobs_empty(db_manager): + assert len(db_manager.load_responder_jobs()) == 0 + + +def test_store_load_watcher_appointment(db_manager, watcher_appointments): + for key, value in watcher_appointments.items(): + db_manager.store_watcher_appointment(key, json.dumps({'value': value})) + + db_watcher_appointments = db_manager.load_watcher_appointments() + + values = [appointment["value"] for appointment in db_watcher_appointments.values()] + + assert watcher_appointments.keys() == db_watcher_appointments.keys() + assert set(watcher_appointments.values()) == set(values) and len(watcher_appointments) == len(values) + + +def test_store_load_appointment_jobs(db_manager, responder_jobs): + for key, value in responder_jobs.items(): + db_manager.store_responder_job(key, json.dumps({'value': value})) + + db_responder_jobs = db_manager.load_responder_jobs() + + values = [job["value"] for job in db_responder_jobs.values()] + + assert responder_jobs.keys() == db_responder_jobs.keys() + assert set(responder_jobs.values()) == set(values) and len(responder_jobs) == len(values) + + +def test_delete_watcher_appointment(db_manager, watcher_appointments): + # Let's delete all we added + db_watcher_appointments = db_manager.load_watcher_appointments() + assert len(db_watcher_appointments) != 0 + + for key in watcher_appointments.keys(): + db_manager.delete_watcher_appointment(key) + + db_watcher_appointments = db_manager.load_watcher_appointments() + assert len(db_watcher_appointments) == 0 + + +def test_delete_responder_job(db_manager, responder_jobs): + # Same for the responder + db_responder_jobs = db_manager.load_responder_jobs() + assert len(db_responder_jobs) != 0 + + for key in responder_jobs.keys(): + db_manager.delete_responder_job(key) + + db_responder_jobs = db_manager.load_responder_jobs() + assert len(db_responder_jobs) == 0 + + +def test_store_load_last_block_hash_watcher(db_manager): + # Let's first create a made up block hash + local_last_block_hash = get_random_value_hex(32) + db_manager.store_last_block_hash_watcher(local_last_block_hash) + + db_last_block_hash = db_manager.load_last_block_hash_watcher() + + assert local_last_block_hash == db_last_block_hash + + +def test_store_load_last_block_hash_responder(db_manager): + # Same for the responder + local_last_block_hash = get_random_value_hex(32) + db_manager.store_last_block_hash_responder(local_last_block_hash) + + db_last_block_hash = db_manager.load_last_block_hash_responder() + + assert local_last_block_hash == db_last_block_hash + + + + + + + + + + + + + + + + + + + + + + + + +