From ba5aa9f6512adaff56c738d2ba378aace139d2d4 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 24 Jan 2020 13:21:36 +0100 Subject: [PATCH 1/4] Updates builder with new minimal in-memory data from #83 - The builder was never modified when the in-memory data was reduced, so it was still trying to build data based on the past approach. - Renames create_block_queue to poplate_block_queue and repurposes the method When creating a block queue, a new Queue was created and populated. That was breaking the link between the Watcher/Responder and the ChainMonitor since the Queue is defined beforehand. --- pisa/builder.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/pisa/builder.py b/pisa/builder.py index 6f0f2bf..072b638 100644 --- a/pisa/builder.py +++ b/pisa/builder.py @@ -32,14 +32,13 @@ class Builder: locator_uuid_map = {} for uuid, data in appointments_data.items(): - appointment = Appointment.from_dict(data) - appointments[uuid] = appointment + appointments[uuid] = {"locator": data.get("locator"), "end_time": data.get("end_time")} - if appointment.locator in locator_uuid_map: - locator_uuid_map[appointment.locator].append(uuid) + if data.get("locator") in locator_uuid_map: + locator_uuid_map[data.get("locator")].append(uuid) else: - locator_uuid_map[appointment.locator] = [uuid] + locator_uuid_map[data.get("locator")] = [uuid] return appointments, locator_uuid_map @@ -67,33 +66,33 @@ class Builder: tx_tracker_map = {} for uuid, data in tracker_data.items(): - tracker = TransactionTracker.from_dict(data) - trackers[uuid] = tracker + trackers[uuid] = { + "penalty_txid": data.get("penalty_txid"), + "locator": data.get("locator"), + "appointment_end": data.get("appointment_end"), + } - if tracker.penalty_txid in tx_tracker_map: - tx_tracker_map[tracker.penalty_txid].append(uuid) + if data.get("penalty_txid") in tx_tracker_map: + tx_tracker_map[data.get("penalty_txid")].append(uuid) else: - tx_tracker_map[tracker.penalty_txid] = [uuid] + tx_tracker_map[data.get("penalty_txid")] = [uuid] return trackers, tx_tracker_map @staticmethod - def build_block_queue(missed_blocks): + def populate_block_queue(block_queue, missed_blocks): """ - Builds a ``Queue`` of block hashes to initialize the :mod:`Watcher ` or the + Populates a ``Queue`` of block hashes to initialize the :mod:`Watcher ` or the :mod:`Responder ` using backed up data. Args: + block_queue (:obj:`Queue`): a ``Queue`` missed_blocks (:obj:`list`): list of block hashes missed by the Watchtower (do to a crash or shutdown). Returns: :obj:`Queue`: A ``Queue`` containing all the missed blocks hashes. """ - block_queue = Queue() - for block in missed_blocks: block_queue.put(block) - - return block_queue From 7c4d4d0aad50d4cd282f66119cf0dcdf50946393 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 24 Jan 2020 13:24:12 +0100 Subject: [PATCH 2/4] Moves awaking / sleeping functionality to their own methods --- pisa/responder.py | 20 +++++++++++++------- pisa/watcher.py | 24 +++++++++++++++--------- 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/pisa/responder.py b/pisa/responder.py index 1198553..d1c2aa3 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -146,6 +146,17 @@ class Responder: self.chain_monitor = chain_monitor self.db_manager = db_manager + def awake(self): + self.asleep = False + self.chain_monitor.responder_asleep = False + Thread(target=self.do_watch).start() + + def sleep(self): + self.asleep = True + self.chain_monitor.responder_asleep = True + + logger.info("No more pending trackers, going back to sleep") + @staticmethod def on_sync(block_hash): """ @@ -265,9 +276,7 @@ class Responder: ) if self.asleep: - self.asleep = False - self.chain_monitor.responder_asleep = False - Thread(target=self.do_watch).start() + self.awake() def do_watch(self): """ @@ -321,10 +330,7 @@ class Responder: prev_block_hash = block.get("hash") # Go back to sleep if there are no more pending trackers - self.asleep = True - self.chain_monitor.responder_asleep = True - - logger.info("No more pending trackers, going back to sleep") + self.sleep() def check_confirmations(self, txs): """ diff --git a/pisa/watcher.py b/pisa/watcher.py index 86a5c40..7dbc68b 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -76,6 +76,19 @@ class Watcher: if not isinstance(responder, Responder): self.responder = Responder(db_manager, chain_monitor) + def awake(self): + self.asleep = False + self.chain_monitor.watcher_asleep = False + Thread(target=self.do_watch).start() + + logger.info("Waking up") + + def sleep(self): + self.asleep = True + self.chain_monitor.watcher_asleep = True + + logger.info("No more pending appointments, going back to sleep") + def add_appointment(self, appointment): """ Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached. @@ -118,11 +131,7 @@ class Watcher: self.locator_uuid_map[appointment.locator] = [uuid] if self.asleep: - self.asleep = False - self.chain_monitor.watcher_asleep = False - Thread(target=self.do_watch).start() - - logger.info("Waking up") + self.awake() self.db_manager.store_watcher_appointment(uuid, appointment.to_json()) self.db_manager.create_append_locator_map(appointment.locator, uuid) @@ -208,10 +217,7 @@ class Watcher: self.db_manager.store_last_block_hash_watcher(block_hash) # Go back to sleep if there are no more appointments - self.asleep = True - self.chain_monitor.watcher_asleep = True - - logger.info("No more pending appointments, going back to sleep") + self.sleep() def get_breaches(self, txids): """ From 89181e6a7e09577548ee203a4609ceb118e5fe88 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 24 Jan 2020 13:25:02 +0100 Subject: [PATCH 3/4] Fixes handling data from db Data obtained from the database was not properly handled in some cases: - If some appointments are accepted and no block is received and the tower is restarted, the data was not loaded. The tower was checking that the Watcher/Responder had a last known block, which may not have been the case. Now best_tip is set as last_known_block on a fresh bootstrap. - The Watcher/ Responder were not being awaken when loading data from the database. - The block queues were not properly populated --- pisa/pisad.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/pisa/pisad.py b/pisa/pisad.py index 0335832..0eca57b 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -60,6 +60,10 @@ def main(): if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: logger.info("Fresh bootstrap") + # Set the current tip as the last known block for both on a fresh start + db_manager.store_last_block_hash_watcher(BlockProcessor.get_best_block_hash()) + db_manager.store_last_block_hash_responder(BlockProcessor.get_best_block_hash()) + else: logger.info("Bootstrapping from backed up data") block_processor = BlockProcessor() @@ -68,11 +72,10 @@ def main(): last_block_responder = db_manager.load_last_block_hash_responder() # FIXME: 32-reorgs-offline dropped txs are not used at this point. - last_common_ancestor_responder = None missed_blocks_responder = None # Build Responder with backed up data if found - if last_block_responder is not None: + if len(responder_trackers_data) != 0: last_common_ancestor_responder, dropped_txs_responder = block_processor.find_last_common_ancestor( last_block_responder ) @@ -81,11 +84,12 @@ def main(): watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers( responder_trackers_data ) - watcher.responder.block_queue = Builder.build_block_queue(missed_blocks_responder) + Builder.populate_block_queue(watcher.responder.block_queue, missed_blocks_responder) + watcher.responder.awake() # Build Watcher. If the blocks of both match we don't perform the search twice. - if last_block_watcher is not None: - if last_block_watcher == last_block_responder: + if len(watcher_appointments_data) != 0: + if last_block_watcher == last_block_responder and missed_blocks_responder is not None: missed_blocks_watcher = missed_blocks_responder else: last_common_ancestor_watcher, dropped_txs_watcher = block_processor.find_last_common_ancestor( @@ -96,7 +100,8 @@ def main(): watcher.appointments, watcher.locator_uuid_map = Builder.build_appointments( watcher_appointments_data ) - watcher.block_queue = Builder.build_block_queue(missed_blocks_watcher) + Builder.populate_block_queue(watcher.block_queue, missed_blocks_watcher) + watcher.awake() # Fire the API API(watcher, config=config).start() From a718a5c6ae1c2815517d5175ea6024ec657d4161 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 24 Jan 2020 13:32:20 +0100 Subject: [PATCH 4/4] Updates Builder unit tests --- test/pisa/unit/test_builder.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/test/pisa/unit/test_builder.py b/test/pisa/unit/test_builder.py index c45ef47..6cbf073 100644 --- a/test/pisa/unit/test_builder.py +++ b/test/pisa/unit/test_builder.py @@ -1,4 +1,5 @@ from uuid import uuid4 +from queue import Queue from pisa.builder import Builder from test.pisa.unit.conftest import get_random_value_hex, generate_dummy_appointment, generate_dummy_tracker @@ -29,8 +30,9 @@ def test_build_appointments(): # Check that the created appointments match the data for uuid, appointment in appointments.items(): assert uuid in appointments_data.keys() - assert appointments_data[uuid] == appointment.to_dict() - assert uuid in locator_uuid_map[appointment.locator] + assert appointments_data[uuid].get("locator") == appointment.get("locator") + assert appointments_data[uuid].get("end_time") == appointment.get("end_time") + assert uuid in locator_uuid_map[appointment.get("locator")] def test_build_trackers(): @@ -55,17 +57,18 @@ def test_build_trackers(): # Check that the built trackers match the data for uuid, tracker in trackers.items(): assert uuid in trackers_data.keys() - tracker_dict = tracker.to_dict() - # The locator is not part of the tracker_data found in the database (for now) - assert trackers_data[uuid] == tracker_dict - assert uuid in tx_tracker_map[tracker.penalty_txid] + assert tracker.get("penalty_txid") == trackers_data[uuid].get("penalty_txid") + assert tracker.get("locator") == trackers_data[uuid].get("locator") + assert tracker.get("appointment_end") == trackers_data[uuid].get("appointment_end") + assert uuid in tx_tracker_map[tracker.get("penalty_txid")] -def test_build_block_queue(): +def test_populate_block_queue(): # Create some random block hashes and construct the queue with them blocks = [get_random_value_hex(32) for _ in range(10)] - queue = Builder.build_block_queue(blocks) + queue = Queue() + Builder.populate_block_queue(queue, blocks) # Make sure every block is in the queue and that there are not additional ones while not queue.empty():