From 3dfa7e2978fe733429785201047788ce3460441c Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Tue, 13 Aug 2019 17:40:08 +0100 Subject: [PATCH 1/6] Changes appointment structure Updates the former appointment structure from `dicts` containing `lists` accessed by `index` to `dicts` containing `dicts` identified by `uuid4`. Needs testing. --- pisa/watcher.py | 85 +++++++++++++++++++++++++------------------------ 1 file changed, 43 insertions(+), 42 deletions(-) diff --git a/pisa/watcher.py b/pisa/watcher.py index e84ddb9..ee9eb7b 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -5,6 +5,7 @@ from pisa.responder import Responder from pisa.zmq_subscriber import ZMQHandler from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException from hashlib import sha256 +from uuid import uuid4 from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS, EXPIRY_DELTA @@ -28,11 +29,12 @@ class Watcher: if len(self.appointments) < self.max_appointments: # Appointments are identified by the locator: the sha256 of commitment txid (H(tx_id)). # Two different nodes may ask for appointments using the same commitment txid, what will result in a - # collision in our appointments structure (and may be an attack surface), we use lists to avoid that. + # collision in our appointments structure (and may be an attack surface). In order to avoid such collisions + # we will identify every appointment with a uuid if not self.appointments.get(appointment.locator): - self.appointments[appointment.locator] = [] + self.appointments[appointment.locator] = {} - self.appointments[appointment.locator].append(appointment) + self.appointments[appointment.locator][uuid4().hex] = appointment if self.asleep: self.asleep = False @@ -78,36 +80,7 @@ class Watcher: logging.info("[Watcher] new block received {}".format(block_hash)) logging.info("[Watcher] list of transactions: {}".format(txids)) - # Delete expired appointments - # ToDo: #9: also move this to a function - to_delete = {} - for locator in self.appointments: - for appointment in self.appointments[locator]: - if block["height"] > appointment.end_time + EXPIRY_DELTA: - # Get the appointment index and add the appointment to the deletion list - appointment_pos = self.appointments[locator].index(appointment) - - if locator in to_delete: - to_delete[locator].append(appointment_pos) - else: - to_delete[locator] = [appointment_pos] - - for locator, indexes in to_delete.items(): - if len(indexes) == len(self.appointments[locator]): - if debug: - logging.info("[Watcher] end time reached with no match! Deleting appointment {}" - .format(locator)) - - del self.appointments[locator] - # ToDo: #9-add-data-persistency - else: - for i in indexes: - if debug: - logging.info("[Watcher] end time reached with no match! Deleting appointment {}:{}" - .format(locator, i)) - - del self.appointments[locator][i] - # ToDo: #9-add-data-persistency + self.delete_expired_appointment(block, debug, logging) potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids} @@ -127,13 +100,13 @@ class Watcher: matches = self.check_potential_matches(potential_matches, bitcoin_cli, debug, logging) - for locator, appointment_pos, dispute_txid, justice_txid, justice_rawtx in matches: + for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches: if debug: - logging.info("[Watcher] notifying responder about {} and deleting appointment {}:{}".format( - justice_txid, locator, appointment_pos)) + logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})" + .format(justice_txid, locator, uuid)) self.responder.add_response(dispute_txid, justice_txid, justice_rawtx, - self.appointments[locator][appointment_pos].end_time, debug, logging) + self.appointments[locator][uuid].end_time, debug, logging) # If there was only one appointment that matches the locator we can delete the whole list if len(self.appointments[locator]) == 1: @@ -142,7 +115,7 @@ class Watcher: else: # Otherwise we just delete the appointment that matches locator:appointment_pos # ToDo: #9-add-data-persistency - del self.appointments[locator][appointment_pos] + del self.appointments[locator][uuid] except JSONRPCException as e: if debug: @@ -155,21 +128,49 @@ class Watcher: if debug: logging.error("[Watcher] no more pending appointments, going back to sleep") + def delete_expired_appointment(self, block, debug, logging): + to_delete = {} + + for locator, appointments in self.appointments.items(): + for uuid, appointment in appointments: + if block["height"] > appointment.end_time + EXPIRY_DELTA: + # Add the appointment to the deletion list + if locator in to_delete: + to_delete[locator].append(uuid) + else: + to_delete[locator] = [uuid] + + for locator, uuids in to_delete.items(): + if len(uuids) == len(self.appointments[locator]): + if debug: + logging.info("[Watcher] end time reached with no match! Deleting appointment {}".format(locator)) + + del self.appointments[locator] + # ToDo: #9-add-data-persistency + else: + for uuid in uuids: + if debug: + logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})" + .format(locator, uuid)) + + del self.appointments[locator][uuid] + # ToDo: #9-add-data-persistency + def check_potential_matches(self, potential_matches, bitcoin_cli, debug, logging): matches = [] for locator, dispute_txid in potential_matches.items(): - for appointment_pos, appointment in enumerate(self.appointments.get(locator)): + for uuid, appointment in self.appointments.get(locator): try: # ToDo: #20-test-tx-decrypting-edge-cases justice_rawtx = appointment.encrypted_blob.decrypt(unhexlify(dispute_txid), debug, logging) justice_rawtx = hexlify(justice_rawtx).decode() justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid') - matches.append((locator, appointment_pos, dispute_txid, justice_txid, justice_rawtx)) + matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx)) if debug: - logging.info("[Watcher] match found for {}:{}! {}".format(locator, appointment_pos, - justice_txid)) + logging.info("[Watcher] match found for locator {} (uuid: {}): {}".format(locator, uuid, + justice_txid)) except JSONRPCException as e: # Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple # for the POC From 400e524c36a153d620fd2c7c335b0238644012c0 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Tue, 13 Aug 2019 17:40:08 +0100 Subject: [PATCH 2/6] Changes appointment structure Updates the former appointment structure from `dicts` containing `lists` accessed by `index` to `dicts` containing `dicts` identified by `uuid4`. Needs testing. --- pisa/watcher.py | 85 +++++++++++++++++++++++++------------------------ 1 file changed, 43 insertions(+), 42 deletions(-) diff --git a/pisa/watcher.py b/pisa/watcher.py index e84ddb9..ee9eb7b 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -5,6 +5,7 @@ from pisa.responder import Responder from pisa.zmq_subscriber import ZMQHandler from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException from hashlib import sha256 +from uuid import uuid4 from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS, EXPIRY_DELTA @@ -28,11 +29,12 @@ class Watcher: if len(self.appointments) < self.max_appointments: # Appointments are identified by the locator: the sha256 of commitment txid (H(tx_id)). # Two different nodes may ask for appointments using the same commitment txid, what will result in a - # collision in our appointments structure (and may be an attack surface), we use lists to avoid that. + # collision in our appointments structure (and may be an attack surface). In order to avoid such collisions + # we will identify every appointment with a uuid if not self.appointments.get(appointment.locator): - self.appointments[appointment.locator] = [] + self.appointments[appointment.locator] = {} - self.appointments[appointment.locator].append(appointment) + self.appointments[appointment.locator][uuid4().hex] = appointment if self.asleep: self.asleep = False @@ -78,36 +80,7 @@ class Watcher: logging.info("[Watcher] new block received {}".format(block_hash)) logging.info("[Watcher] list of transactions: {}".format(txids)) - # Delete expired appointments - # ToDo: #9: also move this to a function - to_delete = {} - for locator in self.appointments: - for appointment in self.appointments[locator]: - if block["height"] > appointment.end_time + EXPIRY_DELTA: - # Get the appointment index and add the appointment to the deletion list - appointment_pos = self.appointments[locator].index(appointment) - - if locator in to_delete: - to_delete[locator].append(appointment_pos) - else: - to_delete[locator] = [appointment_pos] - - for locator, indexes in to_delete.items(): - if len(indexes) == len(self.appointments[locator]): - if debug: - logging.info("[Watcher] end time reached with no match! Deleting appointment {}" - .format(locator)) - - del self.appointments[locator] - # ToDo: #9-add-data-persistency - else: - for i in indexes: - if debug: - logging.info("[Watcher] end time reached with no match! Deleting appointment {}:{}" - .format(locator, i)) - - del self.appointments[locator][i] - # ToDo: #9-add-data-persistency + self.delete_expired_appointment(block, debug, logging) potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids} @@ -127,13 +100,13 @@ class Watcher: matches = self.check_potential_matches(potential_matches, bitcoin_cli, debug, logging) - for locator, appointment_pos, dispute_txid, justice_txid, justice_rawtx in matches: + for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches: if debug: - logging.info("[Watcher] notifying responder about {} and deleting appointment {}:{}".format( - justice_txid, locator, appointment_pos)) + logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})" + .format(justice_txid, locator, uuid)) self.responder.add_response(dispute_txid, justice_txid, justice_rawtx, - self.appointments[locator][appointment_pos].end_time, debug, logging) + self.appointments[locator][uuid].end_time, debug, logging) # If there was only one appointment that matches the locator we can delete the whole list if len(self.appointments[locator]) == 1: @@ -142,7 +115,7 @@ class Watcher: else: # Otherwise we just delete the appointment that matches locator:appointment_pos # ToDo: #9-add-data-persistency - del self.appointments[locator][appointment_pos] + del self.appointments[locator][uuid] except JSONRPCException as e: if debug: @@ -155,21 +128,49 @@ class Watcher: if debug: logging.error("[Watcher] no more pending appointments, going back to sleep") + def delete_expired_appointment(self, block, debug, logging): + to_delete = {} + + for locator, appointments in self.appointments.items(): + for uuid, appointment in appointments: + if block["height"] > appointment.end_time + EXPIRY_DELTA: + # Add the appointment to the deletion list + if locator in to_delete: + to_delete[locator].append(uuid) + else: + to_delete[locator] = [uuid] + + for locator, uuids in to_delete.items(): + if len(uuids) == len(self.appointments[locator]): + if debug: + logging.info("[Watcher] end time reached with no match! Deleting appointment {}".format(locator)) + + del self.appointments[locator] + # ToDo: #9-add-data-persistency + else: + for uuid in uuids: + if debug: + logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})" + .format(locator, uuid)) + + del self.appointments[locator][uuid] + # ToDo: #9-add-data-persistency + def check_potential_matches(self, potential_matches, bitcoin_cli, debug, logging): matches = [] for locator, dispute_txid in potential_matches.items(): - for appointment_pos, appointment in enumerate(self.appointments.get(locator)): + for uuid, appointment in self.appointments.get(locator): try: # ToDo: #20-test-tx-decrypting-edge-cases justice_rawtx = appointment.encrypted_blob.decrypt(unhexlify(dispute_txid), debug, logging) justice_rawtx = hexlify(justice_rawtx).decode() justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid') - matches.append((locator, appointment_pos, dispute_txid, justice_txid, justice_rawtx)) + matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx)) if debug: - logging.info("[Watcher] match found for {}:{}! {}".format(locator, appointment_pos, - justice_txid)) + logging.info("[Watcher] match found for locator {} (uuid: {}): {}".format(locator, uuid, + justice_txid)) except JSONRPCException as e: # Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple # for the POC From ae2cf26195ea1fbf0ef055c4b0fe58d4d2366d0a Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 26 Aug 2019 16:26:29 +0200 Subject: [PATCH 3/6] Improves internal data structures and reorgs the code for readability --- pisa/responder.py | 255 ++++++++++-------- pisa/watcher.py | 86 +++--- ...ntment_test.py => add_appointment_test.py} | 27 +- 3 files changed, 218 insertions(+), 150 deletions(-) rename tests/{simulator/appointment_test.py => add_appointment_test.py} (70%) diff --git a/pisa/responder.py b/pisa/responder.py index bdda0bb..9146599 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -13,18 +13,22 @@ MIN_CONFIRMATIONS = 6 class Job: - def __init__(self, dispute_txid, justice_rawtx, appointment_end, retry_counter=0): + def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0, retry_counter=0): self.dispute_txid = dispute_txid - # FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info - # can be directly got from DB - self.locator = sha256(unhexlify(dispute_txid)).hexdigest() + self.justice_txid = justice_txid self.justice_rawtx = justice_rawtx self.appointment_end = appointment_end + self.confirmations = confirmations + self.missed_confirmations = 0 self.retry_counter = retry_counter + # FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info + # can be directly got from DB + self.locator = sha256(unhexlify(dispute_txid)).hexdigest() + def to_json(self): - job = {"locator": self.dispute_txid, "justice_rawtx": self.justice_rawtx, + job = {"locator": self.locator, "justice_rawtx": self.justice_rawtx, "confirmations": self.confirmations, "appointment_end": self.appointment_end} return job @@ -33,45 +37,17 @@ class Job: class Responder: def __init__(self): self.jobs = dict() - self.confirmation_counter = dict() + self.tx_job_map = dict() self.block_queue = None self.asleep = True self.zmq_subscriber = None - def do_subscribe(self, block_queue, debug, logging): - self.zmq_subscriber = ZMQHandler(parent='Responder') - self.zmq_subscriber.handle(block_queue, debug, logging) + def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, + retry=False): - def create_job(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, conf_counter=0, - retry=False): - - # ToDo: #23-define-behaviour-approaching-end - if retry: - self.jobs[justice_txid].retry_counter += 1 - self.jobs[justice_txid].missed_confirmations = 0 - else: - self.confirmation_counter[justice_txid] = conf_counter - self.jobs[justice_txid] = Job(dispute_txid, justice_rawtx, appointment_end) - - if debug: - logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'. - format(dispute_txid, justice_txid, appointment_end)) - - if self.asleep: - self.asleep = False - self.block_queue = Queue() - zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging]) - # ToDo: This may not have to be a thead. The main thread only creates this and terminates. - responder = Thread(target=self.handle_responses, args=[debug, logging]) - zmq_thread.start() - responder.start() - - def add_response(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, retry=False): bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT)) - # ToDo: Moving the sending functionality to a separate function would improve readability. Also try to use - # check_tx_in_chain if possible. try: if debug: if self.asleep: @@ -82,44 +58,44 @@ class Responder: # handle_responses can call add_response recursively if a broadcast transaction does not get confirmations # retry holds such information. - self.create_job(dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, retry=retry) + self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, + retry=retry) except JSONRPCException as e: - # Since we're pushing a raw transaction to the network we can get two kind of rejections: - # RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected - # due to network rules, whereas the later implies that the transaction is already in the blockchain. - if e.error.get('code') == RPC_VERIFY_REJECTED: - # DISCUSS: what to do in this case - # DISCUSS: invalid transactions (properly formatted but invalid, like unsigned) fit here too. - # DISCUSS: RPC_VERIFY_ERROR could also be a possible case. - # DISCUSS: check errors -9 and -10 - pass - elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN: - try: - if debug: - logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and " - "start monitoring the transaction".format(justice_txid)) + self.handle_send_failures(e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, + debug, logging, retry) - # If the transaction is already in the chain, we get the number of confirmations and watch the job - # until the end of the appointment - tx_info = bitcoin_cli.getrawtransaction(justice_txid, 1) - confirmations = int(tx_info.get("confirmations")) - self.create_job(dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, - retry=retry, conf_counter=confirmations) + def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, + confirmations=0, retry=False): - except JSONRPCException as e: - # While it's quite unlikely, the transaction that was already in the blockchain could have been - # reorged while we were querying bitcoind to get the confirmation count. In such a case we just - # restart the job - if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: - self.add_response(dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, - retry=retry) - elif debug: - # If something else happens (unlikely but possible) log it so we can treat it in future releases - logging.error("[Responder] JSONRPCException. Error code {}".format(e)) - elif debug: - # If something else happens (unlikely but possible) log it so we can treat it in future releases - logging.error("[Responder] JSONRPCException. Error code {}".format(e)) + # ToDo: #23-define-behaviour-approaching-end + if retry: + self.jobs[uuid].retry_counter += 1 + self.jobs[uuid].missed_confirmations = 0 + else: + self.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations) + + if justice_txid in self.tx_job_map: + self.tx_job_map[justice_txid].append(uuid) + + else: + self.tx_job_map[justice_txid] = [uuid] + + if debug: + logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'. + format(dispute_txid, justice_txid, appointment_end)) + + if self.asleep: + self.asleep = False + self.block_queue = Queue() + zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging]) + responder = Thread(target=self.handle_responses, args=[debug, logging]) + zmq_thread.start() + responder.start() + + def do_subscribe(self, block_queue, debug, logging): + self.zmq_subscriber = ZMQHandler(parent='Responder') + self.zmq_subscriber.handle(block_queue, debug, logging) def handle_responses(self, debug, logging): bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, @@ -145,48 +121,39 @@ class Responder: continue - jobs_to_delete = [] + completed_jobs = [] if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0: # Keep count of the confirmations each tx gets - for job_id, confirmations in self.confirmation_counter.items(): - # If we see the transaction for the first time, or appointment_end & MIN_CONFIRMATIONS hasn't been - # reached - if job_id in txs or confirmations > 0: - self.confirmation_counter[job_id] += 1 + for justice_txid, jobs in self.tx_job_map.items(): + for uuid in jobs: + if justice_txid in txs or self.jobs[uuid].confirmations > 0: + self.jobs[uuid].confirmations += 1 - if debug: - logging.info("[Responder] new confirmation received for txid = {}".format(job_id)) + if debug: + logging.info("[Responder] new confirmation received for job = {}, txid = {}".format( + uuid, justice_txid)) - elif self.jobs[job_id].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY: - # If a transactions has missed too many confirmations for a while we'll try to rebroadcast - # ToDO: #22-discuss-confirmations-before-retry - # ToDo: #23-define-behaviour-approaching-end - self.add_response(self.jobs[job_id].dispute_txid, job_id, self.jobs[job_id].justice_rawtx, - self.jobs[job_id].appointment_end, debug, logging, retry=True) - if debug: - logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting" - .format(job_id, CONFIRMATIONS_BEFORE_RETRY)) - else: - # Otherwise we increase the number of missed confirmations - self.jobs[job_id].missed_confirmations += 1 + elif self.jobs[uuid].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY: + # If a transactions has missed too many confirmations for a while we'll try to rebroadcast + # ToDO: #22-discuss-confirmations-before-retry + # ToDo: #23-define-behaviour-approaching-end + self.add_response(uuid, self.jobs[uuid].dispute_txid, justice_txid, + self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, debug, + logging, retry=True) + if debug: + logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting" + .format(justice_txid, CONFIRMATIONS_BEFORE_RETRY)) - for job_id, job in self.jobs.items(): - if job.appointment_end <= height and self.confirmation_counter[job_id] >= MIN_CONFIRMATIONS: - # The end of the appointment has been reached - jobs_to_delete.append(job_id) + else: + # Otherwise we increase the number of missed confirmations + self.jobs[uuid].missed_confirmations += 1 - for job_id in jobs_to_delete: - # ToDo: Find a better way to solve this. Deepcopy of the keys maybe? - # Trying to delete directly when iterating the last for causes dictionary changed size error during - # iteration in Python3 (can not be solved iterating only trough keys in Python3 either) + if self.jobs[uuid].appointment_end <= height and self.jobs[uuid].confirmations >= \ + MIN_CONFIRMATIONS: + # The end of the appointment has been reached + completed_jobs.append(uuid) - if debug: - logging.info("[Responder] {} completed. Appointment ended at block {} after {} confirmations" - .format(job_id, height, self.confirmation_counter[job_id])) - - # ToDo: #9-add-data-persistency - del self.jobs[job_id] - del self.confirmation_counter[job_id] + self.remove_completed_jobs(completed_jobs, height, debug, logging) else: if debug: @@ -204,32 +171,96 @@ class Responder: if debug: logging.info("[Responder] no more pending jobs, going back to sleep") + def handle_send_failures(self, e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, + debug, logging, retry): + # Since we're pushing a raw transaction to the network we can get two kind of rejections: + # RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected + # due to network rules, whereas the later implies that the transaction is already in the blockchain. + if e.error.get('code') == RPC_VERIFY_REJECTED: + # DISCUSS: what to do in this case + # DISCUSS: invalid transactions (properly formatted but invalid, like unsigned) fit here too. + # DISCUSS: RPC_VERIFY_ERROR could also be a possible case. + # DISCUSS: check errors -9 and -10 + pass + + elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN: + try: + if debug: + logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and " + "start monitoring the transaction".format(justice_txid)) + + # If the transaction is already in the chain, we get the number of confirmations and watch the job + # until the end of the appointment + tx_info = bitcoin_cli.getrawtransaction(justice_txid, 1) + confirmations = int(tx_info.get("confirmations")) + self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, + retry=retry, confirmations=confirmations) + + except JSONRPCException as e: + # While it's quite unlikely, the transaction that was already in the blockchain could have been + # reorged while we were querying bitcoind to get the confirmation count. In such a case we just + # restart the job + if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: + self.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, + logging, retry=retry) + elif debug: + # If something else happens (unlikely but possible) log it so we can treat it in future releases + logging.error("[Responder] JSONRPCException. Error {}".format(e)) + + elif debug: + # If something else happens (unlikely but possible) log it so we can treat it in future releases + logging.error("[Responder] JSONRPCException. Error {}".format(e)) + + def remove_completed_jobs(self, completed_jobs, height, debug, logging): + for uuid in completed_jobs: + if debug: + logging.info("[Responder] job completed (uuid = {}, justice_txid = {}). Appointment ended at " + "block {} after {} confirmations".format(uuid, self.jobs[uuid].justice_txid, height, + self.jobs[uuid].confirmations)) + + # ToDo: #9-add-data-persistency + justice_txid = self.jobs[uuid].justice_txid + self.jobs.pop(uuid) + + if len(self.tx_job_map[justice_txid]) == 1: + self.tx_job_map.pop(justice_txid) + + if debug: + logging.info("[Responder] no more jobs for justice_txid {}".format(justice_txid)) + + else: + self.tx_job_map[justice_txid].remove(uuid) + def handle_reorgs(self, bitcoin_cli, debug, logging): - for job_id, job in self.jobs.items(): + for uuid, job in self.jobs.items(): # First we check if the dispute transaction is still in the blockchain. If not, the justice can not be # there either, so we'll need to call the reorg manager straight away - dispute_in_chain, _ = check_tx_in_chain(bitcoin_cli, job.dispute_txid, debug, logging, parent='Responder', + dispute_in_chain, _ = check_tx_in_chain(bitcoin_cli, job.dispute_txid, debug, logging, + parent='Responder', tx_label='dispute tx') # If the dispute is there, we can check the justice tx if dispute_in_chain: - justice_in_chain, justice_confirmations = check_tx_in_chain(bitcoin_cli, job_id, debug, logging, - parent='Responder', tx_label='justice tx') + justice_in_chain, justice_confirmations = check_tx_in_chain(bitcoin_cli, job.justice_txid, debug, + logging, parent='Responder', + tx_label='justice tx') # If both transactions are there, we only need to update the justice tx confirmation count if justice_in_chain: if debug: logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format( - job_id, self.confirmation_counter[job_id], justice_confirmations)) + job.justice_txid, job.confirmations, justice_confirmations)) - self.confirmation_counter[job_id] = justice_confirmations + job.confirmations = justice_confirmations else: # Otherwise, we will add the job back (implying rebroadcast of the tx) and monitor it again # DISCUSS: Adding job back, should we flag it as retried? # FIXME: Whether we decide to increase the retried counter or not, the current counter should be # maintained. There is no way of doing so with the current approach. Update if required - self.add_response(job.dispute_txid, job_id, job.justice_rawtx, job.appointment_end, debug, logging) + self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx, + job.appointment_end, + debug, logging) else: # ToDo: #24-properly-handle-reorgs diff --git a/pisa/watcher.py b/pisa/watcher.py index ee9eb7b..a4590f5 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -12,6 +12,7 @@ from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, class Watcher: def __init__(self, max_appointments=MAX_APPOINTMENTS): self.appointments = dict() + self.locator_uuid_map = dict() self.block_queue = None self.asleep = True self.max_appointments = max_appointments @@ -31,10 +32,15 @@ class Watcher: # Two different nodes may ask for appointments using the same commitment txid, what will result in a # collision in our appointments structure (and may be an attack surface). In order to avoid such collisions # we will identify every appointment with a uuid - if not self.appointments.get(appointment.locator): - self.appointments[appointment.locator] = {} - self.appointments[appointment.locator][uuid4().hex] = appointment + uuid = uuid4().hex + self.appointments[uuid] = appointment + + if appointment.locator in self.locator_uuid_map: + self.locator_uuid_map[appointment.locator].append(uuid) + + else: + self.locator_uuid_map[appointment.locator] = [uuid] if self.asleep: self.asleep = False @@ -85,12 +91,17 @@ class Watcher: potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids} # Check is any of the tx_ids in the received block is an actual match - potential_matches = {} + # ToDo: set intersection should be a more optimal solution - for locator in self.appointments.keys(): - if locator in potential_locators: - # This is locator:txid - potential_matches[locator] = potential_locators[locator] + # Get the locators that are both in the map and in the potential locators dict. + intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys()) + potential_matches = {locator: potential_locators[locator] for locator in intersection} + + # potential_matches = {} + # for locator in self.locator_uuid_map.keys(): + # if locator in potential_locators: + # # This is locator:txid + # potential_matches[locator] = potential_locators[locator] if debug: if len(potential_matches) > 0: @@ -105,17 +116,20 @@ class Watcher: logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})" .format(justice_txid, locator, uuid)) - self.responder.add_response(dispute_txid, justice_txid, justice_rawtx, - self.appointments[locator][uuid].end_time, debug, logging) + self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, + self.appointments[uuid].end_time, debug, logging) + + # Delete the appointment + self.appointments.pop(uuid) # If there was only one appointment that matches the locator we can delete the whole list - if len(self.appointments[locator]) == 1: + if len(self.locator_uuid_map[locator]) == 1: # ToDo: #9-add-data-persistency - del self.appointments[locator] + self.locator_uuid_map.pop(locator) else: # Otherwise we just delete the appointment that matches locator:appointment_pos # ToDo: #9-add-data-persistency - del self.appointments[locator][uuid] + self.locator_uuid_map[locator].remove(uuid) except JSONRPCException as e: if debug: @@ -129,41 +143,41 @@ class Watcher: logging.error("[Watcher] no more pending appointments, going back to sleep") def delete_expired_appointment(self, block, debug, logging): - to_delete = {} + # to_delete = [] + # + # for uuid, appointment in self.appointments.items(): + # if block["height"] > appointment.end_time + EXPIRY_DELTA: + # # Add the appointment to the deletion list + # to_delete.append(uuid) - for locator, appointments in self.appointments.items(): - for uuid, appointment in appointments: - if block["height"] > appointment.end_time + EXPIRY_DELTA: - # Add the appointment to the deletion list - if locator in to_delete: - to_delete[locator].append(uuid) - else: - to_delete[locator] = [uuid] + to_delete = [uuid for uuid, appointment in self.appointments.items() if block["height"] > appointment.end_time + + EXPIRY_DELTA] - for locator, uuids in to_delete.items(): - if len(uuids) == len(self.appointments[locator]): - if debug: - logging.info("[Watcher] end time reached with no match! Deleting appointment {}".format(locator)) + for uuid in to_delete: + # ToDo: #9-add-data-persistency + locator = self.appointments[uuid].locator + + self.appointments.pop(uuid) + + if len(self.locator_uuid_map[locator]) == 1: + self.locator_uuid_map.pop(locator) - del self.appointments[locator] - # ToDo: #9-add-data-persistency else: - for uuid in uuids: - if debug: - logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})" - .format(locator, uuid)) + self.locator_uuid_map[locator].remove(uuid) - del self.appointments[locator][uuid] - # ToDo: #9-add-data-persistency + if debug: + logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})" + .format(locator, uuid)) def check_potential_matches(self, potential_matches, bitcoin_cli, debug, logging): matches = [] for locator, dispute_txid in potential_matches.items(): - for uuid, appointment in self.appointments.get(locator): + for uuid in self.locator_uuid_map[locator]: try: # ToDo: #20-test-tx-decrypting-edge-cases - justice_rawtx = appointment.encrypted_blob.decrypt(unhexlify(dispute_txid), debug, logging) + justice_rawtx = self.appointments[uuid].encrypted_blob.decrypt(unhexlify(dispute_txid), debug, + logging) justice_rawtx = hexlify(justice_rawtx).decode() justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid') matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx)) diff --git a/tests/simulator/appointment_test.py b/tests/add_appointment_test.py similarity index 70% rename from tests/simulator/appointment_test.py rename to tests/add_appointment_test.py index 2af8496..95a85e3 100644 --- a/tests/simulator/appointment_test.py +++ b/tests/add_appointment_test.py @@ -42,11 +42,34 @@ appointment = generate_dummy_appointment(dispute_txid) print("Sending appointment (locator: {}) to PISA".format(appointment.get("locator"))) r = requests.post(url=PISA_API, json=json.dumps(appointment), timeout=5) -print(r, r.reason) +print(r, r.reason, r.content) + +print("Requesting it back from PISA") +r = requests.get(url=PISA_API+"/get_appointment?locator="+appointment["locator"]) +print(r, r.reason, r.content) + +time.sleep(2) +print("Sending it again") +appointment["end_time"] += 1 +r = requests.post(url=PISA_API, json=json.dumps(appointment), timeout=5) +print(r, r.reason, r.content) print("Sleeping 10 sec") time.sleep(10) bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT)) +print("Getting all appointments") +r = requests.get(url=PISA_API+"/get_all_appointments") +print(r, r.reason, r.content) + print("Triggering PISA with dispute tx") -bitcoin_cli.sendrawtransaction(dispute_txid) \ No newline at end of file +bitcoin_cli.sendrawtransaction(dispute_txid) + +time.sleep(10) +print("Requesting it again") +r = requests.get(url=PISA_API+"/get_appointment?locator="+appointment["locator"]) +print(r, r.reason, r.content) + +print("Getting all appointments") +r = requests.get(url=PISA_API+"/get_all_appointments") +print(r, r.reason, r.content) \ No newline at end of file From ebac9b003e168c3d9d8326a1f92f4b6395636cf3 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 26 Aug 2019 16:26:53 +0200 Subject: [PATCH 4/6] Updates api to match the net internal data structures --- pisa/api.py | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/pisa/api.py b/pisa/api.py index 9cfc901..15bf4a2 100644 --- a/pisa/api.py +++ b/pisa/api.py @@ -65,22 +65,21 @@ def get_appointment(): # ToDo: #15-add-system-monitor - appointment_in_watcher = watcher.appointments.get(locator) + appointment_in_watcher = watcher.locator_uuid_map.get(locator) if appointment_in_watcher: - for appointment in appointment_in_watcher: - appointment_data = appointment.to_json() + for uuid in appointment_in_watcher: + appointment_data = watcher.appointments[uuid].to_json() appointment_data['status'] = "being_watched" response.append(appointment_data) if watcher.responder: responder_jobs = watcher.responder.jobs - for job_id, job in responder_jobs.items(): + for job in responder_jobs.values(): if job.locator == locator: job_data = job.to_json() job_data['status'] = "dispute_responded" - job_data['confirmations'] = watcher.responder.confirmation_counter.get(job_id) response.append(job_data) if not response: @@ -93,22 +92,18 @@ def get_appointment(): @app.route('/get_all_appointments', methods=['GET']) def get_all_appointments(): - watcher_appointments = [] - responder_jobs = [] + watcher_appointments = {} + responder_jobs = {} # ToDo: #15-add-system-monitor if request.remote_addr in request.host or request.remote_addr == '127.0.0.1': - for app_id, appointment in watcher.appointments.items(): - jobs_data = [job.to_json() for job in appointment] - - watcher_appointments.append({app_id: jobs_data}) + for uuid, appointment in watcher.appointments.items(): + watcher_appointments[uuid] = appointment.to_json() if watcher.responder: - for job_id, job in watcher.responder.jobs.items(): - job_data = job.to_json() - job_data['confirmations'] = watcher.responder.confirmation_counter.get(job_id) - responder_jobs.append({job_id: job_data}) + for uuid, job in watcher.responder.jobs.items(): + responder_jobs[uuid] = job.to_json() response = jsonify({"watcher_appointments": watcher_appointments, "responder_jobs": responder_jobs}) From ea037da5308b4ea2f518eadb136de941435aeaa5 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 26 Aug 2019 16:28:05 +0200 Subject: [PATCH 5/6] small fixes --- tests/simulator/bitcoind_sim.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/simulator/bitcoind_sim.py b/tests/simulator/bitcoind_sim.py index 06cfeae..d9b902d 100644 --- a/tests/simulator/bitcoind_sim.py +++ b/tests/simulator/bitcoind_sim.py @@ -77,7 +77,7 @@ def process_request(): if isinstance(txid, str): if check_txid_format(txid): - if txid not in mempool and txid not in list(mined_transactions.keys()): + if txid not in list(mined_transactions.keys()): mempool.append(txid) else: @@ -100,6 +100,9 @@ def process_request(): if block: response["result"] = {"confirmations": len(blockchain) - block.get('height')} + elif txid in mempool: + response["result"] = {"confirmations": 0} + else: response["error"] = {'code': RPC_INVALID_ADDRESS_OR_KEY, 'message': 'No such mempool or blockchain transaction. Use gettransaction for ' From 1c9e6d474deeefe242d309c2ed037f3986316b81 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 26 Aug 2019 16:28:49 +0200 Subject: [PATCH 6/6] Improves tests --- tests/add_appointment_test.py | 104 ++++++++++++++++++++++++---------- 1 file changed, 75 insertions(+), 29 deletions(-) diff --git a/tests/add_appointment_test.py b/tests/add_appointment_test.py index 95a85e3..caccfdc 100644 --- a/tests/add_appointment_test.py +++ b/tests/add_appointment_test.py @@ -2,6 +2,7 @@ import os import json import requests import time +from copy import deepcopy from hashlib import sha256 from binascii import hexlify, unhexlify from apps.cli.blob import Blob @@ -13,7 +14,7 @@ PISA_API = "http://{}:{}".format(HOST, PORT) def generate_dummy_appointment(dispute_txid): - r = requests.get(url=PISA_API+'/get_block_count', timeout=5) + r = requests.get(url=PISA_API + '/get_block_count', timeout=5) current_height = r.json().get("block_count") @@ -37,39 +38,84 @@ def generate_dummy_appointment(dispute_txid): return appointment -dispute_txid = hexlify(os.urandom(32)).decode('utf-8') -appointment = generate_dummy_appointment(dispute_txid) +def test_add_appointment(appointment=None): + if not appointment: + dispute_txid = hexlify(os.urandom(32)).decode('utf-8') + appointment = generate_dummy_appointment(dispute_txid) -print("Sending appointment (locator: {}) to PISA".format(appointment.get("locator"))) -r = requests.post(url=PISA_API, json=json.dumps(appointment), timeout=5) -print(r, r.reason, r.content) + print("Sending appointment (locator: {}) to PISA".format(appointment.get("locator"))) + r = requests.post(url=PISA_API, json=json.dumps(appointment), timeout=5) -print("Requesting it back from PISA") -r = requests.get(url=PISA_API+"/get_appointment?locator="+appointment["locator"]) -print(r, r.reason, r.content) + assert (r.status_code == 200 and r.reason == 'OK') + print(r.content.decode()) -time.sleep(2) -print("Sending it again") -appointment["end_time"] += 1 -r = requests.post(url=PISA_API, json=json.dumps(appointment), timeout=5) -print(r, r.reason, r.content) + print("Requesting it back from PISA") + r = requests.get(url=PISA_API + "/get_appointment?locator=" + appointment["locator"]) -print("Sleeping 10 sec") -time.sleep(10) -bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT)) + assert (r.status_code == 200 and r.reason == 'OK') -print("Getting all appointments") -r = requests.get(url=PISA_API+"/get_all_appointments") -print(r, r.reason, r.content) + received_appointments = json.loads(r.content) -print("Triggering PISA with dispute tx") -bitcoin_cli.sendrawtransaction(dispute_txid) + # Take the status out and leave the received appointments ready to compare + appointment_status = [appointment.pop("status") for appointment in received_appointments] -time.sleep(10) -print("Requesting it again") -r = requests.get(url=PISA_API+"/get_appointment?locator="+appointment["locator"]) -print(r, r.reason, r.content) + # Check that the appointment is within the received appoints + assert (appointment in received_appointments) -print("Getting all appointments") -r = requests.get(url=PISA_API+"/get_all_appointments") -print(r, r.reason, r.content) \ No newline at end of file + # Check that all the appointments are being watched + assert (all([status == "being_watched" for status in appointment_status])) + + +def test_same_locator_multiple_appointments(): + dispute_txid = hexlify(os.urandom(32)).decode('utf-8') + appointment = generate_dummy_appointment(dispute_txid) + + # Send it once + test_add_appointment(appointment) + time.sleep(0.5) + + # Try again with the same data + print("Sending it again") + test_add_appointment(appointment) + time.sleep(0.5) + + # Try again with the same data but increasing the end time + print("Sending once more") + dup_appointment = deepcopy(appointment) + dup_appointment["end_time"] += 1 + test_add_appointment(dup_appointment) + + print("Sleeping 5 sec") + time.sleep(5) + + bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT)) + + print("Triggering PISA with dispute tx") + bitcoin_cli.sendrawtransaction(dispute_txid) + + print("Sleeping 10 sec (waiting for a new block)") + time.sleep(10) + + print("Getting all appointments") + r = requests.get(url=PISA_API + "/get_all_appointments") + + assert (r.status_code == 200 and r.reason == 'OK') + + received_appointments = json.loads(r.content) + + # Make sure there is not pending instance of the locator in the watcher + watcher_locators = [appointment["locator"] for appointment in received_appointments["watcher_appointments"]] + assert(appointment["locator"] not in watcher_locators) + + # Make sure all the appointments went trough + target_jobs = [v for k, v in received_appointments["responder_jobs"].items() if v["locator"] == + appointment["locator"]] + + assert (len(target_jobs) == 3) + + +if __name__ == '__main__': + + test_same_locator_multiple_appointments() + + print("All good!")