diff --git a/pisa/builder.py b/pisa/builder.py index 39298dd..e9728cc 100644 --- a/pisa/builder.py +++ b/pisa/builder.py @@ -120,7 +120,6 @@ class Builder: set(missed_blocks_responder).difference(missed_blocks_watcher), key=missed_blocks_responder.index ) Builder.populate_block_queue(watcher.responder.block_queue, block_diff) - watcher.responder.awake() watcher.responder.block_queue.join() elif len(missed_blocks_watcher) > len(missed_blocks_responder): @@ -128,27 +127,12 @@ class Builder: set(missed_blocks_watcher).difference(missed_blocks_responder), key=missed_blocks_watcher.index ) Builder.populate_block_queue(watcher.block_queue, block_diff) - watcher.awake() watcher.block_queue.join() - # Awake the actors if they are asleep and have pending work. No new inputs are provided, so if the Watcher is - # asleep it will remain asleep. However, the Responder may come and go to sleep since it will be awaken if - # appointments are passed trough from the Watcher. - if watcher.appointments and watcher.asleep: - watcher.awake() - - if watcher.responder.trackers and watcher.responder.asleep: - watcher.responder.awake() - + # Once they are at the same height, we update them one by one for block in missed_blocks_watcher: - if not watcher.asleep: - watcher.block_queue.put(block) - watcher.block_queue.join() + watcher.block_queue.put(block) + watcher.block_queue.join() - if not watcher.responder.asleep: - watcher.responder.block_queue.put(block) - watcher.responder.block_queue.join() - else: - # The Responder keeps track of last know block for reorgs, so it has to be updated even if there're no - # trackers - watcher.responder.last_known_block = block + watcher.responder.block_queue.put(block) + watcher.responder.block_queue.join() diff --git a/pisa/chain_monitor.py b/pisa/chain_monitor.py index 22ef377..0dad221 100644 --- a/pisa/chain_monitor.py +++ b/pisa/chain_monitor.py @@ -19,6 +19,10 @@ class ChainMonitor: The :class:`ChainMonitor` monitors the chain using two methods: ``zmq`` and ``polling``. Blocks are only notified once per queue and the notification is triggered by the method that detects the block faster. + Args: + watcher_queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Watcher``. + responder_queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Responder``. + Attributes: best_tip (:obj:`str`): a block hash representing the current best tip. last_tips (:obj:`list`): a list of last chain tips. Used as a sliding window to avoid notifying about old tips. @@ -30,11 +34,9 @@ class ChainMonitor: watcher_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Watcher `. responder_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Responder `. - watcher_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Watcher`` or not. - responder_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Responder`` or not. """ - def __init__(self): + def __init__(self, watcher_queue, responder_queue): self.best_tip = None self.last_tips = [] self.terminate = False @@ -48,53 +50,21 @@ class ChainMonitor: self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) - self.watcher_queue = None - self.responder_queue = None - self.watcher_asleep = True - self.responder_asleep = True - - def attach_watcher(self, queue, asleep): - """ - Attaches a :obj:`Watcher ` to the :class:`ChainMonitor`. The ``Watcher`` and the - ``ChainMonitor`` are connected via the ``watcher_queue`` and the ``watcher_asleep`` flag. - - Args: - queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Watcher``. - asleep( :obj:`bool`): whether the ``Watcher`` is initially awake or asleep. It is changed on the fly from - the ``Watcher`` when the state changes. - """ - - self.watcher_queue = queue - self.watcher_asleep = asleep - - def attach_responder(self, queue, asleep): - """ - Attaches a :obj:`Responder ` to the :class:`ChainMonitor`. The ``Responder`` and the - ``ChainMonitor`` are connected via the ``responder_queue`` and the ``responder_asleep`` flag. - - Args: - queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Responder``. - asleep( :obj:`bool`): whether the ``Responder`` is initially awake or asleep. It is changed on the fly from - the ``Responder`` when the state changes. - """ - - self.responder_queue = queue - self.responder_asleep = asleep + self.watcher_queue = watcher_queue + self.responder_queue = responder_queue def notify_subscribers(self, block_hash): """ - Notifies the subscribers (``Watcher`` and ``Responder``) about a new block provided they are awake. It does so - by putting the hash in the corresponding queue(s). + Notifies the subscribers (``Watcher`` and ``Responder``) about a new block. It does so by putting the hash in + the corresponding queue(s). Args: - block_hash (:obj:`str`): the new block hash to be sent to the awake subscribers. + block_hash (:obj:`str`): the new block hash to be sent to the subscribers. + block_hash (:obj:`str`): the new block hash to be sent to the subscribers. """ - if not self.watcher_asleep: - self.watcher_queue.put(block_hash) - - if not self.responder_asleep: - self.responder_queue.put(block_hash) + self.watcher_queue.put(block_hash) + self.responder_queue.put(block_hash) def update_state(self, block_hash, max_block_window_size=BLOCK_WINDOW_SIZE): """ diff --git a/pisa/responder.py b/pisa/responder.py index 6f18d19..4ae73ab 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -108,11 +108,6 @@ class Responder: the decrypted ``penalty_txs`` handed by the :obj:`Watcher ` and ensuring the they make it to the blockchain. - The :class:`Responder` can be in two states: - - - Asleep (``self.asleep = True)`` when there are no trackers to take care of (``self.trackers`` is empty). - - Awake (``self.asleep = False)`` when there are trackers to take care of (actively monitoring the blockchain). - Args: db_manager (:obj:`DBManager `): a ``DBManager`` instance to interact with the database. @@ -126,41 +121,29 @@ class Responder: unconfirmed_txs (:obj:`list`): A list that keeps track of all unconfirmed ``penalty_txs``. missed_confirmations (:obj:`dict`): A dictionary that keeps count of how many confirmations each ``penalty_tx`` has missed. Used to trigger rebroadcast if needed. - asleep (:obj:`bool`): A flag that signals whether the :obj:`Responder` is asleep or awake. block_queue (:obj:`Queue`): A queue used by the :obj:`Responder` to receive block hashes from ``bitcoind``. It - is populated by the :obj:`ChainMonitor `. - chain_monitor (:obj:`ChainMonitor `): a ``ChainMonitor`` instance used to track - new blocks received by ``bitcoind``. + is populated by the :obj:`ChainMonitor `. db_manager (:obj:`DBManager `): A ``DBManager`` instance to interact with the database. """ - def __init__(self, db_manager, chain_monitor): + def __init__(self, db_manager): self.trackers = dict() self.tx_tracker_map = dict() self.unconfirmed_txs = [] self.missed_confirmations = dict() - self.asleep = True self.block_queue = Queue() - self.chain_monitor = chain_monitor self.db_manager = db_manager self.carrier = Carrier() self.last_known_block = db_manager.load_last_block_hash_responder() def awake(self): - self.asleep = False - self.chain_monitor.responder_asleep = False - responder_thread = Thread(target=self.do_watch, daemon=True).start() + responder_thread = Thread(target=self.do_watch, daemon=True) + responder_thread.start() return responder_thread - def sleep(self): - self.asleep = True - self.chain_monitor.responder_asleep = True - - logger.info("No more pending trackers, going back to sleep") - @staticmethod def on_sync(block_hash): """ @@ -212,9 +195,6 @@ class Responder: into the blockchain. """ - if self.asleep: - logger.info("Waking up") - receipt = self.carrier.send_transaction(penalty_rawtx, penalty_txid) if receipt.delivered: @@ -239,8 +219,6 @@ class Responder: ``penalty_txid`` added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, all the data is stored in the database. - ``add_tracker`` awakes the :obj:`Responder` if it is asleep. - Args: uuid (:obj:`str`): a unique identifier for the appointment. locator (:obj:`str`): the appointment locator provided by the user (16-byte hex-encoded). @@ -278,9 +256,6 @@ class Responder: "New tracker added", dispute_txid=dispute_txid, penalty_txid=penalty_txid, appointment_end=appointment_end ) - if self.asleep: - self.awake() - def do_watch(self): """ Monitors the blockchain whilst there are pending trackers. @@ -293,20 +268,17 @@ class Responder: if self.last_known_block is None: self.last_known_block = BlockProcessor.get_best_block_hash() - while len(self.trackers) > 0: - # We get notified for every new received block + while True: block_hash = self.block_queue.get() block = BlockProcessor.get_block(block_hash) + logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash")) - if block is not None: - txs = block.get("tx") - - logger.info( - "New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"), txs=txs - ) + if len(self.trackers) > 0 and block is not None: + txids = block.get("tx") + logger.info("List of transactions", txids=txids) if self.last_known_block == block.get("previousblockhash"): - self.check_confirmations(txs) + self.check_confirmations(txids) height = block.get("height") completed_trackers = self.get_completed_trackers(height) @@ -328,16 +300,16 @@ class Responder: # ToDo: #24-properly-handle-reorgs self.handle_reorgs(block_hash) - # Register the last processed block for the responder - self.db_manager.store_last_block_hash_responder(block_hash) + # Clear the receipts issued in this block + self.carrier.issued_receipts = {} - self.last_known_block = block.get("hash") + if len(self.trackers) is 0: + logger.info("No more pending trackers") + # Register the last processed block for the responder + self.db_manager.store_last_block_hash_responder(block_hash) + self.last_known_block = block.get("hash") self.block_queue.task_done() - self.carrier.issued_receipts = {} - - # Go back to sleep if there are no more pending trackers - self.sleep() def check_confirmations(self, txs): """ diff --git a/pisa/watcher.py b/pisa/watcher.py index dc35aec..5b1860c 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -10,7 +10,6 @@ from common.logger import Logger from pisa import LOG_PREFIX from pisa.cleaner import Cleaner -from pisa.responder import Responder from pisa.block_processor import BlockProcessor logger = Logger(actor="Watcher", log_name_prefix=LOG_PREFIX) @@ -33,13 +32,10 @@ class Watcher: Args: db_manager (:obj:`DBManager `): a ``DBManager`` instance to interact with the database. - chain_monitor (:obj:`ChainMonitor `): a ``ChainMonitor`` instance used to track - new blocks received by ``bitcoind``. sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance). config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve ``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``. - responder (:obj:`Responder `): a ``Responder`` instance. If ``None`` is passed, a new - instance is created. Populated instances are useful when bootstrapping the system from backed-up data. + responder (:obj:`Responder `): a ``Responder`` instance. Attributes: @@ -48,11 +44,8 @@ class Watcher: It's populated trough ``add_appointment``. locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several appointments with the same ``locator``. - asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake. block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is populated by the :obj:`ChainMonitor `. - chain_monitor (:obj:`ChainMonitor `): a ``ChainMonitor`` instance used to track - new blocks received by ``bitcoind``. config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve ``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``. db_manager (:obj:`DBManager `): A db manager instance to interact with the database. @@ -63,41 +56,27 @@ class Watcher: """ - def __init__(self, db_manager, chain_monitor, sk_der, config, responder=None): + def __init__(self, db_manager, responder, sk_der, config): self.appointments = dict() self.locator_uuid_map = dict() - self.asleep = True self.block_queue = Queue() - self.chain_monitor = chain_monitor self.config = config self.db_manager = db_manager + self.responder = responder self.signing_key = Cryptographer.load_private_key_der(sk_der) - if not isinstance(responder, Responder): - self.responder = Responder(db_manager, chain_monitor) - def awake(self): - self.asleep = False - self.chain_monitor.watcher_asleep = False - watcher_thread = Thread(target=self.do_watch, daemon=True).start() - - logger.info("Waking up") + watcher_thread = Thread(target=self.do_watch, daemon=True) + watcher_thread.start() return watcher_thread - def sleep(self): - self.asleep = True - self.chain_monitor.watcher_asleep = True - - logger.info("No more pending appointments, going back to sleep") - def add_appointment(self, appointment): """ Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached. - ``add_appointment`` is the entry point of the Watcher. Upon receiving a new appointment, if the :obj:`Watcher` - is asleep, it will be awaken and start monitoring the blockchain (``do_watch``) until ``appointments`` is empty. - It will go back to sleep once there are no more pending appointments. + ``add_appointment`` is the entry point of the Watcher. Upon receiving a new appointment it will start monitoring + the blockchain (``do_watch``) until ``appointments`` is empty. Once a breach is seen on the blockchain, the :obj:`Watcher` will decrypt the corresponding :obj:`EncryptedBlob ` and pass the information to the @@ -132,9 +111,6 @@ class Watcher: else: self.locator_uuid_map[appointment.locator] = [uuid] - if self.asleep: - self.awake() - self.db_manager.store_watcher_appointment(uuid, appointment.to_json()) self.db_manager.create_append_locator_map(appointment.locator, uuid) @@ -159,15 +135,13 @@ class Watcher: :obj:`Responder ` upon detecting a breach. """ - while len(self.appointments) > 0: + while True: block_hash = self.block_queue.get() - logger.info("New block received", block_hash=block_hash) - block = BlockProcessor.get_block(block_hash) + logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash")) - if block is not None: + if len(self.appointments) > 0 and block is not None: txids = block.get("tx") - logger.info("List of transactions", txids=txids) expired_appointments = [ @@ -203,7 +177,7 @@ class Watcher: block_hash, ) - # FIXME: This is only necessary because of the triggered appointment approach. Fix if it changes. + # FIXME: Only necessary because of the triggered appointment approach. Fix if it changes. if receipt.delivered: Cleaner.delete_appointment_from_memory(uuid, self.appointments, self.locator_uuid_map) @@ -219,14 +193,13 @@ class Watcher: appointments_to_delete, self.appointments, self.locator_uuid_map, self.db_manager ) - # Register the last processed block for the watcher - self.db_manager.store_last_block_hash_watcher(block_hash) + if len(self.appointments) is 0: + logger.info("No more pending appointments") + # Register the last processed block for the watcher + self.db_manager.store_last_block_hash_watcher(block_hash) self.block_queue.task_done() - # Go back to sleep if there are no more appointments - self.sleep() - def get_breaches(self, txids): """ Gets a list of channel breaches given the list of transaction ids.