From 47044625022c7781b497c31669bd4abcdc38887f Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 4 Oct 2019 13:52:51 +0100 Subject: [PATCH] Increases responder modularity The responder had way too complex functions. Separate them into smaller / more specific ones to increse modularity and code reuse. --- pisa/responder.py | 172 +++++++++++++++++++--------------------------- 1 file changed, 69 insertions(+), 103 deletions(-) diff --git a/pisa/responder.py b/pisa/responder.py index e277eea..0f04662 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -3,27 +3,24 @@ from threading import Thread from hashlib import sha256 from binascii import unhexlify -from pisa.rpc_errors import * from pisa.cleaner import Cleaner -from pisa import logging, bitcoin_cli +from pisa.carrier import Carrier +from pisa import logging from pisa.tools import check_tx_in_chain from pisa.block_processor import BlockProcessor from pisa.utils.zmq_subscriber import ZMQHandler -from pisa.utils.auth_proxy import JSONRPCException CONFIRMATIONS_BEFORE_RETRY = 6 MIN_CONFIRMATIONS = 6 class Job: - def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0, retry_counter=0): + def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry_counter=0): self.dispute_txid = dispute_txid self.justice_txid = justice_txid self.justice_rawtx = justice_rawtx self.appointment_end = appointment_end - self.confirmations = confirmations - self.missed_confirmations = 0 self.retry_counter = retry_counter # FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info @@ -31,8 +28,7 @@ class Job: self.locator = sha256(unhexlify(dispute_txid)).hexdigest() def to_json(self): - job = {"locator": self.locator, "justice_rawtx": self.justice_rawtx, "confirmations": self.confirmations, - "appointment_end": self.appointment_end} + job = {"locator": self.locator, "justice_rawtx": self.justice_rawtx, "appointment_end": self.appointment_end} return job @@ -41,25 +37,28 @@ class Responder: def __init__(self): self.jobs = dict() self.tx_job_map = dict() + self.unconfirmed_txs = [] + self.missed_confirmations = dict() self.block_queue = None self.asleep = True self.zmq_subscriber = None def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False): + if self.asleep: + logging.info("[Responder] waking up!") - try: - if self.asleep: - logging.info("[Responder] waking up!") - logging.info("[Responder] pushing transaction to the network (txid: {})".format(justice_txid)) + carrier = Carrier() + receipt = carrier.send_transaction(justice_rawtx, justice_txid) - bitcoin_cli.sendrawtransaction(justice_rawtx) - - # handle_responses can call add_response recursively if a broadcast transaction does not get confirmations + if receipt.delivered: + # do_watch can call add_response recursively if a broadcast transaction does not get confirmations # retry holds such information. - self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry) + self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry, + confirmations=receipt.confirmations) - except JSONRPCException as e: - self.handle_send_failures(e, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry) + else: + # TODO: Add the missing reasons (e.g. RPC_VERIFY_REJECTED) + pass def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0, retry=False): @@ -68,6 +67,7 @@ class Responder: if retry: self.jobs[uuid].retry_counter += 1 self.jobs[uuid].missed_confirmations = 0 + else: self.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations) @@ -77,6 +77,9 @@ class Responder: else: self.tx_job_map[justice_txid] = [uuid] + if confirmations == 0: + self.unconfirmed_txs.append(justice_txid) + logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})' .format(dispute_txid, justice_txid, appointment_end)) @@ -84,7 +87,7 @@ class Responder: self.asleep = False self.block_queue = Queue() zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue]) - responder = Thread(target=self.handle_responses) + responder = Thread(target=self.do_watch) zmq_thread.start() responder.start() @@ -92,7 +95,7 @@ class Responder: self.zmq_subscriber = ZMQHandler(parent='Responder') self.zmq_subscriber.handle(block_queue) - def handle_responses(self): + def do_watch(self): # ToDo: #9-add-data-persistence # change prev_block_hash to the last known tip when bootstrapping prev_block_hash = 0 @@ -100,10 +103,9 @@ class Responder: while len(self.jobs) > 0: # We get notified for every new received block block_hash = self.block_queue.get() - block = BlockProcessor.getblock(block_hash) + block = BlockProcessor.get_block(block_hash) if block is not None: - block = bitcoin_cli.getblock(block_hash) txs = block.get('tx') height = block.get('height') @@ -111,48 +113,25 @@ class Responder: logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash'))) logging.info("[Responder] list of transactions: {}".format(txs)) - else: - continue + # ToDo: #9-add-data-persistence + # change prev_block_hash condition + if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0: + self.unconfirmed_txs, self.missed_confirmations = BlockProcessor.check_confirmations( + txs, self.unconfirmed_txs, self.tx_job_map, self.missed_confirmations) - completed_jobs = [] - jobs_to_rebroadcast = [] - # ToDo: #9-add-data-persistence - # change prev_block_hash condition - if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0: - # Keep count of the confirmations each tx gets - for justice_txid, jobs in self.tx_job_map.items(): - for uuid in jobs: - if justice_txid in txs or self.jobs[uuid].confirmations > 0: - self.jobs[uuid].confirmations += 1 + txs_to_rebroadcast = self.get_txs_to_rebroadcast(txs) + self.jobs, self.tx_job_map = Cleaner.delete_completed_jobs(self.jobs, self.tx_job_map, + self.get_completed_jobs(height), height) - logging.info("[Responder] new confirmation received for job = {}, txid = {}".format( - uuid, justice_txid)) + self.rebroadcast(txs_to_rebroadcast) - elif self.jobs[uuid].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY: - # If a transactions has missed too many confirmations we add it to the rebroadcast list - jobs_to_rebroadcast.append(uuid) + else: + logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}" + .format(prev_block_hash, block.get('previousblockhash'))) - else: - # Otherwise we increase the number of missed confirmations - self.jobs[uuid].missed_confirmations += 1 + self.handle_reorgs() - if self.jobs[uuid].appointment_end <= height and self.jobs[uuid].confirmations >= \ - MIN_CONFIRMATIONS: - # The end of the appointment has been reached - completed_jobs.append(uuid) - - self.jobs, self.tx_job_map = Cleaner.delete_completed_jobs(self.jobs, self.tx_job_map, completed_jobs, - height) - - self.rebroadcast(jobs_to_rebroadcast) - - else: - logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}" - .format(prev_block_hash, block.get('previousblockhash'))) - - self.handle_reorgs() - - prev_block_hash = block.get('hash') + prev_block_hash = block.get('hash') # Go back to sleep if there are no more jobs self.asleep = True @@ -160,55 +139,43 @@ class Responder: logging.info("[Responder] no more pending jobs, going back to sleep") + def get_txs_to_rebroadcast(self, txs): + txs_to_rebroadcast = [] + + for tx in txs: + if self.missed_confirmations[tx] >= CONFIRMATIONS_BEFORE_RETRY: + # If a transactions has missed too many confirmations we add it to the rebroadcast list + txs_to_rebroadcast.append(tx) + + return txs_to_rebroadcast + + def get_completed_jobs(self, height): + completed_jobs = [] + + for uuid, job in self.jobs: + if job.appointment_end <= height: + tx = Carrier.get_transaction(job.dispute_txid) + + # FIXME: Should be improved with the librarian + if tx is not None and tx.get('confirmations') > MIN_CONFIRMATIONS: + # The end of the appointment has been reached + completed_jobs.append(uuid) + + return completed_jobs + def rebroadcast(self, jobs_to_rebroadcast): # ToDO: #22-discuss-confirmations-before-retry # ToDo: #23-define-behaviour-approaching-end - for uuid in jobs_to_rebroadcast: - self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid, - self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True) + for tx in jobs_to_rebroadcast: + for uuid in self.tx_job_map[tx]: + self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid, + self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True) - logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting" - .format(self.jobs[uuid].justice_txid, CONFIRMATIONS_BEFORE_RETRY)) - - def handle_send_failures(self, e, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry): - # Since we're pushing a raw transaction to the network we can get two kind of rejections: - # RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected - # due to network rules, whereas the later implies that the transaction is already in the blockchain. - if e.error.get('code') == RPC_VERIFY_REJECTED: - # DISCUSS: what to do in this case - # DISCUSS: invalid transactions (properly formatted but invalid, like unsigned) fit here too. - # DISCUSS: RPC_VERIFY_ERROR could also be a possible case. - # DISCUSS: check errors -9 and -10 - pass - - elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN: - try: - logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and start " - "monitoring the transaction".format(justice_txid)) - - # If the transaction is already in the chain, we get the number of confirmations and watch the job - # until the end of the appointment - tx_info = bitcoin_cli.getrawtransaction(justice_txid, 1) - confirmations = int(tx_info.get("confirmations")) - self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry, - confirmations=confirmations) - - except JSONRPCException as e: - # While it's quite unlikely, the transaction that was already in the blockchain could have been - # reorged while we were querying bitcoind to get the confirmation count. In such a case we just - # restart the job - if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: - self.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry) - - else: - # If something else happens (unlikely but possible) log it so we can treat it in future releases - logging.error("[Responder] JSONRPCException. Error {}".format(e)) - - else: - # If something else happens (unlikely but possible) log it so we can treat it in future releases - logging.error("[Responder] JSONRPCException. Error {}".format(e)) + logging.warning("[Responder] tx {} has missed {} confirmations. Rebroadcasting" + .format(self.jobs[uuid].justice_txid, CONFIRMATIONS_BEFORE_RETRY)) + # FIXME: Legacy code, must be checked and updated/fixed def handle_reorgs(self): for uuid, job in self.jobs.items(): # First we check if the dispute transaction is still in the blockchain. If not, the justice can not be @@ -240,4 +207,3 @@ class Responder: # reorg manager logging.warning("[Responder] dispute and justice transaction missing. Calling the reorg manager") logging.error("[Responder] reorg manager not yet implemented") - pass