diff --git a/pisa/builder.py b/pisa/builder.py index 072b638..39298dd 100644 --- a/pisa/builder.py +++ b/pisa/builder.py @@ -1,9 +1,3 @@ -from queue import Queue - -from pisa.responder import TransactionTracker -from common.appointment import Appointment - - class Builder: """ The :class:`Builder` class is in charge or reconstructing data loaded from the database and build the data @@ -96,3 +90,65 @@ class Builder: for block in missed_blocks: block_queue.put(block) + + @staticmethod + def update_states(watcher, missed_blocks_watcher, missed_blocks_responder): + """ + Updates the states of both the :mod:`Watcher ` and the :mod:`Responder `. + If both have pending blocks to process they need to be updates at the same time, block by block. + + If only one instance has to be updated, ``populate_block_queue`` should be used. + + Args: + watcher (:obj:`Watcher `): a ``Watcher`` instance (including a ``Responder``). + missed_blocks_watcher (:obj:`list`): the list of block missed by the ``Watcher``. + missed_blocks_responder (:obj:`list`): the list of block missed by the ``Responder``. + + Raises: + ValueError: is one of the provided list is empty. + """ + + if len(missed_blocks_responder) == 0 or len(missed_blocks_watcher) == 0: + raise ValueError( + "Both the Watcher and the Responder must have missed blocks. Use ``populate_block_queue`` otherwise." + ) + + # If the missed blocks of the Watcher and the Responder are not the same, we need to bring one up to date with + # the other. + if len(missed_blocks_responder) > len(missed_blocks_watcher): + block_diff = sorted( + set(missed_blocks_responder).difference(missed_blocks_watcher), key=missed_blocks_responder.index + ) + Builder.populate_block_queue(watcher.responder.block_queue, block_diff) + watcher.responder.awake() + watcher.responder.block_queue.join() + + elif len(missed_blocks_watcher) > len(missed_blocks_responder): + block_diff = sorted( + set(missed_blocks_watcher).difference(missed_blocks_responder), key=missed_blocks_watcher.index + ) + Builder.populate_block_queue(watcher.block_queue, block_diff) + watcher.awake() + watcher.block_queue.join() + + # Awake the actors if they are asleep and have pending work. No new inputs are provided, so if the Watcher is + # asleep it will remain asleep. However, the Responder may come and go to sleep since it will be awaken if + # appointments are passed trough from the Watcher. + if watcher.appointments and watcher.asleep: + watcher.awake() + + if watcher.responder.trackers and watcher.responder.asleep: + watcher.responder.awake() + + for block in missed_blocks_watcher: + if not watcher.asleep: + watcher.block_queue.put(block) + watcher.block_queue.join() + + if not watcher.responder.asleep: + watcher.responder.block_queue.put(block) + watcher.responder.block_queue.join() + else: + # The Responder keeps track of last know block for reorgs, so it has to be updated even if there're no + # trackers + watcher.responder.last_known_block = block