Fixes loading data from disk when both Watcher and Responder need to be brough up to date

The previous approach was not correct, since both actors need to be brought up to date at the same time.
This commit is contained in:
Sergi Delgado Segura
2020-01-31 13:02:32 +01:00
parent 02bc88ed84
commit 32ff13a495

View File

@@ -60,10 +60,6 @@ def main():
if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0:
logger.info("Fresh bootstrap") logger.info("Fresh bootstrap")
# Set the current tip as the last known block for both on a fresh start
db_manager.store_last_block_hash_watcher(BlockProcessor.get_best_block_hash())
db_manager.store_last_block_hash_responder(BlockProcessor.get_best_block_hash())
else: else:
logger.info("Bootstrapping from backed up data") logger.info("Bootstrapping from backed up data")
block_processor = BlockProcessor() block_processor = BlockProcessor()
@@ -72,40 +68,58 @@ def main():
last_block_responder = db_manager.load_last_block_hash_responder() last_block_responder = db_manager.load_last_block_hash_responder()
# FIXME: 32-reorgs-offline dropped txs are not used at this point. # FIXME: 32-reorgs-offline dropped txs are not used at this point.
missed_blocks_responder = None # Get the blocks missed by both the Watcher and the Responder. If the blocks of both match we don't
# perform the search twice.
last_common_ancestor_watcher, dropped_txs_watcher = block_processor.find_last_common_ancestor(
last_block_watcher
)
missed_blocks_watcher = block_processor.get_missed_blocks(last_common_ancestor_watcher)
# Build Responder with backed up data if found if last_block_watcher == last_block_responder:
if len(responder_trackers_data) != 0: dropped_txs_responder = dropped_txs_watcher
missed_blocks_responder = missed_blocks_watcher
else:
last_common_ancestor_responder, dropped_txs_responder = block_processor.find_last_common_ancestor( last_common_ancestor_responder, dropped_txs_responder = block_processor.find_last_common_ancestor(
last_block_responder last_block_responder
) )
missed_blocks_responder = block_processor.get_missed_blocks(last_common_ancestor_responder) missed_blocks_responder = block_processor.get_missed_blocks(last_common_ancestor_responder)
watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers( # Build and update the Watcher.
responder_trackers_data
)
Builder.populate_block_queue(watcher.responder.block_queue, missed_blocks_responder)
watcher.responder.awake()
# Build Watcher. If the blocks of both match we don't perform the search twice.
if len(watcher_appointments_data) != 0: if len(watcher_appointments_data) != 0:
if last_block_watcher == last_block_responder and missed_blocks_responder is not None:
missed_blocks_watcher = missed_blocks_responder
else:
last_common_ancestor_watcher, dropped_txs_watcher = block_processor.find_last_common_ancestor(
last_block_watcher
)
missed_blocks_watcher = block_processor.get_missed_blocks(last_common_ancestor_watcher)
watcher.appointments, watcher.locator_uuid_map = Builder.build_appointments( watcher.appointments, watcher.locator_uuid_map = Builder.build_appointments(
watcher_appointments_data watcher_appointments_data
) )
# Build Responder with backed up data if found
if len(responder_trackers_data) != 0:
watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers(
responder_trackers_data
)
# If only one of the instances needs to be updated, it can be done separately.
if len(missed_blocks_watcher) == 0 and len(missed_blocks_responder) != 0:
Builder.populate_block_queue(watcher.responder.block_queue, missed_blocks_responder)
watcher.responder.awake()
watcher.responder.block_queue.join()
elif len(missed_blocks_responder) == 0 and len(missed_blocks_watcher) != 0:
Builder.populate_block_queue(watcher.block_queue, missed_blocks_watcher) Builder.populate_block_queue(watcher.block_queue, missed_blocks_watcher)
watcher.awake() watcher.awake()
watcher.block_queue.join()
# Otherwise the need to be updated at the same time, block by block
elif len(missed_blocks_responder) != 0 and len(missed_blocks_watcher) != 0:
Builder.update_states(watcher, missed_blocks_watcher, missed_blocks_responder)
# Awake the Watcher/Responder if they ended up with pending work
if watcher.appointments and watcher.asleep:
watcher.awake()
if watcher.responder.trackers and watcher.responder.asleep:
watcher.responder.awake()
# Fire the API # Fire the API
API(watcher, config=config).start() API(watcher, config=config).start()
except Exception as e: except Exception as e:
logger.error("An error occurred: {}. Shutting down".format(e)) logger.error("An error occurred: {}. Shutting down".format(e))
exit(1) exit(1)