Removes sleep flag from Watcher and Responder

The sleep flag was used to avoid doing useless work when no data was hold by the tower. However, from the implementation of the data persistence on, the Watcher and Responder should at least keep track of the last known block. The current apporach was making this harder.
This commit is contained in:
Sergi Delgado Segura
2020-02-10 16:19:22 +01:00
parent 6c957b067d
commit aa12fa2cf8
4 changed files with 50 additions and 151 deletions

View File

@@ -120,7 +120,6 @@ class Builder:
set(missed_blocks_responder).difference(missed_blocks_watcher), key=missed_blocks_responder.index set(missed_blocks_responder).difference(missed_blocks_watcher), key=missed_blocks_responder.index
) )
Builder.populate_block_queue(watcher.responder.block_queue, block_diff) Builder.populate_block_queue(watcher.responder.block_queue, block_diff)
watcher.responder.awake()
watcher.responder.block_queue.join() watcher.responder.block_queue.join()
elif len(missed_blocks_watcher) > len(missed_blocks_responder): elif len(missed_blocks_watcher) > len(missed_blocks_responder):
@@ -128,27 +127,12 @@ class Builder:
set(missed_blocks_watcher).difference(missed_blocks_responder), key=missed_blocks_watcher.index set(missed_blocks_watcher).difference(missed_blocks_responder), key=missed_blocks_watcher.index
) )
Builder.populate_block_queue(watcher.block_queue, block_diff) Builder.populate_block_queue(watcher.block_queue, block_diff)
watcher.awake()
watcher.block_queue.join() watcher.block_queue.join()
# Awake the actors if they are asleep and have pending work. No new inputs are provided, so if the Watcher is # Once they are at the same height, we update them one by one
# asleep it will remain asleep. However, the Responder may come and go to sleep since it will be awaken if
# appointments are passed trough from the Watcher.
if watcher.appointments and watcher.asleep:
watcher.awake()
if watcher.responder.trackers and watcher.responder.asleep:
watcher.responder.awake()
for block in missed_blocks_watcher: for block in missed_blocks_watcher:
if not watcher.asleep: watcher.block_queue.put(block)
watcher.block_queue.put(block) watcher.block_queue.join()
watcher.block_queue.join()
if not watcher.responder.asleep: watcher.responder.block_queue.put(block)
watcher.responder.block_queue.put(block) watcher.responder.block_queue.join()
watcher.responder.block_queue.join()
else:
# The Responder keeps track of last know block for reorgs, so it has to be updated even if there're no
# trackers
watcher.responder.last_known_block = block

View File

@@ -19,6 +19,10 @@ class ChainMonitor:
The :class:`ChainMonitor` monitors the chain using two methods: ``zmq`` and ``polling``. Blocks are only notified The :class:`ChainMonitor` monitors the chain using two methods: ``zmq`` and ``polling``. Blocks are only notified
once per queue and the notification is triggered by the method that detects the block faster. once per queue and the notification is triggered by the method that detects the block faster.
Args:
watcher_queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Watcher``.
responder_queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Responder``.
Attributes: Attributes:
best_tip (:obj:`str`): a block hash representing the current best tip. best_tip (:obj:`str`): a block hash representing the current best tip.
last_tips (:obj:`list`): a list of last chain tips. Used as a sliding window to avoid notifying about old tips. last_tips (:obj:`list`): a list of last chain tips. Used as a sliding window to avoid notifying about old tips.
@@ -30,11 +34,9 @@ class ChainMonitor:
watcher_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Watcher <pisa.watcher.Watcher>`. watcher_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Watcher <pisa.watcher.Watcher>`.
responder_queue (:obj:`Queue`): a queue to send new best tips to the responder_queue (:obj:`Queue`): a queue to send new best tips to the
:obj:`Responder <pisa.responder.Responder>`. :obj:`Responder <pisa.responder.Responder>`.
watcher_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Watcher`` or not.
responder_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Responder`` or not.
""" """
def __init__(self): def __init__(self, watcher_queue, responder_queue):
self.best_tip = None self.best_tip = None
self.last_tips = [] self.last_tips = []
self.terminate = False self.terminate = False
@@ -48,53 +50,21 @@ class ChainMonitor:
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT))
self.watcher_queue = None self.watcher_queue = watcher_queue
self.responder_queue = None self.responder_queue = responder_queue
self.watcher_asleep = True
self.responder_asleep = True
def attach_watcher(self, queue, asleep):
"""
Attaches a :obj:`Watcher <pisa.watcher.Watcher>` to the :class:`ChainMonitor`. The ``Watcher`` and the
``ChainMonitor`` are connected via the ``watcher_queue`` and the ``watcher_asleep`` flag.
Args:
queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Watcher``.
asleep( :obj:`bool`): whether the ``Watcher`` is initially awake or asleep. It is changed on the fly from
the ``Watcher`` when the state changes.
"""
self.watcher_queue = queue
self.watcher_asleep = asleep
def attach_responder(self, queue, asleep):
"""
Attaches a :obj:`Responder <pisa.responder.Responder>` to the :class:`ChainMonitor`. The ``Responder`` and the
``ChainMonitor`` are connected via the ``responder_queue`` and the ``responder_asleep`` flag.
Args:
queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Responder``.
asleep( :obj:`bool`): whether the ``Responder`` is initially awake or asleep. It is changed on the fly from
the ``Responder`` when the state changes.
"""
self.responder_queue = queue
self.responder_asleep = asleep
def notify_subscribers(self, block_hash): def notify_subscribers(self, block_hash):
""" """
Notifies the subscribers (``Watcher`` and ``Responder``) about a new block provided they are awake. It does so Notifies the subscribers (``Watcher`` and ``Responder``) about a new block. It does so by putting the hash in
by putting the hash in the corresponding queue(s). the corresponding queue(s).
Args: Args:
block_hash (:obj:`str`): the new block hash to be sent to the awake subscribers. block_hash (:obj:`str`): the new block hash to be sent to the subscribers.
block_hash (:obj:`str`): the new block hash to be sent to the subscribers.
""" """
if not self.watcher_asleep: self.watcher_queue.put(block_hash)
self.watcher_queue.put(block_hash) self.responder_queue.put(block_hash)
if not self.responder_asleep:
self.responder_queue.put(block_hash)
def update_state(self, block_hash, max_block_window_size=BLOCK_WINDOW_SIZE): def update_state(self, block_hash, max_block_window_size=BLOCK_WINDOW_SIZE):
""" """

View File

@@ -108,11 +108,6 @@ class Responder:
the decrypted ``penalty_txs`` handed by the :obj:`Watcher <pisa.watcher.Watcher>` and ensuring the they make it to the decrypted ``penalty_txs`` handed by the :obj:`Watcher <pisa.watcher.Watcher>` and ensuring the they make it to
the blockchain. the blockchain.
The :class:`Responder` can be in two states:
- Asleep (``self.asleep = True)`` when there are no trackers to take care of (``self.trackers`` is empty).
- Awake (``self.asleep = False)`` when there are trackers to take care of (actively monitoring the blockchain).
Args: Args:
db_manager (:obj:`DBManager <pisa.db_manager.DBManager>`): a ``DBManager`` instance to interact with the db_manager (:obj:`DBManager <pisa.db_manager.DBManager>`): a ``DBManager`` instance to interact with the
database. database.
@@ -126,41 +121,29 @@ class Responder:
unconfirmed_txs (:obj:`list`): A list that keeps track of all unconfirmed ``penalty_txs``. unconfirmed_txs (:obj:`list`): A list that keeps track of all unconfirmed ``penalty_txs``.
missed_confirmations (:obj:`dict`): A dictionary that keeps count of how many confirmations each ``penalty_tx`` missed_confirmations (:obj:`dict`): A dictionary that keeps count of how many confirmations each ``penalty_tx``
has missed. Used to trigger rebroadcast if needed. has missed. Used to trigger rebroadcast if needed.
asleep (:obj:`bool`): A flag that signals whether the :obj:`Responder` is asleep or awake.
block_queue (:obj:`Queue`): A queue used by the :obj:`Responder` to receive block hashes from ``bitcoind``. It block_queue (:obj:`Queue`): A queue used by the :obj:`Responder` to receive block hashes from ``bitcoind``. It
is populated by the :obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`. is populated by the :obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`.
chain_monitor (:obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`): a ``ChainMonitor`` instance used to track
new blocks received by ``bitcoind``.
db_manager (:obj:`DBManager <pisa.db_manager.DBManager>`): A ``DBManager`` instance to interact with the db_manager (:obj:`DBManager <pisa.db_manager.DBManager>`): A ``DBManager`` instance to interact with the
database. database.
""" """
def __init__(self, db_manager, chain_monitor): def __init__(self, db_manager):
self.trackers = dict() self.trackers = dict()
self.tx_tracker_map = dict() self.tx_tracker_map = dict()
self.unconfirmed_txs = [] self.unconfirmed_txs = []
self.missed_confirmations = dict() self.missed_confirmations = dict()
self.asleep = True
self.block_queue = Queue() self.block_queue = Queue()
self.chain_monitor = chain_monitor
self.db_manager = db_manager self.db_manager = db_manager
self.carrier = Carrier() self.carrier = Carrier()
self.last_known_block = db_manager.load_last_block_hash_responder() self.last_known_block = db_manager.load_last_block_hash_responder()
def awake(self): def awake(self):
self.asleep = False responder_thread = Thread(target=self.do_watch, daemon=True)
self.chain_monitor.responder_asleep = False responder_thread.start()
responder_thread = Thread(target=self.do_watch, daemon=True).start()
return responder_thread return responder_thread
def sleep(self):
self.asleep = True
self.chain_monitor.responder_asleep = True
logger.info("No more pending trackers, going back to sleep")
@staticmethod @staticmethod
def on_sync(block_hash): def on_sync(block_hash):
""" """
@@ -212,9 +195,6 @@ class Responder:
into the blockchain. into the blockchain.
""" """
if self.asleep:
logger.info("Waking up")
receipt = self.carrier.send_transaction(penalty_rawtx, penalty_txid) receipt = self.carrier.send_transaction(penalty_rawtx, penalty_txid)
if receipt.delivered: if receipt.delivered:
@@ -239,8 +219,6 @@ class Responder:
``penalty_txid`` added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, all the data is stored in the ``penalty_txid`` added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, all the data is stored in the
database. database.
``add_tracker`` awakes the :obj:`Responder` if it is asleep.
Args: Args:
uuid (:obj:`str`): a unique identifier for the appointment. uuid (:obj:`str`): a unique identifier for the appointment.
locator (:obj:`str`): the appointment locator provided by the user (16-byte hex-encoded). locator (:obj:`str`): the appointment locator provided by the user (16-byte hex-encoded).
@@ -278,9 +256,6 @@ class Responder:
"New tracker added", dispute_txid=dispute_txid, penalty_txid=penalty_txid, appointment_end=appointment_end "New tracker added", dispute_txid=dispute_txid, penalty_txid=penalty_txid, appointment_end=appointment_end
) )
if self.asleep:
self.awake()
def do_watch(self): def do_watch(self):
""" """
Monitors the blockchain whilst there are pending trackers. Monitors the blockchain whilst there are pending trackers.
@@ -293,20 +268,17 @@ class Responder:
if self.last_known_block is None: if self.last_known_block is None:
self.last_known_block = BlockProcessor.get_best_block_hash() self.last_known_block = BlockProcessor.get_best_block_hash()
while len(self.trackers) > 0: while True:
# We get notified for every new received block
block_hash = self.block_queue.get() block_hash = self.block_queue.get()
block = BlockProcessor.get_block(block_hash) block = BlockProcessor.get_block(block_hash)
logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"))
if block is not None: if len(self.trackers) > 0 and block is not None:
txs = block.get("tx") txids = block.get("tx")
logger.info("List of transactions", txids=txids)
logger.info(
"New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"), txs=txs
)
if self.last_known_block == block.get("previousblockhash"): if self.last_known_block == block.get("previousblockhash"):
self.check_confirmations(txs) self.check_confirmations(txids)
height = block.get("height") height = block.get("height")
completed_trackers = self.get_completed_trackers(height) completed_trackers = self.get_completed_trackers(height)
@@ -328,16 +300,16 @@ class Responder:
# ToDo: #24-properly-handle-reorgs # ToDo: #24-properly-handle-reorgs
self.handle_reorgs(block_hash) self.handle_reorgs(block_hash)
# Register the last processed block for the responder # Clear the receipts issued in this block
self.db_manager.store_last_block_hash_responder(block_hash) self.carrier.issued_receipts = {}
self.last_known_block = block.get("hash") if len(self.trackers) is 0:
logger.info("No more pending trackers")
# Register the last processed block for the responder
self.db_manager.store_last_block_hash_responder(block_hash)
self.last_known_block = block.get("hash")
self.block_queue.task_done() self.block_queue.task_done()
self.carrier.issued_receipts = {}
# Go back to sleep if there are no more pending trackers
self.sleep()
def check_confirmations(self, txs): def check_confirmations(self, txs):
""" """

View File

@@ -10,7 +10,6 @@ from common.logger import Logger
from pisa import LOG_PREFIX from pisa import LOG_PREFIX
from pisa.cleaner import Cleaner from pisa.cleaner import Cleaner
from pisa.responder import Responder
from pisa.block_processor import BlockProcessor from pisa.block_processor import BlockProcessor
logger = Logger(actor="Watcher", log_name_prefix=LOG_PREFIX) logger = Logger(actor="Watcher", log_name_prefix=LOG_PREFIX)
@@ -33,13 +32,10 @@ class Watcher:
Args: Args:
db_manager (:obj:`DBManager <pisa.db_manager>`): a ``DBManager`` instance to interact with the database. db_manager (:obj:`DBManager <pisa.db_manager>`): a ``DBManager`` instance to interact with the database.
chain_monitor (:obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`): a ``ChainMonitor`` instance used to track
new blocks received by ``bitcoind``.
sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance). sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance).
config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve
``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``. ``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``.
responder (:obj:`Responder <pisa.responder.Responder>`): a ``Responder`` instance. If ``None`` is passed, a new responder (:obj:`Responder <pisa.responder.Responder>`): a ``Responder`` instance.
instance is created. Populated instances are useful when bootstrapping the system from backed-up data.
Attributes: Attributes:
@@ -48,11 +44,8 @@ class Watcher:
It's populated trough ``add_appointment``. It's populated trough ``add_appointment``.
locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several
appointments with the same ``locator``. appointments with the same ``locator``.
asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake.
block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is
populated by the :obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`. populated by the :obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`.
chain_monitor (:obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`): a ``ChainMonitor`` instance used to track
new blocks received by ``bitcoind``.
config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve
``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``. ``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``.
db_manager (:obj:`DBManager <pisa.db_manager>`): A db manager instance to interact with the database. db_manager (:obj:`DBManager <pisa.db_manager>`): A db manager instance to interact with the database.
@@ -63,41 +56,27 @@ class Watcher:
""" """
def __init__(self, db_manager, chain_monitor, sk_der, config, responder=None): def __init__(self, db_manager, responder, sk_der, config):
self.appointments = dict() self.appointments = dict()
self.locator_uuid_map = dict() self.locator_uuid_map = dict()
self.asleep = True
self.block_queue = Queue() self.block_queue = Queue()
self.chain_monitor = chain_monitor
self.config = config self.config = config
self.db_manager = db_manager self.db_manager = db_manager
self.responder = responder
self.signing_key = Cryptographer.load_private_key_der(sk_der) self.signing_key = Cryptographer.load_private_key_der(sk_der)
if not isinstance(responder, Responder):
self.responder = Responder(db_manager, chain_monitor)
def awake(self): def awake(self):
self.asleep = False watcher_thread = Thread(target=self.do_watch, daemon=True)
self.chain_monitor.watcher_asleep = False watcher_thread.start()
watcher_thread = Thread(target=self.do_watch, daemon=True).start()
logger.info("Waking up")
return watcher_thread return watcher_thread
def sleep(self):
self.asleep = True
self.chain_monitor.watcher_asleep = True
logger.info("No more pending appointments, going back to sleep")
def add_appointment(self, appointment): def add_appointment(self, appointment):
""" """
Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached. Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached.
``add_appointment`` is the entry point of the Watcher. Upon receiving a new appointment, if the :obj:`Watcher` ``add_appointment`` is the entry point of the Watcher. Upon receiving a new appointment it will start monitoring
is asleep, it will be awaken and start monitoring the blockchain (``do_watch``) until ``appointments`` is empty. the blockchain (``do_watch``) until ``appointments`` is empty.
It will go back to sleep once there are no more pending appointments.
Once a breach is seen on the blockchain, the :obj:`Watcher` will decrypt the corresponding Once a breach is seen on the blockchain, the :obj:`Watcher` will decrypt the corresponding
:obj:`EncryptedBlob <pisa.encrypted_blob.EncryptedBlob>` and pass the information to the :obj:`EncryptedBlob <pisa.encrypted_blob.EncryptedBlob>` and pass the information to the
@@ -132,9 +111,6 @@ class Watcher:
else: else:
self.locator_uuid_map[appointment.locator] = [uuid] self.locator_uuid_map[appointment.locator] = [uuid]
if self.asleep:
self.awake()
self.db_manager.store_watcher_appointment(uuid, appointment.to_json()) self.db_manager.store_watcher_appointment(uuid, appointment.to_json())
self.db_manager.create_append_locator_map(appointment.locator, uuid) self.db_manager.create_append_locator_map(appointment.locator, uuid)
@@ -159,15 +135,13 @@ class Watcher:
:obj:`Responder <pisa.responder.Responder>` upon detecting a breach. :obj:`Responder <pisa.responder.Responder>` upon detecting a breach.
""" """
while len(self.appointments) > 0: while True:
block_hash = self.block_queue.get() block_hash = self.block_queue.get()
logger.info("New block received", block_hash=block_hash)
block = BlockProcessor.get_block(block_hash) block = BlockProcessor.get_block(block_hash)
logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"))
if block is not None: if len(self.appointments) > 0 and block is not None:
txids = block.get("tx") txids = block.get("tx")
logger.info("List of transactions", txids=txids) logger.info("List of transactions", txids=txids)
expired_appointments = [ expired_appointments = [
@@ -203,7 +177,7 @@ class Watcher:
block_hash, block_hash,
) )
# FIXME: This is only necessary because of the triggered appointment approach. Fix if it changes. # FIXME: Only necessary because of the triggered appointment approach. Fix if it changes.
if receipt.delivered: if receipt.delivered:
Cleaner.delete_appointment_from_memory(uuid, self.appointments, self.locator_uuid_map) Cleaner.delete_appointment_from_memory(uuid, self.appointments, self.locator_uuid_map)
@@ -219,14 +193,13 @@ class Watcher:
appointments_to_delete, self.appointments, self.locator_uuid_map, self.db_manager appointments_to_delete, self.appointments, self.locator_uuid_map, self.db_manager
) )
# Register the last processed block for the watcher if len(self.appointments) is 0:
self.db_manager.store_last_block_hash_watcher(block_hash) logger.info("No more pending appointments")
# Register the last processed block for the watcher
self.db_manager.store_last_block_hash_watcher(block_hash)
self.block_queue.task_done() self.block_queue.task_done()
# Go back to sleep if there are no more appointments
self.sleep()
def get_breaches(self, txids): def get_breaches(self, txids):
""" """
Gets a list of channel breaches given the list of transaction ids. Gets a list of channel breaches given the list of transaction ids.