diff --git a/pisa/responder.py b/pisa/responder.py index d5f0f63..058b658 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -18,14 +18,12 @@ logger = Logger("Responder") class Job: - def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry_counter=0): + def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end): self.dispute_txid = dispute_txid self.justice_txid = justice_txid self.justice_rawtx = justice_rawtx self.appointment_end = appointment_end - self.retry_counter = retry_counter - # FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info # can be directly got from DB self.locator = sha256(unhexlify(dispute_txid)).hexdigest() @@ -56,11 +54,11 @@ class Responder: carrier = Carrier() receipt = carrier.send_transaction(justice_rawtx, justice_txid) + # do_watch can call add_response recursively if a broadcast transaction does not get confirmations + # retry holds that information. If retry is true the job already exists if receipt.delivered: - # do_watch can call add_response recursively if a broadcast transaction does not get confirmations - # retry holds such information. - self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry, - confirmations=receipt.confirmations) + if not retry: + self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, receipt.confirmations) else: # TODO: Add the missing reasons (e.g. RPC_VERIFY_REJECTED) @@ -68,25 +66,17 @@ class Responder: return receipt - def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0, - retry=False): + def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0): + self.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end) - # ToDo: #23-define-behaviour-approaching-end - if retry: - self.jobs[uuid].retry_counter += 1 - self.missed_confirmations[justice_txid] = 0 + if justice_txid in self.tx_job_map: + self.tx_job_map[justice_txid].append(uuid) else: - self.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations) + self.tx_job_map[justice_txid] = [uuid] - if justice_txid in self.tx_job_map: - self.tx_job_map[justice_txid].append(uuid) - - else: - self.tx_job_map[justice_txid] = [uuid] - - if confirmations == 0: - self.unconfirmed_txs.append(justice_txid) + if confirmations == 0: + self.unconfirmed_txs.append(justice_txid) logger.info("New job added.", dispute_txid=dispute_txid, justice_txid=justice_txid, appointment_end=appointment_end) @@ -106,7 +96,7 @@ class Responder: def do_watch(self): # ToDo: #9-add-data-persistence # change prev_block_hash to the last known tip when bootstrapping - prev_block_hash = 0 + prev_block_hash = BlockProcessor.get_best_block_hash() while len(self.jobs) > 0: # We get notified for every new received block @@ -121,21 +111,21 @@ class Responder: block_hash=block_hash, prev_block_hash=block.get('previousblockhash'), txs=txs) # ToDo: #9-add-data-persistence - # change prev_block_hash condition - if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0: + if prev_block_hash == block.get('previousblockhash'): self.unconfirmed_txs, self.missed_confirmations = BlockProcessor.check_confirmations( txs, self.unconfirmed_txs, self.tx_job_map, self.missed_confirmations) txs_to_rebroadcast = self.get_txs_to_rebroadcast(txs) - Cleaner.delete_completed_jobs(self.jobs, self.tx_job_map, self.get_completed_jobs(height), height) + completed_jobs = self.get_completed_jobs(height) + Cleaner.delete_completed_jobs(self.jobs, self.tx_job_map, completed_jobs, height) self.rebroadcast(txs_to_rebroadcast) else: - logger.warning("Reorg found", - local_prev_block_hash=prev_block_hash, + logger.warning("Reorg found", local_prev_block_hash=prev_block_hash, remote_prev_block_hash=block.get('previousblockhash')) + # ToDo: #24-properly-handle-reorgs self.handle_reorgs() prev_block_hash = block.get('hash') @@ -160,31 +150,42 @@ class Responder: completed_jobs = [] for uuid, job in self.jobs.items(): - if job.appointment_end <= height: + if job.appointment_end <= height and job.justice_txid not in self.unconfirmed_txs: tx = Carrier.get_transaction(job.justice_txid) # FIXME: Should be improved with the librarian - confirmations = tx.get('confirmations') - if tx is not None and confirmations > MIN_CONFIRMATIONS: - # The end of the appointment has been reached - completed_jobs.append((uuid, confirmations)) + if tx is not None: + confirmations = tx.get('confirmations') + + if confirmations >= MIN_CONFIRMATIONS: + # The end of the appointment has been reached + completed_jobs.append((uuid, confirmations)) return completed_jobs - def rebroadcast(self, jobs_to_rebroadcast): + def rebroadcast(self, txs_to_rebroadcast): # DISCUSS: #22-discuss-confirmations-before-retry # ToDo: #23-define-behaviour-approaching-end - for tx in jobs_to_rebroadcast: - for uuid in self.tx_job_map[tx]: - self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid, - self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True) + receipts = [] + + for txid in txs_to_rebroadcast: + self.missed_confirmations[txid] = 0 + + for uuid in self.tx_job_map[txid]: + job = self.jobs[uuid] + receipt = self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx, + job.appointment_end, retry=True) logger.warning("Transaction has missed many confirmations. Rebroadcasting.", - justice_txid=self.jobs[uuid].justice_txid, - confirmations_missed=CONFIRMATIONS_BEFORE_RETRY) + justice_txid=job.justice_txid, confirmations_missed=CONFIRMATIONS_BEFORE_RETRY) + + receipts.append((txid, receipt)) + + return receipts # FIXME: Legacy code, must be checked and updated/fixed + # NOTCOVERED def handle_reorgs(self): for uuid, job in self.jobs.items(): # First we check if the dispute transaction is still in the blockchain. If not, the justice can not be