diff --git a/pisa/responder.py b/pisa/responder.py index bdda0bb..9146599 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -13,18 +13,22 @@ MIN_CONFIRMATIONS = 6 class Job: - def __init__(self, dispute_txid, justice_rawtx, appointment_end, retry_counter=0): + def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0, retry_counter=0): self.dispute_txid = dispute_txid - # FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info - # can be directly got from DB - self.locator = sha256(unhexlify(dispute_txid)).hexdigest() + self.justice_txid = justice_txid self.justice_rawtx = justice_rawtx self.appointment_end = appointment_end + self.confirmations = confirmations + self.missed_confirmations = 0 self.retry_counter = retry_counter + # FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info + # can be directly got from DB + self.locator = sha256(unhexlify(dispute_txid)).hexdigest() + def to_json(self): - job = {"locator": self.dispute_txid, "justice_rawtx": self.justice_rawtx, + job = {"locator": self.locator, "justice_rawtx": self.justice_rawtx, "confirmations": self.confirmations, "appointment_end": self.appointment_end} return job @@ -33,45 +37,17 @@ class Job: class Responder: def __init__(self): self.jobs = dict() - self.confirmation_counter = dict() + self.tx_job_map = dict() self.block_queue = None self.asleep = True self.zmq_subscriber = None - def do_subscribe(self, block_queue, debug, logging): - self.zmq_subscriber = ZMQHandler(parent='Responder') - self.zmq_subscriber.handle(block_queue, debug, logging) + def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, + retry=False): - def create_job(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, conf_counter=0, - retry=False): - - # ToDo: #23-define-behaviour-approaching-end - if retry: - self.jobs[justice_txid].retry_counter += 1 - self.jobs[justice_txid].missed_confirmations = 0 - else: - self.confirmation_counter[justice_txid] = conf_counter - self.jobs[justice_txid] = Job(dispute_txid, justice_rawtx, appointment_end) - - if debug: - logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'. - format(dispute_txid, justice_txid, appointment_end)) - - if self.asleep: - self.asleep = False - self.block_queue = Queue() - zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging]) - # ToDo: This may not have to be a thead. The main thread only creates this and terminates. - responder = Thread(target=self.handle_responses, args=[debug, logging]) - zmq_thread.start() - responder.start() - - def add_response(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, retry=False): bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT)) - # ToDo: Moving the sending functionality to a separate function would improve readability. Also try to use - # check_tx_in_chain if possible. try: if debug: if self.asleep: @@ -82,44 +58,44 @@ class Responder: # handle_responses can call add_response recursively if a broadcast transaction does not get confirmations # retry holds such information. - self.create_job(dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, retry=retry) + self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, + retry=retry) except JSONRPCException as e: - # Since we're pushing a raw transaction to the network we can get two kind of rejections: - # RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected - # due to network rules, whereas the later implies that the transaction is already in the blockchain. - if e.error.get('code') == RPC_VERIFY_REJECTED: - # DISCUSS: what to do in this case - # DISCUSS: invalid transactions (properly formatted but invalid, like unsigned) fit here too. - # DISCUSS: RPC_VERIFY_ERROR could also be a possible case. - # DISCUSS: check errors -9 and -10 - pass - elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN: - try: - if debug: - logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and " - "start monitoring the transaction".format(justice_txid)) + self.handle_send_failures(e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, + debug, logging, retry) - # If the transaction is already in the chain, we get the number of confirmations and watch the job - # until the end of the appointment - tx_info = bitcoin_cli.getrawtransaction(justice_txid, 1) - confirmations = int(tx_info.get("confirmations")) - self.create_job(dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, - retry=retry, conf_counter=confirmations) + def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, + confirmations=0, retry=False): - except JSONRPCException as e: - # While it's quite unlikely, the transaction that was already in the blockchain could have been - # reorged while we were querying bitcoind to get the confirmation count. In such a case we just - # restart the job - if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: - self.add_response(dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, - retry=retry) - elif debug: - # If something else happens (unlikely but possible) log it so we can treat it in future releases - logging.error("[Responder] JSONRPCException. Error code {}".format(e)) - elif debug: - # If something else happens (unlikely but possible) log it so we can treat it in future releases - logging.error("[Responder] JSONRPCException. Error code {}".format(e)) + # ToDo: #23-define-behaviour-approaching-end + if retry: + self.jobs[uuid].retry_counter += 1 + self.jobs[uuid].missed_confirmations = 0 + else: + self.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations) + + if justice_txid in self.tx_job_map: + self.tx_job_map[justice_txid].append(uuid) + + else: + self.tx_job_map[justice_txid] = [uuid] + + if debug: + logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'. + format(dispute_txid, justice_txid, appointment_end)) + + if self.asleep: + self.asleep = False + self.block_queue = Queue() + zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging]) + responder = Thread(target=self.handle_responses, args=[debug, logging]) + zmq_thread.start() + responder.start() + + def do_subscribe(self, block_queue, debug, logging): + self.zmq_subscriber = ZMQHandler(parent='Responder') + self.zmq_subscriber.handle(block_queue, debug, logging) def handle_responses(self, debug, logging): bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, @@ -145,48 +121,39 @@ class Responder: continue - jobs_to_delete = [] + completed_jobs = [] if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0: # Keep count of the confirmations each tx gets - for job_id, confirmations in self.confirmation_counter.items(): - # If we see the transaction for the first time, or appointment_end & MIN_CONFIRMATIONS hasn't been - # reached - if job_id in txs or confirmations > 0: - self.confirmation_counter[job_id] += 1 + for justice_txid, jobs in self.tx_job_map.items(): + for uuid in jobs: + if justice_txid in txs or self.jobs[uuid].confirmations > 0: + self.jobs[uuid].confirmations += 1 - if debug: - logging.info("[Responder] new confirmation received for txid = {}".format(job_id)) + if debug: + logging.info("[Responder] new confirmation received for job = {}, txid = {}".format( + uuid, justice_txid)) - elif self.jobs[job_id].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY: - # If a transactions has missed too many confirmations for a while we'll try to rebroadcast - # ToDO: #22-discuss-confirmations-before-retry - # ToDo: #23-define-behaviour-approaching-end - self.add_response(self.jobs[job_id].dispute_txid, job_id, self.jobs[job_id].justice_rawtx, - self.jobs[job_id].appointment_end, debug, logging, retry=True) - if debug: - logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting" - .format(job_id, CONFIRMATIONS_BEFORE_RETRY)) - else: - # Otherwise we increase the number of missed confirmations - self.jobs[job_id].missed_confirmations += 1 + elif self.jobs[uuid].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY: + # If a transactions has missed too many confirmations for a while we'll try to rebroadcast + # ToDO: #22-discuss-confirmations-before-retry + # ToDo: #23-define-behaviour-approaching-end + self.add_response(uuid, self.jobs[uuid].dispute_txid, justice_txid, + self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, debug, + logging, retry=True) + if debug: + logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting" + .format(justice_txid, CONFIRMATIONS_BEFORE_RETRY)) - for job_id, job in self.jobs.items(): - if job.appointment_end <= height and self.confirmation_counter[job_id] >= MIN_CONFIRMATIONS: - # The end of the appointment has been reached - jobs_to_delete.append(job_id) + else: + # Otherwise we increase the number of missed confirmations + self.jobs[uuid].missed_confirmations += 1 - for job_id in jobs_to_delete: - # ToDo: Find a better way to solve this. Deepcopy of the keys maybe? - # Trying to delete directly when iterating the last for causes dictionary changed size error during - # iteration in Python3 (can not be solved iterating only trough keys in Python3 either) + if self.jobs[uuid].appointment_end <= height and self.jobs[uuid].confirmations >= \ + MIN_CONFIRMATIONS: + # The end of the appointment has been reached + completed_jobs.append(uuid) - if debug: - logging.info("[Responder] {} completed. Appointment ended at block {} after {} confirmations" - .format(job_id, height, self.confirmation_counter[job_id])) - - # ToDo: #9-add-data-persistency - del self.jobs[job_id] - del self.confirmation_counter[job_id] + self.remove_completed_jobs(completed_jobs, height, debug, logging) else: if debug: @@ -204,32 +171,96 @@ class Responder: if debug: logging.info("[Responder] no more pending jobs, going back to sleep") + def handle_send_failures(self, e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, + debug, logging, retry): + # Since we're pushing a raw transaction to the network we can get two kind of rejections: + # RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected + # due to network rules, whereas the later implies that the transaction is already in the blockchain. + if e.error.get('code') == RPC_VERIFY_REJECTED: + # DISCUSS: what to do in this case + # DISCUSS: invalid transactions (properly formatted but invalid, like unsigned) fit here too. + # DISCUSS: RPC_VERIFY_ERROR could also be a possible case. + # DISCUSS: check errors -9 and -10 + pass + + elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN: + try: + if debug: + logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and " + "start monitoring the transaction".format(justice_txid)) + + # If the transaction is already in the chain, we get the number of confirmations and watch the job + # until the end of the appointment + tx_info = bitcoin_cli.getrawtransaction(justice_txid, 1) + confirmations = int(tx_info.get("confirmations")) + self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, + retry=retry, confirmations=confirmations) + + except JSONRPCException as e: + # While it's quite unlikely, the transaction that was already in the blockchain could have been + # reorged while we were querying bitcoind to get the confirmation count. In such a case we just + # restart the job + if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: + self.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, + logging, retry=retry) + elif debug: + # If something else happens (unlikely but possible) log it so we can treat it in future releases + logging.error("[Responder] JSONRPCException. Error {}".format(e)) + + elif debug: + # If something else happens (unlikely but possible) log it so we can treat it in future releases + logging.error("[Responder] JSONRPCException. Error {}".format(e)) + + def remove_completed_jobs(self, completed_jobs, height, debug, logging): + for uuid in completed_jobs: + if debug: + logging.info("[Responder] job completed (uuid = {}, justice_txid = {}). Appointment ended at " + "block {} after {} confirmations".format(uuid, self.jobs[uuid].justice_txid, height, + self.jobs[uuid].confirmations)) + + # ToDo: #9-add-data-persistency + justice_txid = self.jobs[uuid].justice_txid + self.jobs.pop(uuid) + + if len(self.tx_job_map[justice_txid]) == 1: + self.tx_job_map.pop(justice_txid) + + if debug: + logging.info("[Responder] no more jobs for justice_txid {}".format(justice_txid)) + + else: + self.tx_job_map[justice_txid].remove(uuid) + def handle_reorgs(self, bitcoin_cli, debug, logging): - for job_id, job in self.jobs.items(): + for uuid, job in self.jobs.items(): # First we check if the dispute transaction is still in the blockchain. If not, the justice can not be # there either, so we'll need to call the reorg manager straight away - dispute_in_chain, _ = check_tx_in_chain(bitcoin_cli, job.dispute_txid, debug, logging, parent='Responder', + dispute_in_chain, _ = check_tx_in_chain(bitcoin_cli, job.dispute_txid, debug, logging, + parent='Responder', tx_label='dispute tx') # If the dispute is there, we can check the justice tx if dispute_in_chain: - justice_in_chain, justice_confirmations = check_tx_in_chain(bitcoin_cli, job_id, debug, logging, - parent='Responder', tx_label='justice tx') + justice_in_chain, justice_confirmations = check_tx_in_chain(bitcoin_cli, job.justice_txid, debug, + logging, parent='Responder', + tx_label='justice tx') # If both transactions are there, we only need to update the justice tx confirmation count if justice_in_chain: if debug: logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format( - job_id, self.confirmation_counter[job_id], justice_confirmations)) + job.justice_txid, job.confirmations, justice_confirmations)) - self.confirmation_counter[job_id] = justice_confirmations + job.confirmations = justice_confirmations else: # Otherwise, we will add the job back (implying rebroadcast of the tx) and monitor it again # DISCUSS: Adding job back, should we flag it as retried? # FIXME: Whether we decide to increase the retried counter or not, the current counter should be # maintained. There is no way of doing so with the current approach. Update if required - self.add_response(job.dispute_txid, job_id, job.justice_rawtx, job.appointment_end, debug, logging) + self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx, + job.appointment_end, + debug, logging) else: # ToDo: #24-properly-handle-reorgs diff --git a/pisa/watcher.py b/pisa/watcher.py index ee9eb7b..a4590f5 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -12,6 +12,7 @@ from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, class Watcher: def __init__(self, max_appointments=MAX_APPOINTMENTS): self.appointments = dict() + self.locator_uuid_map = dict() self.block_queue = None self.asleep = True self.max_appointments = max_appointments @@ -31,10 +32,15 @@ class Watcher: # Two different nodes may ask for appointments using the same commitment txid, what will result in a # collision in our appointments structure (and may be an attack surface). In order to avoid such collisions # we will identify every appointment with a uuid - if not self.appointments.get(appointment.locator): - self.appointments[appointment.locator] = {} - self.appointments[appointment.locator][uuid4().hex] = appointment + uuid = uuid4().hex + self.appointments[uuid] = appointment + + if appointment.locator in self.locator_uuid_map: + self.locator_uuid_map[appointment.locator].append(uuid) + + else: + self.locator_uuid_map[appointment.locator] = [uuid] if self.asleep: self.asleep = False @@ -85,12 +91,17 @@ class Watcher: potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids} # Check is any of the tx_ids in the received block is an actual match - potential_matches = {} + # ToDo: set intersection should be a more optimal solution - for locator in self.appointments.keys(): - if locator in potential_locators: - # This is locator:txid - potential_matches[locator] = potential_locators[locator] + # Get the locators that are both in the map and in the potential locators dict. + intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys()) + potential_matches = {locator: potential_locators[locator] for locator in intersection} + + # potential_matches = {} + # for locator in self.locator_uuid_map.keys(): + # if locator in potential_locators: + # # This is locator:txid + # potential_matches[locator] = potential_locators[locator] if debug: if len(potential_matches) > 0: @@ -105,17 +116,20 @@ class Watcher: logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})" .format(justice_txid, locator, uuid)) - self.responder.add_response(dispute_txid, justice_txid, justice_rawtx, - self.appointments[locator][uuid].end_time, debug, logging) + self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, + self.appointments[uuid].end_time, debug, logging) + + # Delete the appointment + self.appointments.pop(uuid) # If there was only one appointment that matches the locator we can delete the whole list - if len(self.appointments[locator]) == 1: + if len(self.locator_uuid_map[locator]) == 1: # ToDo: #9-add-data-persistency - del self.appointments[locator] + self.locator_uuid_map.pop(locator) else: # Otherwise we just delete the appointment that matches locator:appointment_pos # ToDo: #9-add-data-persistency - del self.appointments[locator][uuid] + self.locator_uuid_map[locator].remove(uuid) except JSONRPCException as e: if debug: @@ -129,41 +143,41 @@ class Watcher: logging.error("[Watcher] no more pending appointments, going back to sleep") def delete_expired_appointment(self, block, debug, logging): - to_delete = {} + # to_delete = [] + # + # for uuid, appointment in self.appointments.items(): + # if block["height"] > appointment.end_time + EXPIRY_DELTA: + # # Add the appointment to the deletion list + # to_delete.append(uuid) - for locator, appointments in self.appointments.items(): - for uuid, appointment in appointments: - if block["height"] > appointment.end_time + EXPIRY_DELTA: - # Add the appointment to the deletion list - if locator in to_delete: - to_delete[locator].append(uuid) - else: - to_delete[locator] = [uuid] + to_delete = [uuid for uuid, appointment in self.appointments.items() if block["height"] > appointment.end_time + + EXPIRY_DELTA] - for locator, uuids in to_delete.items(): - if len(uuids) == len(self.appointments[locator]): - if debug: - logging.info("[Watcher] end time reached with no match! Deleting appointment {}".format(locator)) + for uuid in to_delete: + # ToDo: #9-add-data-persistency + locator = self.appointments[uuid].locator + + self.appointments.pop(uuid) + + if len(self.locator_uuid_map[locator]) == 1: + self.locator_uuid_map.pop(locator) - del self.appointments[locator] - # ToDo: #9-add-data-persistency else: - for uuid in uuids: - if debug: - logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})" - .format(locator, uuid)) + self.locator_uuid_map[locator].remove(uuid) - del self.appointments[locator][uuid] - # ToDo: #9-add-data-persistency + if debug: + logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})" + .format(locator, uuid)) def check_potential_matches(self, potential_matches, bitcoin_cli, debug, logging): matches = [] for locator, dispute_txid in potential_matches.items(): - for uuid, appointment in self.appointments.get(locator): + for uuid in self.locator_uuid_map[locator]: try: # ToDo: #20-test-tx-decrypting-edge-cases - justice_rawtx = appointment.encrypted_blob.decrypt(unhexlify(dispute_txid), debug, logging) + justice_rawtx = self.appointments[uuid].encrypted_blob.decrypt(unhexlify(dispute_txid), debug, + logging) justice_rawtx = hexlify(justice_rawtx).decode() justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid') matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx)) diff --git a/tests/simulator/appointment_test.py b/tests/add_appointment_test.py similarity index 70% rename from tests/simulator/appointment_test.py rename to tests/add_appointment_test.py index 2af8496..95a85e3 100644 --- a/tests/simulator/appointment_test.py +++ b/tests/add_appointment_test.py @@ -42,11 +42,34 @@ appointment = generate_dummy_appointment(dispute_txid) print("Sending appointment (locator: {}) to PISA".format(appointment.get("locator"))) r = requests.post(url=PISA_API, json=json.dumps(appointment), timeout=5) -print(r, r.reason) +print(r, r.reason, r.content) + +print("Requesting it back from PISA") +r = requests.get(url=PISA_API+"/get_appointment?locator="+appointment["locator"]) +print(r, r.reason, r.content) + +time.sleep(2) +print("Sending it again") +appointment["end_time"] += 1 +r = requests.post(url=PISA_API, json=json.dumps(appointment), timeout=5) +print(r, r.reason, r.content) print("Sleeping 10 sec") time.sleep(10) bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT)) +print("Getting all appointments") +r = requests.get(url=PISA_API+"/get_all_appointments") +print(r, r.reason, r.content) + print("Triggering PISA with dispute tx") -bitcoin_cli.sendrawtransaction(dispute_txid) \ No newline at end of file +bitcoin_cli.sendrawtransaction(dispute_txid) + +time.sleep(10) +print("Requesting it again") +r = requests.get(url=PISA_API+"/get_appointment?locator="+appointment["locator"]) +print(r, r.reason, r.content) + +print("Getting all appointments") +r = requests.get(url=PISA_API+"/get_all_appointments") +print(r, r.reason, r.content) \ No newline at end of file