diff --git a/pisa/responder.py b/pisa/responder.py index c4b6616..e277eea 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -2,9 +2,12 @@ from queue import Queue from threading import Thread from hashlib import sha256 from binascii import unhexlify -from pisa import logging, bitcoin_cli + from pisa.rpc_errors import * +from pisa.cleaner import Cleaner +from pisa import logging, bitcoin_cli from pisa.tools import check_tx_in_chain +from pisa.block_processor import BlockProcessor from pisa.utils.zmq_subscriber import ZMQHandler from pisa.utils.auth_proxy import JSONRPCException @@ -90,12 +93,16 @@ class Responder: self.zmq_subscriber.handle(block_queue) def handle_responses(self): + # ToDo: #9-add-data-persistence + # change prev_block_hash to the last known tip when bootstrapping prev_block_hash = 0 + while len(self.jobs) > 0: # We get notified for every new received block block_hash = self.block_queue.get() + block = BlockProcessor.getblock(block_hash) - try: + if block is not None: block = bitcoin_cli.getblock(block_hash) txs = block.get('tx') height = block.get('height') @@ -104,12 +111,13 @@ class Responder: logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash'))) logging.info("[Responder] list of transactions: {}".format(txs)) - except JSONRPCException as e: - logging.error("[Responder] couldn't get block from bitcoind. Error code {}".format(e)) - + else: continue completed_jobs = [] + jobs_to_rebroadcast = [] + # ToDo: #9-add-data-persistence + # change prev_block_hash condition if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0: # Keep count of the confirmations each tx gets for justice_txid, jobs in self.tx_job_map.items(): @@ -121,15 +129,8 @@ class Responder: uuid, justice_txid)) elif self.jobs[uuid].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY: - # If a transactions has missed too many confirmations for a while we'll try to rebroadcast - # ToDO: #22-discuss-confirmations-before-retry - # ToDo: #23-define-behaviour-approaching-end - self.add_response(uuid, self.jobs[uuid].dispute_txid, justice_txid, - self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, - retry=True) - - logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting" - .format(justice_txid, CONFIRMATIONS_BEFORE_RETRY)) + # If a transactions has missed too many confirmations we add it to the rebroadcast list + jobs_to_rebroadcast.append(uuid) else: # Otherwise we increase the number of missed confirmations @@ -140,7 +141,10 @@ class Responder: # The end of the appointment has been reached completed_jobs.append(uuid) - self.remove_completed_jobs(completed_jobs, height) + self.jobs, self.tx_job_map = Cleaner.delete_completed_jobs(self.jobs, self.tx_job_map, completed_jobs, + height) + + self.rebroadcast(jobs_to_rebroadcast) else: logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}" @@ -156,6 +160,17 @@ class Responder: logging.info("[Responder] no more pending jobs, going back to sleep") + def rebroadcast(self, jobs_to_rebroadcast): + # ToDO: #22-discuss-confirmations-before-retry + # ToDo: #23-define-behaviour-approaching-end + + for uuid in jobs_to_rebroadcast: + self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid, + self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True) + + logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting" + .format(self.jobs[uuid].justice_txid, CONFIRMATIONS_BEFORE_RETRY)) + def handle_send_failures(self, e, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry): # Since we're pushing a raw transaction to the network we can get two kind of rejections: # RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected @@ -194,24 +209,6 @@ class Responder: # If something else happens (unlikely but possible) log it so we can treat it in future releases logging.error("[Responder] JSONRPCException. Error {}".format(e)) - def remove_completed_jobs(self, completed_jobs, height): - for uuid in completed_jobs: - logging.info("[Responder] job completed (uuid = {}, justice_txid = {}). Appointment ended at " - "block {} after {} confirmations".format(uuid, self.jobs[uuid].justice_txid, height, - self.jobs[uuid].confirmations)) - - # ToDo: #9-add-data-persistency - justice_txid = self.jobs[uuid].justice_txid - self.jobs.pop(uuid) - - if len(self.tx_job_map[justice_txid]) == 1: - self.tx_job_map.pop(justice_txid) - - logging.info("[Responder] no more jobs for justice_txid {}".format(justice_txid)) - - else: - self.tx_job_map[justice_txid].remove(uuid) - def handle_reorgs(self): for uuid, job in self.jobs.items(): # First we check if the dispute transaction is still in the blockchain. If not, the justice can not be diff --git a/pisa/watcher.py b/pisa/watcher.py index 2f517c1..bbf066c 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -1,13 +1,17 @@ -from binascii import hexlify, unhexlify +from uuid import uuid4 from queue import Queue from threading import Thread -from pisa import logging, bitcoin_cli + +from pisa import logging from pisa.responder import Responder +from pisa.conf import MAX_APPOINTMENTS +from pisa.block_processor import BlockProcessor +from pisa.cleaner import Cleaner from pisa.utils.zmq_subscriber import ZMQHandler -from pisa.utils.auth_proxy import JSONRPCException -from hashlib import sha256 -from uuid import uuid4 -from pisa.conf import MAX_APPOINTMENTS, EXPIRY_DELTA + + +# WIP: MOVED BLOCKCHAIN RELATED TASKS TO BLOCK PROCESSOR IN AN AIM TO MAKE THE CODE MORE MODULAR. THIS SHOULD HELP +# WITH CODE REUSE WHEN MERGING THE DATA PERSISTENCE PART. class Watcher: @@ -72,29 +76,20 @@ class Watcher: def do_watch(self): while len(self.appointments) > 0: block_hash = self.block_queue.get() + logging.info("[Watcher] new block received {}".format(block_hash)) - try: - block = bitcoin_cli.getblock(block_hash) + block = BlockProcessor.getblock(block_hash) + + if block is not None: txids = block.get('tx') - logging.info("[Watcher] new block received {}".format(block_hash)) logging.info("[Watcher] list of transactions: {}".format(txids)) - self.delete_expired_appointment(block) + self.appointments, self.locator_uuid_map = Cleaner.delete_expired_appointment( + block, self.appointments, self.locator_uuid_map) - potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids} - - # Check is any of the tx_ids in the received block is an actual match - # Get the locators that are both in the map and in the potential locators dict. - intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys()) - potential_matches = {locator: potential_locators[locator] for locator in intersection} - - if len(potential_matches) > 0: - logging.info("[Watcher] list of potential matches: {}".format(potential_matches)) - else: - logging.info("[Watcher] no potential matches found") - - matches = self.check_potential_matches(potential_matches) + potential_matches = BlockProcessor.get_potential_matches(txids, self.locator_uuid_map) + matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments) for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches: logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})" @@ -108,58 +103,15 @@ class Watcher: # If there was only one appointment that matches the locator we can delete the whole list if len(self.locator_uuid_map[locator]) == 1: - # ToDo: #9-add-data-persistency + # ToDo: #9-add-data-persistence self.locator_uuid_map.pop(locator) else: # Otherwise we just delete the appointment that matches locator:appointment_pos - # ToDo: #9-add-data-persistency + # ToDo: #9-add-data-persistence self.locator_uuid_map[locator].remove(uuid) - except JSONRPCException as e: - logging.error("[Watcher] couldn't get block from bitcoind. Error code {}".format(e)) - # Go back to sleep if there are no more appointments self.asleep = True self.zmq_subscriber.terminate = True logging.error("[Watcher] no more pending appointments, going back to sleep") - - def delete_expired_appointment(self, block): - to_delete = [uuid for uuid, appointment in self.appointments.items() if block["height"] > appointment.end_time - + EXPIRY_DELTA] - - for uuid in to_delete: - # ToDo: #9-add-data-persistency - locator = self.appointments[uuid].locator - - self.appointments.pop(uuid) - - if len(self.locator_uuid_map[locator]) == 1: - self.locator_uuid_map.pop(locator) - - else: - self.locator_uuid_map[locator].remove(uuid) - - logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})".format(locator, - uuid)) - - def check_potential_matches(self, potential_matches): - matches = [] - - for locator, dispute_txid in potential_matches.items(): - for uuid in self.locator_uuid_map[locator]: - try: - # ToDo: #20-test-tx-decrypting-edge-cases - justice_rawtx = self.appointments[uuid].encrypted_blob.decrypt(unhexlify(dispute_txid)) - justice_rawtx = hexlify(justice_rawtx).decode() - justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid') - matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx)) - - logging.info("[Watcher] match found for locator {} (uuid: {}): {}".format(locator, uuid, - justice_txid)) - except JSONRPCException as e: - # Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple - # for the POC - logging.error("[Watcher] can't build transaction from decoded data. Error code {}".format(e)) - - return matches