diff --git a/pisa/responder.py b/pisa/responder.py index 87ec9c7..cc57335 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -6,7 +6,7 @@ from pisa.logger import Logger from pisa.cleaner import Cleaner from pisa.carrier import Carrier from pisa.block_processor import BlockProcessor -from pisa.utils.zmq_subscriber import ZMQHandler +from pisa.utils.zmq_subscriber import ZMQSubscriber CONFIRMATIONS_BEFORE_RETRY = 6 MIN_CONFIRMATIONS = 6 @@ -14,7 +14,24 @@ MIN_CONFIRMATIONS = 6 logger = Logger("Responder") -class Job: +class TransactionTracker: + """ + A ``TransactionTracker`` is used to monitor a ``penalty_tx``. Once the dispute is seen by the + :mod:`Watcher ` the penalty transaction is decrypted and the relevant appointment data is passed + along to the ``Responder``. + + Once the ``Responder`` has succeeded on broadcasting the penalty transaction it will create a ``TransactionTracker`` + and monitor the blockchain until the end of the appointment. + + Args: + locator (str): A 16-byte hex-encoded value used by the tower to detect channel breaches. It serves as a trigger + for the tower to decrypt and broadcast the penalty transaction. + dispute_txid (str): the id of the transaction that created the channel breach and triggered the penalty. + penalty_txid (str): the id of the transaction that was encrypted under ``dispute_txid``. + penalty_rawtx (str): the raw transaction that was broadcast as a consequence of the channel breach. + appointment_end (int): the block at which the tower will stop monitoring the blockchain for this appointment. + """ + def __init__(self, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end): self.locator = locator self.dispute_txid = dispute_txid @@ -23,23 +40,47 @@ class Job: self.appointment_end = appointment_end @classmethod - def from_dict(cls, job_data): - locator = job_data.get("locator") - dispute_txid = job_data.get("dispute_txid") - penalty_txid = job_data.get("penalty_txid") - penalty_rawtx = job_data.get("penalty_rawtx") - appointment_end = job_data.get("appointment_end") + def from_dict(cls, tx_tracker_data): + """ + Constructs a ``TransactionTracker`` instance from a dictionary. Requires that all the fields are populated + (``not None``). + + Useful to load data from the database. + + Args: + tx_tracker_data (dict): a dictionary with an entry per each field required to create the + ``TransactionTracker``. + + Returns: + ``TransactionTracker``: A ``TransactionTracker`` instantiated with the provided data. + + Raises: + ValueError: if any of the required fields is missing. + """ + + locator = tx_tracker_data.get("locator") + dispute_txid = tx_tracker_data.get("dispute_txid") + penalty_txid = tx_tracker_data.get("penalty_txid") + penalty_rawtx = tx_tracker_data.get("penalty_rawtx") + appointment_end = tx_tracker_data.get("appointment_end") if any(v is None for v in [locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end]): - raise ValueError("Wrong job data, some fields are missing") + raise ValueError("Wrong transaction tracker data, some fields are missing") else: - job = cls(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end) + tx_tracker = cls(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end) - return job + return tx_tracker def to_dict(self): - job = { + """ + Exports a ``TransactionTracker`` as a dictionary. + + Returns: + ``dict``: A dictionary containing the ``TransactionTracker`` data. + """ + + tx_tracker = { "locator": self.locator, "dispute_txid": self.dispute_txid, "penalty_txid": self.penalty_txid, @@ -47,16 +88,50 @@ class Job: "appointment_end": self.appointment_end, } - return job + return tx_tracker def to_json(self): + """ + Exports a ``TransactionTracker`` as a json-encoded dictionary. + + Returns: + ``str``: A json-encoded dictionary containing the ``TransactionTracker`` data. + """ + return json.dumps(self.to_dict()) class Responder: + """ + The ``Responder`` is the class in charge of ensuring that channel breaches are dealt with. It does so handling the + decrypted ``penalty_txs`` handed by the :mod:`Watcher ` and ensuring the they make it to the + blockchain. + + The ``Responder`` can be in two states: + - Asleep (``self.asleep = True)`` when there are no trackers to take care of (``self.trackers`` is empty). + - Awake (``self.asleep = False)`` when there are trackers to take care of (actively monitoring the blockchain). + + Args: + db_manager (DBManager): a :mod:`DBManager ` instance to interact with the database. + + Attributes: + trackers (``dict``): A dictionary containing all the ``TransactionTrackers`` handled by the ``Responder``. + Each entry is identified by a ``uuid``. + tx_tracker_map (``dict``): A ``penalty_txid:uuid`` map used to allow the ``Responder`` to deal with several + trackers triggered by the same ``penalty_txid``. + unconfirmed_txs (``list``): A list that keeps track of all unconfirmed ``penalty_txs``. + missed_confirmations (``dict``): a dictionary that keeps count of how many confirmations each ``penalty_tx`` has + missed. Used to trigger rebroadcast if needed. + asleep (``bool``): A flag that signals whether the ``Responder`` is asleep or awake. + block_queue (``Queue``): A queue used by the ``Responder`` to receive block hashes from ``bitcoind``. It is + populated by the ``ZMQSubscriber``. + db_manager (``DBManager``): A :mod:`DBManager ` instance to interact with the database. + + """ + def __init__(self, db_manager): - self.jobs = dict() - self.tx_job_map = dict() + self.trackers = dict() + self.tx_tracker_map = dict() self.unconfirmed_txs = [] self.missed_confirmations = dict() self.asleep = True @@ -66,6 +141,25 @@ class Responder: @staticmethod def on_sync(block_hash): + """ + Whether the ``Responder`` is on sync with ``bitcoind`` or not. Used when recovering from a crash. + + The Watchtower can be instantiated with fresh or with backed up data. In the later, some triggers may have been + missed. In order to go back on sync both the ``Watcher`` and the ``Responder`` need to perform the state + transitions until they catch up. + + If a transaction is broadcast by the ``Responder`` and it is rejected (due to a double-spending for example) + and the ``Responder`` is off-sync then the ``TransactionTracker`` is abandoned. + + This method helps making that decision. + + Args: + block_hash (str): the block hash passed to the ``Responder`` in the ``handle_breach`` request. + + Returns: + ``bool``: Whether or not the ``Responder`` and ``bitcoind`` are on sync. + """ + block_processor = BlockProcessor() distance_from_tip = block_processor.get_distance_to_tip(block_hash) @@ -77,49 +171,83 @@ class Responder: return synchronized - def add_response( - self, uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, block_hash, retry=False - ): + def handle_breach(self, uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, block_hash): + """ + Requests the ``Responder`` to handle a channel breach. This is the entry point of the ``Responder``. + + Args: + uuid (str): a unique identifier for the appointment. + locator (str): the appointment locator provided by the user (16-byte hex-encoded). + dispute_txid (str): the id of the transaction that created the channel breach. + penalty_txid (str): the id of the decrypted transaction included in the appointment. + penalty_rawtx (str): the raw transaction to be broadcast in response of the breach. + appointment_end (int): the block height at which the ``Responder`` will stop monitoring for this penalty + transaction. + block_hash (str): the block hash at which the breach was seen (used to see if we are on sync). + + Returns: + ``Receipt``: A :mod:`Receipt ` indicating whether or not the ``penalty_tx`` made it + into the blockchain. + """ + if self.asleep: logger.info("Waking up") carrier = Carrier() receipt = carrier.send_transaction(penalty_rawtx, penalty_txid) - # do_watch can call add_response recursively if a broadcast transaction does not get confirmations - # retry holds that information. If retry is true the job already exists if receipt.delivered: - if not retry: - self.create_job( - uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, receipt.confirmations - ) + self.add_tracker( + uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, receipt.confirmations + ) else: # TODO: Add the missing reasons (e.g. RPC_VERIFY_REJECTED) # TODO: Use self.on_sync(block_hash) to check whether or not we failed because we are out of sync - logger.warning("Job failed.", uuid=uuid, on_sync=self.on_sync(block_hash)) + logger.warning( + "Tracker cannot be created.", reason=receipt.reason, uuid=uuid, on_sync=self.on_sync(block_hash) + ) pass return receipt - def create_job(self, uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations=0): - job = Job(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end) - self.jobs[uuid] = job + def add_tracker(self, uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations=0): + """ + Creates a ``TransactionTracker`` after successfully broadcasting a ``penalty_tx``. - if penalty_txid in self.tx_job_map: - self.tx_job_map[penalty_txid].append(uuid) + The ``TransactionTracker`` is stored in ``trackers`` and ``tx_tracker_map`` and the ``penalty_txid`` added to + ``unconfirmed_txs`` if ``confirmations=0``. Finally, the data is also stored in the database. + + ``add_tracker`` awakes the ``Responder`` and creates a connection with the ``ZMQSubscriber`` if he is asleep. + + Args: + uuid (str): a unique identifier for the appointment. + locator (str): the appointment locator provided by the user (16-byte hex-encoded). + dispute_txid (str): the id of the transaction that created the channel breach. + penalty_txid (str): the id of the decrypted transaction included in the appointment. + penalty_rawtx (str): the raw transaction to be broadcast. + appointment_end (int): the block height at which the ``Responder`` will stop monitoring for the tracker. + confirmations (int): the confirmation count of the ``penalty_tx``. In normal conditions it will be zero, but + if the transaction is already on the blockchain this won't be the case. + """ + + tracker = TransactionTracker(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end) + self.trackers[uuid] = tracker + + if penalty_txid in self.tx_tracker_map: + self.tx_tracker_map[penalty_txid].append(uuid) else: - self.tx_job_map[penalty_txid] = [uuid] + self.tx_tracker_map[penalty_txid] = [uuid] - # In the case we receive two jobs with the same penalty txid we only add it to the unconfirmed txs list once + # In the case we receive two trackers with the same penalty txid we only add it to the unconfirmed txs list once if penalty_txid not in self.unconfirmed_txs and confirmations == 0: self.unconfirmed_txs.append(penalty_txid) - self.db_manager.store_responder_job(uuid, job.to_json()) + self.db_manager.store_responder_tracker(uuid, tracker.to_json()) logger.info( - "New job added.", dispute_txid=dispute_txid, penalty_txid=penalty_txid, appointment_end=appointment_end + "New tracker added.", dispute_txid=dispute_txid, penalty_txid=penalty_txid, appointment_end=appointment_end ) if self.asleep: @@ -130,14 +258,25 @@ class Responder: responder.start() def do_subscribe(self): - self.zmq_subscriber = ZMQHandler(parent="Responder") + """ + Initializes a ``ZMQSubscriber`` instance to listen to new blocks from ``bitcoind``. Block ids are received + trough the ``block_queue``. + """ + + self.zmq_subscriber = ZMQSubscriber(parent="Responder") self.zmq_subscriber.handle(self.block_queue) def do_watch(self): + """ + Monitors the blockchain whilst there are pending trackers. + + This is the main method of the ``Responder`` and triggers tracker cleaning, rebroadcasting, reorg managing, etc. + """ + # ToDo: change prev_block_hash to the last known tip when bootstrapping prev_block_hash = BlockProcessor.get_best_block_hash() - while len(self.jobs) > 0: + while len(self.trackers) > 0: # We get notified for every new received block block_hash = self.block_queue.get() block = BlockProcessor.get_block(block_hash) @@ -153,11 +292,13 @@ class Responder: self.check_confirmations(txs) height = block.get("height") - txs_to_rebroadcast = self.get_txs_to_rebroadcast(txs) - completed_jobs = self.get_completed_jobs(height) + completed_trackers = self.get_completed_trackers(height) + Cleaner.delete_completed_trackers( + completed_trackers, height, self.trackers, self.tx_tracker_map, self.db_manager + ) - Cleaner.delete_completed_jobs(completed_jobs, height, self.jobs, self.tx_job_map, self.db_manager) - self.rebroadcast(txs_to_rebroadcast, block_hash) + txs_to_rebroadcast = self.get_txs_to_rebroadcast() + self.rebroadcast(txs_to_rebroadcast) # NOTCOVERED else: @@ -175,17 +316,26 @@ class Responder: prev_block_hash = block.get("hash") - # Go back to sleep if there are no more jobs + # Go back to sleep if there are no more pendong trackers self.asleep = True self.zmq_subscriber.terminate = True self.block_queue = Queue() - logger.info("No more pending jobs, going back to sleep") + logger.info("No more pending trackers, going back to sleep") def check_confirmations(self, txs): + """ + Checks if any of the monitored ``penalty_txs`` has received it's first confirmation or keeps missing them. + + This method manages ``unconfirmed_txs`` and ``missed_confirmations``. + + Args: + txs (list): A list of confirmed tx ids (the list of transactions included in the last received block). + """ + # If a new confirmed tx matches a tx we are watching, then we remove it from the unconfirmed txs map for tx in txs: - if tx in self.tx_job_map and tx in self.unconfirmed_txs: + if tx in self.tx_tracker_map and tx in self.unconfirmed_txs: self.unconfirmed_txs.remove(tx) logger.info("Confirmation received for transaction", tx=tx) @@ -201,22 +351,40 @@ class Responder: logger.info("Transaction missed a confirmation", tx=tx, missed_confirmations=self.missed_confirmations[tx]) - def get_txs_to_rebroadcast(self, txs): + def get_txs_to_rebroadcast(self): + """ + Gets the transactions to be rebroadcast based on their ``missed_confirmation`` count. + + Returns: + ``list``: A list with all the ids of the transaction that have to be rebroadcast. + """ + txs_to_rebroadcast = [] - for tx in txs: - if tx in self.missed_confirmations and self.missed_confirmations[tx] >= CONFIRMATIONS_BEFORE_RETRY: + for tx, missed_conf in self.missed_confirmations.items(): + if missed_conf >= CONFIRMATIONS_BEFORE_RETRY: # If a transactions has missed too many confirmations we add it to the rebroadcast list txs_to_rebroadcast.append(tx) return txs_to_rebroadcast - def get_completed_jobs(self, height): - completed_jobs = [] + def get_completed_trackers(self, height): + """ + Gets the trackers that has already been fulfilled based on a given height (``end_time`` was reached with a + minimum confirmation count). - for uuid, job in self.jobs.items(): - if job.appointment_end <= height and job.penalty_txid not in self.unconfirmed_txs: - tx = Carrier.get_transaction(job.penalty_txid) + Args: + height (int): the height of the last received block. + + Returns: + ``list``: a list of tuples ``uuid:confirmations`` for the completed trackers. + """ + + completed_trackers = [] + + for uuid, tracker in self.trackers.items(): + if tracker.appointment_end <= height and tracker.penalty_txid not in self.unconfirmed_txs: + tx = Carrier.get_transaction(tracker.penalty_txid) # FIXME: Should be improved with the librarian if tx is not None: @@ -224,81 +392,101 @@ class Responder: if confirmations >= MIN_CONFIRMATIONS: # The end of the appointment has been reached - completed_jobs.append((uuid, confirmations)) + completed_trackers.append((uuid, confirmations)) - return completed_jobs + return completed_trackers + + def rebroadcast(self, txs_to_rebroadcast): + """ + Rebroadcasts a ``penalty_tx`` that has missed too many confirmations. In the current approach this would loop + forever si the transaction keeps not getting it. + + Potentially the fees could be bumped here if the transaction has some tower dedicated outputs (or allows it + trough ``ANYONECANSPEND`` or something similar). + + Args: + txs_to_rebroadcast (list): a list of transactions to be rebroadcast. + + Returns: + ``list``: A list of ``Receipts`` with information about whether or not every transaction made it trough the + network. + """ - def rebroadcast(self, txs_to_rebroadcast, block_hash): # DISCUSS: #22-discuss-confirmations-before-retry # ToDo: #23-define-behaviour-approaching-end receipts = [] + carrier = Carrier() for txid in txs_to_rebroadcast: self.missed_confirmations[txid] = 0 - for uuid in self.tx_job_map[txid]: - job = self.jobs[uuid] - receipt = self.add_response( - job.locator, - uuid, - job.dispute_txid, - job.penalty_txid, - job.penalty_rawtx, - job.appointment_end, - block_hash, - retry=True, - ) - + # FIXME: This would potentially grab multiple instances of the same transaction and try to send them. + # should we do it only once? + for uuid in self.tx_tracker_map[txid]: + tracker = self.trackers[uuid] logger.warning( - "Transaction has missed many confirmations. Rebroadcasting.", - penalty_txid=job.penalty_txid, - confirmations_missed=CONFIRMATIONS_BEFORE_RETRY, + "Transaction has missed many confirmations. Rebroadcasting.", penalty_txid=tracker.penalty_txid ) + receipt = carrier.send_transaction(tracker.penalty_rawtx, tracker.penalty_txid) receipts.append((txid, receipt)) + if not receipt.delivered: + # FIXME: Can this actually happen? + logger.warning("Transaction failed.", penalty_txid=tracker.penalty_txid) + return receipts # NOTCOVERED def handle_reorgs(self, block_hash): + """ + Basic reorg handle. It deals with situations where a reorg has been found but the ``dispute_tx`` is still + on the chain. If the ``dispute_tx`` is reverted, it need to call the ``ReorgManager`` (not implemented yet). + + Args: + block_hash (str): the hash of the last block received (which triggered the reorg). + + """ carrier = Carrier() - for uuid, job in self.jobs.items(): + for uuid, tracker in self.trackers.items(): # First we check if the dispute transaction is known (exists either in mempool or blockchain) - dispute_tx = carrier.get_transaction(job.dispute_txid) + dispute_tx = carrier.get_transaction(tracker.dispute_txid) if dispute_tx is not None: # If the dispute is there, we check the penalty - penalty_tx = carrier.get_transaction(job.penalty_txid) + penalty_tx = carrier.get_transaction(tracker.penalty_txid) if penalty_tx is not None: # If the penalty exists we need to check is it's on the blockchain or not so we can update the # unconfirmed transactions list accordingly. if penalty_tx.get("confirmations") is None: - self.unconfirmed_txs.append(job.penalty_txid) + self.unconfirmed_txs.append(tracker.penalty_txid) logger.info( "Penalty transaction back in mempool. Updating unconfirmed transactions.", - penalty_txid=job.penalty_txid, + penalty_txid=tracker.penalty_txid, ) else: - # If the penalty transaction is missing, we need to reset the job. - # DISCUSS: Adding job back, should we flag it as retried? + # If the penalty transaction is missing, we need to reset the tracker. + # DISCUSS: Adding tracker back, should we flag it as retried? # FIXME: Whether we decide to increase the retried counter or not, the current counter should be # maintained. There is no way of doing so with the current approach. Update if required - self.add_response( - job.locator, + self.handle_breach( + tracker.locator, uuid, - job.dispute_txid, - job.penalty_txid, - job.penalty_rawtx, - job.appointment_end, + tracker.dispute_txid, + tracker.penalty_txid, + tracker.penalty_rawtx, + tracker.appointment_end, block_hash, ) - logger.warning("Penalty transaction banished. Resetting the job", penalty_tx=job.penalty_txid) + logger.warning( + "Penalty transaction banished. Resetting the tracker", penalty_tx=tracker.penalty_txid + ) else: # ToDo: #24-properly-handle-reorgs diff --git a/test/unit/test_responder.py b/test/unit/test_responder.py index 6f09a99..67846a6 100644 --- a/test/unit/test_responder.py +++ b/test/unit/test_responder.py @@ -9,7 +9,7 @@ from queue import Queue, Empty from pisa import c_logger from pisa.db_manager import DBManager -from pisa.responder import Responder, Job +from pisa.responder import Responder, TransactionTracker from pisa.block_processor import BlockProcessor from pisa.tools import bitcoin_cli @@ -38,7 +38,7 @@ def temp_db_manager(): rmtree(db_name) -def create_dummy_job_data(random_txid=False, penalty_rawtx=None): +def create_dummy_tracker_data(random_txid=False, penalty_rawtx=None): # The following transaction data corresponds to a valid transaction. For some test it may be interesting to have # some valid data, but for others we may need multiple different penalty_txids. @@ -67,22 +67,22 @@ def create_dummy_job_data(random_txid=False, penalty_rawtx=None): return locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end -def create_dummy_job(random_txid=False, penalty_rawtx=None): - locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_job_data( +def create_dummy_tracker(random_txid=False, penalty_rawtx=None): + locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data( random_txid, penalty_rawtx ) - return Job(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end) + return TransactionTracker(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end) -def test_job_init(run_bitcoind): - locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_job_data() - job = Job(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end) +def test_tracker_init(run_bitcoind): + locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data() + tracker = TransactionTracker(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end) assert ( - job.dispute_txid == dispute_txid - and job.penalty_txid == penalty_txid - and job.penalty_rawtx == penalty_rawtx - and job.appointment_end == appointment_end + tracker.dispute_txid == dispute_txid + and tracker.penalty_txid == penalty_txid + and tracker.penalty_rawtx == penalty_rawtx + and tracker.appointment_end == appointment_end ) @@ -103,44 +103,44 @@ def test_on_sync_fail(responder): assert Responder.on_sync(chain_tip) is False -def test_job_to_dict(): - job = create_dummy_job() - job_dict = job.to_dict() +def test_tracker_to_dict(): + tracker = create_dummy_tracker() + tracker_dict = tracker.to_dict() assert ( - job.locator == job_dict["locator"] - and job.penalty_rawtx == job_dict["penalty_rawtx"] - and job.appointment_end == job_dict["appointment_end"] + tracker.locator == tracker_dict["locator"] + and tracker.penalty_rawtx == tracker_dict["penalty_rawtx"] + and tracker.appointment_end == tracker_dict["appointment_end"] ) -def test_job_to_json(): - job = create_dummy_job() - job_dict = json.loads(job.to_json()) +def test_tracker_to_json(): + tracker = create_dummy_tracker() + tracker_dict = json.loads(tracker.to_json()) assert ( - job.locator == job_dict["locator"] - and job.penalty_rawtx == job_dict["penalty_rawtx"] - and job.appointment_end == job_dict["appointment_end"] + tracker.locator == tracker_dict["locator"] + and tracker.penalty_rawtx == tracker_dict["penalty_rawtx"] + and tracker.appointment_end == tracker_dict["appointment_end"] ) -def test_job_from_dict(): - job_dict = create_dummy_job().to_dict() - new_job = Job.from_dict(job_dict) +def test_tracker_from_dict(): + tracker_dict = create_dummy_tracker().to_dict() + new_tracker = TransactionTracker.from_dict(tracker_dict) - assert job_dict == new_job.to_dict() + assert tracker_dict == new_tracker.to_dict() -def test_job_from_dict_invalid_data(): - job_dict = create_dummy_job().to_dict() +def test_tracker_from_dict_invalid_data(): + tracker_dict = create_dummy_tracker().to_dict() for value in ["dispute_txid", "penalty_txid", "penalty_rawtx", "appointment_end"]: - job_dict_copy = deepcopy(job_dict) - job_dict_copy[value] = None + tracker_dict_copy = deepcopy(tracker_dict) + tracker_dict_copy[value] = None try: - Job.from_dict(job_dict_copy) + TransactionTracker.from_dict(tracker_dict_copy) assert False except ValueError: @@ -148,8 +148,8 @@ def test_job_from_dict_invalid_data(): def test_init_responder(responder): - assert type(responder.jobs) is dict and len(responder.jobs) == 0 - assert type(responder.tx_job_map) is dict and len(responder.tx_job_map) == 0 + assert type(responder.trackers) is dict and len(responder.trackers) == 0 + assert type(responder.tx_tracker_map) is dict and len(responder.tx_tracker_map) == 0 assert type(responder.unconfirmed_txs) is list and len(responder.unconfirmed_txs) == 0 assert type(responder.missed_confirmations) is dict and len(responder.missed_confirmations) == 0 assert responder.block_queue.empty() @@ -157,124 +157,126 @@ def test_init_responder(responder): assert responder.zmq_subscriber is None -def test_add_response(db_manager): +def test_handle_breach(db_manager): responder = Responder(db_manager) uuid = uuid4().hex - job = create_dummy_job() + tracker = create_dummy_tracker() # The block_hash passed to add_response does not matter much now. It will in the future to deal with errors - receipt = responder.add_response( - job.locator, + receipt = responder.handle_breach( + tracker.locator, uuid, - job.dispute_txid, - job.penalty_txid, - job.penalty_rawtx, - job.appointment_end, + tracker.dispute_txid, + tracker.penalty_txid, + tracker.penalty_rawtx, + tracker.appointment_end, block_hash=get_random_value_hex(32), ) assert receipt.delivered is True - # The responder automatically fires create_job on adding a job if it is asleep. We need to stop the processes now. - # To do so we delete all the jobs, stop the zmq and create a new fake block to unblock the queue.get method - responder.jobs = dict() + # The responder automatically fires add_tracker on adding a tracker if it is asleep. We need to stop the processes now. + # To do so we delete all the trackers, stop the zmq and create a new fake block to unblock the queue.get method + responder.trackers = dict() responder.zmq_subscriber.terminate = True responder.block_queue.put(get_random_value_hex(32)) def test_add_bad_response(responder): uuid = uuid4().hex - job = create_dummy_job() + tracker = create_dummy_tracker() # Now that the asleep / awake functionality has been tested we can avoid manually killing the responder by setting # to awake. That will prevent the zmq thread to be launched again. responder.asleep = False # A txid instead of a rawtx should be enough for unit tests using the bitcoind mock, better tests are needed though. - job.penalty_rawtx = job.penalty_txid + tracker.penalty_rawtx = tracker.penalty_txid # The block_hash passed to add_response does not matter much now. It will in the future to deal with errors - receipt = responder.add_response( - job.locator, + receipt = responder.handle_breach( + tracker.locator, uuid, - job.dispute_txid, - job.penalty_txid, - job.penalty_rawtx, - job.appointment_end, + tracker.dispute_txid, + tracker.penalty_txid, + tracker.penalty_rawtx, + tracker.appointment_end, block_hash=get_random_value_hex(32), ) assert receipt.delivered is False -def test_create_job(responder): +def test_add_tracker(responder): responder.asleep = False for _ in range(20): uuid = uuid4().hex confirmations = 0 - locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_job_data(random_txid=True) + locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data( + random_txid=True + ) - # Check the job is not within the responder jobs before adding it - assert uuid not in responder.jobs - assert penalty_txid not in responder.tx_job_map + # Check the tracker is not within the responder trackers before adding it + assert uuid not in responder.trackers + assert penalty_txid not in responder.tx_tracker_map assert penalty_txid not in responder.unconfirmed_txs # And that it is afterwards - responder.create_job(uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations) - assert uuid in responder.jobs - assert penalty_txid in responder.tx_job_map + responder.add_tracker(uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations) + assert uuid in responder.trackers + assert penalty_txid in responder.tx_tracker_map assert penalty_txid in responder.unconfirmed_txs - # Check that the rest of job data also matches - job = responder.jobs[uuid] + # Check that the rest of tracker data also matches + tracker = responder.trackers[uuid] assert ( - job.dispute_txid == dispute_txid - and job.penalty_txid == penalty_txid - and job.penalty_rawtx == penalty_rawtx - and job.appointment_end == appointment_end - and job.appointment_end == appointment_end + tracker.dispute_txid == dispute_txid + and tracker.penalty_txid == penalty_txid + and tracker.penalty_rawtx == penalty_rawtx + and tracker.appointment_end == appointment_end + and tracker.appointment_end == appointment_end ) -def test_create_job_same_penalty_txid(responder): - # Create the same job using two different uuids +def test_add_tracker_same_penalty_txid(responder): + # Create the same tracker using two different uuids confirmations = 0 - locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_job_data(random_txid=True) + locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data(random_txid=True) uuid_1 = uuid4().hex uuid_2 = uuid4().hex - responder.create_job(uuid_1, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations) - responder.create_job(uuid_2, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations) + responder.add_tracker(uuid_1, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations) + responder.add_tracker(uuid_2, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations) - # Check that both jobs have been added - assert uuid_1 in responder.jobs and uuid_2 in responder.jobs - assert penalty_txid in responder.tx_job_map + # Check that both trackers have been added + assert uuid_1 in responder.trackers and uuid_2 in responder.trackers + assert penalty_txid in responder.tx_tracker_map assert penalty_txid in responder.unconfirmed_txs - # Check that the rest of job data also matches + # Check that the rest of tracker data also matches for uuid in [uuid_1, uuid_2]: - job = responder.jobs[uuid] + tracker = responder.trackers[uuid] assert ( - job.dispute_txid == dispute_txid - and job.penalty_txid == penalty_txid - and job.penalty_rawtx == penalty_rawtx - and job.appointment_end == appointment_end - and job.appointment_end == appointment_end + tracker.dispute_txid == dispute_txid + and tracker.penalty_txid == penalty_txid + and tracker.penalty_rawtx == penalty_rawtx + and tracker.appointment_end == appointment_end + and tracker.appointment_end == appointment_end ) -def test_create_job_already_confirmed(responder): +def test_add_tracker_already_confirmed(responder): responder.asleep = False for i in range(20): uuid = uuid4().hex confirmations = i + 1 - locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_job_data( + locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data( penalty_rawtx=TX.create_dummy_transaction() ) - responder.create_job(uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations) + responder.add_tracker(uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations) assert penalty_txid not in responder.unconfirmed_txs @@ -303,16 +305,16 @@ def test_do_watch(temp_db_manager): zmq_thread.daemon = True zmq_thread.start() - jobs = [create_dummy_job(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(20)] + trackers = [create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(20)] - # Let's set up the jobs first - for job in jobs: + # Let's set up the trackers first + for tracker in trackers: uuid = uuid4().hex - responder.jobs[uuid] = job - responder.tx_job_map[job.penalty_txid] = [uuid] - responder.missed_confirmations[job.penalty_txid] = 0 - responder.unconfirmed_txs.append(job.penalty_txid) + responder.trackers[uuid] = tracker + responder.tx_tracker_map[tracker.penalty_txid] = [uuid] + responder.missed_confirmations[tracker.penalty_txid] = 0 + responder.unconfirmed_txs.append(tracker.penalty_txid) # Let's start to watch watch_thread = Thread(target=responder.do_watch) @@ -321,9 +323,9 @@ def test_do_watch(temp_db_manager): # And broadcast some of the transactions broadcast_txs = [] - for job in jobs[:5]: - bitcoin_cli().sendrawtransaction(job.penalty_rawtx) - broadcast_txs.append(job.penalty_txid) + for tracker in trackers[:5]: + bitcoin_cli().sendrawtransaction(tracker.penalty_rawtx) + broadcast_txs.append(tracker.penalty_txid) # Mine a block generate_block() @@ -333,21 +335,21 @@ def test_do_watch(temp_db_manager): # TODO: test that reorgs can be detected once data persistence is merged (new version of the simulator) - # Generating 5 additional blocks should complete the 5 jobs + # Generating 5 additional blocks should complete the 5 trackers generate_blocks(5) - assert not set(broadcast_txs).issubset(responder.tx_job_map) + assert not set(broadcast_txs).issubset(responder.tx_tracker_map) # Do the rest broadcast_txs = [] - for job in jobs[5:]: - bitcoin_cli().sendrawtransaction(job.penalty_rawtx) - broadcast_txs.append(job.penalty_txid) + for tracker in trackers[5:]: + bitcoin_cli().sendrawtransaction(tracker.penalty_rawtx) + broadcast_txs.append(tracker.penalty_txid) # Mine a block generate_blocks(6) - assert len(responder.tx_job_map) == 0 + assert len(responder.tx_tracker_map) == 0 assert responder.asleep is True @@ -368,9 +370,9 @@ def test_check_confirmations(temp_db_manager): txs_subset = random.sample(txs, k=10) responder.unconfirmed_txs.extend(txs_subset) - # We also need to add them to the tx_job_map since they would be there in normal conditions - responder.tx_job_map = { - txid: Job(txid[:LOCATOR_LEN_HEX], txid, None, None, None) for txid in responder.unconfirmed_txs + # We also need to add them to the tx_tracker_map since they would be there in normal conditions + responder.tx_tracker_map = { + txid: TransactionTracker(txid[:LOCATOR_LEN_HEX], txid, None, None, None) for txid in responder.unconfirmed_txs } # Let's make sure that there are no txs with missed confirmations yet @@ -387,6 +389,7 @@ def test_check_confirmations(temp_db_manager): assert responder.missed_confirmations[tx] == 1 +# WIP: Check this properly, a bug pass unnoticed! def test_get_txs_to_rebroadcast(responder): # Let's create a few fake txids and assign at least 6 missing confirmations to each txs_missing_too_many_conf = {get_random_value_hex(32): 6 + i for i in range(10)} @@ -396,67 +399,69 @@ def test_get_txs_to_rebroadcast(responder): # All the txs in the first dict should be flagged as to_rebroadcast responder.missed_confirmations = txs_missing_too_many_conf - txs_to_rebroadcast = responder.get_txs_to_rebroadcast(txs_missing_too_many_conf) + txs_to_rebroadcast = responder.get_txs_to_rebroadcast() assert txs_to_rebroadcast == list(txs_missing_too_many_conf.keys()) # Non of the txs in the second dict should be flagged responder.missed_confirmations = txs_missing_some_conf - txs_to_rebroadcast = responder.get_txs_to_rebroadcast(txs_missing_some_conf) + txs_to_rebroadcast = responder.get_txs_to_rebroadcast() assert txs_to_rebroadcast == [] # Let's check that it also works with a mixed dict responder.missed_confirmations.update(txs_missing_too_many_conf) - txs_to_rebroadcast = responder.get_txs_to_rebroadcast(txs_missing_some_conf) + txs_to_rebroadcast = responder.get_txs_to_rebroadcast() assert txs_to_rebroadcast == list(txs_missing_too_many_conf.keys()) -def test_get_completed_jobs(db_manager): +def test_get_completed_trackers(db_manager): initial_height = bitcoin_cli().getblockcount() # Let's use a fresh responder for this to make it easier to compare the results responder = Responder(db_manager) - # A complete job is a job that has reached the appointment end with enough confirmations (> MIN_CONFIRMATIONS) + # A complete tracker is a tracker that has reached the appointment end with enough confirmations (> MIN_CONFIRMATIONS) # We'll create three type of transactions: end reached + enough conf, end reached + no enough conf, end not reached - jobs_end_conf = {uuid4().hex: create_dummy_job(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(10)} + trackers_end_conf = { + uuid4().hex: create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(10) + } - jobs_end_no_conf = {} + trackers_end_no_conf = {} for _ in range(10): - job = create_dummy_job(penalty_rawtx=TX.create_dummy_transaction()) - responder.unconfirmed_txs.append(job.penalty_txid) - jobs_end_no_conf[uuid4().hex] = job + tracker = create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) + responder.unconfirmed_txs.append(tracker.penalty_txid) + trackers_end_no_conf[uuid4().hex] = tracker - jobs_no_end = {} + trackers_no_end = {} for _ in range(10): - job = create_dummy_job(penalty_rawtx=TX.create_dummy_transaction()) - job.appointment_end += 10 - jobs_no_end[uuid4().hex] = job + tracker = create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) + tracker.appointment_end += 10 + trackers_no_end[uuid4().hex] = tracker # Let's add all to the responder - responder.jobs.update(jobs_end_conf) - responder.jobs.update(jobs_end_no_conf) - responder.jobs.update(jobs_no_end) + responder.trackers.update(trackers_end_conf) + responder.trackers.update(trackers_end_no_conf) + responder.trackers.update(trackers_no_end) - for uuid, job in responder.jobs.items(): - bitcoin_cli().sendrawtransaction(job.penalty_rawtx) + for uuid, tracker in responder.trackers.items(): + bitcoin_cli().sendrawtransaction(tracker.penalty_rawtx) - # The dummy appointments have a end_appointment time of current + 2, but jobs need at least 6 confs by default + # The dummy appointments have a end_appointment time of current + 2, but trackers need at least 6 confs by default generate_blocks(6) # And now let's check - completed_jobs = responder.get_completed_jobs(initial_height + 6) - completed_jobs_ids = [job_id for job_id, confirmations in completed_jobs] - ended_jobs_keys = list(jobs_end_conf.keys()) - assert set(completed_jobs_ids) == set(ended_jobs_keys) + completed_trackers = responder.get_completed_trackers(initial_height + 6) + completed_trackers_ids = [tracker_id for tracker_id, confirmations in completed_trackers] + ended_trackers_keys = list(trackers_end_conf.keys()) + assert set(completed_trackers_ids) == set(ended_trackers_keys) - # Generating 6 additional blocks should also confirm jobs_no_end + # Generating 6 additional blocks should also confirm trackers_no_end generate_blocks(6) - completed_jobs = responder.get_completed_jobs(initial_height + 12) - completed_jobs_ids = [job_id for job_id, confirmations in completed_jobs] - ended_jobs_keys.extend(list(jobs_no_end.keys())) + completed_trackers = responder.get_completed_trackers(initial_height + 12) + completed_trackers_ids = [tracker_id for tracker_id, confirmations in completed_trackers] + ended_trackers_keys.extend(list(trackers_no_end.keys())) - assert set(completed_jobs_ids) == set(ended_jobs_keys) + assert set(completed_trackers_ids) == set(ended_trackers_keys) def test_rebroadcast(db_manager): @@ -465,15 +470,17 @@ def test_rebroadcast(db_manager): txs_to_rebroadcast = [] - # Rebroadcast calls add_response with retry=True. The job data is already in jobs. + # Rebroadcast calls add_response with retry=True. The tracker data is already in trackers. for i in range(20): uuid = uuid4().hex - locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_job_data( + locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data( penalty_rawtx=TX.create_dummy_transaction() ) - responder.jobs[uuid] = Job(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end) - responder.tx_job_map[penalty_txid] = [uuid] + responder.trackers[uuid] = TransactionTracker( + locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end + ) + responder.tx_tracker_map[penalty_txid] = [uuid] responder.unconfirmed_txs.append(penalty_txid) # Let's add some of the txs in the rebroadcast list @@ -481,7 +488,7 @@ def test_rebroadcast(db_manager): txs_to_rebroadcast.append(penalty_txid) # The block_hash passed to rebroadcast does not matter much now. It will in the future to deal with errors - receipts = responder.rebroadcast(txs_to_rebroadcast, get_random_value_hex(32)) + receipts = responder.rebroadcast(txs_to_rebroadcast) # All txs should have been delivered and the missed confirmation reset for txid, receipt in receipts: