Removes sleep flag and reorders code for redability

This commit is contained in:
Sergi Delgado Segura
2020-02-10 16:21:05 +01:00
parent aa12fa2cf8
commit a4f7548804

View File

@@ -8,6 +8,7 @@ from pisa import config, LOG_PREFIX
from pisa.api import API from pisa.api import API
from pisa.watcher import Watcher from pisa.watcher import Watcher
from pisa.builder import Builder from pisa.builder import Builder
from pisa.responder import Responder
from pisa.db_manager import DBManager from pisa.db_manager import DBManager
from pisa.chain_monitor import ChainMonitor from pisa.chain_monitor import ChainMonitor
from pisa.block_processor import BlockProcessor from pisa.block_processor import BlockProcessor
@@ -43,33 +44,50 @@ def main():
else: else:
try: try:
with open(config.get("PISA_SECRET_KEY"), "rb") as key_file:
secret_key_der = key_file.read()
watcher = Watcher(db_manager, Responder(db_manager), secret_key_der, config)
# Create the chain monitor and start monitoring the chain # Create the chain monitor and start monitoring the chain
chain_monitor = ChainMonitor() chain_monitor = ChainMonitor(watcher.block_queue, watcher.responder.block_queue)
chain_monitor.monitor_chain()
watcher_appointments_data = db_manager.load_watcher_appointments() watcher_appointments_data = db_manager.load_watcher_appointments()
responder_trackers_data = db_manager.load_responder_trackers() responder_trackers_data = db_manager.load_responder_trackers()
with open(config.get("PISA_SECRET_KEY"), "rb") as key_file:
secret_key_der = key_file.read()
watcher = Watcher(db_manager, chain_monitor, secret_key_der, config)
chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep)
chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep)
if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0:
logger.info("Fresh bootstrap") logger.info("Fresh bootstrap")
watcher.awake()
watcher.responder.awake()
else: else:
logger.info("Bootstrapping from backed up data") logger.info("Bootstrapping from backed up data")
block_processor = BlockProcessor()
# Update the Watcher backed up data if found.
if len(watcher_appointments_data) != 0:
watcher.appointments, watcher.locator_uuid_map = Builder.build_appointments(
watcher_appointments_data
)
# Update the Responder with backed up data if found.
if len(responder_trackers_data) != 0:
watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers(
responder_trackers_data
)
# Awaking components so the states can be updated.
watcher.awake()
watcher.responder.awake()
last_block_watcher = db_manager.load_last_block_hash_watcher() last_block_watcher = db_manager.load_last_block_hash_watcher()
last_block_responder = db_manager.load_last_block_hash_responder() last_block_responder = db_manager.load_last_block_hash_responder()
# Populate the block queues with data if they've missed some while offline. If the blocks of both match
# we don't perform the search twice.
block_processor = BlockProcessor()
# FIXME: 32-reorgs-offline dropped txs are not used at this point. # FIXME: 32-reorgs-offline dropped txs are not used at this point.
# Get the blocks missed by both the Watcher and the Responder. If the blocks of both match we don't
# perform the search twice.
last_common_ancestor_watcher, dropped_txs_watcher = block_processor.find_last_common_ancestor( last_common_ancestor_watcher, dropped_txs_watcher = block_processor.find_last_common_ancestor(
last_block_watcher last_block_watcher
) )
@@ -85,40 +103,22 @@ def main():
) )
missed_blocks_responder = block_processor.get_missed_blocks(last_common_ancestor_responder) missed_blocks_responder = block_processor.get_missed_blocks(last_common_ancestor_responder)
# Build and update the Watcher.
if len(watcher_appointments_data) != 0:
watcher.appointments, watcher.locator_uuid_map = Builder.build_appointments(
watcher_appointments_data
)
# Build Responder with backed up data if found
if len(responder_trackers_data) != 0:
watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers(
responder_trackers_data
)
# If only one of the instances needs to be updated, it can be done separately. # If only one of the instances needs to be updated, it can be done separately.
if len(missed_blocks_watcher) == 0 and len(missed_blocks_responder) != 0: if len(missed_blocks_watcher) == 0 and len(missed_blocks_responder) != 0:
Builder.populate_block_queue(watcher.responder.block_queue, missed_blocks_responder) Builder.populate_block_queue(watcher.responder.block_queue, missed_blocks_responder)
watcher.responder.awake()
watcher.responder.block_queue.join() watcher.responder.block_queue.join()
elif len(missed_blocks_responder) == 0 and len(missed_blocks_watcher) != 0: elif len(missed_blocks_responder) == 0 and len(missed_blocks_watcher) != 0:
Builder.populate_block_queue(watcher.block_queue, missed_blocks_watcher) Builder.populate_block_queue(watcher.block_queue, missed_blocks_watcher)
watcher.awake()
watcher.block_queue.join() watcher.block_queue.join()
# Otherwise the need to be updated at the same time, block by block # Otherwise they need to be updated at the same time, block by block
elif len(missed_blocks_responder) != 0 and len(missed_blocks_watcher) != 0: elif len(missed_blocks_responder) != 0 and len(missed_blocks_watcher) != 0:
Builder.update_states(watcher, missed_blocks_watcher, missed_blocks_responder) Builder.update_states(watcher, missed_blocks_watcher, missed_blocks_responder)
# Awake the Watcher/Responder if they ended up with pending work # Fire the API and the ChainMonitor
if watcher.appointments and watcher.asleep: # FIXME: 92-block-data-during-bootstrap-db
watcher.awake() chain_monitor.monitor_chain()
if watcher.responder.trackers and watcher.responder.asleep:
watcher.responder.awake()
# Fire the API
API(watcher, config=config).start() API(watcher, config=config).start()
except Exception as e: except Exception as e:
logger.error("An error occurred: {}. Shutting down".format(e)) logger.error("An error occurred: {}. Shutting down".format(e))