From aa12fa2cf88c34c839d6409ded538425b73f37cb Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 10 Feb 2020 16:19:22 +0100 Subject: [PATCH 1/3] Removes sleep flag from Watcher and Responder The sleep flag was used to avoid doing useless work when no data was hold by the tower. However, from the implementation of the data persistence on, the Watcher and Responder should at least keep track of the last known block. The current apporach was making this harder. --- pisa/builder.py | 26 ++++-------------- pisa/chain_monitor.py | 56 +++++++++----------------------------- pisa/responder.py | 62 ++++++++++++------------------------------- pisa/watcher.py | 57 +++++++++++---------------------------- 4 files changed, 50 insertions(+), 151 deletions(-) diff --git a/pisa/builder.py b/pisa/builder.py index 39298dd..e9728cc 100644 --- a/pisa/builder.py +++ b/pisa/builder.py @@ -120,7 +120,6 @@ class Builder: set(missed_blocks_responder).difference(missed_blocks_watcher), key=missed_blocks_responder.index ) Builder.populate_block_queue(watcher.responder.block_queue, block_diff) - watcher.responder.awake() watcher.responder.block_queue.join() elif len(missed_blocks_watcher) > len(missed_blocks_responder): @@ -128,27 +127,12 @@ class Builder: set(missed_blocks_watcher).difference(missed_blocks_responder), key=missed_blocks_watcher.index ) Builder.populate_block_queue(watcher.block_queue, block_diff) - watcher.awake() watcher.block_queue.join() - # Awake the actors if they are asleep and have pending work. No new inputs are provided, so if the Watcher is - # asleep it will remain asleep. However, the Responder may come and go to sleep since it will be awaken if - # appointments are passed trough from the Watcher. - if watcher.appointments and watcher.asleep: - watcher.awake() - - if watcher.responder.trackers and watcher.responder.asleep: - watcher.responder.awake() - + # Once they are at the same height, we update them one by one for block in missed_blocks_watcher: - if not watcher.asleep: - watcher.block_queue.put(block) - watcher.block_queue.join() + watcher.block_queue.put(block) + watcher.block_queue.join() - if not watcher.responder.asleep: - watcher.responder.block_queue.put(block) - watcher.responder.block_queue.join() - else: - # The Responder keeps track of last know block for reorgs, so it has to be updated even if there're no - # trackers - watcher.responder.last_known_block = block + watcher.responder.block_queue.put(block) + watcher.responder.block_queue.join() diff --git a/pisa/chain_monitor.py b/pisa/chain_monitor.py index 22ef377..0dad221 100644 --- a/pisa/chain_monitor.py +++ b/pisa/chain_monitor.py @@ -19,6 +19,10 @@ class ChainMonitor: The :class:`ChainMonitor` monitors the chain using two methods: ``zmq`` and ``polling``. Blocks are only notified once per queue and the notification is triggered by the method that detects the block faster. + Args: + watcher_queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Watcher``. + responder_queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Responder``. + Attributes: best_tip (:obj:`str`): a block hash representing the current best tip. last_tips (:obj:`list`): a list of last chain tips. Used as a sliding window to avoid notifying about old tips. @@ -30,11 +34,9 @@ class ChainMonitor: watcher_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Watcher `. responder_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Responder `. - watcher_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Watcher`` or not. - responder_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Responder`` or not. """ - def __init__(self): + def __init__(self, watcher_queue, responder_queue): self.best_tip = None self.last_tips = [] self.terminate = False @@ -48,53 +50,21 @@ class ChainMonitor: self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) - self.watcher_queue = None - self.responder_queue = None - self.watcher_asleep = True - self.responder_asleep = True - - def attach_watcher(self, queue, asleep): - """ - Attaches a :obj:`Watcher ` to the :class:`ChainMonitor`. The ``Watcher`` and the - ``ChainMonitor`` are connected via the ``watcher_queue`` and the ``watcher_asleep`` flag. - - Args: - queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Watcher``. - asleep( :obj:`bool`): whether the ``Watcher`` is initially awake or asleep. It is changed on the fly from - the ``Watcher`` when the state changes. - """ - - self.watcher_queue = queue - self.watcher_asleep = asleep - - def attach_responder(self, queue, asleep): - """ - Attaches a :obj:`Responder ` to the :class:`ChainMonitor`. The ``Responder`` and the - ``ChainMonitor`` are connected via the ``responder_queue`` and the ``responder_asleep`` flag. - - Args: - queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Responder``. - asleep( :obj:`bool`): whether the ``Responder`` is initially awake or asleep. It is changed on the fly from - the ``Responder`` when the state changes. - """ - - self.responder_queue = queue - self.responder_asleep = asleep + self.watcher_queue = watcher_queue + self.responder_queue = responder_queue def notify_subscribers(self, block_hash): """ - Notifies the subscribers (``Watcher`` and ``Responder``) about a new block provided they are awake. It does so - by putting the hash in the corresponding queue(s). + Notifies the subscribers (``Watcher`` and ``Responder``) about a new block. It does so by putting the hash in + the corresponding queue(s). Args: - block_hash (:obj:`str`): the new block hash to be sent to the awake subscribers. + block_hash (:obj:`str`): the new block hash to be sent to the subscribers. + block_hash (:obj:`str`): the new block hash to be sent to the subscribers. """ - if not self.watcher_asleep: - self.watcher_queue.put(block_hash) - - if not self.responder_asleep: - self.responder_queue.put(block_hash) + self.watcher_queue.put(block_hash) + self.responder_queue.put(block_hash) def update_state(self, block_hash, max_block_window_size=BLOCK_WINDOW_SIZE): """ diff --git a/pisa/responder.py b/pisa/responder.py index 6f18d19..4ae73ab 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -108,11 +108,6 @@ class Responder: the decrypted ``penalty_txs`` handed by the :obj:`Watcher ` and ensuring the they make it to the blockchain. - The :class:`Responder` can be in two states: - - - Asleep (``self.asleep = True)`` when there are no trackers to take care of (``self.trackers`` is empty). - - Awake (``self.asleep = False)`` when there are trackers to take care of (actively monitoring the blockchain). - Args: db_manager (:obj:`DBManager `): a ``DBManager`` instance to interact with the database. @@ -126,41 +121,29 @@ class Responder: unconfirmed_txs (:obj:`list`): A list that keeps track of all unconfirmed ``penalty_txs``. missed_confirmations (:obj:`dict`): A dictionary that keeps count of how many confirmations each ``penalty_tx`` has missed. Used to trigger rebroadcast if needed. - asleep (:obj:`bool`): A flag that signals whether the :obj:`Responder` is asleep or awake. block_queue (:obj:`Queue`): A queue used by the :obj:`Responder` to receive block hashes from ``bitcoind``. It - is populated by the :obj:`ChainMonitor `. - chain_monitor (:obj:`ChainMonitor `): a ``ChainMonitor`` instance used to track - new blocks received by ``bitcoind``. + is populated by the :obj:`ChainMonitor `. db_manager (:obj:`DBManager `): A ``DBManager`` instance to interact with the database. """ - def __init__(self, db_manager, chain_monitor): + def __init__(self, db_manager): self.trackers = dict() self.tx_tracker_map = dict() self.unconfirmed_txs = [] self.missed_confirmations = dict() - self.asleep = True self.block_queue = Queue() - self.chain_monitor = chain_monitor self.db_manager = db_manager self.carrier = Carrier() self.last_known_block = db_manager.load_last_block_hash_responder() def awake(self): - self.asleep = False - self.chain_monitor.responder_asleep = False - responder_thread = Thread(target=self.do_watch, daemon=True).start() + responder_thread = Thread(target=self.do_watch, daemon=True) + responder_thread.start() return responder_thread - def sleep(self): - self.asleep = True - self.chain_monitor.responder_asleep = True - - logger.info("No more pending trackers, going back to sleep") - @staticmethod def on_sync(block_hash): """ @@ -212,9 +195,6 @@ class Responder: into the blockchain. """ - if self.asleep: - logger.info("Waking up") - receipt = self.carrier.send_transaction(penalty_rawtx, penalty_txid) if receipt.delivered: @@ -239,8 +219,6 @@ class Responder: ``penalty_txid`` added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, all the data is stored in the database. - ``add_tracker`` awakes the :obj:`Responder` if it is asleep. - Args: uuid (:obj:`str`): a unique identifier for the appointment. locator (:obj:`str`): the appointment locator provided by the user (16-byte hex-encoded). @@ -278,9 +256,6 @@ class Responder: "New tracker added", dispute_txid=dispute_txid, penalty_txid=penalty_txid, appointment_end=appointment_end ) - if self.asleep: - self.awake() - def do_watch(self): """ Monitors the blockchain whilst there are pending trackers. @@ -293,20 +268,17 @@ class Responder: if self.last_known_block is None: self.last_known_block = BlockProcessor.get_best_block_hash() - while len(self.trackers) > 0: - # We get notified for every new received block + while True: block_hash = self.block_queue.get() block = BlockProcessor.get_block(block_hash) + logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash")) - if block is not None: - txs = block.get("tx") - - logger.info( - "New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"), txs=txs - ) + if len(self.trackers) > 0 and block is not None: + txids = block.get("tx") + logger.info("List of transactions", txids=txids) if self.last_known_block == block.get("previousblockhash"): - self.check_confirmations(txs) + self.check_confirmations(txids) height = block.get("height") completed_trackers = self.get_completed_trackers(height) @@ -328,16 +300,16 @@ class Responder: # ToDo: #24-properly-handle-reorgs self.handle_reorgs(block_hash) - # Register the last processed block for the responder - self.db_manager.store_last_block_hash_responder(block_hash) + # Clear the receipts issued in this block + self.carrier.issued_receipts = {} - self.last_known_block = block.get("hash") + if len(self.trackers) is 0: + logger.info("No more pending trackers") + # Register the last processed block for the responder + self.db_manager.store_last_block_hash_responder(block_hash) + self.last_known_block = block.get("hash") self.block_queue.task_done() - self.carrier.issued_receipts = {} - - # Go back to sleep if there are no more pending trackers - self.sleep() def check_confirmations(self, txs): """ diff --git a/pisa/watcher.py b/pisa/watcher.py index dc35aec..5b1860c 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -10,7 +10,6 @@ from common.logger import Logger from pisa import LOG_PREFIX from pisa.cleaner import Cleaner -from pisa.responder import Responder from pisa.block_processor import BlockProcessor logger = Logger(actor="Watcher", log_name_prefix=LOG_PREFIX) @@ -33,13 +32,10 @@ class Watcher: Args: db_manager (:obj:`DBManager `): a ``DBManager`` instance to interact with the database. - chain_monitor (:obj:`ChainMonitor `): a ``ChainMonitor`` instance used to track - new blocks received by ``bitcoind``. sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance). config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve ``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``. - responder (:obj:`Responder `): a ``Responder`` instance. If ``None`` is passed, a new - instance is created. Populated instances are useful when bootstrapping the system from backed-up data. + responder (:obj:`Responder `): a ``Responder`` instance. Attributes: @@ -48,11 +44,8 @@ class Watcher: It's populated trough ``add_appointment``. locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several appointments with the same ``locator``. - asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake. block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is populated by the :obj:`ChainMonitor `. - chain_monitor (:obj:`ChainMonitor `): a ``ChainMonitor`` instance used to track - new blocks received by ``bitcoind``. config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve ``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``. db_manager (:obj:`DBManager `): A db manager instance to interact with the database. @@ -63,41 +56,27 @@ class Watcher: """ - def __init__(self, db_manager, chain_monitor, sk_der, config, responder=None): + def __init__(self, db_manager, responder, sk_der, config): self.appointments = dict() self.locator_uuid_map = dict() - self.asleep = True self.block_queue = Queue() - self.chain_monitor = chain_monitor self.config = config self.db_manager = db_manager + self.responder = responder self.signing_key = Cryptographer.load_private_key_der(sk_der) - if not isinstance(responder, Responder): - self.responder = Responder(db_manager, chain_monitor) - def awake(self): - self.asleep = False - self.chain_monitor.watcher_asleep = False - watcher_thread = Thread(target=self.do_watch, daemon=True).start() - - logger.info("Waking up") + watcher_thread = Thread(target=self.do_watch, daemon=True) + watcher_thread.start() return watcher_thread - def sleep(self): - self.asleep = True - self.chain_monitor.watcher_asleep = True - - logger.info("No more pending appointments, going back to sleep") - def add_appointment(self, appointment): """ Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached. - ``add_appointment`` is the entry point of the Watcher. Upon receiving a new appointment, if the :obj:`Watcher` - is asleep, it will be awaken and start monitoring the blockchain (``do_watch``) until ``appointments`` is empty. - It will go back to sleep once there are no more pending appointments. + ``add_appointment`` is the entry point of the Watcher. Upon receiving a new appointment it will start monitoring + the blockchain (``do_watch``) until ``appointments`` is empty. Once a breach is seen on the blockchain, the :obj:`Watcher` will decrypt the corresponding :obj:`EncryptedBlob ` and pass the information to the @@ -132,9 +111,6 @@ class Watcher: else: self.locator_uuid_map[appointment.locator] = [uuid] - if self.asleep: - self.awake() - self.db_manager.store_watcher_appointment(uuid, appointment.to_json()) self.db_manager.create_append_locator_map(appointment.locator, uuid) @@ -159,15 +135,13 @@ class Watcher: :obj:`Responder ` upon detecting a breach. """ - while len(self.appointments) > 0: + while True: block_hash = self.block_queue.get() - logger.info("New block received", block_hash=block_hash) - block = BlockProcessor.get_block(block_hash) + logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash")) - if block is not None: + if len(self.appointments) > 0 and block is not None: txids = block.get("tx") - logger.info("List of transactions", txids=txids) expired_appointments = [ @@ -203,7 +177,7 @@ class Watcher: block_hash, ) - # FIXME: This is only necessary because of the triggered appointment approach. Fix if it changes. + # FIXME: Only necessary because of the triggered appointment approach. Fix if it changes. if receipt.delivered: Cleaner.delete_appointment_from_memory(uuid, self.appointments, self.locator_uuid_map) @@ -219,14 +193,13 @@ class Watcher: appointments_to_delete, self.appointments, self.locator_uuid_map, self.db_manager ) - # Register the last processed block for the watcher - self.db_manager.store_last_block_hash_watcher(block_hash) + if len(self.appointments) is 0: + logger.info("No more pending appointments") + # Register the last processed block for the watcher + self.db_manager.store_last_block_hash_watcher(block_hash) self.block_queue.task_done() - # Go back to sleep if there are no more appointments - self.sleep() - def get_breaches(self, txids): """ Gets a list of channel breaches given the list of transaction ids. From a4f7548804a2af7138d97c0bdbbd59a668c09971 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 10 Feb 2020 16:21:05 +0100 Subject: [PATCH 2/3] Removes sleep flag and reorders code for redability --- pisa/pisad.py | 68 +++++++++++++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/pisa/pisad.py b/pisa/pisad.py index 0d361f3..e7d771e 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -8,6 +8,7 @@ from pisa import config, LOG_PREFIX from pisa.api import API from pisa.watcher import Watcher from pisa.builder import Builder +from pisa.responder import Responder from pisa.db_manager import DBManager from pisa.chain_monitor import ChainMonitor from pisa.block_processor import BlockProcessor @@ -43,33 +44,50 @@ def main(): else: try: + with open(config.get("PISA_SECRET_KEY"), "rb") as key_file: + secret_key_der = key_file.read() + + watcher = Watcher(db_manager, Responder(db_manager), secret_key_der, config) + # Create the chain monitor and start monitoring the chain - chain_monitor = ChainMonitor() - chain_monitor.monitor_chain() + chain_monitor = ChainMonitor(watcher.block_queue, watcher.responder.block_queue) watcher_appointments_data = db_manager.load_watcher_appointments() responder_trackers_data = db_manager.load_responder_trackers() - with open(config.get("PISA_SECRET_KEY"), "rb") as key_file: - secret_key_der = key_file.read() - - watcher = Watcher(db_manager, chain_monitor, secret_key_der, config) - chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) - chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) - if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: logger.info("Fresh bootstrap") + watcher.awake() + watcher.responder.awake() + else: logger.info("Bootstrapping from backed up data") - block_processor = BlockProcessor() + + # Update the Watcher backed up data if found. + if len(watcher_appointments_data) != 0: + watcher.appointments, watcher.locator_uuid_map = Builder.build_appointments( + watcher_appointments_data + ) + + # Update the Responder with backed up data if found. + if len(responder_trackers_data) != 0: + watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers( + responder_trackers_data + ) + + # Awaking components so the states can be updated. + watcher.awake() + watcher.responder.awake() last_block_watcher = db_manager.load_last_block_hash_watcher() last_block_responder = db_manager.load_last_block_hash_responder() + # Populate the block queues with data if they've missed some while offline. If the blocks of both match + # we don't perform the search twice. + block_processor = BlockProcessor() + # FIXME: 32-reorgs-offline dropped txs are not used at this point. - # Get the blocks missed by both the Watcher and the Responder. If the blocks of both match we don't - # perform the search twice. last_common_ancestor_watcher, dropped_txs_watcher = block_processor.find_last_common_ancestor( last_block_watcher ) @@ -85,40 +103,22 @@ def main(): ) missed_blocks_responder = block_processor.get_missed_blocks(last_common_ancestor_responder) - # Build and update the Watcher. - if len(watcher_appointments_data) != 0: - watcher.appointments, watcher.locator_uuid_map = Builder.build_appointments( - watcher_appointments_data - ) - - # Build Responder with backed up data if found - if len(responder_trackers_data) != 0: - watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers( - responder_trackers_data - ) - # If only one of the instances needs to be updated, it can be done separately. if len(missed_blocks_watcher) == 0 and len(missed_blocks_responder) != 0: Builder.populate_block_queue(watcher.responder.block_queue, missed_blocks_responder) - watcher.responder.awake() watcher.responder.block_queue.join() elif len(missed_blocks_responder) == 0 and len(missed_blocks_watcher) != 0: Builder.populate_block_queue(watcher.block_queue, missed_blocks_watcher) - watcher.awake() watcher.block_queue.join() - # Otherwise the need to be updated at the same time, block by block + # Otherwise they need to be updated at the same time, block by block elif len(missed_blocks_responder) != 0 and len(missed_blocks_watcher) != 0: Builder.update_states(watcher, missed_blocks_watcher, missed_blocks_responder) - # Awake the Watcher/Responder if they ended up with pending work - if watcher.appointments and watcher.asleep: - watcher.awake() - if watcher.responder.trackers and watcher.responder.asleep: - watcher.responder.awake() - - # Fire the API + # Fire the API and the ChainMonitor + # FIXME: 92-block-data-during-bootstrap-db + chain_monitor.monitor_chain() API(watcher, config=config).start() except Exception as e: logger.error("An error occurred: {}. Shutting down".format(e)) From 6913d1cd18102c1f930eae353c1ce8733185baaa Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 10 Feb 2020 16:21:31 +0100 Subject: [PATCH 3/3] Update tests to remove the asleep flags --- test/pisa/unit/conftest.py | 13 +--- test/pisa/unit/test_api.py | 11 ++- test/pisa/unit/test_builder.py | 109 +++------------------------ test/pisa/unit/test_chain_monitor.py | 98 ++++-------------------- test/pisa/unit/test_responder.py | 68 +++++++---------- test/pisa/unit/test_watcher.py | 36 +++++---- 6 files changed, 85 insertions(+), 250 deletions(-) diff --git a/test/pisa/unit/conftest.py b/test/pisa/unit/conftest.py index 0995539..2ffcb85 100644 --- a/test/pisa/unit/conftest.py +++ b/test/pisa/unit/conftest.py @@ -15,7 +15,6 @@ from apps.cli.blob import Blob from pisa.responder import TransactionTracker from pisa.tools import bitcoin_cli from pisa.db_manager import DBManager -from pisa.chain_monitor import ChainMonitor from common.appointment import Appointment from common.tools import compute_locator @@ -46,23 +45,13 @@ def prng_seed(): def db_manager(): manager = DBManager("test_db") # Add last know block for the Responder in the db + yield manager manager.db.close() rmtree("test_db") -@pytest.fixture(scope="module") -def chain_monitor(): - chain_monitor = ChainMonitor() - chain_monitor.monitor_chain() - - yield chain_monitor - - chain_monitor.terminate = True - generate_block() - - def generate_keypair(): client_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) client_pk = client_sk.public_key() diff --git a/test/pisa/unit/test_api.py b/test/pisa/unit/test_api.py index f94ab40..6561569 100644 --- a/test/pisa/unit/test_api.py +++ b/test/pisa/unit/test_api.py @@ -7,8 +7,10 @@ from cryptography.hazmat.primitives import serialization from pisa.api import API from pisa.watcher import Watcher +from pisa.responder import Responder from pisa.tools import bitcoin_cli from pisa import HOST, PORT +from pisa.chain_monitor import ChainMonitor from test.pisa.unit.conftest import ( generate_block, @@ -32,7 +34,7 @@ config = get_config() @pytest.fixture(scope="module") -def run_api(db_manager, chain_monitor): +def run_api(db_manager): sk, pk = generate_keypair() sk_der = sk.private_bytes( encoding=serialization.Encoding.DER, @@ -40,9 +42,10 @@ def run_api(db_manager, chain_monitor): encryption_algorithm=serialization.NoEncryption(), ) - watcher = Watcher(db_manager, chain_monitor, sk_der, get_config()) - chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) - chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) + watcher = Watcher(db_manager, Responder(db_manager), sk_der, get_config()) + chain_monitor = ChainMonitor(watcher.block_queue, watcher.responder.block_queue) + watcher.awake() + chain_monitor.monitor_chain() api_thread = Thread(target=API(watcher, config).start) api_thread.daemon = True diff --git a/test/pisa/unit/test_builder.py b/test/pisa/unit/test_builder.py index 8fe09af..c0a2ace 100644 --- a/test/pisa/unit/test_builder.py +++ b/test/pisa/unit/test_builder.py @@ -4,6 +4,7 @@ from queue import Queue from pisa.builder import Builder from pisa.watcher import Watcher +from pisa.responder import Responder from test.pisa.unit.conftest import ( get_random_value_hex, generate_dummy_appointment, @@ -89,7 +90,7 @@ def test_populate_block_queue(): def test_update_states_empty_list(db_manager): - w = Watcher(db_manager=db_manager, chain_monitor=None, sk_der=None, config=None) + w = Watcher(db_manager=db_manager, responder=Responder(db_manager), sk_der=None, config=None) missed_blocks_watcher = [] missed_blocks_responder = [get_random_value_hex(32)] @@ -102,121 +103,35 @@ def test_update_states_empty_list(db_manager): Builder.update_states(w, missed_blocks_responder, missed_blocks_watcher) -def test_update_states_different_sizes(run_bitcoind, db_manager, chain_monitor): - w = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config()) - chain_monitor.attach_watcher(w.responder, True) - chain_monitor.attach_responder(w.responder, True) - - # For the states to be updated data needs to be present in the actors (either appointments or trackers). - # Let's start from the Watcher. We add one appointment and mine some blocks that both are gonna miss. - w.appointments[uuid4().hex] = {"locator": get_random_value_hex(16), "end_time": 200} +def test_update_states_responder_misses_more(run_bitcoind, db_manager): + w = Watcher(db_manager=db_manager, responder=Responder(db_manager), sk_der=None, config=get_config()) blocks = [] for _ in range(5): generate_block() blocks.append(bitcoin_cli().getbestblockhash()) - # Updating the states should bring both to the same last known block. The Watcher's is stored in the db since it has - # gone over do_watch, whereas the Responders in only updated by update state. + # Updating the states should bring both to the same last known block. + w.awake() + w.responder.awake() Builder.update_states(w, blocks, blocks[1:]) assert db_manager.load_last_block_hash_watcher() == blocks[-1] assert w.responder.last_known_block == blocks[-1] - # If both have work, both last known blocks are updated - w.sleep() - w.responder.sleep() - w.responder.trackers[uuid4().hex] = { - "penalty_txid": get_random_value_hex(32), - "locator": get_random_value_hex(16), - "appointment_end": 200, - } +def test_update_states_watcher_misses_more(run_bitcoind, db_manager): + # Same as before, but data is now in the Responder + w = Watcher(db_manager=db_manager, responder=Responder(db_manager), sk_der=None, config=get_config()) blocks = [] for _ in range(5): generate_block() blocks.append(bitcoin_cli().getbestblockhash()) + w.awake() + w.responder.awake() Builder.update_states(w, blocks[1:], blocks) - assert db_manager.load_last_block_hash_watcher() == blocks[-1] - assert db_manager.load_last_block_hash_responder() == blocks[-1] - - # Let's try the opposite of the first test (Responder with data, Watcher without) - w.sleep() - w.responder.sleep() - - w.appointments = {} - last_block_prev = blocks[-1] - - blocks = [] - for _ in range(5): - generate_block() - blocks.append(bitcoin_cli().getbestblockhash()) - - # The Responder should have been brought up to date via do_watch, whereas the Watcher's last known block hash't - # change. The Watcher does not keep track of reorgs, so if he has no work to do he does not even update the last - # known block. - Builder.update_states(w, blocks[1:], blocks) - assert db_manager.load_last_block_hash_watcher() == last_block_prev - assert db_manager.load_last_block_hash_responder() == blocks[-1] - - -def test_update_states_same_sizes(db_manager, chain_monitor): - # The exact same behaviour of the last test is expected here, since different sizes are even using - # populate_block_queue and then run with the same list size. - w = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config()) - chain_monitor.attach_watcher(w.responder, True) - chain_monitor.attach_responder(w.responder, True) - - # For the states to be updated data needs to be present in the actors (either appointments or trackers). - # Let's start from the Watcher. We add one appointment and mine some blocks that both are gonna miss. - w.appointments[uuid4().hex] = {"locator": get_random_value_hex(16), "end_time": 200} - - blocks = [] - for _ in range(5): - generate_block() - blocks.append(bitcoin_cli().getbestblockhash()) - - Builder.update_states(w, blocks, blocks) - assert db_manager.load_last_block_hash_watcher() == blocks[-1] - assert w.responder.last_known_block == blocks[-1] - - # If both have work, both last known blocks are updated - w.sleep() - w.responder.sleep() - - w.responder.trackers[uuid4().hex] = { - "penalty_txid": get_random_value_hex(32), - "locator": get_random_value_hex(16), - "appointment_end": 200, - } - - blocks = [] - for _ in range(5): - generate_block() - blocks.append(bitcoin_cli().getbestblockhash()) - - Builder.update_states(w, blocks, blocks) assert db_manager.load_last_block_hash_watcher() == blocks[-1] assert db_manager.load_last_block_hash_responder() == blocks[-1] - - # Let's try the opposite of the first test (Responder with data, Watcher without) - w.sleep() - w.responder.sleep() - - w.appointments = {} - last_block_prev = blocks[-1] - - blocks = [] - for _ in range(5): - generate_block() - blocks.append(bitcoin_cli().getbestblockhash()) - - # The Responder should have been brought up to date via do_watch, whereas the Watcher's last known block hash't - # change. The Watcher does not keep track of reorgs, so if he has no work to do he does not even update the last - # known block. - Builder.update_states(w, blocks, blocks) - assert db_manager.load_last_block_hash_watcher() == last_block_prev - assert db_manager.load_last_block_hash_responder() == blocks[-1] diff --git a/test/pisa/unit/test_chain_monitor.py b/test/pisa/unit/test_chain_monitor.py index 56c2b31..7098478 100644 --- a/test/pisa/unit/test_chain_monitor.py +++ b/test/pisa/unit/test_chain_monitor.py @@ -1,20 +1,19 @@ import zmq import time +from queue import Queue from threading import Thread, Event, Condition -from pisa.watcher import Watcher -from pisa.responder import Responder from pisa.block_processor import BlockProcessor from pisa.chain_monitor import ChainMonitor -from test.pisa.unit.conftest import get_random_value_hex, generate_block, get_config +from test.pisa.unit.conftest import get_random_value_hex, generate_block def test_init(run_bitcoind): # run_bitcoind is started here instead of later on to avoid race conditions while it initializes # Not much to test here, just sanity checks to make sure nothing goes south in the future - chain_monitor = ChainMonitor() + chain_monitor = ChainMonitor(Queue(), Queue()) assert chain_monitor.best_tip is None assert isinstance(chain_monitor.last_tips, list) and len(chain_monitor.last_tips) == 0 @@ -24,41 +23,12 @@ def test_init(run_bitcoind): assert isinstance(chain_monitor.zmqSubSocket, zmq.Socket) # The Queues and asleep flags are initialized when attaching the corresponding subscriber - assert chain_monitor.watcher_queue is None - assert chain_monitor.responder_queue is None - assert chain_monitor.watcher_asleep and chain_monitor.responder_asleep + assert isinstance(chain_monitor.watcher_queue, Queue) + assert isinstance(chain_monitor.responder_queue, Queue) -def test_attach_watcher(chain_monitor, db_manager): - watcher = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config()) - chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) - - # booleans are not passed as reference in Python, so the flags need to be set separately - assert watcher.asleep == chain_monitor.watcher_asleep - watcher.asleep = False - assert chain_monitor.watcher_asleep != watcher.asleep - - # Test that the Queue work - r_hash = get_random_value_hex(32) - chain_monitor.watcher_queue.put(r_hash) - assert watcher.block_queue.get() == r_hash - - -def test_attach_responder(chain_monitor, db_manager): - responder = Responder(db_manager=db_manager, chain_monitor=chain_monitor) - chain_monitor.attach_responder(responder.block_queue, responder.asleep) - - # Same kind of testing as with the attach watcher - assert responder.asleep == chain_monitor.watcher_asleep - responder.asleep = False - assert chain_monitor.watcher_asleep != responder.asleep - - r_hash = get_random_value_hex(32) - chain_monitor.responder_queue.put(r_hash) - assert responder.block_queue.get() == r_hash - - -def test_notify_subscribers(chain_monitor): +def test_notify_subscribers(): + chain_monitor = ChainMonitor(Queue(), Queue()) # Subscribers are only notified as long as they are awake new_block = get_random_value_hex(32) @@ -66,27 +36,17 @@ def test_notify_subscribers(chain_monitor): assert chain_monitor.watcher_queue.empty() assert chain_monitor.responder_queue.empty() - chain_monitor.watcher_asleep = True - chain_monitor.responder_asleep = True - chain_monitor.notify_subscribers(new_block) - - # And remain empty afterwards since both subscribers were asleep - assert chain_monitor.watcher_queue.empty() - assert chain_monitor.responder_queue.empty() - - # Let's flag them as awake and try again - chain_monitor.watcher_asleep = False - chain_monitor.responder_asleep = False chain_monitor.notify_subscribers(new_block) assert chain_monitor.watcher_queue.get() == new_block assert chain_monitor.responder_queue.get() == new_block -def test_update_state(chain_monitor): +def test_update_state(): # The state is updated after receiving a new block (and only if the block is not already known). # Let's start by setting a best_tip and a couple of old tips new_block_hash = get_random_value_hex(32) + chain_monitor = ChainMonitor(Queue(), Queue()) chain_monitor.best_tip = new_block_hash chain_monitor.last_tips = [get_random_value_hex(32) for _ in range(5)] @@ -105,12 +65,10 @@ def test_update_state(chain_monitor): def test_monitor_chain_polling(db_manager): # Try polling with the Watcher - chain_monitor = ChainMonitor() + wq = Queue() + chain_monitor = ChainMonitor(wq, Queue()) chain_monitor.best_tip = BlockProcessor.get_best_block_hash() - watcher = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config()) - chain_monitor.attach_watcher(watcher.block_queue, asleep=False) - # monitor_chain_polling runs until terminate if set polling_thread = Thread(target=chain_monitor.monitor_chain_polling, kwargs={"polling_delta": 0.1}, daemon=True) polling_thread.start() @@ -131,13 +89,10 @@ def test_monitor_chain_polling(db_manager): def test_monitor_chain_zmq(db_manager): - # Try zmq with the Responder - chain_monitor = ChainMonitor() + rq = Queue() + chain_monitor = ChainMonitor(Queue(), rq) chain_monitor.best_tip = BlockProcessor.get_best_block_hash() - responder = Responder(db_manager=db_manager, chain_monitor=chain_monitor) - chain_monitor.attach_responder(responder.block_queue, asleep=False) - zmq_thread = Thread(target=chain_monitor.monitor_chain_zmq, daemon=True) zmq_thread.start() @@ -150,28 +105,10 @@ def test_monitor_chain_zmq(db_manager): chain_monitor.responder_queue.get() assert chain_monitor.responder_queue.empty() - # If we flag it to sleep no notification is sent - chain_monitor.responder_asleep = True - - for _ in range(3): - generate_block() - assert chain_monitor.responder_queue.empty() - - chain_monitor.terminate = True - # The zmq thread needs a block generation to release from the recv method. - generate_block() - - zmq_thread.join() - def test_monitor_chain(db_manager): # Not much to test here, this should launch two threads (one per monitor approach) and finish on terminate - chain_monitor = ChainMonitor() - - watcher = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config()) - responder = Responder(db_manager=db_manager, chain_monitor=chain_monitor) - chain_monitor.attach_responder(responder.block_queue, asleep=False) - chain_monitor.attach_watcher(watcher.block_queue, asleep=False) + chain_monitor = ChainMonitor(Queue(), Queue()) chain_monitor.best_tip = None chain_monitor.monitor_chain() @@ -196,12 +133,7 @@ def test_monitor_chain(db_manager): def test_monitor_chain_single_update(db_manager): # This test tests that if both threads try to add the same block to the queue, only the first one will make it - chain_monitor = ChainMonitor() - - watcher = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config()) - responder = Responder(db_manager=db_manager, chain_monitor=chain_monitor) - chain_monitor.attach_responder(responder.block_queue, asleep=False) - chain_monitor.attach_watcher(watcher.block_queue, asleep=False) + chain_monitor = ChainMonitor(Queue(), Queue()) chain_monitor.best_tip = None diff --git a/test/pisa/unit/test_responder.py b/test/pisa/unit/test_responder.py index a3a9e75..41af14f 100644 --- a/test/pisa/unit/test_responder.py +++ b/test/pisa/unit/test_responder.py @@ -1,6 +1,7 @@ import json import pytest import random +from queue import Queue from uuid import uuid4 from shutil import rmtree from copy import deepcopy @@ -18,17 +19,19 @@ from test.pisa.unit.conftest import generate_block, generate_blocks, get_random_ @pytest.fixture(scope="module") -def responder(db_manager, chain_monitor): - responder = Responder(db_manager, chain_monitor) - chain_monitor.attach_responder(responder.block_queue, responder.asleep) +def responder(db_manager): + responder = Responder(db_manager) + chain_monitor = ChainMonitor(Queue(), responder.block_queue) + chain_monitor.monitor_chain() return responder -@pytest.fixture() +@pytest.fixture(scope="session") def temp_db_manager(): db_name = get_random_value_hex(8) db_manager = DBManager(db_name) + yield db_manager db_manager.db.close() @@ -144,19 +147,17 @@ def test_tracker_from_dict_invalid_data(): assert True -def test_init_responder(responder): +def test_init_responder(temp_db_manager): + responder = Responder(temp_db_manager) assert isinstance(responder.trackers, dict) and len(responder.trackers) == 0 assert isinstance(responder.tx_tracker_map, dict) and len(responder.tx_tracker_map) == 0 assert isinstance(responder.unconfirmed_txs, list) and len(responder.unconfirmed_txs) == 0 assert isinstance(responder.missed_confirmations, dict) and len(responder.missed_confirmations) == 0 - assert isinstance(responder.chain_monitor, ChainMonitor) assert responder.block_queue.empty() - assert responder.asleep is True -def test_handle_breach(db_manager, chain_monitor): - responder = Responder(db_manager, chain_monitor) - chain_monitor.attach_responder(responder.block_queue, responder.asleep) +def test_handle_breach(db_manager): + responder = Responder(db_manager) uuid = uuid4().hex tracker = create_dummy_tracker() @@ -174,20 +175,11 @@ def test_handle_breach(db_manager, chain_monitor): assert receipt.delivered is True - # The responder automatically fires add_tracker on adding a tracker if it is asleep. We need to stop the processes - # now. To do so we delete all the trackers, and generate a new block. - responder.trackers = dict() - generate_block() - -def test_add_bad_response(responder): +def test_handle_breach_bad_response(responder): uuid = uuid4().hex tracker = create_dummy_tracker() - # Now that the asleep / awake functionality has been tested we can avoid manually killing the responder by setting - # to awake. That will prevent the chain_monitor thread to be launched again. - responder.asleep = False - # A txid instead of a rawtx should be enough for unit tests using the bitcoind mock, better tests are needed though. tracker.penalty_rawtx = tracker.penalty_txid @@ -206,8 +198,6 @@ def test_add_bad_response(responder): def test_add_tracker(responder): - # Responder is asleep - for _ in range(20): uuid = uuid4().hex confirmations = 0 @@ -236,8 +226,6 @@ def test_add_tracker(responder): def test_add_tracker_same_penalty_txid(responder): - # Responder is asleep - confirmations = 0 locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data(random_txid=True) uuid_1 = uuid4().hex @@ -262,8 +250,6 @@ def test_add_tracker_same_penalty_txid(responder): def test_add_tracker_already_confirmed(responder): - # Responder is asleep - for i in range(20): uuid = uuid4().hex confirmations = i + 1 @@ -276,10 +262,11 @@ def test_add_tracker_already_confirmed(responder): assert penalty_txid not in responder.unconfirmed_txs -def test_do_watch(temp_db_manager, chain_monitor): +def test_do_watch(temp_db_manager): # Create a fresh responder to simplify the test - responder = Responder(temp_db_manager, chain_monitor) - chain_monitor.attach_responder(responder.block_queue, False) + responder = Responder(temp_db_manager) + chain_monitor = ChainMonitor(Queue(), responder.block_queue) + chain_monitor.monitor_chain() trackers = [create_dummy_tracker(penalty_rawtx=create_dummy_transaction().hex()) for _ in range(20)] @@ -332,12 +319,12 @@ def test_do_watch(temp_db_manager, chain_monitor): generate_blocks(6) assert len(responder.tx_tracker_map) == 0 - assert responder.asleep is True -def test_check_confirmations(temp_db_manager, chain_monitor): - responder = Responder(temp_db_manager, chain_monitor) - chain_monitor.attach_responder(responder.block_queue, responder.asleep) +def test_check_confirmations(db_manager): + responder = Responder(db_manager) + chain_monitor = ChainMonitor(Queue(), responder.block_queue) + chain_monitor.monitor_chain() # check_confirmations checks, given a list of transaction for a block, what of the known penalty transaction have # been confirmed. To test this we need to create a list of transactions and the state of the responder @@ -391,11 +378,12 @@ def test_get_txs_to_rebroadcast(responder): assert txs_to_rebroadcast == list(txs_missing_too_many_conf.keys()) -def test_get_completed_trackers(db_manager, chain_monitor): +def test_get_completed_trackers(db_manager): initial_height = bitcoin_cli().getblockcount() - responder = Responder(db_manager, chain_monitor) - chain_monitor.attach_responder(responder.block_queue, responder.asleep) + responder = Responder(db_manager) + chain_monitor = ChainMonitor(Queue(), responder.block_queue) + chain_monitor.monitor_chain() # A complete tracker is a tracker that has reached the appointment end with enough confs (> MIN_CONFIRMATIONS) # We'll create three type of transactions: end reached + enough conf, end reached + no enough conf, end not reached @@ -450,10 +438,10 @@ def test_get_completed_trackers(db_manager, chain_monitor): assert set(completed_trackers_ids) == set(ended_trackers_keys) -def test_rebroadcast(db_manager, chain_monitor): - responder = Responder(db_manager, chain_monitor) - responder.asleep = False - chain_monitor.attach_responder(responder.block_queue, responder.asleep) +def test_rebroadcast(db_manager): + responder = Responder(db_manager) + chain_monitor = ChainMonitor(Queue(), responder.block_queue) + chain_monitor.monitor_chain() txs_to_rebroadcast = [] diff --git a/test/pisa/unit/test_watcher.py b/test/pisa/unit/test_watcher.py index 03c6f45..5a85bec 100644 --- a/test/pisa/unit/test_watcher.py +++ b/test/pisa/unit/test_watcher.py @@ -1,5 +1,6 @@ import pytest from uuid import uuid4 +from shutil import rmtree from threading import Thread from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives import serialization @@ -8,6 +9,7 @@ from pisa.watcher import Watcher from pisa.responder import Responder from pisa.tools import bitcoin_cli from pisa.chain_monitor import ChainMonitor +from pisa.db_manager import DBManager from test.pisa.unit.conftest import ( generate_blocks, @@ -36,11 +38,22 @@ sk_der = signing_key.private_bytes( ) +@pytest.fixture(scope="session") +def temp_db_manager(): + db_name = get_random_value_hex(8) + db_manager = DBManager(db_name) + + yield db_manager + + db_manager.db.close() + rmtree(db_name) + + @pytest.fixture(scope="module") -def watcher(db_manager, chain_monitor): - watcher = Watcher(db_manager, chain_monitor, sk_der, get_config()) - chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) - chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) +def watcher(db_manager): + watcher = Watcher(db_manager, Responder(db_manager), sk_der, get_config()) + chain_monitor = ChainMonitor(watcher.block_queue, watcher.responder.block_queue) + chain_monitor.monitor_chain() return watcher @@ -76,19 +89,13 @@ def create_appointments(n): def test_init(run_bitcoind, watcher): assert isinstance(watcher.appointments, dict) and len(watcher.appointments) == 0 assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0 - assert watcher.asleep is True assert watcher.block_queue.empty() - assert isinstance(watcher.chain_monitor, ChainMonitor) assert isinstance(watcher.config, dict) assert isinstance(watcher.signing_key, ec.EllipticCurvePrivateKey) assert isinstance(watcher.responder, Responder) def test_add_appointment(watcher): - # The watcher automatically fires do_watch and do_subscribe on adding an appointment if it is asleep (initial state) - # Avoid this by setting the state to awake. - watcher.asleep = False - # We should be able to add appointments up to the limit for _ in range(10): appointment, dispute_tx = generate_dummy_appointment( @@ -128,10 +135,11 @@ def test_add_too_many_appointments(watcher): assert sig is None -def test_do_watch(watcher): +def test_do_watch(watcher, temp_db_manager): + watcher.db_manager = temp_db_manager + # We will wipe all the previous data and add 5 appointments appointments, locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS) - watcher.chain_monitor.watcher_asleep = False # Set the data into the Watcher and in the db watcher.locator_uuid_map = locator_uuid_map @@ -142,7 +150,8 @@ def test_do_watch(watcher): watcher.db_manager.store_watcher_appointment(uuid, appointment.to_json()) watcher.db_manager.create_append_locator_map(appointment.locator, uuid) - Thread(target=watcher.do_watch, daemon=True).start() + do_watch_thread = Thread(target=watcher.do_watch, daemon=True) + do_watch_thread.start() # Broadcast the first two for dispute_tx in dispute_txs[:2]: @@ -158,7 +167,6 @@ def test_do_watch(watcher): generate_blocks(EXPIRY_DELTA + START_TIME_OFFSET + END_TIME_OFFSET) assert len(watcher.appointments) == 0 - assert watcher.asleep is True def test_get_breaches(watcher, txids, locator_uuid_map):