mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 22:24:23 +01:00
The Watcher and Responder were not properly bootstrapped from db data if both were missing blocks to process. Since some appointments may need to pass from the Watcher to the Responder during this process, they need to be brought up to date at the same time, block after block.
155 lines
6.6 KiB
Python
155 lines
6.6 KiB
Python
class Builder:
|
|
"""
|
|
The :class:`Builder` class is in charge or reconstructing data loaded from the database and build the data
|
|
structures of the :obj:`Watcher <pisa.watcher.Watcher>` and the :obj:`Responder <pisa.responder.Responder>`.
|
|
"""
|
|
|
|
@staticmethod
|
|
def build_appointments(appointments_data):
|
|
"""
|
|
Builds an appointments dictionary (``uuid: Appointment``) and a locator_uuid_map (``locator: uuid``) given a
|
|
dictionary of appointments from the database.
|
|
|
|
Args:
|
|
appointments_data (:obj:`dict`): a dictionary of dictionaries representing all the
|
|
:obj:`Watcher <pisa.watcher.Watcher>` appointments stored in the database. The structure is as follows:
|
|
|
|
``{uuid: {locator: str, start_time: int, ...}, uuid: {locator:...}}``
|
|
|
|
Returns:
|
|
:obj:`tuple`: A tuple with two dictionaries. ``appointments`` containing the appointment information in
|
|
:obj:`Appointment <pisa.appointment.Appointment>` objects and ``locator_uuid_map`` containing a map of
|
|
appointment (``uuid:locator``).
|
|
"""
|
|
|
|
appointments = {}
|
|
locator_uuid_map = {}
|
|
|
|
for uuid, data in appointments_data.items():
|
|
appointments[uuid] = {"locator": data.get("locator"), "end_time": data.get("end_time")}
|
|
|
|
if data.get("locator") in locator_uuid_map:
|
|
locator_uuid_map[data.get("locator")].append(uuid)
|
|
|
|
else:
|
|
locator_uuid_map[data.get("locator")] = [uuid]
|
|
|
|
return appointments, locator_uuid_map
|
|
|
|
@staticmethod
|
|
def build_trackers(tracker_data):
|
|
"""
|
|
Builds a tracker dictionary (``uuid: TransactionTracker``) and a tx_tracker_map (``penalty_txid: uuid``) given
|
|
a dictionary of trackers from the database.
|
|
|
|
Args:
|
|
tracker_data (:obj:`dict`): a dictionary of dictionaries representing all the
|
|
:mod:`Responder <pisa.responder.Responder>` trackers stored in the database.
|
|
The structure is as follows:
|
|
|
|
``{uuid: {locator: str, dispute_txid: str, ...}, uuid: {locator:...}}``
|
|
|
|
Returns:
|
|
:obj:`tuple`: A tuple with two dictionaries. ``trackers`` containing the trackers' information in
|
|
:obj:`TransactionTracker <pisa.responder.TransactionTracker>` objects and a ``tx_tracker_map`` containing
|
|
the map of trackers (``penalty_txid: uuid``).
|
|
|
|
"""
|
|
|
|
trackers = {}
|
|
tx_tracker_map = {}
|
|
|
|
for uuid, data in tracker_data.items():
|
|
trackers[uuid] = {
|
|
"penalty_txid": data.get("penalty_txid"),
|
|
"locator": data.get("locator"),
|
|
"appointment_end": data.get("appointment_end"),
|
|
}
|
|
|
|
if data.get("penalty_txid") in tx_tracker_map:
|
|
tx_tracker_map[data.get("penalty_txid")].append(uuid)
|
|
|
|
else:
|
|
tx_tracker_map[data.get("penalty_txid")] = [uuid]
|
|
|
|
return trackers, tx_tracker_map
|
|
|
|
@staticmethod
|
|
def populate_block_queue(block_queue, missed_blocks):
|
|
"""
|
|
Populates a ``Queue`` of block hashes to initialize the :mod:`Watcher <pisa.watcher.Watcher>` or the
|
|
:mod:`Responder <pisa.responder.Responder>` using backed up data.
|
|
|
|
Args:
|
|
block_queue (:obj:`Queue`): a ``Queue``
|
|
missed_blocks (:obj:`list`): list of block hashes missed by the Watchtower (do to a crash or shutdown).
|
|
|
|
Returns:
|
|
:obj:`Queue`: A ``Queue`` containing all the missed blocks hashes.
|
|
"""
|
|
|
|
for block in missed_blocks:
|
|
block_queue.put(block)
|
|
|
|
@staticmethod
|
|
def update_states(watcher, missed_blocks_watcher, missed_blocks_responder):
|
|
"""
|
|
Updates the states of both the :mod:`Watcher <pisa.watcher.Watcher>` and the :mod:`Responder <pisa.responder.Responder>`.
|
|
If both have pending blocks to process they need to be updates at the same time, block by block.
|
|
|
|
If only one instance has to be updated, ``populate_block_queue`` should be used.
|
|
|
|
Args:
|
|
watcher (:obj:`Watcher <pisa.watcher.Watcher>`): a ``Watcher`` instance (including a ``Responder``).
|
|
missed_blocks_watcher (:obj:`list`): the list of block missed by the ``Watcher``.
|
|
missed_blocks_responder (:obj:`list`): the list of block missed by the ``Responder``.
|
|
|
|
Raises:
|
|
ValueError: is one of the provided list is empty.
|
|
"""
|
|
|
|
if len(missed_blocks_responder) == 0 or len(missed_blocks_watcher) == 0:
|
|
raise ValueError(
|
|
"Both the Watcher and the Responder must have missed blocks. Use ``populate_block_queue`` otherwise."
|
|
)
|
|
|
|
# If the missed blocks of the Watcher and the Responder are not the same, we need to bring one up to date with
|
|
# the other.
|
|
if len(missed_blocks_responder) > len(missed_blocks_watcher):
|
|
block_diff = sorted(
|
|
set(missed_blocks_responder).difference(missed_blocks_watcher), key=missed_blocks_responder.index
|
|
)
|
|
Builder.populate_block_queue(watcher.responder.block_queue, block_diff)
|
|
watcher.responder.awake()
|
|
watcher.responder.block_queue.join()
|
|
|
|
elif len(missed_blocks_watcher) > len(missed_blocks_responder):
|
|
block_diff = sorted(
|
|
set(missed_blocks_watcher).difference(missed_blocks_responder), key=missed_blocks_watcher.index
|
|
)
|
|
Builder.populate_block_queue(watcher.block_queue, block_diff)
|
|
watcher.awake()
|
|
watcher.block_queue.join()
|
|
|
|
# Awake the actors if they are asleep and have pending work. No new inputs are provided, so if the Watcher is
|
|
# asleep it will remain asleep. However, the Responder may come and go to sleep since it will be awaken if
|
|
# appointments are passed trough from the Watcher.
|
|
if watcher.appointments and watcher.asleep:
|
|
watcher.awake()
|
|
|
|
if watcher.responder.trackers and watcher.responder.asleep:
|
|
watcher.responder.awake()
|
|
|
|
for block in missed_blocks_watcher:
|
|
if not watcher.asleep:
|
|
watcher.block_queue.put(block)
|
|
watcher.block_queue.join()
|
|
|
|
if not watcher.responder.asleep:
|
|
watcher.responder.block_queue.put(block)
|
|
watcher.responder.block_queue.join()
|
|
else:
|
|
# The Responder keeps track of last know block for reorgs, so it has to be updated even if there're no
|
|
# trackers
|
|
watcher.responder.last_known_block = block
|