Merge pull request #87 from sr-gi/fix-build-from-database

Fix build from database
This commit is contained in:
Sergi Delgado Segura
2020-01-24 13:36:13 +01:00
committed by GitHub
5 changed files with 65 additions and 46 deletions

View File

@@ -32,14 +32,13 @@ class Builder:
locator_uuid_map = {} locator_uuid_map = {}
for uuid, data in appointments_data.items(): for uuid, data in appointments_data.items():
appointment = Appointment.from_dict(data) appointments[uuid] = {"locator": data.get("locator"), "end_time": data.get("end_time")}
appointments[uuid] = appointment
if appointment.locator in locator_uuid_map: if data.get("locator") in locator_uuid_map:
locator_uuid_map[appointment.locator].append(uuid) locator_uuid_map[data.get("locator")].append(uuid)
else: else:
locator_uuid_map[appointment.locator] = [uuid] locator_uuid_map[data.get("locator")] = [uuid]
return appointments, locator_uuid_map return appointments, locator_uuid_map
@@ -67,33 +66,33 @@ class Builder:
tx_tracker_map = {} tx_tracker_map = {}
for uuid, data in tracker_data.items(): for uuid, data in tracker_data.items():
tracker = TransactionTracker.from_dict(data) trackers[uuid] = {
trackers[uuid] = tracker "penalty_txid": data.get("penalty_txid"),
"locator": data.get("locator"),
"appointment_end": data.get("appointment_end"),
}
if tracker.penalty_txid in tx_tracker_map: if data.get("penalty_txid") in tx_tracker_map:
tx_tracker_map[tracker.penalty_txid].append(uuid) tx_tracker_map[data.get("penalty_txid")].append(uuid)
else: else:
tx_tracker_map[tracker.penalty_txid] = [uuid] tx_tracker_map[data.get("penalty_txid")] = [uuid]
return trackers, tx_tracker_map return trackers, tx_tracker_map
@staticmethod @staticmethod
def build_block_queue(missed_blocks): def populate_block_queue(block_queue, missed_blocks):
""" """
Builds a ``Queue`` of block hashes to initialize the :mod:`Watcher <pisa.watcher.Watcher>` or the Populates a ``Queue`` of block hashes to initialize the :mod:`Watcher <pisa.watcher.Watcher>` or the
:mod:`Responder <pisa.responder.Responder>` using backed up data. :mod:`Responder <pisa.responder.Responder>` using backed up data.
Args: Args:
block_queue (:obj:`Queue`): a ``Queue``
missed_blocks (:obj:`list`): list of block hashes missed by the Watchtower (do to a crash or shutdown). missed_blocks (:obj:`list`): list of block hashes missed by the Watchtower (do to a crash or shutdown).
Returns: Returns:
:obj:`Queue`: A ``Queue`` containing all the missed blocks hashes. :obj:`Queue`: A ``Queue`` containing all the missed blocks hashes.
""" """
block_queue = Queue()
for block in missed_blocks: for block in missed_blocks:
block_queue.put(block) block_queue.put(block)
return block_queue

View File

@@ -60,6 +60,10 @@ def main():
if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0:
logger.info("Fresh bootstrap") logger.info("Fresh bootstrap")
# Set the current tip as the last known block for both on a fresh start
db_manager.store_last_block_hash_watcher(BlockProcessor.get_best_block_hash())
db_manager.store_last_block_hash_responder(BlockProcessor.get_best_block_hash())
else: else:
logger.info("Bootstrapping from backed up data") logger.info("Bootstrapping from backed up data")
block_processor = BlockProcessor() block_processor = BlockProcessor()
@@ -68,11 +72,10 @@ def main():
last_block_responder = db_manager.load_last_block_hash_responder() last_block_responder = db_manager.load_last_block_hash_responder()
# FIXME: 32-reorgs-offline dropped txs are not used at this point. # FIXME: 32-reorgs-offline dropped txs are not used at this point.
last_common_ancestor_responder = None
missed_blocks_responder = None missed_blocks_responder = None
# Build Responder with backed up data if found # Build Responder with backed up data if found
if last_block_responder is not None: if len(responder_trackers_data) != 0:
last_common_ancestor_responder, dropped_txs_responder = block_processor.find_last_common_ancestor( last_common_ancestor_responder, dropped_txs_responder = block_processor.find_last_common_ancestor(
last_block_responder last_block_responder
) )
@@ -81,11 +84,12 @@ def main():
watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers( watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers(
responder_trackers_data responder_trackers_data
) )
watcher.responder.block_queue = Builder.build_block_queue(missed_blocks_responder) Builder.populate_block_queue(watcher.responder.block_queue, missed_blocks_responder)
watcher.responder.awake()
# Build Watcher. If the blocks of both match we don't perform the search twice. # Build Watcher. If the blocks of both match we don't perform the search twice.
if last_block_watcher is not None: if len(watcher_appointments_data) != 0:
if last_block_watcher == last_block_responder: if last_block_watcher == last_block_responder and missed_blocks_responder is not None:
missed_blocks_watcher = missed_blocks_responder missed_blocks_watcher = missed_blocks_responder
else: else:
last_common_ancestor_watcher, dropped_txs_watcher = block_processor.find_last_common_ancestor( last_common_ancestor_watcher, dropped_txs_watcher = block_processor.find_last_common_ancestor(
@@ -96,7 +100,8 @@ def main():
watcher.appointments, watcher.locator_uuid_map = Builder.build_appointments( watcher.appointments, watcher.locator_uuid_map = Builder.build_appointments(
watcher_appointments_data watcher_appointments_data
) )
watcher.block_queue = Builder.build_block_queue(missed_blocks_watcher) Builder.populate_block_queue(watcher.block_queue, missed_blocks_watcher)
watcher.awake()
# Fire the API # Fire the API
API(watcher, config=config).start() API(watcher, config=config).start()

View File

@@ -146,6 +146,17 @@ class Responder:
self.chain_monitor = chain_monitor self.chain_monitor = chain_monitor
self.db_manager = db_manager self.db_manager = db_manager
def awake(self):
self.asleep = False
self.chain_monitor.responder_asleep = False
Thread(target=self.do_watch).start()
def sleep(self):
self.asleep = True
self.chain_monitor.responder_asleep = True
logger.info("No more pending trackers, going back to sleep")
@staticmethod @staticmethod
def on_sync(block_hash): def on_sync(block_hash):
""" """
@@ -265,9 +276,7 @@ class Responder:
) )
if self.asleep: if self.asleep:
self.asleep = False self.awake()
self.chain_monitor.responder_asleep = False
Thread(target=self.do_watch).start()
def do_watch(self): def do_watch(self):
""" """
@@ -321,10 +330,7 @@ class Responder:
prev_block_hash = block.get("hash") prev_block_hash = block.get("hash")
# Go back to sleep if there are no more pending trackers # Go back to sleep if there are no more pending trackers
self.asleep = True self.sleep()
self.chain_monitor.responder_asleep = True
logger.info("No more pending trackers, going back to sleep")
def check_confirmations(self, txs): def check_confirmations(self, txs):
""" """

View File

@@ -76,6 +76,19 @@ class Watcher:
if not isinstance(responder, Responder): if not isinstance(responder, Responder):
self.responder = Responder(db_manager, chain_monitor) self.responder = Responder(db_manager, chain_monitor)
def awake(self):
self.asleep = False
self.chain_monitor.watcher_asleep = False
Thread(target=self.do_watch).start()
logger.info("Waking up")
def sleep(self):
self.asleep = True
self.chain_monitor.watcher_asleep = True
logger.info("No more pending appointments, going back to sleep")
def add_appointment(self, appointment): def add_appointment(self, appointment):
""" """
Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached. Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached.
@@ -118,11 +131,7 @@ class Watcher:
self.locator_uuid_map[appointment.locator] = [uuid] self.locator_uuid_map[appointment.locator] = [uuid]
if self.asleep: if self.asleep:
self.asleep = False self.awake()
self.chain_monitor.watcher_asleep = False
Thread(target=self.do_watch).start()
logger.info("Waking up")
self.db_manager.store_watcher_appointment(uuid, appointment.to_json()) self.db_manager.store_watcher_appointment(uuid, appointment.to_json())
self.db_manager.create_append_locator_map(appointment.locator, uuid) self.db_manager.create_append_locator_map(appointment.locator, uuid)
@@ -208,10 +217,7 @@ class Watcher:
self.db_manager.store_last_block_hash_watcher(block_hash) self.db_manager.store_last_block_hash_watcher(block_hash)
# Go back to sleep if there are no more appointments # Go back to sleep if there are no more appointments
self.asleep = True self.sleep()
self.chain_monitor.watcher_asleep = True
logger.info("No more pending appointments, going back to sleep")
def get_breaches(self, txids): def get_breaches(self, txids):
""" """

View File

@@ -1,4 +1,5 @@
from uuid import uuid4 from uuid import uuid4
from queue import Queue
from pisa.builder import Builder from pisa.builder import Builder
from test.pisa.unit.conftest import get_random_value_hex, generate_dummy_appointment, generate_dummy_tracker from test.pisa.unit.conftest import get_random_value_hex, generate_dummy_appointment, generate_dummy_tracker
@@ -29,8 +30,9 @@ def test_build_appointments():
# Check that the created appointments match the data # Check that the created appointments match the data
for uuid, appointment in appointments.items(): for uuid, appointment in appointments.items():
assert uuid in appointments_data.keys() assert uuid in appointments_data.keys()
assert appointments_data[uuid] == appointment.to_dict() assert appointments_data[uuid].get("locator") == appointment.get("locator")
assert uuid in locator_uuid_map[appointment.locator] assert appointments_data[uuid].get("end_time") == appointment.get("end_time")
assert uuid in locator_uuid_map[appointment.get("locator")]
def test_build_trackers(): def test_build_trackers():
@@ -55,17 +57,18 @@ def test_build_trackers():
# Check that the built trackers match the data # Check that the built trackers match the data
for uuid, tracker in trackers.items(): for uuid, tracker in trackers.items():
assert uuid in trackers_data.keys() assert uuid in trackers_data.keys()
tracker_dict = tracker.to_dict()
# The locator is not part of the tracker_data found in the database (for now) assert tracker.get("penalty_txid") == trackers_data[uuid].get("penalty_txid")
assert trackers_data[uuid] == tracker_dict assert tracker.get("locator") == trackers_data[uuid].get("locator")
assert uuid in tx_tracker_map[tracker.penalty_txid] assert tracker.get("appointment_end") == trackers_data[uuid].get("appointment_end")
assert uuid in tx_tracker_map[tracker.get("penalty_txid")]
def test_build_block_queue(): def test_populate_block_queue():
# Create some random block hashes and construct the queue with them # Create some random block hashes and construct the queue with them
blocks = [get_random_value_hex(32) for _ in range(10)] blocks = [get_random_value_hex(32) for _ in range(10)]
queue = Builder.build_block_queue(blocks) queue = Queue()
Builder.populate_block_queue(queue, blocks)
# Make sure every block is in the queue and that there are not additional ones # Make sure every block is in the queue and that there are not additional ones
while not queue.empty(): while not queue.empty():