Improves responder.

Several changes have been performed:

- Retry counter has been removed (#23)
- Rebroadcast return receipts now
- Re-calling send_transaction if a fixable error occurs should be handled in the responder now (missing)
- Fixes some small bugs
This commit is contained in:
Sergi Delgado Segura
2019-10-22 15:03:18 +01:00
parent 45552e390c
commit be16d8af73

View File

@@ -18,14 +18,12 @@ logger = Logger("Responder")
class Job: class Job:
def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry_counter=0): def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end):
self.dispute_txid = dispute_txid self.dispute_txid = dispute_txid
self.justice_txid = justice_txid self.justice_txid = justice_txid
self.justice_rawtx = justice_rawtx self.justice_rawtx = justice_rawtx
self.appointment_end = appointment_end self.appointment_end = appointment_end
self.retry_counter = retry_counter
# FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info # FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info
# can be directly got from DB # can be directly got from DB
self.locator = sha256(unhexlify(dispute_txid)).hexdigest() self.locator = sha256(unhexlify(dispute_txid)).hexdigest()
@@ -56,11 +54,11 @@ class Responder:
carrier = Carrier() carrier = Carrier()
receipt = carrier.send_transaction(justice_rawtx, justice_txid) receipt = carrier.send_transaction(justice_rawtx, justice_txid)
# do_watch can call add_response recursively if a broadcast transaction does not get confirmations
# retry holds that information. If retry is true the job already exists
if receipt.delivered: if receipt.delivered:
# do_watch can call add_response recursively if a broadcast transaction does not get confirmations if not retry:
# retry holds such information. self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, receipt.confirmations)
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry,
confirmations=receipt.confirmations)
else: else:
# TODO: Add the missing reasons (e.g. RPC_VERIFY_REJECTED) # TODO: Add the missing reasons (e.g. RPC_VERIFY_REJECTED)
@@ -68,25 +66,17 @@ class Responder:
return receipt return receipt
def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0, def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0):
retry=False): self.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end)
# ToDo: #23-define-behaviour-approaching-end if justice_txid in self.tx_job_map:
if retry: self.tx_job_map[justice_txid].append(uuid)
self.jobs[uuid].retry_counter += 1
self.missed_confirmations[justice_txid] = 0
else: else:
self.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations) self.tx_job_map[justice_txid] = [uuid]
if justice_txid in self.tx_job_map: if confirmations == 0:
self.tx_job_map[justice_txid].append(uuid) self.unconfirmed_txs.append(justice_txid)
else:
self.tx_job_map[justice_txid] = [uuid]
if confirmations == 0:
self.unconfirmed_txs.append(justice_txid)
logger.info("New job added.", dispute_txid=dispute_txid, justice_txid=justice_txid, logger.info("New job added.", dispute_txid=dispute_txid, justice_txid=justice_txid,
appointment_end=appointment_end) appointment_end=appointment_end)
@@ -106,7 +96,7 @@ class Responder:
def do_watch(self): def do_watch(self):
# ToDo: #9-add-data-persistence # ToDo: #9-add-data-persistence
# change prev_block_hash to the last known tip when bootstrapping # change prev_block_hash to the last known tip when bootstrapping
prev_block_hash = 0 prev_block_hash = BlockProcessor.get_best_block_hash()
while len(self.jobs) > 0: while len(self.jobs) > 0:
# We get notified for every new received block # We get notified for every new received block
@@ -121,21 +111,21 @@ class Responder:
block_hash=block_hash, prev_block_hash=block.get('previousblockhash'), txs=txs) block_hash=block_hash, prev_block_hash=block.get('previousblockhash'), txs=txs)
# ToDo: #9-add-data-persistence # ToDo: #9-add-data-persistence
# change prev_block_hash condition if prev_block_hash == block.get('previousblockhash'):
if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0:
self.unconfirmed_txs, self.missed_confirmations = BlockProcessor.check_confirmations( self.unconfirmed_txs, self.missed_confirmations = BlockProcessor.check_confirmations(
txs, self.unconfirmed_txs, self.tx_job_map, self.missed_confirmations) txs, self.unconfirmed_txs, self.tx_job_map, self.missed_confirmations)
txs_to_rebroadcast = self.get_txs_to_rebroadcast(txs) txs_to_rebroadcast = self.get_txs_to_rebroadcast(txs)
Cleaner.delete_completed_jobs(self.jobs, self.tx_job_map, self.get_completed_jobs(height), height) completed_jobs = self.get_completed_jobs(height)
Cleaner.delete_completed_jobs(self.jobs, self.tx_job_map, completed_jobs, height)
self.rebroadcast(txs_to_rebroadcast) self.rebroadcast(txs_to_rebroadcast)
else: else:
logger.warning("Reorg found", logger.warning("Reorg found", local_prev_block_hash=prev_block_hash,
local_prev_block_hash=prev_block_hash,
remote_prev_block_hash=block.get('previousblockhash')) remote_prev_block_hash=block.get('previousblockhash'))
# ToDo: #24-properly-handle-reorgs
self.handle_reorgs() self.handle_reorgs()
prev_block_hash = block.get('hash') prev_block_hash = block.get('hash')
@@ -160,31 +150,42 @@ class Responder:
completed_jobs = [] completed_jobs = []
for uuid, job in self.jobs.items(): for uuid, job in self.jobs.items():
if job.appointment_end <= height: if job.appointment_end <= height and job.justice_txid not in self.unconfirmed_txs:
tx = Carrier.get_transaction(job.justice_txid) tx = Carrier.get_transaction(job.justice_txid)
# FIXME: Should be improved with the librarian # FIXME: Should be improved with the librarian
confirmations = tx.get('confirmations') if tx is not None:
if tx is not None and confirmations > MIN_CONFIRMATIONS: confirmations = tx.get('confirmations')
# The end of the appointment has been reached
completed_jobs.append((uuid, confirmations)) if confirmations >= MIN_CONFIRMATIONS:
# The end of the appointment has been reached
completed_jobs.append((uuid, confirmations))
return completed_jobs return completed_jobs
def rebroadcast(self, jobs_to_rebroadcast): def rebroadcast(self, txs_to_rebroadcast):
# DISCUSS: #22-discuss-confirmations-before-retry # DISCUSS: #22-discuss-confirmations-before-retry
# ToDo: #23-define-behaviour-approaching-end # ToDo: #23-define-behaviour-approaching-end
for tx in jobs_to_rebroadcast: receipts = []
for uuid in self.tx_job_map[tx]:
self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid, for txid in txs_to_rebroadcast:
self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True) self.missed_confirmations[txid] = 0
for uuid in self.tx_job_map[txid]:
job = self.jobs[uuid]
receipt = self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx,
job.appointment_end, retry=True)
logger.warning("Transaction has missed many confirmations. Rebroadcasting.", logger.warning("Transaction has missed many confirmations. Rebroadcasting.",
justice_txid=self.jobs[uuid].justice_txid, justice_txid=job.justice_txid, confirmations_missed=CONFIRMATIONS_BEFORE_RETRY)
confirmations_missed=CONFIRMATIONS_BEFORE_RETRY)
receipts.append((txid, receipt))
return receipts
# FIXME: Legacy code, must be checked and updated/fixed # FIXME: Legacy code, must be checked and updated/fixed
# NOTCOVERED
def handle_reorgs(self): def handle_reorgs(self):
for uuid, job in self.jobs.items(): for uuid, job in self.jobs.items():
# First we check if the dispute transaction is still in the blockchain. If not, the justice can not be # First we check if the dispute transaction is still in the blockchain. If not, the justice can not be