From 32ff13a495e7ca70b11fe1ece7b94a0ae9425f51 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 31 Jan 2020 13:02:32 +0100 Subject: [PATCH] Fixes loading data from disk when both Watcher and Responder need to be brough up to date The previous approach was not correct, since both actors need to be brought up to date at the same time. --- pisa/pisad.py | 60 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/pisa/pisad.py b/pisa/pisad.py index 0eca57b..0d361f3 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -60,10 +60,6 @@ def main(): if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: logger.info("Fresh bootstrap") - # Set the current tip as the last known block for both on a fresh start - db_manager.store_last_block_hash_watcher(BlockProcessor.get_best_block_hash()) - db_manager.store_last_block_hash_responder(BlockProcessor.get_best_block_hash()) - else: logger.info("Bootstrapping from backed up data") block_processor = BlockProcessor() @@ -72,40 +68,58 @@ def main(): last_block_responder = db_manager.load_last_block_hash_responder() # FIXME: 32-reorgs-offline dropped txs are not used at this point. - missed_blocks_responder = None + # Get the blocks missed by both the Watcher and the Responder. If the blocks of both match we don't + # perform the search twice. + last_common_ancestor_watcher, dropped_txs_watcher = block_processor.find_last_common_ancestor( + last_block_watcher + ) + missed_blocks_watcher = block_processor.get_missed_blocks(last_common_ancestor_watcher) - # Build Responder with backed up data if found - if len(responder_trackers_data) != 0: + if last_block_watcher == last_block_responder: + dropped_txs_responder = dropped_txs_watcher + missed_blocks_responder = missed_blocks_watcher + + else: last_common_ancestor_responder, dropped_txs_responder = block_processor.find_last_common_ancestor( last_block_responder ) missed_blocks_responder = block_processor.get_missed_blocks(last_common_ancestor_responder) - watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers( - responder_trackers_data - ) - Builder.populate_block_queue(watcher.responder.block_queue, missed_blocks_responder) - watcher.responder.awake() - - # Build Watcher. If the blocks of both match we don't perform the search twice. + # Build and update the Watcher. if len(watcher_appointments_data) != 0: - if last_block_watcher == last_block_responder and missed_blocks_responder is not None: - missed_blocks_watcher = missed_blocks_responder - else: - last_common_ancestor_watcher, dropped_txs_watcher = block_processor.find_last_common_ancestor( - last_block_watcher - ) - missed_blocks_watcher = block_processor.get_missed_blocks(last_common_ancestor_watcher) - watcher.appointments, watcher.locator_uuid_map = Builder.build_appointments( watcher_appointments_data ) + + # Build Responder with backed up data if found + if len(responder_trackers_data) != 0: + watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers( + responder_trackers_data + ) + + # If only one of the instances needs to be updated, it can be done separately. + if len(missed_blocks_watcher) == 0 and len(missed_blocks_responder) != 0: + Builder.populate_block_queue(watcher.responder.block_queue, missed_blocks_responder) + watcher.responder.awake() + watcher.responder.block_queue.join() + + elif len(missed_blocks_responder) == 0 and len(missed_blocks_watcher) != 0: Builder.populate_block_queue(watcher.block_queue, missed_blocks_watcher) watcher.awake() + watcher.block_queue.join() + + # Otherwise the need to be updated at the same time, block by block + elif len(missed_blocks_responder) != 0 and len(missed_blocks_watcher) != 0: + Builder.update_states(watcher, missed_blocks_watcher, missed_blocks_responder) + + # Awake the Watcher/Responder if they ended up with pending work + if watcher.appointments and watcher.asleep: + watcher.awake() + if watcher.responder.trackers and watcher.responder.asleep: + watcher.responder.awake() # Fire the API API(watcher, config=config).start() - except Exception as e: logger.error("An error occurred: {}. Shutting down".format(e)) exit(1)