diff --git a/pisa/builder.py b/pisa/builder.py index 39298dd..e9728cc 100644 --- a/pisa/builder.py +++ b/pisa/builder.py @@ -120,7 +120,6 @@ class Builder: set(missed_blocks_responder).difference(missed_blocks_watcher), key=missed_blocks_responder.index ) Builder.populate_block_queue(watcher.responder.block_queue, block_diff) - watcher.responder.awake() watcher.responder.block_queue.join() elif len(missed_blocks_watcher) > len(missed_blocks_responder): @@ -128,27 +127,12 @@ class Builder: set(missed_blocks_watcher).difference(missed_blocks_responder), key=missed_blocks_watcher.index ) Builder.populate_block_queue(watcher.block_queue, block_diff) - watcher.awake() watcher.block_queue.join() - # Awake the actors if they are asleep and have pending work. No new inputs are provided, so if the Watcher is - # asleep it will remain asleep. However, the Responder may come and go to sleep since it will be awaken if - # appointments are passed trough from the Watcher. - if watcher.appointments and watcher.asleep: - watcher.awake() - - if watcher.responder.trackers and watcher.responder.asleep: - watcher.responder.awake() - + # Once they are at the same height, we update them one by one for block in missed_blocks_watcher: - if not watcher.asleep: - watcher.block_queue.put(block) - watcher.block_queue.join() + watcher.block_queue.put(block) + watcher.block_queue.join() - if not watcher.responder.asleep: - watcher.responder.block_queue.put(block) - watcher.responder.block_queue.join() - else: - # The Responder keeps track of last know block for reorgs, so it has to be updated even if there're no - # trackers - watcher.responder.last_known_block = block + watcher.responder.block_queue.put(block) + watcher.responder.block_queue.join() diff --git a/pisa/chain_monitor.py b/pisa/chain_monitor.py index 22ef377..0dad221 100644 --- a/pisa/chain_monitor.py +++ b/pisa/chain_monitor.py @@ -19,6 +19,10 @@ class ChainMonitor: The :class:`ChainMonitor` monitors the chain using two methods: ``zmq`` and ``polling``. Blocks are only notified once per queue and the notification is triggered by the method that detects the block faster. + Args: + watcher_queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Watcher``. + responder_queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Responder``. + Attributes: best_tip (:obj:`str`): a block hash representing the current best tip. last_tips (:obj:`list`): a list of last chain tips. Used as a sliding window to avoid notifying about old tips. @@ -30,11 +34,9 @@ class ChainMonitor: watcher_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Watcher `. responder_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Responder `. - watcher_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Watcher`` or not. - responder_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Responder`` or not. """ - def __init__(self): + def __init__(self, watcher_queue, responder_queue): self.best_tip = None self.last_tips = [] self.terminate = False @@ -48,53 +50,21 @@ class ChainMonitor: self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) - self.watcher_queue = None - self.responder_queue = None - self.watcher_asleep = True - self.responder_asleep = True - - def attach_watcher(self, queue, asleep): - """ - Attaches a :obj:`Watcher ` to the :class:`ChainMonitor`. The ``Watcher`` and the - ``ChainMonitor`` are connected via the ``watcher_queue`` and the ``watcher_asleep`` flag. - - Args: - queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Watcher``. - asleep( :obj:`bool`): whether the ``Watcher`` is initially awake or asleep. It is changed on the fly from - the ``Watcher`` when the state changes. - """ - - self.watcher_queue = queue - self.watcher_asleep = asleep - - def attach_responder(self, queue, asleep): - """ - Attaches a :obj:`Responder ` to the :class:`ChainMonitor`. The ``Responder`` and the - ``ChainMonitor`` are connected via the ``responder_queue`` and the ``responder_asleep`` flag. - - Args: - queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Responder``. - asleep( :obj:`bool`): whether the ``Responder`` is initially awake or asleep. It is changed on the fly from - the ``Responder`` when the state changes. - """ - - self.responder_queue = queue - self.responder_asleep = asleep + self.watcher_queue = watcher_queue + self.responder_queue = responder_queue def notify_subscribers(self, block_hash): """ - Notifies the subscribers (``Watcher`` and ``Responder``) about a new block provided they are awake. It does so - by putting the hash in the corresponding queue(s). + Notifies the subscribers (``Watcher`` and ``Responder``) about a new block. It does so by putting the hash in + the corresponding queue(s). Args: - block_hash (:obj:`str`): the new block hash to be sent to the awake subscribers. + block_hash (:obj:`str`): the new block hash to be sent to the subscribers. + block_hash (:obj:`str`): the new block hash to be sent to the subscribers. """ - if not self.watcher_asleep: - self.watcher_queue.put(block_hash) - - if not self.responder_asleep: - self.responder_queue.put(block_hash) + self.watcher_queue.put(block_hash) + self.responder_queue.put(block_hash) def update_state(self, block_hash, max_block_window_size=BLOCK_WINDOW_SIZE): """ diff --git a/pisa/pisad.py b/pisa/pisad.py index 0d361f3..e7d771e 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -8,6 +8,7 @@ from pisa import config, LOG_PREFIX from pisa.api import API from pisa.watcher import Watcher from pisa.builder import Builder +from pisa.responder import Responder from pisa.db_manager import DBManager from pisa.chain_monitor import ChainMonitor from pisa.block_processor import BlockProcessor @@ -43,33 +44,50 @@ def main(): else: try: + with open(config.get("PISA_SECRET_KEY"), "rb") as key_file: + secret_key_der = key_file.read() + + watcher = Watcher(db_manager, Responder(db_manager), secret_key_der, config) + # Create the chain monitor and start monitoring the chain - chain_monitor = ChainMonitor() - chain_monitor.monitor_chain() + chain_monitor = ChainMonitor(watcher.block_queue, watcher.responder.block_queue) watcher_appointments_data = db_manager.load_watcher_appointments() responder_trackers_data = db_manager.load_responder_trackers() - with open(config.get("PISA_SECRET_KEY"), "rb") as key_file: - secret_key_der = key_file.read() - - watcher = Watcher(db_manager, chain_monitor, secret_key_der, config) - chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) - chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) - if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: logger.info("Fresh bootstrap") + watcher.awake() + watcher.responder.awake() + else: logger.info("Bootstrapping from backed up data") - block_processor = BlockProcessor() + + # Update the Watcher backed up data if found. + if len(watcher_appointments_data) != 0: + watcher.appointments, watcher.locator_uuid_map = Builder.build_appointments( + watcher_appointments_data + ) + + # Update the Responder with backed up data if found. + if len(responder_trackers_data) != 0: + watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers( + responder_trackers_data + ) + + # Awaking components so the states can be updated. + watcher.awake() + watcher.responder.awake() last_block_watcher = db_manager.load_last_block_hash_watcher() last_block_responder = db_manager.load_last_block_hash_responder() + # Populate the block queues with data if they've missed some while offline. If the blocks of both match + # we don't perform the search twice. + block_processor = BlockProcessor() + # FIXME: 32-reorgs-offline dropped txs are not used at this point. - # Get the blocks missed by both the Watcher and the Responder. If the blocks of both match we don't - # perform the search twice. last_common_ancestor_watcher, dropped_txs_watcher = block_processor.find_last_common_ancestor( last_block_watcher ) @@ -85,40 +103,22 @@ def main(): ) missed_blocks_responder = block_processor.get_missed_blocks(last_common_ancestor_responder) - # Build and update the Watcher. - if len(watcher_appointments_data) != 0: - watcher.appointments, watcher.locator_uuid_map = Builder.build_appointments( - watcher_appointments_data - ) - - # Build Responder with backed up data if found - if len(responder_trackers_data) != 0: - watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers( - responder_trackers_data - ) - # If only one of the instances needs to be updated, it can be done separately. if len(missed_blocks_watcher) == 0 and len(missed_blocks_responder) != 0: Builder.populate_block_queue(watcher.responder.block_queue, missed_blocks_responder) - watcher.responder.awake() watcher.responder.block_queue.join() elif len(missed_blocks_responder) == 0 and len(missed_blocks_watcher) != 0: Builder.populate_block_queue(watcher.block_queue, missed_blocks_watcher) - watcher.awake() watcher.block_queue.join() - # Otherwise the need to be updated at the same time, block by block + # Otherwise they need to be updated at the same time, block by block elif len(missed_blocks_responder) != 0 and len(missed_blocks_watcher) != 0: Builder.update_states(watcher, missed_blocks_watcher, missed_blocks_responder) - # Awake the Watcher/Responder if they ended up with pending work - if watcher.appointments and watcher.asleep: - watcher.awake() - if watcher.responder.trackers and watcher.responder.asleep: - watcher.responder.awake() - - # Fire the API + # Fire the API and the ChainMonitor + # FIXME: 92-block-data-during-bootstrap-db + chain_monitor.monitor_chain() API(watcher, config=config).start() except Exception as e: logger.error("An error occurred: {}. Shutting down".format(e)) diff --git a/pisa/responder.py b/pisa/responder.py index 6f18d19..4ae73ab 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -108,11 +108,6 @@ class Responder: the decrypted ``penalty_txs`` handed by the :obj:`Watcher ` and ensuring the they make it to the blockchain. - The :class:`Responder` can be in two states: - - - Asleep (``self.asleep = True)`` when there are no trackers to take care of (``self.trackers`` is empty). - - Awake (``self.asleep = False)`` when there are trackers to take care of (actively monitoring the blockchain). - Args: db_manager (:obj:`DBManager `): a ``DBManager`` instance to interact with the database. @@ -126,41 +121,29 @@ class Responder: unconfirmed_txs (:obj:`list`): A list that keeps track of all unconfirmed ``penalty_txs``. missed_confirmations (:obj:`dict`): A dictionary that keeps count of how many confirmations each ``penalty_tx`` has missed. Used to trigger rebroadcast if needed. - asleep (:obj:`bool`): A flag that signals whether the :obj:`Responder` is asleep or awake. block_queue (:obj:`Queue`): A queue used by the :obj:`Responder` to receive block hashes from ``bitcoind``. It - is populated by the :obj:`ChainMonitor `. - chain_monitor (:obj:`ChainMonitor `): a ``ChainMonitor`` instance used to track - new blocks received by ``bitcoind``. + is populated by the :obj:`ChainMonitor `. db_manager (:obj:`DBManager `): A ``DBManager`` instance to interact with the database. """ - def __init__(self, db_manager, chain_monitor): + def __init__(self, db_manager): self.trackers = dict() self.tx_tracker_map = dict() self.unconfirmed_txs = [] self.missed_confirmations = dict() - self.asleep = True self.block_queue = Queue() - self.chain_monitor = chain_monitor self.db_manager = db_manager self.carrier = Carrier() self.last_known_block = db_manager.load_last_block_hash_responder() def awake(self): - self.asleep = False - self.chain_monitor.responder_asleep = False - responder_thread = Thread(target=self.do_watch, daemon=True).start() + responder_thread = Thread(target=self.do_watch, daemon=True) + responder_thread.start() return responder_thread - def sleep(self): - self.asleep = True - self.chain_monitor.responder_asleep = True - - logger.info("No more pending trackers, going back to sleep") - @staticmethod def on_sync(block_hash): """ @@ -212,9 +195,6 @@ class Responder: into the blockchain. """ - if self.asleep: - logger.info("Waking up") - receipt = self.carrier.send_transaction(penalty_rawtx, penalty_txid) if receipt.delivered: @@ -239,8 +219,6 @@ class Responder: ``penalty_txid`` added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, all the data is stored in the database. - ``add_tracker`` awakes the :obj:`Responder` if it is asleep. - Args: uuid (:obj:`str`): a unique identifier for the appointment. locator (:obj:`str`): the appointment locator provided by the user (16-byte hex-encoded). @@ -278,9 +256,6 @@ class Responder: "New tracker added", dispute_txid=dispute_txid, penalty_txid=penalty_txid, appointment_end=appointment_end ) - if self.asleep: - self.awake() - def do_watch(self): """ Monitors the blockchain whilst there are pending trackers. @@ -293,20 +268,17 @@ class Responder: if self.last_known_block is None: self.last_known_block = BlockProcessor.get_best_block_hash() - while len(self.trackers) > 0: - # We get notified for every new received block + while True: block_hash = self.block_queue.get() block = BlockProcessor.get_block(block_hash) + logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash")) - if block is not None: - txs = block.get("tx") - - logger.info( - "New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"), txs=txs - ) + if len(self.trackers) > 0 and block is not None: + txids = block.get("tx") + logger.info("List of transactions", txids=txids) if self.last_known_block == block.get("previousblockhash"): - self.check_confirmations(txs) + self.check_confirmations(txids) height = block.get("height") completed_trackers = self.get_completed_trackers(height) @@ -328,16 +300,16 @@ class Responder: # ToDo: #24-properly-handle-reorgs self.handle_reorgs(block_hash) - # Register the last processed block for the responder - self.db_manager.store_last_block_hash_responder(block_hash) + # Clear the receipts issued in this block + self.carrier.issued_receipts = {} - self.last_known_block = block.get("hash") + if len(self.trackers) is 0: + logger.info("No more pending trackers") + # Register the last processed block for the responder + self.db_manager.store_last_block_hash_responder(block_hash) + self.last_known_block = block.get("hash") self.block_queue.task_done() - self.carrier.issued_receipts = {} - - # Go back to sleep if there are no more pending trackers - self.sleep() def check_confirmations(self, txs): """ diff --git a/pisa/watcher.py b/pisa/watcher.py index dc35aec..5b1860c 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -10,7 +10,6 @@ from common.logger import Logger from pisa import LOG_PREFIX from pisa.cleaner import Cleaner -from pisa.responder import Responder from pisa.block_processor import BlockProcessor logger = Logger(actor="Watcher", log_name_prefix=LOG_PREFIX) @@ -33,13 +32,10 @@ class Watcher: Args: db_manager (:obj:`DBManager `): a ``DBManager`` instance to interact with the database. - chain_monitor (:obj:`ChainMonitor `): a ``ChainMonitor`` instance used to track - new blocks received by ``bitcoind``. sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance). config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve ``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``. - responder (:obj:`Responder `): a ``Responder`` instance. If ``None`` is passed, a new - instance is created. Populated instances are useful when bootstrapping the system from backed-up data. + responder (:obj:`Responder `): a ``Responder`` instance. Attributes: @@ -48,11 +44,8 @@ class Watcher: It's populated trough ``add_appointment``. locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several appointments with the same ``locator``. - asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake. block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is populated by the :obj:`ChainMonitor `. - chain_monitor (:obj:`ChainMonitor `): a ``ChainMonitor`` instance used to track - new blocks received by ``bitcoind``. config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve ``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``. db_manager (:obj:`DBManager `): A db manager instance to interact with the database. @@ -63,41 +56,27 @@ class Watcher: """ - def __init__(self, db_manager, chain_monitor, sk_der, config, responder=None): + def __init__(self, db_manager, responder, sk_der, config): self.appointments = dict() self.locator_uuid_map = dict() - self.asleep = True self.block_queue = Queue() - self.chain_monitor = chain_monitor self.config = config self.db_manager = db_manager + self.responder = responder self.signing_key = Cryptographer.load_private_key_der(sk_der) - if not isinstance(responder, Responder): - self.responder = Responder(db_manager, chain_monitor) - def awake(self): - self.asleep = False - self.chain_monitor.watcher_asleep = False - watcher_thread = Thread(target=self.do_watch, daemon=True).start() - - logger.info("Waking up") + watcher_thread = Thread(target=self.do_watch, daemon=True) + watcher_thread.start() return watcher_thread - def sleep(self): - self.asleep = True - self.chain_monitor.watcher_asleep = True - - logger.info("No more pending appointments, going back to sleep") - def add_appointment(self, appointment): """ Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached. - ``add_appointment`` is the entry point of the Watcher. Upon receiving a new appointment, if the :obj:`Watcher` - is asleep, it will be awaken and start monitoring the blockchain (``do_watch``) until ``appointments`` is empty. - It will go back to sleep once there are no more pending appointments. + ``add_appointment`` is the entry point of the Watcher. Upon receiving a new appointment it will start monitoring + the blockchain (``do_watch``) until ``appointments`` is empty. Once a breach is seen on the blockchain, the :obj:`Watcher` will decrypt the corresponding :obj:`EncryptedBlob ` and pass the information to the @@ -132,9 +111,6 @@ class Watcher: else: self.locator_uuid_map[appointment.locator] = [uuid] - if self.asleep: - self.awake() - self.db_manager.store_watcher_appointment(uuid, appointment.to_json()) self.db_manager.create_append_locator_map(appointment.locator, uuid) @@ -159,15 +135,13 @@ class Watcher: :obj:`Responder ` upon detecting a breach. """ - while len(self.appointments) > 0: + while True: block_hash = self.block_queue.get() - logger.info("New block received", block_hash=block_hash) - block = BlockProcessor.get_block(block_hash) + logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash")) - if block is not None: + if len(self.appointments) > 0 and block is not None: txids = block.get("tx") - logger.info("List of transactions", txids=txids) expired_appointments = [ @@ -203,7 +177,7 @@ class Watcher: block_hash, ) - # FIXME: This is only necessary because of the triggered appointment approach. Fix if it changes. + # FIXME: Only necessary because of the triggered appointment approach. Fix if it changes. if receipt.delivered: Cleaner.delete_appointment_from_memory(uuid, self.appointments, self.locator_uuid_map) @@ -219,14 +193,13 @@ class Watcher: appointments_to_delete, self.appointments, self.locator_uuid_map, self.db_manager ) - # Register the last processed block for the watcher - self.db_manager.store_last_block_hash_watcher(block_hash) + if len(self.appointments) is 0: + logger.info("No more pending appointments") + # Register the last processed block for the watcher + self.db_manager.store_last_block_hash_watcher(block_hash) self.block_queue.task_done() - # Go back to sleep if there are no more appointments - self.sleep() - def get_breaches(self, txids): """ Gets a list of channel breaches given the list of transaction ids. diff --git a/test/pisa/unit/conftest.py b/test/pisa/unit/conftest.py index 0995539..2ffcb85 100644 --- a/test/pisa/unit/conftest.py +++ b/test/pisa/unit/conftest.py @@ -15,7 +15,6 @@ from apps.cli.blob import Blob from pisa.responder import TransactionTracker from pisa.tools import bitcoin_cli from pisa.db_manager import DBManager -from pisa.chain_monitor import ChainMonitor from common.appointment import Appointment from common.tools import compute_locator @@ -46,23 +45,13 @@ def prng_seed(): def db_manager(): manager = DBManager("test_db") # Add last know block for the Responder in the db + yield manager manager.db.close() rmtree("test_db") -@pytest.fixture(scope="module") -def chain_monitor(): - chain_monitor = ChainMonitor() - chain_monitor.monitor_chain() - - yield chain_monitor - - chain_monitor.terminate = True - generate_block() - - def generate_keypair(): client_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) client_pk = client_sk.public_key() diff --git a/test/pisa/unit/test_api.py b/test/pisa/unit/test_api.py index f94ab40..6561569 100644 --- a/test/pisa/unit/test_api.py +++ b/test/pisa/unit/test_api.py @@ -7,8 +7,10 @@ from cryptography.hazmat.primitives import serialization from pisa.api import API from pisa.watcher import Watcher +from pisa.responder import Responder from pisa.tools import bitcoin_cli from pisa import HOST, PORT +from pisa.chain_monitor import ChainMonitor from test.pisa.unit.conftest import ( generate_block, @@ -32,7 +34,7 @@ config = get_config() @pytest.fixture(scope="module") -def run_api(db_manager, chain_monitor): +def run_api(db_manager): sk, pk = generate_keypair() sk_der = sk.private_bytes( encoding=serialization.Encoding.DER, @@ -40,9 +42,10 @@ def run_api(db_manager, chain_monitor): encryption_algorithm=serialization.NoEncryption(), ) - watcher = Watcher(db_manager, chain_monitor, sk_der, get_config()) - chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) - chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) + watcher = Watcher(db_manager, Responder(db_manager), sk_der, get_config()) + chain_monitor = ChainMonitor(watcher.block_queue, watcher.responder.block_queue) + watcher.awake() + chain_monitor.monitor_chain() api_thread = Thread(target=API(watcher, config).start) api_thread.daemon = True diff --git a/test/pisa/unit/test_builder.py b/test/pisa/unit/test_builder.py index 8fe09af..c0a2ace 100644 --- a/test/pisa/unit/test_builder.py +++ b/test/pisa/unit/test_builder.py @@ -4,6 +4,7 @@ from queue import Queue from pisa.builder import Builder from pisa.watcher import Watcher +from pisa.responder import Responder from test.pisa.unit.conftest import ( get_random_value_hex, generate_dummy_appointment, @@ -89,7 +90,7 @@ def test_populate_block_queue(): def test_update_states_empty_list(db_manager): - w = Watcher(db_manager=db_manager, chain_monitor=None, sk_der=None, config=None) + w = Watcher(db_manager=db_manager, responder=Responder(db_manager), sk_der=None, config=None) missed_blocks_watcher = [] missed_blocks_responder = [get_random_value_hex(32)] @@ -102,121 +103,35 @@ def test_update_states_empty_list(db_manager): Builder.update_states(w, missed_blocks_responder, missed_blocks_watcher) -def test_update_states_different_sizes(run_bitcoind, db_manager, chain_monitor): - w = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config()) - chain_monitor.attach_watcher(w.responder, True) - chain_monitor.attach_responder(w.responder, True) - - # For the states to be updated data needs to be present in the actors (either appointments or trackers). - # Let's start from the Watcher. We add one appointment and mine some blocks that both are gonna miss. - w.appointments[uuid4().hex] = {"locator": get_random_value_hex(16), "end_time": 200} +def test_update_states_responder_misses_more(run_bitcoind, db_manager): + w = Watcher(db_manager=db_manager, responder=Responder(db_manager), sk_der=None, config=get_config()) blocks = [] for _ in range(5): generate_block() blocks.append(bitcoin_cli().getbestblockhash()) - # Updating the states should bring both to the same last known block. The Watcher's is stored in the db since it has - # gone over do_watch, whereas the Responders in only updated by update state. + # Updating the states should bring both to the same last known block. + w.awake() + w.responder.awake() Builder.update_states(w, blocks, blocks[1:]) assert db_manager.load_last_block_hash_watcher() == blocks[-1] assert w.responder.last_known_block == blocks[-1] - # If both have work, both last known blocks are updated - w.sleep() - w.responder.sleep() - w.responder.trackers[uuid4().hex] = { - "penalty_txid": get_random_value_hex(32), - "locator": get_random_value_hex(16), - "appointment_end": 200, - } +def test_update_states_watcher_misses_more(run_bitcoind, db_manager): + # Same as before, but data is now in the Responder + w = Watcher(db_manager=db_manager, responder=Responder(db_manager), sk_der=None, config=get_config()) blocks = [] for _ in range(5): generate_block() blocks.append(bitcoin_cli().getbestblockhash()) + w.awake() + w.responder.awake() Builder.update_states(w, blocks[1:], blocks) - assert db_manager.load_last_block_hash_watcher() == blocks[-1] - assert db_manager.load_last_block_hash_responder() == blocks[-1] - - # Let's try the opposite of the first test (Responder with data, Watcher without) - w.sleep() - w.responder.sleep() - - w.appointments = {} - last_block_prev = blocks[-1] - - blocks = [] - for _ in range(5): - generate_block() - blocks.append(bitcoin_cli().getbestblockhash()) - - # The Responder should have been brought up to date via do_watch, whereas the Watcher's last known block hash't - # change. The Watcher does not keep track of reorgs, so if he has no work to do he does not even update the last - # known block. - Builder.update_states(w, blocks[1:], blocks) - assert db_manager.load_last_block_hash_watcher() == last_block_prev - assert db_manager.load_last_block_hash_responder() == blocks[-1] - - -def test_update_states_same_sizes(db_manager, chain_monitor): - # The exact same behaviour of the last test is expected here, since different sizes are even using - # populate_block_queue and then run with the same list size. - w = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config()) - chain_monitor.attach_watcher(w.responder, True) - chain_monitor.attach_responder(w.responder, True) - - # For the states to be updated data needs to be present in the actors (either appointments or trackers). - # Let's start from the Watcher. We add one appointment and mine some blocks that both are gonna miss. - w.appointments[uuid4().hex] = {"locator": get_random_value_hex(16), "end_time": 200} - - blocks = [] - for _ in range(5): - generate_block() - blocks.append(bitcoin_cli().getbestblockhash()) - - Builder.update_states(w, blocks, blocks) - assert db_manager.load_last_block_hash_watcher() == blocks[-1] - assert w.responder.last_known_block == blocks[-1] - - # If both have work, both last known blocks are updated - w.sleep() - w.responder.sleep() - - w.responder.trackers[uuid4().hex] = { - "penalty_txid": get_random_value_hex(32), - "locator": get_random_value_hex(16), - "appointment_end": 200, - } - - blocks = [] - for _ in range(5): - generate_block() - blocks.append(bitcoin_cli().getbestblockhash()) - - Builder.update_states(w, blocks, blocks) assert db_manager.load_last_block_hash_watcher() == blocks[-1] assert db_manager.load_last_block_hash_responder() == blocks[-1] - - # Let's try the opposite of the first test (Responder with data, Watcher without) - w.sleep() - w.responder.sleep() - - w.appointments = {} - last_block_prev = blocks[-1] - - blocks = [] - for _ in range(5): - generate_block() - blocks.append(bitcoin_cli().getbestblockhash()) - - # The Responder should have been brought up to date via do_watch, whereas the Watcher's last known block hash't - # change. The Watcher does not keep track of reorgs, so if he has no work to do he does not even update the last - # known block. - Builder.update_states(w, blocks, blocks) - assert db_manager.load_last_block_hash_watcher() == last_block_prev - assert db_manager.load_last_block_hash_responder() == blocks[-1] diff --git a/test/pisa/unit/test_chain_monitor.py b/test/pisa/unit/test_chain_monitor.py index 56c2b31..7098478 100644 --- a/test/pisa/unit/test_chain_monitor.py +++ b/test/pisa/unit/test_chain_monitor.py @@ -1,20 +1,19 @@ import zmq import time +from queue import Queue from threading import Thread, Event, Condition -from pisa.watcher import Watcher -from pisa.responder import Responder from pisa.block_processor import BlockProcessor from pisa.chain_monitor import ChainMonitor -from test.pisa.unit.conftest import get_random_value_hex, generate_block, get_config +from test.pisa.unit.conftest import get_random_value_hex, generate_block def test_init(run_bitcoind): # run_bitcoind is started here instead of later on to avoid race conditions while it initializes # Not much to test here, just sanity checks to make sure nothing goes south in the future - chain_monitor = ChainMonitor() + chain_monitor = ChainMonitor(Queue(), Queue()) assert chain_monitor.best_tip is None assert isinstance(chain_monitor.last_tips, list) and len(chain_monitor.last_tips) == 0 @@ -24,41 +23,12 @@ def test_init(run_bitcoind): assert isinstance(chain_monitor.zmqSubSocket, zmq.Socket) # The Queues and asleep flags are initialized when attaching the corresponding subscriber - assert chain_monitor.watcher_queue is None - assert chain_monitor.responder_queue is None - assert chain_monitor.watcher_asleep and chain_monitor.responder_asleep + assert isinstance(chain_monitor.watcher_queue, Queue) + assert isinstance(chain_monitor.responder_queue, Queue) -def test_attach_watcher(chain_monitor, db_manager): - watcher = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config()) - chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) - - # booleans are not passed as reference in Python, so the flags need to be set separately - assert watcher.asleep == chain_monitor.watcher_asleep - watcher.asleep = False - assert chain_monitor.watcher_asleep != watcher.asleep - - # Test that the Queue work - r_hash = get_random_value_hex(32) - chain_monitor.watcher_queue.put(r_hash) - assert watcher.block_queue.get() == r_hash - - -def test_attach_responder(chain_monitor, db_manager): - responder = Responder(db_manager=db_manager, chain_monitor=chain_monitor) - chain_monitor.attach_responder(responder.block_queue, responder.asleep) - - # Same kind of testing as with the attach watcher - assert responder.asleep == chain_monitor.watcher_asleep - responder.asleep = False - assert chain_monitor.watcher_asleep != responder.asleep - - r_hash = get_random_value_hex(32) - chain_monitor.responder_queue.put(r_hash) - assert responder.block_queue.get() == r_hash - - -def test_notify_subscribers(chain_monitor): +def test_notify_subscribers(): + chain_monitor = ChainMonitor(Queue(), Queue()) # Subscribers are only notified as long as they are awake new_block = get_random_value_hex(32) @@ -66,27 +36,17 @@ def test_notify_subscribers(chain_monitor): assert chain_monitor.watcher_queue.empty() assert chain_monitor.responder_queue.empty() - chain_monitor.watcher_asleep = True - chain_monitor.responder_asleep = True - chain_monitor.notify_subscribers(new_block) - - # And remain empty afterwards since both subscribers were asleep - assert chain_monitor.watcher_queue.empty() - assert chain_monitor.responder_queue.empty() - - # Let's flag them as awake and try again - chain_monitor.watcher_asleep = False - chain_monitor.responder_asleep = False chain_monitor.notify_subscribers(new_block) assert chain_monitor.watcher_queue.get() == new_block assert chain_monitor.responder_queue.get() == new_block -def test_update_state(chain_monitor): +def test_update_state(): # The state is updated after receiving a new block (and only if the block is not already known). # Let's start by setting a best_tip and a couple of old tips new_block_hash = get_random_value_hex(32) + chain_monitor = ChainMonitor(Queue(), Queue()) chain_monitor.best_tip = new_block_hash chain_monitor.last_tips = [get_random_value_hex(32) for _ in range(5)] @@ -105,12 +65,10 @@ def test_update_state(chain_monitor): def test_monitor_chain_polling(db_manager): # Try polling with the Watcher - chain_monitor = ChainMonitor() + wq = Queue() + chain_monitor = ChainMonitor(wq, Queue()) chain_monitor.best_tip = BlockProcessor.get_best_block_hash() - watcher = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config()) - chain_monitor.attach_watcher(watcher.block_queue, asleep=False) - # monitor_chain_polling runs until terminate if set polling_thread = Thread(target=chain_monitor.monitor_chain_polling, kwargs={"polling_delta": 0.1}, daemon=True) polling_thread.start() @@ -131,13 +89,10 @@ def test_monitor_chain_polling(db_manager): def test_monitor_chain_zmq(db_manager): - # Try zmq with the Responder - chain_monitor = ChainMonitor() + rq = Queue() + chain_monitor = ChainMonitor(Queue(), rq) chain_monitor.best_tip = BlockProcessor.get_best_block_hash() - responder = Responder(db_manager=db_manager, chain_monitor=chain_monitor) - chain_monitor.attach_responder(responder.block_queue, asleep=False) - zmq_thread = Thread(target=chain_monitor.monitor_chain_zmq, daemon=True) zmq_thread.start() @@ -150,28 +105,10 @@ def test_monitor_chain_zmq(db_manager): chain_monitor.responder_queue.get() assert chain_monitor.responder_queue.empty() - # If we flag it to sleep no notification is sent - chain_monitor.responder_asleep = True - - for _ in range(3): - generate_block() - assert chain_monitor.responder_queue.empty() - - chain_monitor.terminate = True - # The zmq thread needs a block generation to release from the recv method. - generate_block() - - zmq_thread.join() - def test_monitor_chain(db_manager): # Not much to test here, this should launch two threads (one per monitor approach) and finish on terminate - chain_monitor = ChainMonitor() - - watcher = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config()) - responder = Responder(db_manager=db_manager, chain_monitor=chain_monitor) - chain_monitor.attach_responder(responder.block_queue, asleep=False) - chain_monitor.attach_watcher(watcher.block_queue, asleep=False) + chain_monitor = ChainMonitor(Queue(), Queue()) chain_monitor.best_tip = None chain_monitor.monitor_chain() @@ -196,12 +133,7 @@ def test_monitor_chain(db_manager): def test_monitor_chain_single_update(db_manager): # This test tests that if both threads try to add the same block to the queue, only the first one will make it - chain_monitor = ChainMonitor() - - watcher = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config()) - responder = Responder(db_manager=db_manager, chain_monitor=chain_monitor) - chain_monitor.attach_responder(responder.block_queue, asleep=False) - chain_monitor.attach_watcher(watcher.block_queue, asleep=False) + chain_monitor = ChainMonitor(Queue(), Queue()) chain_monitor.best_tip = None diff --git a/test/pisa/unit/test_responder.py b/test/pisa/unit/test_responder.py index a3a9e75..41af14f 100644 --- a/test/pisa/unit/test_responder.py +++ b/test/pisa/unit/test_responder.py @@ -1,6 +1,7 @@ import json import pytest import random +from queue import Queue from uuid import uuid4 from shutil import rmtree from copy import deepcopy @@ -18,17 +19,19 @@ from test.pisa.unit.conftest import generate_block, generate_blocks, get_random_ @pytest.fixture(scope="module") -def responder(db_manager, chain_monitor): - responder = Responder(db_manager, chain_monitor) - chain_monitor.attach_responder(responder.block_queue, responder.asleep) +def responder(db_manager): + responder = Responder(db_manager) + chain_monitor = ChainMonitor(Queue(), responder.block_queue) + chain_monitor.monitor_chain() return responder -@pytest.fixture() +@pytest.fixture(scope="session") def temp_db_manager(): db_name = get_random_value_hex(8) db_manager = DBManager(db_name) + yield db_manager db_manager.db.close() @@ -144,19 +147,17 @@ def test_tracker_from_dict_invalid_data(): assert True -def test_init_responder(responder): +def test_init_responder(temp_db_manager): + responder = Responder(temp_db_manager) assert isinstance(responder.trackers, dict) and len(responder.trackers) == 0 assert isinstance(responder.tx_tracker_map, dict) and len(responder.tx_tracker_map) == 0 assert isinstance(responder.unconfirmed_txs, list) and len(responder.unconfirmed_txs) == 0 assert isinstance(responder.missed_confirmations, dict) and len(responder.missed_confirmations) == 0 - assert isinstance(responder.chain_monitor, ChainMonitor) assert responder.block_queue.empty() - assert responder.asleep is True -def test_handle_breach(db_manager, chain_monitor): - responder = Responder(db_manager, chain_monitor) - chain_monitor.attach_responder(responder.block_queue, responder.asleep) +def test_handle_breach(db_manager): + responder = Responder(db_manager) uuid = uuid4().hex tracker = create_dummy_tracker() @@ -174,20 +175,11 @@ def test_handle_breach(db_manager, chain_monitor): assert receipt.delivered is True - # The responder automatically fires add_tracker on adding a tracker if it is asleep. We need to stop the processes - # now. To do so we delete all the trackers, and generate a new block. - responder.trackers = dict() - generate_block() - -def test_add_bad_response(responder): +def test_handle_breach_bad_response(responder): uuid = uuid4().hex tracker = create_dummy_tracker() - # Now that the asleep / awake functionality has been tested we can avoid manually killing the responder by setting - # to awake. That will prevent the chain_monitor thread to be launched again. - responder.asleep = False - # A txid instead of a rawtx should be enough for unit tests using the bitcoind mock, better tests are needed though. tracker.penalty_rawtx = tracker.penalty_txid @@ -206,8 +198,6 @@ def test_add_bad_response(responder): def test_add_tracker(responder): - # Responder is asleep - for _ in range(20): uuid = uuid4().hex confirmations = 0 @@ -236,8 +226,6 @@ def test_add_tracker(responder): def test_add_tracker_same_penalty_txid(responder): - # Responder is asleep - confirmations = 0 locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data(random_txid=True) uuid_1 = uuid4().hex @@ -262,8 +250,6 @@ def test_add_tracker_same_penalty_txid(responder): def test_add_tracker_already_confirmed(responder): - # Responder is asleep - for i in range(20): uuid = uuid4().hex confirmations = i + 1 @@ -276,10 +262,11 @@ def test_add_tracker_already_confirmed(responder): assert penalty_txid not in responder.unconfirmed_txs -def test_do_watch(temp_db_manager, chain_monitor): +def test_do_watch(temp_db_manager): # Create a fresh responder to simplify the test - responder = Responder(temp_db_manager, chain_monitor) - chain_monitor.attach_responder(responder.block_queue, False) + responder = Responder(temp_db_manager) + chain_monitor = ChainMonitor(Queue(), responder.block_queue) + chain_monitor.monitor_chain() trackers = [create_dummy_tracker(penalty_rawtx=create_dummy_transaction().hex()) for _ in range(20)] @@ -332,12 +319,12 @@ def test_do_watch(temp_db_manager, chain_monitor): generate_blocks(6) assert len(responder.tx_tracker_map) == 0 - assert responder.asleep is True -def test_check_confirmations(temp_db_manager, chain_monitor): - responder = Responder(temp_db_manager, chain_monitor) - chain_monitor.attach_responder(responder.block_queue, responder.asleep) +def test_check_confirmations(db_manager): + responder = Responder(db_manager) + chain_monitor = ChainMonitor(Queue(), responder.block_queue) + chain_monitor.monitor_chain() # check_confirmations checks, given a list of transaction for a block, what of the known penalty transaction have # been confirmed. To test this we need to create a list of transactions and the state of the responder @@ -391,11 +378,12 @@ def test_get_txs_to_rebroadcast(responder): assert txs_to_rebroadcast == list(txs_missing_too_many_conf.keys()) -def test_get_completed_trackers(db_manager, chain_monitor): +def test_get_completed_trackers(db_manager): initial_height = bitcoin_cli().getblockcount() - responder = Responder(db_manager, chain_monitor) - chain_monitor.attach_responder(responder.block_queue, responder.asleep) + responder = Responder(db_manager) + chain_monitor = ChainMonitor(Queue(), responder.block_queue) + chain_monitor.monitor_chain() # A complete tracker is a tracker that has reached the appointment end with enough confs (> MIN_CONFIRMATIONS) # We'll create three type of transactions: end reached + enough conf, end reached + no enough conf, end not reached @@ -450,10 +438,10 @@ def test_get_completed_trackers(db_manager, chain_monitor): assert set(completed_trackers_ids) == set(ended_trackers_keys) -def test_rebroadcast(db_manager, chain_monitor): - responder = Responder(db_manager, chain_monitor) - responder.asleep = False - chain_monitor.attach_responder(responder.block_queue, responder.asleep) +def test_rebroadcast(db_manager): + responder = Responder(db_manager) + chain_monitor = ChainMonitor(Queue(), responder.block_queue) + chain_monitor.monitor_chain() txs_to_rebroadcast = [] diff --git a/test/pisa/unit/test_watcher.py b/test/pisa/unit/test_watcher.py index 03c6f45..5a85bec 100644 --- a/test/pisa/unit/test_watcher.py +++ b/test/pisa/unit/test_watcher.py @@ -1,5 +1,6 @@ import pytest from uuid import uuid4 +from shutil import rmtree from threading import Thread from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives import serialization @@ -8,6 +9,7 @@ from pisa.watcher import Watcher from pisa.responder import Responder from pisa.tools import bitcoin_cli from pisa.chain_monitor import ChainMonitor +from pisa.db_manager import DBManager from test.pisa.unit.conftest import ( generate_blocks, @@ -36,11 +38,22 @@ sk_der = signing_key.private_bytes( ) +@pytest.fixture(scope="session") +def temp_db_manager(): + db_name = get_random_value_hex(8) + db_manager = DBManager(db_name) + + yield db_manager + + db_manager.db.close() + rmtree(db_name) + + @pytest.fixture(scope="module") -def watcher(db_manager, chain_monitor): - watcher = Watcher(db_manager, chain_monitor, sk_der, get_config()) - chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) - chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) +def watcher(db_manager): + watcher = Watcher(db_manager, Responder(db_manager), sk_der, get_config()) + chain_monitor = ChainMonitor(watcher.block_queue, watcher.responder.block_queue) + chain_monitor.monitor_chain() return watcher @@ -76,19 +89,13 @@ def create_appointments(n): def test_init(run_bitcoind, watcher): assert isinstance(watcher.appointments, dict) and len(watcher.appointments) == 0 assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0 - assert watcher.asleep is True assert watcher.block_queue.empty() - assert isinstance(watcher.chain_monitor, ChainMonitor) assert isinstance(watcher.config, dict) assert isinstance(watcher.signing_key, ec.EllipticCurvePrivateKey) assert isinstance(watcher.responder, Responder) def test_add_appointment(watcher): - # The watcher automatically fires do_watch and do_subscribe on adding an appointment if it is asleep (initial state) - # Avoid this by setting the state to awake. - watcher.asleep = False - # We should be able to add appointments up to the limit for _ in range(10): appointment, dispute_tx = generate_dummy_appointment( @@ -128,10 +135,11 @@ def test_add_too_many_appointments(watcher): assert sig is None -def test_do_watch(watcher): +def test_do_watch(watcher, temp_db_manager): + watcher.db_manager = temp_db_manager + # We will wipe all the previous data and add 5 appointments appointments, locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS) - watcher.chain_monitor.watcher_asleep = False # Set the data into the Watcher and in the db watcher.locator_uuid_map = locator_uuid_map @@ -142,7 +150,8 @@ def test_do_watch(watcher): watcher.db_manager.store_watcher_appointment(uuid, appointment.to_json()) watcher.db_manager.create_append_locator_map(appointment.locator, uuid) - Thread(target=watcher.do_watch, daemon=True).start() + do_watch_thread = Thread(target=watcher.do_watch, daemon=True) + do_watch_thread.start() # Broadcast the first two for dispute_tx in dispute_txs[:2]: @@ -158,7 +167,6 @@ def test_do_watch(watcher): generate_blocks(EXPIRY_DELTA + START_TIME_OFFSET + END_TIME_OFFSET) assert len(watcher.appointments) == 0 - assert watcher.asleep is True def test_get_breaches(watcher, txids, locator_uuid_map):