Updates Responder to minimize data replication operations and properly load data from db

- Uses an instance of the Carrier so it can benefit from issued_receipts and avoid resending multiple copies of the same triggered appointment
- Defines last_known_block to properly load data from db
- Uses task_done from Queue to properly signal task completion when boostraping from db
- Creates a checked_txs dict in get_completed_trackers to avoid querying bitcoind for the same transaction over and over
- Redefines completed_trackers as dict instead of tuple
This commit is contained in:
Sergi Delgado Segura
2020-01-31 12:55:21 +01:00
parent 4848b9a058
commit d9ce265c00

View File

@@ -145,11 +145,15 @@ class Responder:
self.block_queue = Queue() self.block_queue = Queue()
self.chain_monitor = chain_monitor self.chain_monitor = chain_monitor
self.db_manager = db_manager self.db_manager = db_manager
self.carrier = Carrier()
self.last_known_block = db_manager.load_last_block_hash_responder()
def awake(self): def awake(self):
self.asleep = False self.asleep = False
self.chain_monitor.responder_asleep = False self.chain_monitor.responder_asleep = False
Thread(target=self.do_watch).start() responder_thread = Thread(target=self.do_watch, daemon=True).start()
return responder_thread
def sleep(self): def sleep(self):
self.asleep = True self.asleep = True
@@ -211,8 +215,7 @@ class Responder:
if self.asleep: if self.asleep:
logger.info("Waking up") logger.info("Waking up")
carrier = Carrier() receipt = self.carrier.send_transaction(penalty_rawtx, penalty_txid)
receipt = carrier.send_transaction(penalty_rawtx, penalty_txid)
if receipt.delivered: if receipt.delivered:
self.add_tracker( self.add_tracker(
@@ -286,8 +289,9 @@ class Responder:
etc. etc.
""" """
# ToDo: change prev_block_hash to the last known tip when bootstrapping # Distinguish fresh bootstraps from bootstraps from db
prev_block_hash = BlockProcessor.get_best_block_hash() if self.last_known_block is None:
self.last_known_block = BlockProcessor.get_best_block_hash()
while len(self.trackers) > 0: while len(self.trackers) > 0:
# We get notified for every new received block # We get notified for every new received block
@@ -301,7 +305,7 @@ class Responder:
"New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"), txs=txs "New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"), txs=txs
) )
if prev_block_hash == block.get("previousblockhash"): if self.last_known_block == block.get("previousblockhash"):
self.check_confirmations(txs) self.check_confirmations(txs)
height = block.get("height") height = block.get("height")
@@ -317,7 +321,7 @@ class Responder:
else: else:
logger.warning( logger.warning(
"Reorg found", "Reorg found",
local_prev_block_hash=prev_block_hash, local_prev_block_hash=self.last_known_block,
remote_prev_block_hash=block.get("previousblockhash"), remote_prev_block_hash=block.get("previousblockhash"),
) )
@@ -327,7 +331,10 @@ class Responder:
# Register the last processed block for the responder # Register the last processed block for the responder
self.db_manager.store_last_block_hash_responder(block_hash) self.db_manager.store_last_block_hash_responder(block_hash)
prev_block_hash = block.get("hash") self.last_known_block = block.get("hash")
self.block_queue.task_done()
self.carrier.issued_receipts = {}
# Go back to sleep if there are no more pending trackers # Go back to sleep if there are no more pending trackers
self.sleep() self.sleep()
@@ -387,23 +394,29 @@ class Responder:
height (:obj:`int`): the height of the last received block. height (:obj:`int`): the height of the last received block.
Returns: Returns:
:obj:`list`: a list of tuples ``uuid:confirmations`` for the completed trackers. :obj:`dict`: a dict (``uuid:confirmations``) of the completed trackers.
""" """
completed_trackers = [] completed_trackers = {}
checked_txs = {}
for uuid, tracker_data in self.trackers.items(): for uuid, tracker_data in self.trackers.items():
appointment_end = tracker_data.get("appointment_end") appointment_end = tracker_data.get("appointment_end")
penalty_txid = tracker_data.get("penalty_txid") penalty_txid = tracker_data.get("penalty_txid")
if appointment_end <= height and penalty_txid not in self.unconfirmed_txs: if appointment_end <= height and penalty_txid not in self.unconfirmed_txs:
tx = Carrier.get_transaction(penalty_txid)
if penalty_txid not in checked_txs:
tx = Carrier.get_transaction(penalty_txid)
else:
tx = checked_txs.get(penalty_txid)
if tx is not None: if tx is not None:
confirmations = tx.get("confirmations") confirmations = tx.get("confirmations")
checked_txs[penalty_txid] = tx
if confirmations is not None and confirmations >= MIN_CONFIRMATIONS: if confirmations is not None and confirmations >= MIN_CONFIRMATIONS:
# The end of the appointment has been reached # The end of the appointment has been reached
completed_trackers.append((uuid, confirmations)) completed_trackers[uuid] = confirmations
return completed_trackers return completed_trackers
@@ -427,7 +440,6 @@ class Responder:
# ToDo: #23-define-behaviour-approaching-end # ToDo: #23-define-behaviour-approaching-end
receipts = [] receipts = []
carrier = Carrier()
for txid in txs_to_rebroadcast: for txid in txs_to_rebroadcast:
self.missed_confirmations[txid] = 0 self.missed_confirmations[txid] = 0
@@ -440,7 +452,7 @@ class Responder:
"Transaction has missed many confirmations. Rebroadcasting", penalty_txid=tracker.penalty_txid "Transaction has missed many confirmations. Rebroadcasting", penalty_txid=tracker.penalty_txid
) )
receipt = carrier.send_transaction(tracker.penalty_rawtx, tracker.penalty_txid) receipt = self.carrier.send_transaction(tracker.penalty_rawtx, tracker.penalty_txid)
receipts.append((txid, receipt)) receipts.append((txid, receipt))
if not receipt.delivered: if not receipt.delivered:
@@ -459,17 +471,16 @@ class Responder:
block_hash (:obj:`str`): the hash of the last block received (which triggered the reorg). block_hash (:obj:`str`): the hash of the last block received (which triggered the reorg).
""" """
carrier = Carrier()
for uuid in self.trackers.keys(): for uuid in self.trackers.keys():
tracker = TransactionTracker.from_dict(self.db_manager.load_responder_tracker(uuid)) tracker = TransactionTracker.from_dict(self.db_manager.load_responder_tracker(uuid))
# First we check if the dispute transaction is known (exists either in mempool or blockchain) # First we check if the dispute transaction is known (exists either in mempool or blockchain)
dispute_tx = carrier.get_transaction(tracker.dispute_txid) dispute_tx = self.carrier.get_transaction(tracker.dispute_txid)
if dispute_tx is not None: if dispute_tx is not None:
# If the dispute is there, we check the penalty # If the dispute is there, we check the penalty
penalty_tx = carrier.get_transaction(tracker.penalty_txid) penalty_tx = self.carrier.get_transaction(tracker.penalty_txid)
if penalty_tx is not None: if penalty_tx is not None:
# If the penalty exists we need to check is it's on the blockchain or not so we can update the # If the penalty exists we need to check is it's on the blockchain or not so we can update the