diff --git a/pisa/api.py b/pisa/api.py index ec8dbda..6b26301 100644 --- a/pisa/api.py +++ b/pisa/api.py @@ -25,14 +25,14 @@ class API: """ Main endpoint of the Watchtower. - The client sends requests (appointments) to this endpoint to request a job to the Watchtower. Requests must be json - encoded and contain an ``appointment`` field and optionally a ``signature`` and ``public_key`` fields. + The client sends requests (appointments) to this endpoint to request a job to the Watchtower. Requests must be + json encoded and contain an ``appointment`` field and optionally a ``signature`` and ``public_key`` fields. Returns: - :obj:`tuple`: A tuple containing the response (``json``) and response code (``int``). For accepted appointments, - the ``rcode`` is always 0 and the response contains the signed receipt. For rejected appointments, the ``rcode`` - is a negative value and the response contains the error message. Error messages can be found at - :mod:`Errors `. + :obj:`tuple`: A tuple containing the response (``json``) and response code (``int``). For accepted + appointments, the ``rcode`` is always 0 and the response contains the signed receipt. For rejected + appointments, the ``rcode`` is a negative value and the response contains the error message. Error messages + can be found at :mod:`Errors `. """ remote_addr = request.environ.get("REMOTE_ADDR") @@ -167,8 +167,8 @@ class API: """ Provides the block height of the Watchtower. - This is a testing endpoint that (most likely) will be removed in production. Its purpose is to give information to - testers about the current block so they can define a dummy appointment without having to run a bitcoin node. + This is a testing endpoint that (most likely) will be removed in production. Its purpose is to give information + to testers about the current block so they can define a dummy appointment without having to run a bitcoin node. Returns: :obj:`dict`: A json encoded dictionary containing the block height. diff --git a/pisa/chain_monitor.py b/pisa/chain_monitor.py new file mode 100644 index 0000000..689a223 --- /dev/null +++ b/pisa/chain_monitor.py @@ -0,0 +1,182 @@ +import zmq +import binascii +from threading import Thread, Event, Condition + +from common.logger import Logger +from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT, POLLING_DELTA, BLOCK_WINDOW_SIZE +from pisa.block_processor import BlockProcessor + +logger = Logger("ChainMonitor") + + +class ChainMonitor: + """ + The :class:`ChainMonitor` is the class in charge of monitoring the blockchain (via ``bitcoind``) to detect new + blocks on top of the best chain. If a new best block is spotted, the chain monitor will notify the + :obj:`Watcher ` and the :obj:`Responder ` using ``Queues``. + + The :class:`ChainMonitor` monitors the chain using two methods: ``zmq`` and ``polling``. Blocks are only notified + once per queue and the notification is triggered by the method that detects the block faster. + + Attributes: + best_tip (:obj:`str`): a block hash representing the current best tip. + last_tips (:obj:`list`): a list of last chain tips. Used as a sliding window to avoid notifying about old tips. + terminate (:obj:`bool`): a flag to signal the termination of the :class:`ChainMonitor` (shutdown the tower). + check_tip (:obj:`Event`): an event that's triggered at fixed time intervals and controls the polling thread. + lock (:obj:`Condition`): a lock used to protect concurrent access to the queues and ``best_tip`` by the zmq and + polling threads. + zmqSubSocket (:obj:`socket`): a socket to connect to ``bitcoind`` via ``zmq``. + watcher_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Watcher `. + responder_queue (:obj:`Queue`): a queue to send new best tips to the + :obj:`Responder `. + watcher_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Watcher`` or not. + responder_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Responder`` or not. + """ + + def __init__(self): + self.best_tip = None + self.last_tips = [] + self.terminate = False + + self.check_tip = Event() + self.lock = Condition() + + self.zmqContext = zmq.Context() + self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) + self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") + self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) + + self.watcher_queue = None + self.responder_queue = None + self.watcher_asleep = True + self.responder_asleep = True + + def attach_watcher(self, queue, asleep): + """ + Attaches a :obj:`Watcher ` to the :class:`ChainMonitor`. The ``Watcher`` and the + ``ChainMonitor`` are connected via the ``watcher_queue`` and the ``watcher_asleep`` flag. + + Args: + queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Watcher``. + asleep( :obj:`bool`): whether the ``Watcher`` is initially awake or asleep. It is changed on the fly from + the ``Watcher`` when the state changes. + """ + + self.watcher_queue = queue + self.watcher_asleep = asleep + + def attach_responder(self, queue, asleep): + """ + Attaches a :obj:`Responder ` to the :class:`ChainMonitor`. The ``Responder`` and the + ``ChainMonitor`` are connected via the ``responder_queue`` and the ``responder_asleep`` flag. + + Args: + queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Responder``. + asleep( :obj:`bool`): whether the ``Responder`` is initially awake or asleep. It is changed on the fly from + the ``Responder`` when the state changes. + """ + + self.responder_queue = queue + self.responder_asleep = asleep + + def notify_subscribers(self, block_hash): + """ + Notifies the subscribers (``Watcher`` and ``Responder``) about a new block provided they are awake. It does so + by putting the hash in the corresponding queue(s). + + Args: + block_hash (:obj:`str`): the new block hash to be sent to the awake subscribers. + """ + + if not self.watcher_asleep: + self.watcher_queue.put(block_hash) + + if not self.responder_asleep: + self.responder_queue.put(block_hash) + + def update_state(self, block_hash, max_block_window_size=BLOCK_WINDOW_SIZE): + """ + Updates the state of the ``ChainMonitor``. The state is represented as the ``best_tip`` and the list of + ``last_tips``. ``last_tips`` is bounded to ``max_block_window_size``. + + Args: + block_hash (:obj:`block_hash`): the new best tip. + max_block_window_size (:obj:`int`): the maximum length of the ``last_tips`` list. + + Returns: + (:obj:`bool`): ``True`` is the state was successfully updated, ``False`` otherwise. + """ + + if block_hash != self.best_tip and block_hash not in self.last_tips: + self.last_tips.append(self.best_tip) + self.best_tip = block_hash + + if len(self.last_tips) > max_block_window_size: + self.last_tips.pop(0) + + return True + + else: + return False + + def monitor_chain_polling(self, polling_delta=POLLING_DELTA): + """ + Monitors ``bitcoind`` via polling. Once the method is fired, it keeps monitoring as long as ``terminate`` is not + set. Polling is performed once every ``polling_delta`` seconds. If a new best tip if found, the shared lock is + acquired, the state is updated and the subscribers are notified, and finally the lock is released. + + Args: + polling_delta (:obj:`int`): the time delta between polls. + """ + + while not self.terminate: + self.check_tip.wait(timeout=polling_delta) + + # Terminate could have been set while the thread was blocked in wait + if not self.terminate: + current_tip = BlockProcessor.get_best_block_hash() + + self.lock.acquire() + if self.update_state(current_tip): + self.notify_subscribers(current_tip) + logger.info("New block received via polling", block_hash=current_tip) + self.lock.release() + + def monitor_chain_zmq(self): + """ + Monitors ``bitcoind`` via zmq. Once the method is fired, it keeps monitoring as long as ``terminate`` is not + set. If a new best tip if found, the shared lock is acquired, the state is updated and the subscribers are + notified, and finally the lock is released. + """ + + while not self.terminate: + msg = self.zmqSubSocket.recv_multipart() + + # Terminate could have been set while the thread was blocked in recv + if not self.terminate: + topic = msg[0] + body = msg[1] + + if topic == b"hashblock": + block_hash = binascii.hexlify(body).decode("utf-8") + + self.lock.acquire() + if self.update_state(block_hash): + self.notify_subscribers(block_hash) + logger.info("New block received via zmq", block_hash=block_hash) + self.lock.release() + + def monitor_chain(self, polling_delta=POLLING_DELTA): + """ + Main :class:`ChainMonitor` method. It initializes the ``best_tip`` to the current one (by querying the + :obj:`BlockProcessor `) and creates two threads, one per each monitoring + approach (``zmq`` and ``polling``). + + Args: + polling_delta (:obj:`int`): the time delta between polls by the ``monitor_chain_polling`` thread. + """ + + self.best_tip = BlockProcessor.get_best_block_hash() + Thread(target=self.monitor_chain_polling, daemon=True, kwargs={"polling_delta": polling_delta}).start() + Thread(target=self.monitor_chain_zmq, daemon=True).start() diff --git a/pisa/pisad.py b/pisa/pisad.py index 5e84a72..6bc8b9c 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -7,8 +7,8 @@ from pisa.api import API from pisa.watcher import Watcher from pisa.builder import Builder import pisa.conf as conf -from pisa.responder import Responder from pisa.db_manager import DBManager +from pisa.chain_monitor import ChainMonitor from pisa.block_processor import BlockProcessor from pisa.tools import can_connect_to_bitcoind, in_correct_network @@ -18,6 +18,7 @@ logger = Logger("Daemon") def handle_signals(signal_received, frame): logger.info("Closing connection with appointments db") db_manager.db.close() + chain_monitor.terminate = True logger.info("Shutting down PISA") exit(0) @@ -94,13 +95,19 @@ if __name__ == "__main__": try: db_manager = DBManager(pisa_config.get("DB_PATH")) + # Create the chain monitor and start monitoring the chain + chain_monitor = ChainMonitor() + chain_monitor.monitor_chain() + watcher_appointments_data = db_manager.load_watcher_appointments() responder_trackers_data = db_manager.load_responder_trackers() with open(pisa_config.get("PISA_SECRET_KEY"), "rb") as key_file: secret_key_der = key_file.read() - watcher = Watcher(db_manager, secret_key_der, config=pisa_config) + watcher = Watcher(db_manager, chain_monitor, secret_key_der, pisa_config) + chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) + chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: logger.info("Fresh bootstrap") @@ -113,7 +120,6 @@ if __name__ == "__main__": last_block_responder = db_manager.load_last_block_hash_responder() # FIXME: 32-reorgs-offline dropped txs are not used at this point. - responder = Responder(db_manager, pisa_config) last_common_ancestor_responder = None missed_blocks_responder = None @@ -124,12 +130,12 @@ if __name__ == "__main__": ) missed_blocks_responder = block_processor.get_missed_blocks(last_common_ancestor_responder) - responder.trackers, responder.tx_tracker_map = Builder.build_trackers(responder_trackers_data) - responder.block_queue = Builder.build_block_queue(missed_blocks_responder) + watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers( + responder_trackers_data + ) + watcher.responder.block_queue = Builder.build_block_queue(missed_blocks_responder) - # Build Watcher with Responder and backed up data. If the blocks of both match we don't perform the - # search twice. - watcher.responder = responder + # Build Watcher. If the blocks of both match we don't perform the search twice. if last_block_watcher is not None: if last_block_watcher == last_block_responder: missed_blocks_watcher = missed_blocks_responder diff --git a/pisa/responder.py b/pisa/responder.py index 2d856e4..bf02391 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -6,7 +6,6 @@ from common.logger import Logger from pisa.cleaner import Cleaner from pisa.carrier import Carrier from pisa.block_processor import BlockProcessor -from pisa.utils.zmq_subscriber import ZMQSubscriber CONFIRMATIONS_BEFORE_RETRY = 6 MIN_CONFIRMATIONS = 6 @@ -127,23 +126,22 @@ class Responder: has missed. Used to trigger rebroadcast if needed. asleep (:obj:`bool`): A flag that signals whether the :obj:`Responder` is asleep or awake. block_queue (:obj:`Queue`): A queue used by the :obj:`Responder` to receive block hashes from ``bitcoind``. It - is populated by the :obj:`ZMQSubscriber `. - zmq_subscriber (:obj:`ZMQSubscriber `): a ``ZMQSubscriber`` instance - used to receive new block notifications from ``bitcoind``. + is populated by the :obj:`ChainMonitor `. + chain_monitor (:obj:`ChainMonitor `): a ``ChainMonitor`` instance used to track + new blocks received by ``bitcoind``. db_manager (:obj:`DBManager `): A ``DBManager`` instance to interact with the database. """ - def __init__(self, db_manager, config): + def __init__(self, db_manager, chain_monitor): self.trackers = dict() self.tx_tracker_map = dict() self.unconfirmed_txs = [] self.missed_confirmations = dict() self.asleep = True self.block_queue = Queue() - self.config = config - self.zmq_subscriber = None + self.chain_monitor = chain_monitor self.db_manager = db_manager @staticmethod @@ -225,8 +223,7 @@ class Responder: The :obj:`TransactionTracker` is stored in ``trackers`` and ``tx_tracker_map`` and the ``penalty_txid`` added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, the data is also stored in the database. - ``add_tracker`` awakes the :obj:`Responder` and creates a connection with the - :obj:`ZMQSubscriber ` if he is asleep. + ``add_tracker`` awakes the :obj:`Responder` if it is asleep. Args: uuid (:obj:`str`): a unique identifier for the appointment. @@ -261,19 +258,8 @@ class Responder: if self.asleep: self.asleep = False - zmq_thread = Thread(target=self.do_subscribe) - responder = Thread(target=self.do_watch) - zmq_thread.start() - responder.start() - - def do_subscribe(self): - """ - Initializes a :obj:`ZMQSubscriber ` instance to listen to new blocks - from ``bitcoind``. Block ids are received trough the ``block_queue``. - """ - - self.zmq_subscriber = ZMQSubscriber(self.config, parent="Responder") - self.zmq_subscriber.handle(self.block_queue) + self.chain_monitor.responder_asleep = False + Thread(target=self.do_watch).start() def do_watch(self): """ @@ -328,8 +314,7 @@ class Responder: # Go back to sleep if there are no more pending trackers self.asleep = True - self.zmq_subscriber.terminate = True - self.block_queue = Queue() + self.chain_monitor.responder_asleep = True logger.info("No more pending trackers, going back to sleep") @@ -482,9 +467,6 @@ class Responder: else: # If the penalty transaction is missing, we need to reset the tracker. - # DISCUSS: Adding tracker back, should we flag it as retried? - # FIXME: Whether we decide to increase the retried counter or not, the current counter should be - # maintained. There is no way of doing so with the current approach. Update if required self.handle_breach( tracker.locator, uuid, diff --git a/pisa/sample_conf.py b/pisa/sample_conf.py index 71e64a0..8d08590 100644 --- a/pisa/sample_conf.py +++ b/pisa/sample_conf.py @@ -5,6 +5,10 @@ BTC_RPC_HOST = "localhost" BTC_RPC_PORT = 18443 BTC_NETWORK = "regtest" +# CHAIN MONITOR +POLLING_DELTA = 60 +BLOCK_WINDOW_SIZE = 10 + # ZMQ FEED_PROTOCOL = "tcp" FEED_ADDR = "127.0.0.1" diff --git a/pisa/watcher.py b/pisa/watcher.py index abf0290..b114b7f 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -9,7 +9,6 @@ from common.logger import Logger from pisa.cleaner import Cleaner from pisa.responder import Responder from pisa.block_processor import BlockProcessor -from pisa.utils.zmq_subscriber import ZMQSubscriber logger = Logger("Watcher") @@ -27,15 +26,17 @@ class Watcher: If an appointment reaches its end with no breach, the data is simply deleted. The :class:`Watcher` receives information about new received blocks via the ``block_queue`` that is populated by the - :obj:`ZMQSubscriber `. + :obj:`ChainMonitor `. Args: db_manager (:obj:`DBManager `): a ``DBManager`` instance to interact with the database. + chain_monitor (:obj:`ChainMonitor `): a ``ChainMonitor`` instance used to track + new blocks received by ``bitcoind``. sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance). + config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve + ``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``. responder (:obj:`Responder `): a ``Responder`` instance. If ``None`` is passed, a new instance is created. Populated instances are useful when bootstrapping the system from backed-up data. - max_appointments(:obj:`int`): the maximum amount of appointments that the :obj:`Watcher` will keep at any given - time. Defaults to ``MAX_APPOINTMENTS``. Attributes: @@ -45,30 +46,31 @@ class Watcher: appointments with the same ``locator``. asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake. block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is - populated by the :obj:`ZMQSubscriber `. - max_appointments(:obj:`int`): the maximum amount of appointments that the :obj:`Watcher` will keep at any given - time. - zmq_subscriber (:obj:`ZMQSubscriber `): a ZMQSubscriber instance used - to receive new block notifications from ``bitcoind``. + populated by the :obj:`ChainMonitor `. + chain_monitor (:obj:`ChainMonitor `): a ``ChainMonitor`` instance used to track + new blocks received by ``bitcoind``. + config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve + ``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``. db_manager (:obj:`DBManager `): A db manager instance to interact with the database. + signing_key (:mod:`EllipticCurvePrivateKey`): a private key used to sign accepted appointments. Raises: ValueError: if `pisa_sk_file` is not found. """ - def __init__(self, db_manager, sk_der, config, responder=None): + def __init__(self, db_manager, chain_monitor, sk_der, config, responder=None): self.appointments = dict() self.locator_uuid_map = dict() self.asleep = True self.block_queue = Queue() + self.chain_monitor = chain_monitor self.config = config - self.zmq_subscriber = None self.db_manager = db_manager self.signing_key = Cryptographer.load_private_key_der(sk_der) if not isinstance(responder, Responder): - self.responder = Responder(db_manager, self.config) + self.responder = Responder(db_manager, chain_monitor) def add_appointment(self, appointment): """ @@ -112,10 +114,8 @@ class Watcher: if self.asleep: self.asleep = False - zmq_thread = Thread(target=self.do_subscribe) - watcher = Thread(target=self.do_watch) - zmq_thread.start() - watcher.start() + self.chain_monitor.watcher_asleep = False + Thread(target=self.do_watch).start() logger.info("Waking up") @@ -136,15 +136,6 @@ class Watcher: return appointment_added, signature - def do_subscribe(self): - """ - Initializes a ``ZMQSubscriber`` instance to listen to new blocks from ``bitcoind``. Block ids are received - trough the ``block_queue``. - """ - - self.zmq_subscriber = ZMQSubscriber(self.config, parent="Watcher") - self.zmq_subscriber.handle(self.block_queue) - def do_watch(self): """ Monitors the blockchain whilst there are pending appointments. @@ -206,8 +197,7 @@ class Watcher: # Go back to sleep if there are no more appointments self.asleep = True - self.zmq_subscriber.terminate = True - self.block_queue = Queue() + self.chain_monitor.watcher_asleep = True logger.info("No more pending appointments, going back to sleep") diff --git a/test/pisa/unit/conftest.py b/test/pisa/unit/conftest.py index 74da650..ff79fee 100644 --- a/test/pisa/unit/conftest.py +++ b/test/pisa/unit/conftest.py @@ -12,9 +12,9 @@ from cryptography.hazmat.primitives import serialization from apps.cli.blob import Blob from pisa.responder import TransactionTracker -from pisa.watcher import Watcher from pisa.tools import bitcoin_cli from pisa.db_manager import DBManager +from pisa.chain_monitor import ChainMonitor from common.appointment import Appointment from common.tools import compute_locator @@ -51,6 +51,17 @@ def db_manager(): rmtree("test_db") +@pytest.fixture(scope="module") +def chain_monitor(): + chain_monitor = ChainMonitor() + chain_monitor.monitor_chain() + + yield chain_monitor + + chain_monitor.terminate = True + generate_block() + + def generate_keypair(): client_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) client_pk = client_sk.public_key() diff --git a/test/pisa/unit/test_api.py b/test/pisa/unit/test_api.py index 820b11a..f94ab40 100644 --- a/test/pisa/unit/test_api.py +++ b/test/pisa/unit/test_api.py @@ -32,14 +32,17 @@ config = get_config() @pytest.fixture(scope="module") -def run_api(db_manager): +def run_api(db_manager, chain_monitor): sk, pk = generate_keypair() sk_der = sk.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) - watcher = Watcher(db_manager, sk_der, get_config()) + + watcher = Watcher(db_manager, chain_monitor, sk_der, get_config()) + chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) + chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) api_thread = Thread(target=API(watcher, config).start) api_thread.daemon = True diff --git a/test/pisa/unit/test_chain_monitor.py b/test/pisa/unit/test_chain_monitor.py new file mode 100644 index 0000000..3c2bee6 --- /dev/null +++ b/test/pisa/unit/test_chain_monitor.py @@ -0,0 +1,225 @@ +import zmq +import time +from threading import Thread, Event, Condition + +from pisa.watcher import Watcher +from pisa.responder import Responder +from pisa.block_processor import BlockProcessor +from pisa.chain_monitor import ChainMonitor + +from test.pisa.unit.conftest import get_random_value_hex, generate_block, get_config + + +def test_init(run_bitcoind): + # run_bitcoind is started here instead of later on to avoid race conditions while it initializes + + # Not much to test here, just sanity checks to make sure nothing goes south in the future + chain_monitor = ChainMonitor() + + assert chain_monitor.best_tip is None + assert isinstance(chain_monitor.last_tips, list) and len(chain_monitor.last_tips) == 0 + assert chain_monitor.terminate is False + assert isinstance(chain_monitor.check_tip, Event) + assert isinstance(chain_monitor.lock, Condition) + assert isinstance(chain_monitor.zmqSubSocket, zmq.Socket) + + # The Queues and asleep flags are initialized when attaching the corresponding subscriber + assert chain_monitor.watcher_queue is None + assert chain_monitor.responder_queue is None + assert chain_monitor.watcher_asleep and chain_monitor.responder_asleep + + +def test_attach_watcher(chain_monitor): + watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None, config=get_config()) + chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) + + # booleans are not passed as reference in Python, so the flags need to be set separately + assert watcher.asleep == chain_monitor.watcher_asleep + watcher.asleep = False + assert chain_monitor.watcher_asleep != watcher.asleep + + # Test that the Queue work + r_hash = get_random_value_hex(32) + chain_monitor.watcher_queue.put(r_hash) + assert watcher.block_queue.get() == r_hash + + +def test_attach_responder(chain_monitor): + responder = Responder(db_manager=None, chain_monitor=chain_monitor) + chain_monitor.attach_responder(responder.block_queue, responder.asleep) + + # Same kind of testing as with the attach watcher + assert responder.asleep == chain_monitor.watcher_asleep + responder.asleep = False + assert chain_monitor.watcher_asleep != responder.asleep + + r_hash = get_random_value_hex(32) + chain_monitor.responder_queue.put(r_hash) + assert responder.block_queue.get() == r_hash + + +def test_notify_subscribers(chain_monitor): + # Subscribers are only notified as long as they are awake + new_block = get_random_value_hex(32) + + # Queues should be empty to start with + assert chain_monitor.watcher_queue.empty() + assert chain_monitor.responder_queue.empty() + + chain_monitor.watcher_asleep = True + chain_monitor.responder_asleep = True + chain_monitor.notify_subscribers(new_block) + + # And remain empty afterwards since both subscribers were asleep + assert chain_monitor.watcher_queue.empty() + assert chain_monitor.responder_queue.empty() + + # Let's flag them as awake and try again + chain_monitor.watcher_asleep = False + chain_monitor.responder_asleep = False + chain_monitor.notify_subscribers(new_block) + + assert chain_monitor.watcher_queue.get() == new_block + assert chain_monitor.responder_queue.get() == new_block + + +def test_update_state(chain_monitor): + # The state is updated after receiving a new block (and only if the block is not already known). + # Let's start by setting a best_tip and a couple of old tips + new_block_hash = get_random_value_hex(32) + chain_monitor.best_tip = new_block_hash + chain_monitor.last_tips = [get_random_value_hex(32) for _ in range(5)] + + # Now we can try to update the state with an old best_tip and see how it doesn't work + assert chain_monitor.update_state(chain_monitor.last_tips[0]) is False + + # Same should happen with the current tip + assert chain_monitor.update_state(chain_monitor.best_tip) is False + + # The state should be correctly updated with a new block hash, the chain tip should change and the old tip should + # have been added to the last_tips + another_block_hash = get_random_value_hex(32) + assert chain_monitor.update_state(another_block_hash) is True + assert chain_monitor.best_tip == another_block_hash and new_block_hash == chain_monitor.last_tips[-1] + + +def test_monitor_chain_polling(): + # Try polling with the Watcher + chain_monitor = ChainMonitor() + chain_monitor.best_tip = BlockProcessor.get_best_block_hash() + + watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None, config=get_config()) + chain_monitor.attach_watcher(watcher.block_queue, asleep=False) + + # monitor_chain_polling runs until terminate if set + polling_thread = Thread(target=chain_monitor.monitor_chain_polling, kwargs={"polling_delta": 0.1}, daemon=True) + polling_thread.start() + + # Check that nothing changes as long as a block is not generated + for _ in range(5): + assert chain_monitor.watcher_queue.empty() + time.sleep(0.1) + + # And that it does if we generate a block + generate_block() + + chain_monitor.watcher_queue.get() + assert chain_monitor.watcher_queue.empty() + + chain_monitor.terminate = True + polling_thread.join() + + +def test_monitor_chain_zmq(): + # Try zmq with the Responder + chain_monitor = ChainMonitor() + chain_monitor.best_tip = BlockProcessor.get_best_block_hash() + + responder = Responder(db_manager=None, chain_monitor=chain_monitor) + chain_monitor.attach_responder(responder.block_queue, asleep=False) + + zmq_thread = Thread(target=chain_monitor.monitor_chain_zmq, daemon=True) + zmq_thread.start() + + # Queues should start empty + assert chain_monitor.responder_queue.empty() + + # And have a new block every time we generate one + for _ in range(3): + generate_block() + chain_monitor.responder_queue.get() + assert chain_monitor.responder_queue.empty() + + # If we flag it to sleep no notification is sent + chain_monitor.responder_asleep = True + + for _ in range(3): + generate_block() + assert chain_monitor.responder_queue.empty() + + chain_monitor.terminate = True + # The zmq thread needs a block generation to release from the recv method. + generate_block() + + zmq_thread.join() + + +def test_monitor_chain(): + # Not much to test here, this should launch two threads (one per monitor approach) and finish on terminate + chain_monitor = ChainMonitor() + + watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None, config=get_config()) + responder = Responder(db_manager=None, chain_monitor=chain_monitor) + chain_monitor.attach_responder(responder.block_queue, asleep=False) + chain_monitor.attach_watcher(watcher.block_queue, asleep=False) + + chain_monitor.best_tip = None + chain_monitor.monitor_chain() + + # The tip is updated before starting the threads, so it should have changed. + assert chain_monitor.best_tip is not None + + # Blocks should be received + for _ in range(5): + generate_block() + watcher_block = chain_monitor.watcher_queue.get() + responder_block = chain_monitor.responder_queue.get() + assert watcher_block == responder_block + assert chain_monitor.watcher_queue.empty() + assert chain_monitor.responder_queue.empty() + + # And the thread be terminated on terminate + chain_monitor.terminate = True + # The zmq thread needs a block generation to release from the recv method. + generate_block() + + +def test_monitor_chain_single_update(): + # This test tests that if both threads try to add the same block to the queue, only the first one will make it + chain_monitor = ChainMonitor() + + watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None, config=get_config()) + responder = Responder(db_manager=None, chain_monitor=chain_monitor) + chain_monitor.attach_responder(responder.block_queue, asleep=False) + chain_monitor.attach_watcher(watcher.block_queue, asleep=False) + + chain_monitor.best_tip = None + + # We will create a block and wait for the polling thread. Then check the queues to see that the block hash has only + # been added once. + chain_monitor.monitor_chain(polling_delta=2) + generate_block() + + watcher_block = chain_monitor.watcher_queue.get() + responder_block = chain_monitor.responder_queue.get() + assert watcher_block == responder_block + assert chain_monitor.watcher_queue.empty() + assert chain_monitor.responder_queue.empty() + + # The delta for polling is 2 secs, so let's wait and see + time.sleep(2) + assert chain_monitor.watcher_queue.empty() + assert chain_monitor.responder_queue.empty() + + # We can also force an update and see that it won't go through + assert chain_monitor.update_state(watcher_block) is False diff --git a/test/pisa/unit/test_responder.py b/test/pisa/unit/test_responder.py index ddd8105..b5d1616 100644 --- a/test/pisa/unit/test_responder.py +++ b/test/pisa/unit/test_responder.py @@ -5,24 +5,26 @@ from uuid import uuid4 from shutil import rmtree from copy import deepcopy from threading import Thread -from queue import Queue, Empty from pisa.db_manager import DBManager from pisa.responder import Responder, TransactionTracker from pisa.block_processor import BlockProcessor +from pisa.chain_monitor import ChainMonitor from pisa.tools import bitcoin_cli from common.constants import LOCATOR_LEN_HEX -from common.tools import check_sha256_hex_format from bitcoind_mock.utils import sha256d from bitcoind_mock.transaction import TX -from test.pisa.unit.conftest import generate_block, generate_blocks, get_random_value_hex, get_config +from test.pisa.unit.conftest import generate_block, generate_blocks, get_random_value_hex @pytest.fixture(scope="module") -def responder(db_manager): - return Responder(db_manager, get_config()) +def responder(db_manager, chain_monitor): + responder = Responder(db_manager, chain_monitor) + chain_monitor.attach_responder(responder.block_queue, responder.asleep) + + return responder @pytest.fixture() @@ -145,18 +147,19 @@ def test_tracker_from_dict_invalid_data(): def test_init_responder(responder): - assert type(responder.trackers) is dict and len(responder.trackers) == 0 - assert type(responder.tx_tracker_map) is dict and len(responder.tx_tracker_map) == 0 - assert type(responder.unconfirmed_txs) is list and len(responder.unconfirmed_txs) == 0 - assert type(responder.missed_confirmations) is dict and len(responder.missed_confirmations) == 0 + assert isinstance(responder.trackers, dict) and len(responder.trackers) == 0 + assert isinstance(responder.tx_tracker_map, dict) and len(responder.tx_tracker_map) == 0 + assert isinstance(responder.unconfirmed_txs, list) and len(responder.unconfirmed_txs) == 0 + assert isinstance(responder.missed_confirmations, dict) and len(responder.missed_confirmations) == 0 + assert isinstance(responder.chain_monitor, ChainMonitor) assert responder.block_queue.empty() assert responder.asleep is True - assert type(responder.config) is dict - assert responder.zmq_subscriber is None -def test_handle_breach(db_manager): - responder = Responder(db_manager, get_config()) +def test_handle_breach(db_manager, chain_monitor): + responder = Responder(db_manager, chain_monitor) + chain_monitor.attach_responder(responder.block_queue, responder.asleep) + uuid = uuid4().hex tracker = create_dummy_tracker() @@ -173,11 +176,10 @@ def test_handle_breach(db_manager): assert receipt.delivered is True - # The responder automatically fires add_tracker on adding a tracker if it is asleep. We need to stop the processes now. - # To do so we delete all the trackers, stop the zmq and create a new fake block to unblock the queue.get method + # The responder automatically fires add_tracker on adding a tracker if it is asleep. We need to stop the processes + # now. To do so we delete all the trackers, and generate a new block. responder.trackers = dict() - responder.zmq_subscriber.terminate = True - responder.block_queue.put(get_random_value_hex(32)) + generate_block() def test_add_bad_response(responder): @@ -185,7 +187,7 @@ def test_add_bad_response(responder): tracker = create_dummy_tracker() # Now that the asleep / awake functionality has been tested we can avoid manually killing the responder by setting - # to awake. That will prevent the zmq thread to be launched again. + # to awake. That will prevent the chain_monitor thread to be launched again. responder.asleep = False # A txid instead of a rawtx should be enough for unit tests using the bitcoind mock, better tests are needed though. @@ -206,7 +208,7 @@ def test_add_bad_response(responder): def test_add_tracker(responder): - responder.asleep = False + # Responder is asleep for _ in range(20): uuid = uuid4().hex @@ -238,7 +240,8 @@ def test_add_tracker(responder): def test_add_tracker_same_penalty_txid(responder): - # Create the same tracker using two different uuids + # Responder is asleep + confirmations = 0 locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data(random_txid=True) uuid_1 = uuid4().hex @@ -265,7 +268,7 @@ def test_add_tracker_same_penalty_txid(responder): def test_add_tracker_already_confirmed(responder): - responder.asleep = False + # Responder is asleep for i in range(20): uuid = uuid4().hex @@ -279,29 +282,10 @@ def test_add_tracker_already_confirmed(responder): assert penalty_txid not in responder.unconfirmed_txs -def test_do_subscribe(responder): - responder.block_queue = Queue() - - zmq_thread = Thread(target=responder.do_subscribe) - zmq_thread.daemon = True - zmq_thread.start() - - try: - generate_block() - block_hash = responder.block_queue.get() - assert check_sha256_hex_format(block_hash) - - except Empty: - assert False - - -def test_do_watch(temp_db_manager): - responder = Responder(temp_db_manager, get_config()) - responder.block_queue = Queue() - - zmq_thread = Thread(target=responder.do_subscribe) - zmq_thread.daemon = True - zmq_thread.start() +def test_do_watch(temp_db_manager, chain_monitor): + # Create a fresh responder to simplify the test + responder = Responder(temp_db_manager, chain_monitor) + chain_monitor.attach_responder(responder.block_queue, False) trackers = [create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(20)] @@ -315,9 +299,7 @@ def test_do_watch(temp_db_manager): responder.unconfirmed_txs.append(tracker.penalty_txid) # Let's start to watch - watch_thread = Thread(target=responder.do_watch) - watch_thread.daemon = True - watch_thread.start() + Thread(target=responder.do_watch, daemon=True).start() # And broadcast some of the transactions broadcast_txs = [] @@ -351,13 +333,9 @@ def test_do_watch(temp_db_manager): assert responder.asleep is True -def test_check_confirmations(temp_db_manager): - responder = Responder(temp_db_manager, get_config()) - responder.block_queue = Queue() - - zmq_thread = Thread(target=responder.do_subscribe) - zmq_thread.daemon = True - zmq_thread.start() +def test_check_confirmations(temp_db_manager, chain_monitor): + responder = Responder(temp_db_manager, chain_monitor) + chain_monitor.attach_responder(responder.block_queue, responder.asleep) # check_confirmations checks, given a list of transaction for a block, what of the known penalty transaction have # been confirmed. To test this we need to create a list of transactions and the state of the responder @@ -387,7 +365,7 @@ def test_check_confirmations(temp_db_manager): assert responder.missed_confirmations[tx] == 1 -# WIP: Check this properly, a bug pass unnoticed! +# TODO: Check this properly, a bug pass unnoticed! def test_get_txs_to_rebroadcast(responder): # Let's create a few fake txids and assign at least 6 missing confirmations to each txs_missing_too_many_conf = {get_random_value_hex(32): 6 + i for i in range(10)} @@ -411,13 +389,13 @@ def test_get_txs_to_rebroadcast(responder): assert txs_to_rebroadcast == list(txs_missing_too_many_conf.keys()) -def test_get_completed_trackers(db_manager): +def test_get_completed_trackers(db_manager, chain_monitor): initial_height = bitcoin_cli().getblockcount() - # Let's use a fresh responder for this to make it easier to compare the results - responder = Responder(db_manager, get_config()) + responder = Responder(db_manager, chain_monitor) + chain_monitor.attach_responder(responder.block_queue, responder.asleep) - # A complete tracker is a tracker that has reached the appointment end with enough confirmations (> MIN_CONFIRMATIONS) + # A complete tracker is a tracker that has reached the appointment end with enough confs (> MIN_CONFIRMATIONS) # We'll create three type of transactions: end reached + enough conf, end reached + no enough conf, end not reached trackers_end_conf = { uuid4().hex: create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(10) @@ -462,9 +440,10 @@ def test_get_completed_trackers(db_manager): assert set(completed_trackers_ids) == set(ended_trackers_keys) -def test_rebroadcast(db_manager): - responder = Responder(db_manager, get_config()) +def test_rebroadcast(db_manager, chain_monitor): + responder = Responder(db_manager, chain_monitor) responder.asleep = False + chain_monitor.attach_responder(responder.block_queue, responder.asleep) txs_to_rebroadcast = [] diff --git a/test/pisa/unit/test_watcher.py b/test/pisa/unit/test_watcher.py index 4a5cf7b..25af8e0 100644 --- a/test/pisa/unit/test_watcher.py +++ b/test/pisa/unit/test_watcher.py @@ -1,14 +1,15 @@ import pytest from uuid import uuid4 from threading import Thread -from queue import Queue, Empty +from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives import serialization from pisa.watcher import Watcher from pisa.responder import Responder from pisa.tools import bitcoin_cli +from pisa.chain_monitor import ChainMonitor + from test.pisa.unit.conftest import ( - generate_block, generate_blocks, generate_dummy_appointment, get_random_value_hex, @@ -17,7 +18,7 @@ from test.pisa.unit.conftest import ( ) from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS -from common.tools import check_sha256_hex_format, compute_locator +from common.tools import compute_locator from common.cryptographer import Cryptographer @@ -36,8 +37,12 @@ sk_der = signing_key.private_bytes( @pytest.fixture(scope="module") -def watcher(db_manager): - return Watcher(db_manager, sk_der, get_config()) +def watcher(db_manager, chain_monitor): + watcher = Watcher(db_manager, chain_monitor, sk_der, get_config()) + chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) + chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) + + return watcher @pytest.fixture(scope="module") @@ -68,17 +73,18 @@ def create_appointments(n): return appointments, locator_uuid_map, dispute_txs -def test_init(watcher): - assert type(watcher.appointments) is dict and len(watcher.appointments) == 0 - assert type(watcher.locator_uuid_map) is dict and len(watcher.locator_uuid_map) == 0 - assert watcher.block_queue.empty() +def test_init(run_bitcoind, watcher): + assert isinstance(watcher.appointments, dict) and len(watcher.appointments) == 0 + assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0 assert watcher.asleep is True - assert type(watcher.config) is dict - assert watcher.zmq_subscriber is None - assert type(watcher.responder) is Responder + assert watcher.block_queue.empty() + assert isinstance(watcher.chain_monitor, ChainMonitor) + assert isinstance(watcher.config, dict) + assert isinstance(watcher.signing_key, ec.EllipticCurvePrivateKey) + assert isinstance(watcher.responder, Responder) -def test_add_appointment(run_bitcoind, watcher): +def test_add_appointment(watcher): # The watcher automatically fires do_watch and do_subscribe on adding an appointment if it is asleep (initial state) # Avoid this by setting the state to awake. watcher.asleep = False @@ -122,36 +128,18 @@ def test_add_too_many_appointments(watcher): assert sig is None -def test_do_subscribe(watcher): - watcher.block_queue = Queue() - - zmq_thread = Thread(target=watcher.do_subscribe) - zmq_thread.daemon = True - zmq_thread.start() - - try: - generate_block() - block_hash = watcher.block_queue.get() - assert check_sha256_hex_format(block_hash) - - except Empty: - assert False - - def test_do_watch(watcher): # We will wipe all the previous data and add 5 appointments watcher.appointments, watcher.locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS) + watcher.chain_monitor.watcher_asleep = False - watch_thread = Thread(target=watcher.do_watch) - watch_thread.daemon = True - watch_thread.start() + Thread(target=watcher.do_watch, daemon=True).start() # Broadcast the first two for dispute_tx in dispute_txs[:2]: bitcoin_cli().sendrawtransaction(dispute_tx) - # After leaving some time for the block to be mined and processed, the number of appointments should have reduced - # by two + # After generating enough blocks, the number of appointments should have reduced by two generate_blocks(START_TIME_OFFSET + END_TIME_OFFSET) assert len(watcher.appointments) == APPOINTMENTS - 2