Merge pull request #93 from sr-gi/remove-sleep

Remove sleep
This commit is contained in:
Sergi Delgado Segura
2020-02-10 16:27:23 +01:00
committed by GitHub
11 changed files with 169 additions and 435 deletions

View File

@@ -120,7 +120,6 @@ class Builder:
set(missed_blocks_responder).difference(missed_blocks_watcher), key=missed_blocks_responder.index set(missed_blocks_responder).difference(missed_blocks_watcher), key=missed_blocks_responder.index
) )
Builder.populate_block_queue(watcher.responder.block_queue, block_diff) Builder.populate_block_queue(watcher.responder.block_queue, block_diff)
watcher.responder.awake()
watcher.responder.block_queue.join() watcher.responder.block_queue.join()
elif len(missed_blocks_watcher) > len(missed_blocks_responder): elif len(missed_blocks_watcher) > len(missed_blocks_responder):
@@ -128,27 +127,12 @@ class Builder:
set(missed_blocks_watcher).difference(missed_blocks_responder), key=missed_blocks_watcher.index set(missed_blocks_watcher).difference(missed_blocks_responder), key=missed_blocks_watcher.index
) )
Builder.populate_block_queue(watcher.block_queue, block_diff) Builder.populate_block_queue(watcher.block_queue, block_diff)
watcher.awake()
watcher.block_queue.join() watcher.block_queue.join()
# Awake the actors if they are asleep and have pending work. No new inputs are provided, so if the Watcher is # Once they are at the same height, we update them one by one
# asleep it will remain asleep. However, the Responder may come and go to sleep since it will be awaken if
# appointments are passed trough from the Watcher.
if watcher.appointments and watcher.asleep:
watcher.awake()
if watcher.responder.trackers and watcher.responder.asleep:
watcher.responder.awake()
for block in missed_blocks_watcher: for block in missed_blocks_watcher:
if not watcher.asleep: watcher.block_queue.put(block)
watcher.block_queue.put(block) watcher.block_queue.join()
watcher.block_queue.join()
if not watcher.responder.asleep: watcher.responder.block_queue.put(block)
watcher.responder.block_queue.put(block) watcher.responder.block_queue.join()
watcher.responder.block_queue.join()
else:
# The Responder keeps track of last know block for reorgs, so it has to be updated even if there're no
# trackers
watcher.responder.last_known_block = block

View File

@@ -19,6 +19,10 @@ class ChainMonitor:
The :class:`ChainMonitor` monitors the chain using two methods: ``zmq`` and ``polling``. Blocks are only notified The :class:`ChainMonitor` monitors the chain using two methods: ``zmq`` and ``polling``. Blocks are only notified
once per queue and the notification is triggered by the method that detects the block faster. once per queue and the notification is triggered by the method that detects the block faster.
Args:
watcher_queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Watcher``.
responder_queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Responder``.
Attributes: Attributes:
best_tip (:obj:`str`): a block hash representing the current best tip. best_tip (:obj:`str`): a block hash representing the current best tip.
last_tips (:obj:`list`): a list of last chain tips. Used as a sliding window to avoid notifying about old tips. last_tips (:obj:`list`): a list of last chain tips. Used as a sliding window to avoid notifying about old tips.
@@ -30,11 +34,9 @@ class ChainMonitor:
watcher_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Watcher <pisa.watcher.Watcher>`. watcher_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Watcher <pisa.watcher.Watcher>`.
responder_queue (:obj:`Queue`): a queue to send new best tips to the responder_queue (:obj:`Queue`): a queue to send new best tips to the
:obj:`Responder <pisa.responder.Responder>`. :obj:`Responder <pisa.responder.Responder>`.
watcher_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Watcher`` or not.
responder_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Responder`` or not.
""" """
def __init__(self): def __init__(self, watcher_queue, responder_queue):
self.best_tip = None self.best_tip = None
self.last_tips = [] self.last_tips = []
self.terminate = False self.terminate = False
@@ -48,53 +50,21 @@ class ChainMonitor:
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT))
self.watcher_queue = None self.watcher_queue = watcher_queue
self.responder_queue = None self.responder_queue = responder_queue
self.watcher_asleep = True
self.responder_asleep = True
def attach_watcher(self, queue, asleep):
"""
Attaches a :obj:`Watcher <pisa.watcher.Watcher>` to the :class:`ChainMonitor`. The ``Watcher`` and the
``ChainMonitor`` are connected via the ``watcher_queue`` and the ``watcher_asleep`` flag.
Args:
queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Watcher``.
asleep( :obj:`bool`): whether the ``Watcher`` is initially awake or asleep. It is changed on the fly from
the ``Watcher`` when the state changes.
"""
self.watcher_queue = queue
self.watcher_asleep = asleep
def attach_responder(self, queue, asleep):
"""
Attaches a :obj:`Responder <pisa.responder.Responder>` to the :class:`ChainMonitor`. The ``Responder`` and the
``ChainMonitor`` are connected via the ``responder_queue`` and the ``responder_asleep`` flag.
Args:
queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Responder``.
asleep( :obj:`bool`): whether the ``Responder`` is initially awake or asleep. It is changed on the fly from
the ``Responder`` when the state changes.
"""
self.responder_queue = queue
self.responder_asleep = asleep
def notify_subscribers(self, block_hash): def notify_subscribers(self, block_hash):
""" """
Notifies the subscribers (``Watcher`` and ``Responder``) about a new block provided they are awake. It does so Notifies the subscribers (``Watcher`` and ``Responder``) about a new block. It does so by putting the hash in
by putting the hash in the corresponding queue(s). the corresponding queue(s).
Args: Args:
block_hash (:obj:`str`): the new block hash to be sent to the awake subscribers. block_hash (:obj:`str`): the new block hash to be sent to the subscribers.
block_hash (:obj:`str`): the new block hash to be sent to the subscribers.
""" """
if not self.watcher_asleep: self.watcher_queue.put(block_hash)
self.watcher_queue.put(block_hash) self.responder_queue.put(block_hash)
if not self.responder_asleep:
self.responder_queue.put(block_hash)
def update_state(self, block_hash, max_block_window_size=BLOCK_WINDOW_SIZE): def update_state(self, block_hash, max_block_window_size=BLOCK_WINDOW_SIZE):
""" """

View File

@@ -8,6 +8,7 @@ from pisa import config, LOG_PREFIX
from pisa.api import API from pisa.api import API
from pisa.watcher import Watcher from pisa.watcher import Watcher
from pisa.builder import Builder from pisa.builder import Builder
from pisa.responder import Responder
from pisa.db_manager import DBManager from pisa.db_manager import DBManager
from pisa.chain_monitor import ChainMonitor from pisa.chain_monitor import ChainMonitor
from pisa.block_processor import BlockProcessor from pisa.block_processor import BlockProcessor
@@ -43,33 +44,50 @@ def main():
else: else:
try: try:
with open(config.get("PISA_SECRET_KEY"), "rb") as key_file:
secret_key_der = key_file.read()
watcher = Watcher(db_manager, Responder(db_manager), secret_key_der, config)
# Create the chain monitor and start monitoring the chain # Create the chain monitor and start monitoring the chain
chain_monitor = ChainMonitor() chain_monitor = ChainMonitor(watcher.block_queue, watcher.responder.block_queue)
chain_monitor.monitor_chain()
watcher_appointments_data = db_manager.load_watcher_appointments() watcher_appointments_data = db_manager.load_watcher_appointments()
responder_trackers_data = db_manager.load_responder_trackers() responder_trackers_data = db_manager.load_responder_trackers()
with open(config.get("PISA_SECRET_KEY"), "rb") as key_file:
secret_key_der = key_file.read()
watcher = Watcher(db_manager, chain_monitor, secret_key_der, config)
chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep)
chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep)
if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0:
logger.info("Fresh bootstrap") logger.info("Fresh bootstrap")
watcher.awake()
watcher.responder.awake()
else: else:
logger.info("Bootstrapping from backed up data") logger.info("Bootstrapping from backed up data")
block_processor = BlockProcessor()
# Update the Watcher backed up data if found.
if len(watcher_appointments_data) != 0:
watcher.appointments, watcher.locator_uuid_map = Builder.build_appointments(
watcher_appointments_data
)
# Update the Responder with backed up data if found.
if len(responder_trackers_data) != 0:
watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers(
responder_trackers_data
)
# Awaking components so the states can be updated.
watcher.awake()
watcher.responder.awake()
last_block_watcher = db_manager.load_last_block_hash_watcher() last_block_watcher = db_manager.load_last_block_hash_watcher()
last_block_responder = db_manager.load_last_block_hash_responder() last_block_responder = db_manager.load_last_block_hash_responder()
# Populate the block queues with data if they've missed some while offline. If the blocks of both match
# we don't perform the search twice.
block_processor = BlockProcessor()
# FIXME: 32-reorgs-offline dropped txs are not used at this point. # FIXME: 32-reorgs-offline dropped txs are not used at this point.
# Get the blocks missed by both the Watcher and the Responder. If the blocks of both match we don't
# perform the search twice.
last_common_ancestor_watcher, dropped_txs_watcher = block_processor.find_last_common_ancestor( last_common_ancestor_watcher, dropped_txs_watcher = block_processor.find_last_common_ancestor(
last_block_watcher last_block_watcher
) )
@@ -85,40 +103,22 @@ def main():
) )
missed_blocks_responder = block_processor.get_missed_blocks(last_common_ancestor_responder) missed_blocks_responder = block_processor.get_missed_blocks(last_common_ancestor_responder)
# Build and update the Watcher.
if len(watcher_appointments_data) != 0:
watcher.appointments, watcher.locator_uuid_map = Builder.build_appointments(
watcher_appointments_data
)
# Build Responder with backed up data if found
if len(responder_trackers_data) != 0:
watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers(
responder_trackers_data
)
# If only one of the instances needs to be updated, it can be done separately. # If only one of the instances needs to be updated, it can be done separately.
if len(missed_blocks_watcher) == 0 and len(missed_blocks_responder) != 0: if len(missed_blocks_watcher) == 0 and len(missed_blocks_responder) != 0:
Builder.populate_block_queue(watcher.responder.block_queue, missed_blocks_responder) Builder.populate_block_queue(watcher.responder.block_queue, missed_blocks_responder)
watcher.responder.awake()
watcher.responder.block_queue.join() watcher.responder.block_queue.join()
elif len(missed_blocks_responder) == 0 and len(missed_blocks_watcher) != 0: elif len(missed_blocks_responder) == 0 and len(missed_blocks_watcher) != 0:
Builder.populate_block_queue(watcher.block_queue, missed_blocks_watcher) Builder.populate_block_queue(watcher.block_queue, missed_blocks_watcher)
watcher.awake()
watcher.block_queue.join() watcher.block_queue.join()
# Otherwise the need to be updated at the same time, block by block # Otherwise they need to be updated at the same time, block by block
elif len(missed_blocks_responder) != 0 and len(missed_blocks_watcher) != 0: elif len(missed_blocks_responder) != 0 and len(missed_blocks_watcher) != 0:
Builder.update_states(watcher, missed_blocks_watcher, missed_blocks_responder) Builder.update_states(watcher, missed_blocks_watcher, missed_blocks_responder)
# Awake the Watcher/Responder if they ended up with pending work # Fire the API and the ChainMonitor
if watcher.appointments and watcher.asleep: # FIXME: 92-block-data-during-bootstrap-db
watcher.awake() chain_monitor.monitor_chain()
if watcher.responder.trackers and watcher.responder.asleep:
watcher.responder.awake()
# Fire the API
API(watcher, config=config).start() API(watcher, config=config).start()
except Exception as e: except Exception as e:
logger.error("An error occurred: {}. Shutting down".format(e)) logger.error("An error occurred: {}. Shutting down".format(e))

View File

@@ -108,11 +108,6 @@ class Responder:
the decrypted ``penalty_txs`` handed by the :obj:`Watcher <pisa.watcher.Watcher>` and ensuring the they make it to the decrypted ``penalty_txs`` handed by the :obj:`Watcher <pisa.watcher.Watcher>` and ensuring the they make it to
the blockchain. the blockchain.
The :class:`Responder` can be in two states:
- Asleep (``self.asleep = True)`` when there are no trackers to take care of (``self.trackers`` is empty).
- Awake (``self.asleep = False)`` when there are trackers to take care of (actively monitoring the blockchain).
Args: Args:
db_manager (:obj:`DBManager <pisa.db_manager.DBManager>`): a ``DBManager`` instance to interact with the db_manager (:obj:`DBManager <pisa.db_manager.DBManager>`): a ``DBManager`` instance to interact with the
database. database.
@@ -126,41 +121,29 @@ class Responder:
unconfirmed_txs (:obj:`list`): A list that keeps track of all unconfirmed ``penalty_txs``. unconfirmed_txs (:obj:`list`): A list that keeps track of all unconfirmed ``penalty_txs``.
missed_confirmations (:obj:`dict`): A dictionary that keeps count of how many confirmations each ``penalty_tx`` missed_confirmations (:obj:`dict`): A dictionary that keeps count of how many confirmations each ``penalty_tx``
has missed. Used to trigger rebroadcast if needed. has missed. Used to trigger rebroadcast if needed.
asleep (:obj:`bool`): A flag that signals whether the :obj:`Responder` is asleep or awake.
block_queue (:obj:`Queue`): A queue used by the :obj:`Responder` to receive block hashes from ``bitcoind``. It block_queue (:obj:`Queue`): A queue used by the :obj:`Responder` to receive block hashes from ``bitcoind``. It
is populated by the :obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`. is populated by the :obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`.
chain_monitor (:obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`): a ``ChainMonitor`` instance used to track
new blocks received by ``bitcoind``.
db_manager (:obj:`DBManager <pisa.db_manager.DBManager>`): A ``DBManager`` instance to interact with the db_manager (:obj:`DBManager <pisa.db_manager.DBManager>`): A ``DBManager`` instance to interact with the
database. database.
""" """
def __init__(self, db_manager, chain_monitor): def __init__(self, db_manager):
self.trackers = dict() self.trackers = dict()
self.tx_tracker_map = dict() self.tx_tracker_map = dict()
self.unconfirmed_txs = [] self.unconfirmed_txs = []
self.missed_confirmations = dict() self.missed_confirmations = dict()
self.asleep = True
self.block_queue = Queue() self.block_queue = Queue()
self.chain_monitor = chain_monitor
self.db_manager = db_manager self.db_manager = db_manager
self.carrier = Carrier() self.carrier = Carrier()
self.last_known_block = db_manager.load_last_block_hash_responder() self.last_known_block = db_manager.load_last_block_hash_responder()
def awake(self): def awake(self):
self.asleep = False responder_thread = Thread(target=self.do_watch, daemon=True)
self.chain_monitor.responder_asleep = False responder_thread.start()
responder_thread = Thread(target=self.do_watch, daemon=True).start()
return responder_thread return responder_thread
def sleep(self):
self.asleep = True
self.chain_monitor.responder_asleep = True
logger.info("No more pending trackers, going back to sleep")
@staticmethod @staticmethod
def on_sync(block_hash): def on_sync(block_hash):
""" """
@@ -212,9 +195,6 @@ class Responder:
into the blockchain. into the blockchain.
""" """
if self.asleep:
logger.info("Waking up")
receipt = self.carrier.send_transaction(penalty_rawtx, penalty_txid) receipt = self.carrier.send_transaction(penalty_rawtx, penalty_txid)
if receipt.delivered: if receipt.delivered:
@@ -239,8 +219,6 @@ class Responder:
``penalty_txid`` added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, all the data is stored in the ``penalty_txid`` added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, all the data is stored in the
database. database.
``add_tracker`` awakes the :obj:`Responder` if it is asleep.
Args: Args:
uuid (:obj:`str`): a unique identifier for the appointment. uuid (:obj:`str`): a unique identifier for the appointment.
locator (:obj:`str`): the appointment locator provided by the user (16-byte hex-encoded). locator (:obj:`str`): the appointment locator provided by the user (16-byte hex-encoded).
@@ -278,9 +256,6 @@ class Responder:
"New tracker added", dispute_txid=dispute_txid, penalty_txid=penalty_txid, appointment_end=appointment_end "New tracker added", dispute_txid=dispute_txid, penalty_txid=penalty_txid, appointment_end=appointment_end
) )
if self.asleep:
self.awake()
def do_watch(self): def do_watch(self):
""" """
Monitors the blockchain whilst there are pending trackers. Monitors the blockchain whilst there are pending trackers.
@@ -293,20 +268,17 @@ class Responder:
if self.last_known_block is None: if self.last_known_block is None:
self.last_known_block = BlockProcessor.get_best_block_hash() self.last_known_block = BlockProcessor.get_best_block_hash()
while len(self.trackers) > 0: while True:
# We get notified for every new received block
block_hash = self.block_queue.get() block_hash = self.block_queue.get()
block = BlockProcessor.get_block(block_hash) block = BlockProcessor.get_block(block_hash)
logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"))
if block is not None: if len(self.trackers) > 0 and block is not None:
txs = block.get("tx") txids = block.get("tx")
logger.info("List of transactions", txids=txids)
logger.info(
"New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"), txs=txs
)
if self.last_known_block == block.get("previousblockhash"): if self.last_known_block == block.get("previousblockhash"):
self.check_confirmations(txs) self.check_confirmations(txids)
height = block.get("height") height = block.get("height")
completed_trackers = self.get_completed_trackers(height) completed_trackers = self.get_completed_trackers(height)
@@ -328,16 +300,16 @@ class Responder:
# ToDo: #24-properly-handle-reorgs # ToDo: #24-properly-handle-reorgs
self.handle_reorgs(block_hash) self.handle_reorgs(block_hash)
# Register the last processed block for the responder # Clear the receipts issued in this block
self.db_manager.store_last_block_hash_responder(block_hash) self.carrier.issued_receipts = {}
self.last_known_block = block.get("hash") if len(self.trackers) is 0:
logger.info("No more pending trackers")
# Register the last processed block for the responder
self.db_manager.store_last_block_hash_responder(block_hash)
self.last_known_block = block.get("hash")
self.block_queue.task_done() self.block_queue.task_done()
self.carrier.issued_receipts = {}
# Go back to sleep if there are no more pending trackers
self.sleep()
def check_confirmations(self, txs): def check_confirmations(self, txs):
""" """

View File

@@ -10,7 +10,6 @@ from common.logger import Logger
from pisa import LOG_PREFIX from pisa import LOG_PREFIX
from pisa.cleaner import Cleaner from pisa.cleaner import Cleaner
from pisa.responder import Responder
from pisa.block_processor import BlockProcessor from pisa.block_processor import BlockProcessor
logger = Logger(actor="Watcher", log_name_prefix=LOG_PREFIX) logger = Logger(actor="Watcher", log_name_prefix=LOG_PREFIX)
@@ -33,13 +32,10 @@ class Watcher:
Args: Args:
db_manager (:obj:`DBManager <pisa.db_manager>`): a ``DBManager`` instance to interact with the database. db_manager (:obj:`DBManager <pisa.db_manager>`): a ``DBManager`` instance to interact with the database.
chain_monitor (:obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`): a ``ChainMonitor`` instance used to track
new blocks received by ``bitcoind``.
sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance). sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance).
config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve
``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``. ``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``.
responder (:obj:`Responder <pisa.responder.Responder>`): a ``Responder`` instance. If ``None`` is passed, a new responder (:obj:`Responder <pisa.responder.Responder>`): a ``Responder`` instance.
instance is created. Populated instances are useful when bootstrapping the system from backed-up data.
Attributes: Attributes:
@@ -48,11 +44,8 @@ class Watcher:
It's populated trough ``add_appointment``. It's populated trough ``add_appointment``.
locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several
appointments with the same ``locator``. appointments with the same ``locator``.
asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake.
block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is
populated by the :obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`. populated by the :obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`.
chain_monitor (:obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`): a ``ChainMonitor`` instance used to track
new blocks received by ``bitcoind``.
config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve
``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``. ``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``.
db_manager (:obj:`DBManager <pisa.db_manager>`): A db manager instance to interact with the database. db_manager (:obj:`DBManager <pisa.db_manager>`): A db manager instance to interact with the database.
@@ -63,41 +56,27 @@ class Watcher:
""" """
def __init__(self, db_manager, chain_monitor, sk_der, config, responder=None): def __init__(self, db_manager, responder, sk_der, config):
self.appointments = dict() self.appointments = dict()
self.locator_uuid_map = dict() self.locator_uuid_map = dict()
self.asleep = True
self.block_queue = Queue() self.block_queue = Queue()
self.chain_monitor = chain_monitor
self.config = config self.config = config
self.db_manager = db_manager self.db_manager = db_manager
self.responder = responder
self.signing_key = Cryptographer.load_private_key_der(sk_der) self.signing_key = Cryptographer.load_private_key_der(sk_der)
if not isinstance(responder, Responder):
self.responder = Responder(db_manager, chain_monitor)
def awake(self): def awake(self):
self.asleep = False watcher_thread = Thread(target=self.do_watch, daemon=True)
self.chain_monitor.watcher_asleep = False watcher_thread.start()
watcher_thread = Thread(target=self.do_watch, daemon=True).start()
logger.info("Waking up")
return watcher_thread return watcher_thread
def sleep(self):
self.asleep = True
self.chain_monitor.watcher_asleep = True
logger.info("No more pending appointments, going back to sleep")
def add_appointment(self, appointment): def add_appointment(self, appointment):
""" """
Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached. Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached.
``add_appointment`` is the entry point of the Watcher. Upon receiving a new appointment, if the :obj:`Watcher` ``add_appointment`` is the entry point of the Watcher. Upon receiving a new appointment it will start monitoring
is asleep, it will be awaken and start monitoring the blockchain (``do_watch``) until ``appointments`` is empty. the blockchain (``do_watch``) until ``appointments`` is empty.
It will go back to sleep once there are no more pending appointments.
Once a breach is seen on the blockchain, the :obj:`Watcher` will decrypt the corresponding Once a breach is seen on the blockchain, the :obj:`Watcher` will decrypt the corresponding
:obj:`EncryptedBlob <pisa.encrypted_blob.EncryptedBlob>` and pass the information to the :obj:`EncryptedBlob <pisa.encrypted_blob.EncryptedBlob>` and pass the information to the
@@ -132,9 +111,6 @@ class Watcher:
else: else:
self.locator_uuid_map[appointment.locator] = [uuid] self.locator_uuid_map[appointment.locator] = [uuid]
if self.asleep:
self.awake()
self.db_manager.store_watcher_appointment(uuid, appointment.to_json()) self.db_manager.store_watcher_appointment(uuid, appointment.to_json())
self.db_manager.create_append_locator_map(appointment.locator, uuid) self.db_manager.create_append_locator_map(appointment.locator, uuid)
@@ -159,15 +135,13 @@ class Watcher:
:obj:`Responder <pisa.responder.Responder>` upon detecting a breach. :obj:`Responder <pisa.responder.Responder>` upon detecting a breach.
""" """
while len(self.appointments) > 0: while True:
block_hash = self.block_queue.get() block_hash = self.block_queue.get()
logger.info("New block received", block_hash=block_hash)
block = BlockProcessor.get_block(block_hash) block = BlockProcessor.get_block(block_hash)
logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"))
if block is not None: if len(self.appointments) > 0 and block is not None:
txids = block.get("tx") txids = block.get("tx")
logger.info("List of transactions", txids=txids) logger.info("List of transactions", txids=txids)
expired_appointments = [ expired_appointments = [
@@ -203,7 +177,7 @@ class Watcher:
block_hash, block_hash,
) )
# FIXME: This is only necessary because of the triggered appointment approach. Fix if it changes. # FIXME: Only necessary because of the triggered appointment approach. Fix if it changes.
if receipt.delivered: if receipt.delivered:
Cleaner.delete_appointment_from_memory(uuid, self.appointments, self.locator_uuid_map) Cleaner.delete_appointment_from_memory(uuid, self.appointments, self.locator_uuid_map)
@@ -219,14 +193,13 @@ class Watcher:
appointments_to_delete, self.appointments, self.locator_uuid_map, self.db_manager appointments_to_delete, self.appointments, self.locator_uuid_map, self.db_manager
) )
# Register the last processed block for the watcher if len(self.appointments) is 0:
self.db_manager.store_last_block_hash_watcher(block_hash) logger.info("No more pending appointments")
# Register the last processed block for the watcher
self.db_manager.store_last_block_hash_watcher(block_hash)
self.block_queue.task_done() self.block_queue.task_done()
# Go back to sleep if there are no more appointments
self.sleep()
def get_breaches(self, txids): def get_breaches(self, txids):
""" """
Gets a list of channel breaches given the list of transaction ids. Gets a list of channel breaches given the list of transaction ids.

View File

@@ -15,7 +15,6 @@ from apps.cli.blob import Blob
from pisa.responder import TransactionTracker from pisa.responder import TransactionTracker
from pisa.tools import bitcoin_cli from pisa.tools import bitcoin_cli
from pisa.db_manager import DBManager from pisa.db_manager import DBManager
from pisa.chain_monitor import ChainMonitor
from common.appointment import Appointment from common.appointment import Appointment
from common.tools import compute_locator from common.tools import compute_locator
@@ -46,23 +45,13 @@ def prng_seed():
def db_manager(): def db_manager():
manager = DBManager("test_db") manager = DBManager("test_db")
# Add last know block for the Responder in the db # Add last know block for the Responder in the db
yield manager yield manager
manager.db.close() manager.db.close()
rmtree("test_db") rmtree("test_db")
@pytest.fixture(scope="module")
def chain_monitor():
chain_monitor = ChainMonitor()
chain_monitor.monitor_chain()
yield chain_monitor
chain_monitor.terminate = True
generate_block()
def generate_keypair(): def generate_keypair():
client_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) client_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
client_pk = client_sk.public_key() client_pk = client_sk.public_key()

View File

@@ -7,8 +7,10 @@ from cryptography.hazmat.primitives import serialization
from pisa.api import API from pisa.api import API
from pisa.watcher import Watcher from pisa.watcher import Watcher
from pisa.responder import Responder
from pisa.tools import bitcoin_cli from pisa.tools import bitcoin_cli
from pisa import HOST, PORT from pisa import HOST, PORT
from pisa.chain_monitor import ChainMonitor
from test.pisa.unit.conftest import ( from test.pisa.unit.conftest import (
generate_block, generate_block,
@@ -32,7 +34,7 @@ config = get_config()
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def run_api(db_manager, chain_monitor): def run_api(db_manager):
sk, pk = generate_keypair() sk, pk = generate_keypair()
sk_der = sk.private_bytes( sk_der = sk.private_bytes(
encoding=serialization.Encoding.DER, encoding=serialization.Encoding.DER,
@@ -40,9 +42,10 @@ def run_api(db_manager, chain_monitor):
encryption_algorithm=serialization.NoEncryption(), encryption_algorithm=serialization.NoEncryption(),
) )
watcher = Watcher(db_manager, chain_monitor, sk_der, get_config()) watcher = Watcher(db_manager, Responder(db_manager), sk_der, get_config())
chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) chain_monitor = ChainMonitor(watcher.block_queue, watcher.responder.block_queue)
chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) watcher.awake()
chain_monitor.monitor_chain()
api_thread = Thread(target=API(watcher, config).start) api_thread = Thread(target=API(watcher, config).start)
api_thread.daemon = True api_thread.daemon = True

View File

@@ -4,6 +4,7 @@ from queue import Queue
from pisa.builder import Builder from pisa.builder import Builder
from pisa.watcher import Watcher from pisa.watcher import Watcher
from pisa.responder import Responder
from test.pisa.unit.conftest import ( from test.pisa.unit.conftest import (
get_random_value_hex, get_random_value_hex,
generate_dummy_appointment, generate_dummy_appointment,
@@ -89,7 +90,7 @@ def test_populate_block_queue():
def test_update_states_empty_list(db_manager): def test_update_states_empty_list(db_manager):
w = Watcher(db_manager=db_manager, chain_monitor=None, sk_der=None, config=None) w = Watcher(db_manager=db_manager, responder=Responder(db_manager), sk_der=None, config=None)
missed_blocks_watcher = [] missed_blocks_watcher = []
missed_blocks_responder = [get_random_value_hex(32)] missed_blocks_responder = [get_random_value_hex(32)]
@@ -102,121 +103,35 @@ def test_update_states_empty_list(db_manager):
Builder.update_states(w, missed_blocks_responder, missed_blocks_watcher) Builder.update_states(w, missed_blocks_responder, missed_blocks_watcher)
def test_update_states_different_sizes(run_bitcoind, db_manager, chain_monitor): def test_update_states_responder_misses_more(run_bitcoind, db_manager):
w = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config()) w = Watcher(db_manager=db_manager, responder=Responder(db_manager), sk_der=None, config=get_config())
chain_monitor.attach_watcher(w.responder, True)
chain_monitor.attach_responder(w.responder, True)
# For the states to be updated data needs to be present in the actors (either appointments or trackers).
# Let's start from the Watcher. We add one appointment and mine some blocks that both are gonna miss.
w.appointments[uuid4().hex] = {"locator": get_random_value_hex(16), "end_time": 200}
blocks = [] blocks = []
for _ in range(5): for _ in range(5):
generate_block() generate_block()
blocks.append(bitcoin_cli().getbestblockhash()) blocks.append(bitcoin_cli().getbestblockhash())
# Updating the states should bring both to the same last known block. The Watcher's is stored in the db since it has # Updating the states should bring both to the same last known block.
# gone over do_watch, whereas the Responders in only updated by update state. w.awake()
w.responder.awake()
Builder.update_states(w, blocks, blocks[1:]) Builder.update_states(w, blocks, blocks[1:])
assert db_manager.load_last_block_hash_watcher() == blocks[-1] assert db_manager.load_last_block_hash_watcher() == blocks[-1]
assert w.responder.last_known_block == blocks[-1] assert w.responder.last_known_block == blocks[-1]
# If both have work, both last known blocks are updated
w.sleep()
w.responder.sleep()
w.responder.trackers[uuid4().hex] = { def test_update_states_watcher_misses_more(run_bitcoind, db_manager):
"penalty_txid": get_random_value_hex(32), # Same as before, but data is now in the Responder
"locator": get_random_value_hex(16), w = Watcher(db_manager=db_manager, responder=Responder(db_manager), sk_der=None, config=get_config())
"appointment_end": 200,
}
blocks = [] blocks = []
for _ in range(5): for _ in range(5):
generate_block() generate_block()
blocks.append(bitcoin_cli().getbestblockhash()) blocks.append(bitcoin_cli().getbestblockhash())
w.awake()
w.responder.awake()
Builder.update_states(w, blocks[1:], blocks) Builder.update_states(w, blocks[1:], blocks)
assert db_manager.load_last_block_hash_watcher() == blocks[-1]
assert db_manager.load_last_block_hash_responder() == blocks[-1]
# Let's try the opposite of the first test (Responder with data, Watcher without)
w.sleep()
w.responder.sleep()
w.appointments = {}
last_block_prev = blocks[-1]
blocks = []
for _ in range(5):
generate_block()
blocks.append(bitcoin_cli().getbestblockhash())
# The Responder should have been brought up to date via do_watch, whereas the Watcher's last known block hash't
# change. The Watcher does not keep track of reorgs, so if he has no work to do he does not even update the last
# known block.
Builder.update_states(w, blocks[1:], blocks)
assert db_manager.load_last_block_hash_watcher() == last_block_prev
assert db_manager.load_last_block_hash_responder() == blocks[-1]
def test_update_states_same_sizes(db_manager, chain_monitor):
# The exact same behaviour of the last test is expected here, since different sizes are even using
# populate_block_queue and then run with the same list size.
w = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config())
chain_monitor.attach_watcher(w.responder, True)
chain_monitor.attach_responder(w.responder, True)
# For the states to be updated data needs to be present in the actors (either appointments or trackers).
# Let's start from the Watcher. We add one appointment and mine some blocks that both are gonna miss.
w.appointments[uuid4().hex] = {"locator": get_random_value_hex(16), "end_time": 200}
blocks = []
for _ in range(5):
generate_block()
blocks.append(bitcoin_cli().getbestblockhash())
Builder.update_states(w, blocks, blocks)
assert db_manager.load_last_block_hash_watcher() == blocks[-1]
assert w.responder.last_known_block == blocks[-1]
# If both have work, both last known blocks are updated
w.sleep()
w.responder.sleep()
w.responder.trackers[uuid4().hex] = {
"penalty_txid": get_random_value_hex(32),
"locator": get_random_value_hex(16),
"appointment_end": 200,
}
blocks = []
for _ in range(5):
generate_block()
blocks.append(bitcoin_cli().getbestblockhash())
Builder.update_states(w, blocks, blocks)
assert db_manager.load_last_block_hash_watcher() == blocks[-1] assert db_manager.load_last_block_hash_watcher() == blocks[-1]
assert db_manager.load_last_block_hash_responder() == blocks[-1] assert db_manager.load_last_block_hash_responder() == blocks[-1]
# Let's try the opposite of the first test (Responder with data, Watcher without)
w.sleep()
w.responder.sleep()
w.appointments = {}
last_block_prev = blocks[-1]
blocks = []
for _ in range(5):
generate_block()
blocks.append(bitcoin_cli().getbestblockhash())
# The Responder should have been brought up to date via do_watch, whereas the Watcher's last known block hash't
# change. The Watcher does not keep track of reorgs, so if he has no work to do he does not even update the last
# known block.
Builder.update_states(w, blocks, blocks)
assert db_manager.load_last_block_hash_watcher() == last_block_prev
assert db_manager.load_last_block_hash_responder() == blocks[-1]

View File

@@ -1,20 +1,19 @@
import zmq import zmq
import time import time
from queue import Queue
from threading import Thread, Event, Condition from threading import Thread, Event, Condition
from pisa.watcher import Watcher
from pisa.responder import Responder
from pisa.block_processor import BlockProcessor from pisa.block_processor import BlockProcessor
from pisa.chain_monitor import ChainMonitor from pisa.chain_monitor import ChainMonitor
from test.pisa.unit.conftest import get_random_value_hex, generate_block, get_config from test.pisa.unit.conftest import get_random_value_hex, generate_block
def test_init(run_bitcoind): def test_init(run_bitcoind):
# run_bitcoind is started here instead of later on to avoid race conditions while it initializes # run_bitcoind is started here instead of later on to avoid race conditions while it initializes
# Not much to test here, just sanity checks to make sure nothing goes south in the future # Not much to test here, just sanity checks to make sure nothing goes south in the future
chain_monitor = ChainMonitor() chain_monitor = ChainMonitor(Queue(), Queue())
assert chain_monitor.best_tip is None assert chain_monitor.best_tip is None
assert isinstance(chain_monitor.last_tips, list) and len(chain_monitor.last_tips) == 0 assert isinstance(chain_monitor.last_tips, list) and len(chain_monitor.last_tips) == 0
@@ -24,41 +23,12 @@ def test_init(run_bitcoind):
assert isinstance(chain_monitor.zmqSubSocket, zmq.Socket) assert isinstance(chain_monitor.zmqSubSocket, zmq.Socket)
# The Queues and asleep flags are initialized when attaching the corresponding subscriber # The Queues and asleep flags are initialized when attaching the corresponding subscriber
assert chain_monitor.watcher_queue is None assert isinstance(chain_monitor.watcher_queue, Queue)
assert chain_monitor.responder_queue is None assert isinstance(chain_monitor.responder_queue, Queue)
assert chain_monitor.watcher_asleep and chain_monitor.responder_asleep
def test_attach_watcher(chain_monitor, db_manager): def test_notify_subscribers():
watcher = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config()) chain_monitor = ChainMonitor(Queue(), Queue())
chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep)
# booleans are not passed as reference in Python, so the flags need to be set separately
assert watcher.asleep == chain_monitor.watcher_asleep
watcher.asleep = False
assert chain_monitor.watcher_asleep != watcher.asleep
# Test that the Queue work
r_hash = get_random_value_hex(32)
chain_monitor.watcher_queue.put(r_hash)
assert watcher.block_queue.get() == r_hash
def test_attach_responder(chain_monitor, db_manager):
responder = Responder(db_manager=db_manager, chain_monitor=chain_monitor)
chain_monitor.attach_responder(responder.block_queue, responder.asleep)
# Same kind of testing as with the attach watcher
assert responder.asleep == chain_monitor.watcher_asleep
responder.asleep = False
assert chain_monitor.watcher_asleep != responder.asleep
r_hash = get_random_value_hex(32)
chain_monitor.responder_queue.put(r_hash)
assert responder.block_queue.get() == r_hash
def test_notify_subscribers(chain_monitor):
# Subscribers are only notified as long as they are awake # Subscribers are only notified as long as they are awake
new_block = get_random_value_hex(32) new_block = get_random_value_hex(32)
@@ -66,27 +36,17 @@ def test_notify_subscribers(chain_monitor):
assert chain_monitor.watcher_queue.empty() assert chain_monitor.watcher_queue.empty()
assert chain_monitor.responder_queue.empty() assert chain_monitor.responder_queue.empty()
chain_monitor.watcher_asleep = True
chain_monitor.responder_asleep = True
chain_monitor.notify_subscribers(new_block)
# And remain empty afterwards since both subscribers were asleep
assert chain_monitor.watcher_queue.empty()
assert chain_monitor.responder_queue.empty()
# Let's flag them as awake and try again
chain_monitor.watcher_asleep = False
chain_monitor.responder_asleep = False
chain_monitor.notify_subscribers(new_block) chain_monitor.notify_subscribers(new_block)
assert chain_monitor.watcher_queue.get() == new_block assert chain_monitor.watcher_queue.get() == new_block
assert chain_monitor.responder_queue.get() == new_block assert chain_monitor.responder_queue.get() == new_block
def test_update_state(chain_monitor): def test_update_state():
# The state is updated after receiving a new block (and only if the block is not already known). # The state is updated after receiving a new block (and only if the block is not already known).
# Let's start by setting a best_tip and a couple of old tips # Let's start by setting a best_tip and a couple of old tips
new_block_hash = get_random_value_hex(32) new_block_hash = get_random_value_hex(32)
chain_monitor = ChainMonitor(Queue(), Queue())
chain_monitor.best_tip = new_block_hash chain_monitor.best_tip = new_block_hash
chain_monitor.last_tips = [get_random_value_hex(32) for _ in range(5)] chain_monitor.last_tips = [get_random_value_hex(32) for _ in range(5)]
@@ -105,12 +65,10 @@ def test_update_state(chain_monitor):
def test_monitor_chain_polling(db_manager): def test_monitor_chain_polling(db_manager):
# Try polling with the Watcher # Try polling with the Watcher
chain_monitor = ChainMonitor() wq = Queue()
chain_monitor = ChainMonitor(wq, Queue())
chain_monitor.best_tip = BlockProcessor.get_best_block_hash() chain_monitor.best_tip = BlockProcessor.get_best_block_hash()
watcher = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config())
chain_monitor.attach_watcher(watcher.block_queue, asleep=False)
# monitor_chain_polling runs until terminate if set # monitor_chain_polling runs until terminate if set
polling_thread = Thread(target=chain_monitor.monitor_chain_polling, kwargs={"polling_delta": 0.1}, daemon=True) polling_thread = Thread(target=chain_monitor.monitor_chain_polling, kwargs={"polling_delta": 0.1}, daemon=True)
polling_thread.start() polling_thread.start()
@@ -131,13 +89,10 @@ def test_monitor_chain_polling(db_manager):
def test_monitor_chain_zmq(db_manager): def test_monitor_chain_zmq(db_manager):
# Try zmq with the Responder rq = Queue()
chain_monitor = ChainMonitor() chain_monitor = ChainMonitor(Queue(), rq)
chain_monitor.best_tip = BlockProcessor.get_best_block_hash() chain_monitor.best_tip = BlockProcessor.get_best_block_hash()
responder = Responder(db_manager=db_manager, chain_monitor=chain_monitor)
chain_monitor.attach_responder(responder.block_queue, asleep=False)
zmq_thread = Thread(target=chain_monitor.monitor_chain_zmq, daemon=True) zmq_thread = Thread(target=chain_monitor.monitor_chain_zmq, daemon=True)
zmq_thread.start() zmq_thread.start()
@@ -150,28 +105,10 @@ def test_monitor_chain_zmq(db_manager):
chain_monitor.responder_queue.get() chain_monitor.responder_queue.get()
assert chain_monitor.responder_queue.empty() assert chain_monitor.responder_queue.empty()
# If we flag it to sleep no notification is sent
chain_monitor.responder_asleep = True
for _ in range(3):
generate_block()
assert chain_monitor.responder_queue.empty()
chain_monitor.terminate = True
# The zmq thread needs a block generation to release from the recv method.
generate_block()
zmq_thread.join()
def test_monitor_chain(db_manager): def test_monitor_chain(db_manager):
# Not much to test here, this should launch two threads (one per monitor approach) and finish on terminate # Not much to test here, this should launch two threads (one per monitor approach) and finish on terminate
chain_monitor = ChainMonitor() chain_monitor = ChainMonitor(Queue(), Queue())
watcher = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config())
responder = Responder(db_manager=db_manager, chain_monitor=chain_monitor)
chain_monitor.attach_responder(responder.block_queue, asleep=False)
chain_monitor.attach_watcher(watcher.block_queue, asleep=False)
chain_monitor.best_tip = None chain_monitor.best_tip = None
chain_monitor.monitor_chain() chain_monitor.monitor_chain()
@@ -196,12 +133,7 @@ def test_monitor_chain(db_manager):
def test_monitor_chain_single_update(db_manager): def test_monitor_chain_single_update(db_manager):
# This test tests that if both threads try to add the same block to the queue, only the first one will make it # This test tests that if both threads try to add the same block to the queue, only the first one will make it
chain_monitor = ChainMonitor() chain_monitor = ChainMonitor(Queue(), Queue())
watcher = Watcher(db_manager=db_manager, chain_monitor=chain_monitor, sk_der=None, config=get_config())
responder = Responder(db_manager=db_manager, chain_monitor=chain_monitor)
chain_monitor.attach_responder(responder.block_queue, asleep=False)
chain_monitor.attach_watcher(watcher.block_queue, asleep=False)
chain_monitor.best_tip = None chain_monitor.best_tip = None

View File

@@ -1,6 +1,7 @@
import json import json
import pytest import pytest
import random import random
from queue import Queue
from uuid import uuid4 from uuid import uuid4
from shutil import rmtree from shutil import rmtree
from copy import deepcopy from copy import deepcopy
@@ -18,17 +19,19 @@ from test.pisa.unit.conftest import generate_block, generate_blocks, get_random_
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def responder(db_manager, chain_monitor): def responder(db_manager):
responder = Responder(db_manager, chain_monitor) responder = Responder(db_manager)
chain_monitor.attach_responder(responder.block_queue, responder.asleep) chain_monitor = ChainMonitor(Queue(), responder.block_queue)
chain_monitor.monitor_chain()
return responder return responder
@pytest.fixture() @pytest.fixture(scope="session")
def temp_db_manager(): def temp_db_manager():
db_name = get_random_value_hex(8) db_name = get_random_value_hex(8)
db_manager = DBManager(db_name) db_manager = DBManager(db_name)
yield db_manager yield db_manager
db_manager.db.close() db_manager.db.close()
@@ -144,19 +147,17 @@ def test_tracker_from_dict_invalid_data():
assert True assert True
def test_init_responder(responder): def test_init_responder(temp_db_manager):
responder = Responder(temp_db_manager)
assert isinstance(responder.trackers, dict) and len(responder.trackers) == 0 assert isinstance(responder.trackers, dict) and len(responder.trackers) == 0
assert isinstance(responder.tx_tracker_map, dict) and len(responder.tx_tracker_map) == 0 assert isinstance(responder.tx_tracker_map, dict) and len(responder.tx_tracker_map) == 0
assert isinstance(responder.unconfirmed_txs, list) and len(responder.unconfirmed_txs) == 0 assert isinstance(responder.unconfirmed_txs, list) and len(responder.unconfirmed_txs) == 0
assert isinstance(responder.missed_confirmations, dict) and len(responder.missed_confirmations) == 0 assert isinstance(responder.missed_confirmations, dict) and len(responder.missed_confirmations) == 0
assert isinstance(responder.chain_monitor, ChainMonitor)
assert responder.block_queue.empty() assert responder.block_queue.empty()
assert responder.asleep is True
def test_handle_breach(db_manager, chain_monitor): def test_handle_breach(db_manager):
responder = Responder(db_manager, chain_monitor) responder = Responder(db_manager)
chain_monitor.attach_responder(responder.block_queue, responder.asleep)
uuid = uuid4().hex uuid = uuid4().hex
tracker = create_dummy_tracker() tracker = create_dummy_tracker()
@@ -174,20 +175,11 @@ def test_handle_breach(db_manager, chain_monitor):
assert receipt.delivered is True assert receipt.delivered is True
# The responder automatically fires add_tracker on adding a tracker if it is asleep. We need to stop the processes
# now. To do so we delete all the trackers, and generate a new block.
responder.trackers = dict()
generate_block()
def test_handle_breach_bad_response(responder):
def test_add_bad_response(responder):
uuid = uuid4().hex uuid = uuid4().hex
tracker = create_dummy_tracker() tracker = create_dummy_tracker()
# Now that the asleep / awake functionality has been tested we can avoid manually killing the responder by setting
# to awake. That will prevent the chain_monitor thread to be launched again.
responder.asleep = False
# A txid instead of a rawtx should be enough for unit tests using the bitcoind mock, better tests are needed though. # A txid instead of a rawtx should be enough for unit tests using the bitcoind mock, better tests are needed though.
tracker.penalty_rawtx = tracker.penalty_txid tracker.penalty_rawtx = tracker.penalty_txid
@@ -206,8 +198,6 @@ def test_add_bad_response(responder):
def test_add_tracker(responder): def test_add_tracker(responder):
# Responder is asleep
for _ in range(20): for _ in range(20):
uuid = uuid4().hex uuid = uuid4().hex
confirmations = 0 confirmations = 0
@@ -236,8 +226,6 @@ def test_add_tracker(responder):
def test_add_tracker_same_penalty_txid(responder): def test_add_tracker_same_penalty_txid(responder):
# Responder is asleep
confirmations = 0 confirmations = 0
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data(random_txid=True) locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data(random_txid=True)
uuid_1 = uuid4().hex uuid_1 = uuid4().hex
@@ -262,8 +250,6 @@ def test_add_tracker_same_penalty_txid(responder):
def test_add_tracker_already_confirmed(responder): def test_add_tracker_already_confirmed(responder):
# Responder is asleep
for i in range(20): for i in range(20):
uuid = uuid4().hex uuid = uuid4().hex
confirmations = i + 1 confirmations = i + 1
@@ -276,10 +262,11 @@ def test_add_tracker_already_confirmed(responder):
assert penalty_txid not in responder.unconfirmed_txs assert penalty_txid not in responder.unconfirmed_txs
def test_do_watch(temp_db_manager, chain_monitor): def test_do_watch(temp_db_manager):
# Create a fresh responder to simplify the test # Create a fresh responder to simplify the test
responder = Responder(temp_db_manager, chain_monitor) responder = Responder(temp_db_manager)
chain_monitor.attach_responder(responder.block_queue, False) chain_monitor = ChainMonitor(Queue(), responder.block_queue)
chain_monitor.monitor_chain()
trackers = [create_dummy_tracker(penalty_rawtx=create_dummy_transaction().hex()) for _ in range(20)] trackers = [create_dummy_tracker(penalty_rawtx=create_dummy_transaction().hex()) for _ in range(20)]
@@ -332,12 +319,12 @@ def test_do_watch(temp_db_manager, chain_monitor):
generate_blocks(6) generate_blocks(6)
assert len(responder.tx_tracker_map) == 0 assert len(responder.tx_tracker_map) == 0
assert responder.asleep is True
def test_check_confirmations(temp_db_manager, chain_monitor): def test_check_confirmations(db_manager):
responder = Responder(temp_db_manager, chain_monitor) responder = Responder(db_manager)
chain_monitor.attach_responder(responder.block_queue, responder.asleep) chain_monitor = ChainMonitor(Queue(), responder.block_queue)
chain_monitor.monitor_chain()
# check_confirmations checks, given a list of transaction for a block, what of the known penalty transaction have # check_confirmations checks, given a list of transaction for a block, what of the known penalty transaction have
# been confirmed. To test this we need to create a list of transactions and the state of the responder # been confirmed. To test this we need to create a list of transactions and the state of the responder
@@ -391,11 +378,12 @@ def test_get_txs_to_rebroadcast(responder):
assert txs_to_rebroadcast == list(txs_missing_too_many_conf.keys()) assert txs_to_rebroadcast == list(txs_missing_too_many_conf.keys())
def test_get_completed_trackers(db_manager, chain_monitor): def test_get_completed_trackers(db_manager):
initial_height = bitcoin_cli().getblockcount() initial_height = bitcoin_cli().getblockcount()
responder = Responder(db_manager, chain_monitor) responder = Responder(db_manager)
chain_monitor.attach_responder(responder.block_queue, responder.asleep) chain_monitor = ChainMonitor(Queue(), responder.block_queue)
chain_monitor.monitor_chain()
# A complete tracker is a tracker that has reached the appointment end with enough confs (> MIN_CONFIRMATIONS) # A complete tracker is a tracker that has reached the appointment end with enough confs (> MIN_CONFIRMATIONS)
# We'll create three type of transactions: end reached + enough conf, end reached + no enough conf, end not reached # We'll create three type of transactions: end reached + enough conf, end reached + no enough conf, end not reached
@@ -450,10 +438,10 @@ def test_get_completed_trackers(db_manager, chain_monitor):
assert set(completed_trackers_ids) == set(ended_trackers_keys) assert set(completed_trackers_ids) == set(ended_trackers_keys)
def test_rebroadcast(db_manager, chain_monitor): def test_rebroadcast(db_manager):
responder = Responder(db_manager, chain_monitor) responder = Responder(db_manager)
responder.asleep = False chain_monitor = ChainMonitor(Queue(), responder.block_queue)
chain_monitor.attach_responder(responder.block_queue, responder.asleep) chain_monitor.monitor_chain()
txs_to_rebroadcast = [] txs_to_rebroadcast = []

View File

@@ -1,5 +1,6 @@
import pytest import pytest
from uuid import uuid4 from uuid import uuid4
from shutil import rmtree
from threading import Thread from threading import Thread
from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
@@ -8,6 +9,7 @@ from pisa.watcher import Watcher
from pisa.responder import Responder from pisa.responder import Responder
from pisa.tools import bitcoin_cli from pisa.tools import bitcoin_cli
from pisa.chain_monitor import ChainMonitor from pisa.chain_monitor import ChainMonitor
from pisa.db_manager import DBManager
from test.pisa.unit.conftest import ( from test.pisa.unit.conftest import (
generate_blocks, generate_blocks,
@@ -36,11 +38,22 @@ sk_der = signing_key.private_bytes(
) )
@pytest.fixture(scope="session")
def temp_db_manager():
db_name = get_random_value_hex(8)
db_manager = DBManager(db_name)
yield db_manager
db_manager.db.close()
rmtree(db_name)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def watcher(db_manager, chain_monitor): def watcher(db_manager):
watcher = Watcher(db_manager, chain_monitor, sk_der, get_config()) watcher = Watcher(db_manager, Responder(db_manager), sk_der, get_config())
chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) chain_monitor = ChainMonitor(watcher.block_queue, watcher.responder.block_queue)
chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) chain_monitor.monitor_chain()
return watcher return watcher
@@ -76,19 +89,13 @@ def create_appointments(n):
def test_init(run_bitcoind, watcher): def test_init(run_bitcoind, watcher):
assert isinstance(watcher.appointments, dict) and len(watcher.appointments) == 0 assert isinstance(watcher.appointments, dict) and len(watcher.appointments) == 0
assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0 assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0
assert watcher.asleep is True
assert watcher.block_queue.empty() assert watcher.block_queue.empty()
assert isinstance(watcher.chain_monitor, ChainMonitor)
assert isinstance(watcher.config, dict) assert isinstance(watcher.config, dict)
assert isinstance(watcher.signing_key, ec.EllipticCurvePrivateKey) assert isinstance(watcher.signing_key, ec.EllipticCurvePrivateKey)
assert isinstance(watcher.responder, Responder) assert isinstance(watcher.responder, Responder)
def test_add_appointment(watcher): def test_add_appointment(watcher):
# The watcher automatically fires do_watch and do_subscribe on adding an appointment if it is asleep (initial state)
# Avoid this by setting the state to awake.
watcher.asleep = False
# We should be able to add appointments up to the limit # We should be able to add appointments up to the limit
for _ in range(10): for _ in range(10):
appointment, dispute_tx = generate_dummy_appointment( appointment, dispute_tx = generate_dummy_appointment(
@@ -128,10 +135,11 @@ def test_add_too_many_appointments(watcher):
assert sig is None assert sig is None
def test_do_watch(watcher): def test_do_watch(watcher, temp_db_manager):
watcher.db_manager = temp_db_manager
# We will wipe all the previous data and add 5 appointments # We will wipe all the previous data and add 5 appointments
appointments, locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS) appointments, locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS)
watcher.chain_monitor.watcher_asleep = False
# Set the data into the Watcher and in the db # Set the data into the Watcher and in the db
watcher.locator_uuid_map = locator_uuid_map watcher.locator_uuid_map = locator_uuid_map
@@ -142,7 +150,8 @@ def test_do_watch(watcher):
watcher.db_manager.store_watcher_appointment(uuid, appointment.to_json()) watcher.db_manager.store_watcher_appointment(uuid, appointment.to_json())
watcher.db_manager.create_append_locator_map(appointment.locator, uuid) watcher.db_manager.create_append_locator_map(appointment.locator, uuid)
Thread(target=watcher.do_watch, daemon=True).start() do_watch_thread = Thread(target=watcher.do_watch, daemon=True)
do_watch_thread.start()
# Broadcast the first two # Broadcast the first two
for dispute_tx in dispute_txs[:2]: for dispute_tx in dispute_txs[:2]:
@@ -158,7 +167,6 @@ def test_do_watch(watcher):
generate_blocks(EXPIRY_DELTA + START_TIME_OFFSET + END_TIME_OFFSET) generate_blocks(EXPIRY_DELTA + START_TIME_OFFSET + END_TIME_OFFSET)
assert len(watcher.appointments) == 0 assert len(watcher.appointments) == 0
assert watcher.asleep is True
def test_get_breaches(watcher, txids, locator_uuid_map): def test_get_breaches(watcher, txids, locator_uuid_map):