diff --git a/pisa/responder.py b/pisa/responder.py index d1c2aa3..6f18d19 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -145,11 +145,15 @@ class Responder: self.block_queue = Queue() self.chain_monitor = chain_monitor self.db_manager = db_manager + self.carrier = Carrier() + self.last_known_block = db_manager.load_last_block_hash_responder() def awake(self): self.asleep = False self.chain_monitor.responder_asleep = False - Thread(target=self.do_watch).start() + responder_thread = Thread(target=self.do_watch, daemon=True).start() + + return responder_thread def sleep(self): self.asleep = True @@ -211,8 +215,7 @@ class Responder: if self.asleep: logger.info("Waking up") - carrier = Carrier() - receipt = carrier.send_transaction(penalty_rawtx, penalty_txid) + receipt = self.carrier.send_transaction(penalty_rawtx, penalty_txid) if receipt.delivered: self.add_tracker( @@ -286,8 +289,9 @@ class Responder: etc. """ - # ToDo: change prev_block_hash to the last known tip when bootstrapping - prev_block_hash = BlockProcessor.get_best_block_hash() + # Distinguish fresh bootstraps from bootstraps from db + if self.last_known_block is None: + self.last_known_block = BlockProcessor.get_best_block_hash() while len(self.trackers) > 0: # We get notified for every new received block @@ -301,7 +305,7 @@ class Responder: "New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"), txs=txs ) - if prev_block_hash == block.get("previousblockhash"): + if self.last_known_block == block.get("previousblockhash"): self.check_confirmations(txs) height = block.get("height") @@ -317,7 +321,7 @@ class Responder: else: logger.warning( "Reorg found", - local_prev_block_hash=prev_block_hash, + local_prev_block_hash=self.last_known_block, remote_prev_block_hash=block.get("previousblockhash"), ) @@ -327,7 +331,10 @@ class Responder: # Register the last processed block for the responder self.db_manager.store_last_block_hash_responder(block_hash) - prev_block_hash = block.get("hash") + self.last_known_block = block.get("hash") + + self.block_queue.task_done() + self.carrier.issued_receipts = {} # Go back to sleep if there are no more pending trackers self.sleep() @@ -387,23 +394,29 @@ class Responder: height (:obj:`int`): the height of the last received block. Returns: - :obj:`list`: a list of tuples ``uuid:confirmations`` for the completed trackers. + :obj:`dict`: a dict (``uuid:confirmations``) of the completed trackers. """ - completed_trackers = [] + completed_trackers = {} + checked_txs = {} for uuid, tracker_data in self.trackers.items(): appointment_end = tracker_data.get("appointment_end") penalty_txid = tracker_data.get("penalty_txid") if appointment_end <= height and penalty_txid not in self.unconfirmed_txs: - tx = Carrier.get_transaction(penalty_txid) + + if penalty_txid not in checked_txs: + tx = Carrier.get_transaction(penalty_txid) + else: + tx = checked_txs.get(penalty_txid) if tx is not None: confirmations = tx.get("confirmations") + checked_txs[penalty_txid] = tx if confirmations is not None and confirmations >= MIN_CONFIRMATIONS: # The end of the appointment has been reached - completed_trackers.append((uuid, confirmations)) + completed_trackers[uuid] = confirmations return completed_trackers @@ -427,7 +440,6 @@ class Responder: # ToDo: #23-define-behaviour-approaching-end receipts = [] - carrier = Carrier() for txid in txs_to_rebroadcast: self.missed_confirmations[txid] = 0 @@ -440,7 +452,7 @@ class Responder: "Transaction has missed many confirmations. Rebroadcasting", penalty_txid=tracker.penalty_txid ) - receipt = carrier.send_transaction(tracker.penalty_rawtx, tracker.penalty_txid) + receipt = self.carrier.send_transaction(tracker.penalty_rawtx, tracker.penalty_txid) receipts.append((txid, receipt)) if not receipt.delivered: @@ -459,17 +471,16 @@ class Responder: block_hash (:obj:`str`): the hash of the last block received (which triggered the reorg). """ - carrier = Carrier() for uuid in self.trackers.keys(): tracker = TransactionTracker.from_dict(self.db_manager.load_responder_tracker(uuid)) # First we check if the dispute transaction is known (exists either in mempool or blockchain) - dispute_tx = carrier.get_transaction(tracker.dispute_txid) + dispute_tx = self.carrier.get_transaction(tracker.dispute_txid) if dispute_tx is not None: # If the dispute is there, we check the penalty - penalty_tx = carrier.get_transaction(tracker.penalty_txid) + penalty_tx = self.carrier.get_transaction(tracker.penalty_txid) if penalty_tx is not None: # If the penalty exists we need to check is it's on the blockchain or not so we can update the