diff --git a/pisa/responder.py b/pisa/responder.py index 51c7988..bdda0bb 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -44,7 +44,8 @@ class Responder: def create_job(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, conf_counter=0, retry=False): - # DISCUSS: Check what to do if the retry counter gets too big + + # ToDo: #23-define-behaviour-approaching-end if retry: self.jobs[justice_txid].retry_counter += 1 self.jobs[justice_txid].missed_confirmations = 0 @@ -60,6 +61,7 @@ class Responder: self.asleep = False self.block_queue = Queue() zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging]) + # ToDo: This may not have to be a thead. The main thread only creates this and terminates. responder = Thread(target=self.handle_responses, args=[debug, logging]) zmq_thread.start() responder.start() @@ -67,6 +69,9 @@ class Responder: def add_response(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, retry=False): bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT)) + + # ToDo: Moving the sending functionality to a separate function would improve readability. Also try to use + # check_tx_in_chain if possible. try: if debug: if self.asleep: @@ -85,6 +90,9 @@ class Responder: # due to network rules, whereas the later implies that the transaction is already in the blockchain. if e.error.get('code') == RPC_VERIFY_REJECTED: # DISCUSS: what to do in this case + # DISCUSS: invalid transactions (properly formatted but invalid, like unsigned) fit here too. + # DISCUSS: RPC_VERIFY_ERROR could also be a possible case. + # DISCUSS: check errors -9 and -10 pass elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN: try: @@ -101,7 +109,7 @@ class Responder: except JSONRPCException as e: # While it's quite unlikely, the transaction that was already in the blockchain could have been - # reorged while we were querying bitcoind to get the confirmation count. in such a case we just + # reorged while we were querying bitcoind to get the confirmation count. In such a case we just # restart the job if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: self.add_response(dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, @@ -141,8 +149,9 @@ class Responder: if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0: # Keep count of the confirmations each tx gets for job_id, confirmations in self.confirmation_counter.items(): - # If we see the transaction for the first time, or MIN_CONFIRMATIONS hasn't been reached - if job_id in txs or (0 < confirmations): + # If we see the transaction for the first time, or appointment_end & MIN_CONFIRMATIONS hasn't been + # reached + if job_id in txs or confirmations > 0: self.confirmation_counter[job_id] += 1 if debug: @@ -150,9 +159,8 @@ class Responder: elif self.jobs[job_id].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY: # If a transactions has missed too many confirmations for a while we'll try to rebroadcast - # DISCUSS: How many confirmations before retry - # DISCUSS: recursion vs setting confirmations to 0 and rebroadcast here - # DISCUSS: how many max retries and what to do if the cap is reached + # ToDO: #22-discuss-confirmations-before-retry + # ToDo: #23-define-behaviour-approaching-end self.add_response(self.jobs[job_id].dispute_txid, job_id, self.jobs[job_id].justice_rawtx, self.jobs[job_id].appointment_end, debug, logging, retry=True) if debug: @@ -168,6 +176,7 @@ class Responder: jobs_to_delete.append(job_id) for job_id in jobs_to_delete: + # ToDo: Find a better way to solve this. Deepcopy of the keys maybe? # Trying to delete directly when iterating the last for causes dictionary changed size error during # iteration in Python3 (can not be solved iterating only trough keys in Python3 either) @@ -175,7 +184,7 @@ class Responder: logging.info("[Responder] {} completed. Appointment ended at block {} after {} confirmations" .format(job_id, height, self.confirmation_counter[job_id])) - # ToDo: record job in DB + # ToDo: #9-add-data-persistency del self.jobs[job_id] del self.confirmation_counter[job_id] @@ -200,28 +209,30 @@ class Responder: # First we check if the dispute transaction is still in the blockchain. If not, the justice can not be # there either, so we'll need to call the reorg manager straight away dispute_in_chain, _ = check_tx_in_chain(bitcoin_cli, job.dispute_txid, debug, logging, parent='Responder', - tx_label='dispute') + tx_label='dispute tx') # If the dispute is there, we can check the justice tx if dispute_in_chain: justice_in_chain, justice_confirmations = check_tx_in_chain(bitcoin_cli, job_id, debug, logging, - parent='Responder', tx_label='dispute') + parent='Responder', tx_label='justice tx') # If both transactions are there, we only need to update the justice tx confirmation count if justice_in_chain: if debug: logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format( job_id, self.confirmation_counter[job_id], justice_confirmations)) + self.confirmation_counter[job_id] = justice_confirmations - if debug: - logging.info("[Responder] no more pending jobs, going back to sleep") else: # Otherwise, we will add the job back (implying rebroadcast of the tx) and monitor it again # DISCUSS: Adding job back, should we flag it as retried? + # FIXME: Whether we decide to increase the retried counter or not, the current counter should be + # maintained. There is no way of doing so with the current approach. Update if required self.add_response(job.dispute_txid, job_id, job.justice_rawtx, job.appointment_end, debug, logging) else: + # ToDo: #24-properly-handle-reorgs # FIXME: if the dispute is not on chain (either in mempool or not there al all), we need to call the # reorg manager logging.warning("[Responder] dispute and justice transaction missing. Calling the reorg manager") diff --git a/pisa/watcher.py b/pisa/watcher.py index c5936d5..e84ddb9 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -18,8 +18,6 @@ class Watcher: self.responder = Responder() def add_appointment(self, appointment, debug, logging): - # DISCUSS: about validation of input data - # Rationale: # The Watcher will analyze every received block looking for appointment matches. If there is no work # to do the watcher can go sleep (if appointments = {} then asleep = True) otherwise for every received block @@ -76,13 +74,12 @@ class Watcher: block = bitcoin_cli.getblock(block_hash) txids = block.get('tx') - potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids} - if debug: logging.info("[Watcher] new block received {}".format(block_hash)) logging.info("[Watcher] list of transactions: {}".format(txids)) # Delete expired appointments + # ToDo: #9: also move this to a function to_delete = {} for locator in self.appointments: for appointment in self.appointments[locator]: @@ -102,16 +99,21 @@ class Watcher: .format(locator)) del self.appointments[locator] + # ToDo: #9-add-data-persistency else: for i in indexes: if debug: logging.info("[Watcher] end time reached with no match! Deleting appointment {}:{}" .format(locator, i)) - del self.appointments[locator][i] + del self.appointments[locator][i] + # ToDo: #9-add-data-persistency + + potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids} # Check is any of the tx_ids in the received block is an actual match potential_matches = {} + # ToDo: set intersection should be a more optimal solution for locator in self.appointments.keys(): if locator in potential_locators: # This is locator:txid @@ -134,17 +136,17 @@ class Watcher: self.appointments[locator][appointment_pos].end_time, debug, logging) # If there was only one appointment that matches the locator we can delete the whole list - # DISCUSS: We may want to use locks before adding / removing appointment if len(self.appointments[locator]) == 1: + # ToDo: #9-add-data-persistency del self.appointments[locator] else: # Otherwise we just delete the appointment that matches locator:appointment_pos + # ToDo: #9-add-data-persistency del self.appointments[locator][appointment_pos] except JSONRPCException as e: if debug: - logging.error("[Watcher] JSONRPCException. Error code {}".format(e)) - continue + logging.error("[Watcher] couldn't get block from bitcoind. Error code {}".format(e)) # Go back to sleep if there are no more appointments self.asleep = True @@ -159,6 +161,7 @@ class Watcher: for locator, dispute_txid in potential_matches.items(): for appointment_pos, appointment in enumerate(self.appointments.get(locator)): try: + # ToDo: #20-test-tx-decrypting-edge-cases justice_rawtx = appointment.encrypted_blob.decrypt(unhexlify(dispute_txid), debug, logging) justice_rawtx = hexlify(justice_rawtx).decode() justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid') @@ -172,6 +175,5 @@ class Watcher: # for the POC if debug: logging.error("[Watcher] can't build transaction from decoded data. Error code {}".format(e)) - continue return matches