diff --git a/pisa-btc/pisa/api.py b/pisa-btc/pisa/api.py index c5e3f05..c5329d5 100644 --- a/pisa-btc/pisa/api.py +++ b/pisa-btc/pisa/api.py @@ -39,6 +39,7 @@ def manage_request(conn, remote_addr, remote_port, inspector, watcher, debug, lo if appointment: appointment_added = watcher.add_appointment(appointment, debug, logging) + # FIXME: Response should be signed receipt (created and signed by the API) if appointment_added: response = "Appointment accepted" else: diff --git a/pisa-btc/pisa/appointment.py b/pisa-btc/pisa/appointment.py index 757bc99..2546faa 100644 --- a/pisa-btc/pisa/appointment.py +++ b/pisa-btc/pisa/appointment.py @@ -1,9 +1,13 @@ + + # Basic appointment structure +# DISCUSS: about the field the appointment will have class Appointment: - def __init__(self, locator, start_time, end_time, encrypted_blob, cypher): + def __init__(self, locator, start_time, end_time, dispute_delta, encrypted_blob, cypher): self.locator = locator self.start_time = start_time self.end_time = end_time + self.dispute_delta = dispute_delta self.encrypted_blob = encrypted_blob self.cypher = cypher diff --git a/pisa-btc/pisa/inspector.py b/pisa-btc/pisa/inspector.py index 487dd71..5d2bbb7 100644 --- a/pisa-btc/pisa/inspector.py +++ b/pisa-btc/pisa/inspector.py @@ -1,9 +1,11 @@ from pisa.appointment import Appointment +# FIXME: Implement a proper inspector class Inspector: def __init__(self): pass def inspect(self, appointment, debug): - return Appointment(appointment, None, None, None, None) + # Return Appointment if success, None otherwise + return Appointment(appointment, None, None, None, None, None) diff --git a/pisa-btc/pisa/responder.py b/pisa-btc/pisa/responder.py new file mode 100644 index 0000000..464f4e1 --- /dev/null +++ b/pisa-btc/pisa/responder.py @@ -0,0 +1,160 @@ +from queue import Queue +from threading import Thread +from pisa.zmq_subscriber import ZMQHandler +from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException +from conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT + + +CONFIRMATIONS_BEFORE_RETRY = 6 +MIN_CONFIRMATIONS = 6 + + +class Job: + def __init__(self, dispute_txid, rawtx, appointment_end, retry_counter=0): + self.dispute_txid = dispute_txid + self.rawtx = rawtx + self.appointment_end = appointment_end + self.in_block_height = None + self.missed_confirmations = 0 + self.retry_counter = retry_counter + + +class Responder: + def __init__(self): + self.jobs = dict() + self.confirmation_counter = dict() + self.block_queue = Queue() + self.asleep = True + + def add_response(self, dispute_txid, txid, rawtx, appointment_end, debug, logging, retry=False): + if self.asleep: + self.asleep = False + zmq_subscriber = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging]) + responder = Thread(target=self.handle_responses, args=[debug, logging]) + zmq_subscriber.start() + responder.start() + + bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, + BTC_RPC_PORT)) + try: + # ToDo: All errors should be handled as JSONRPCException, check that holds (if so response if no needed) + response = bitcoin_cli.sendrawtransaction(rawtx) + + # handle_responses can call add_response recursively if a broadcast transaction does not get confirmations + # retry holds such information. + # DISCUSS: Check what to do if the retry counter gets too big + if retry: + self.jobs[txid].retry_counter += 1 + else: + self.confirmation_counter[txid] = 0 + self.jobs[txid] = Job(dispute_txid, rawtx, appointment_end) + + if debug: + logging.info('[Responder] new job added (dispute txid = {}, txid = {}, appointment end = {})'.format( + dispute_txid, txid, appointment_end)) + + except JSONRPCException as e: + if debug: + # ToDo: Check type of error if transaction does not get through + logging.error("[Responder] JSONRPCException. Error code {}".format(e)) + + def handle_responses(self, debug, logging): + bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, + BTC_RPC_PORT)) + prev_block_hash = None + while len(self.jobs) > 0: + # We get notified for every new received block + block_hash = self.block_queue.get() + + try: + block = bitcoin_cli.getblock(block_hash) + txs = block.get('tx') + height = block.get('height') + + if debug: + logging.info("[Responder] new block received {}".format(block_hash)) + logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash'))) + logging.info("[Responder] list of transactions: {}".format(txs)) + + except JSONRPCException as e: + if debug: + logging.error("[Responder] couldn't get block from bitcoind. Error code {}".format(e)) + + continue + + if prev_block_hash == block.get('previousblockhash'): + for job_id, job in self.jobs.items(): + if job.appointment_end <= height: + # The end of the appointment has been reached + # ToDo: record job in DB + del (self.jobs[job_id]) + if debug: + logging.info("[Responder] job completed. Appointment ended at height {}".format(job_id, + height)) + + # Handling new jobs (aka jobs with not enough confirmations), when a job receives MIN_CONFIRMATIONS + # it will be passed to jobs and we will simply check for chain forks. + for job_id, confirmations in self.confirmation_counter.items(): + # If we see the transaction for the first time, or MIN_CONFIRMATIONS hasn't been reached + if job_id in txs or (0 < confirmations < MIN_CONFIRMATIONS): + confirmations += 1 + + if debug: + logging.info("[Responder] new confirmation received for txid = {}".format(job_id)) + + elif self.jobs[job_id].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY: + # If a transactions has missed too many confirmations for a while we'll try to rebroadcast + # DISCUSS: How many confirmations before retry + # DISCUSS: recursion vs setting confirmations to 0 and rebroadcast here + # DISCUSS: how many max retries and what to do if the cap is reached + self.add_response(self.jobs[job_id].dispute_txid, job_id, self.jobs[job_id].tx, + self.jobs[job_id].appointment_end, debug, logging, retry=True) + if debug: + logging.info("[Responder] txid = {} has missed {} confirmations. Rebroadcast" + .format(job_id, CONFIRMATIONS_BEFORE_RETRY)) + else: + # Otherwise we increase the number of missed confirmations + self.jobs[job_id].missed_confirmations += 1 + + else: + # ToDo: REORG!! + if debug: + logging.error("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}" + .format(prev_block_hash, block.get('previousblockhash'))) + + self.handle_reorgs(bitcoin_cli, debug, logging) + + prev_block_hash = block.get('previousblockhash') + + # Go back to sleep if there are no more jobs + self.asleep = True + + if debug: + logging.error("[Responder] no more pending jobs, going back to sleep.") + + def handle_reorgs(self, bitcoin_cli, debug, logging): + for job_id, job in self.jobs: + try: + tx_info = bitcoin_cli.gettransaction(job_id) + job.confirmations = int(tx_info.get("confirmations")) + + except JSONRPCException as e: + # FIXME: It should be safe but check Exception code anyway + if debug: + logging.error("[Responder] justice transaction (txid = {}) not found!".format(job_id)) + + try: + bitcoin_cli.gettransaction(job.dispute_txid) + # DISCUSS: Add job back, should we flag it as retried? + self.add_response(job.dispute_txid, job_id, job.rawtx, job.appointment_end, debug, logging) + except JSONRPCException as e: + # FIXME: It should be safe but check Exception code anyway + # ToDO: Dispute transaction if not there either, call reorg manager + if debug: + logging.error("[Responder] dispute transaction (txid = {}) not found either!" + .format(job.dispute_txid)) + pass + + def do_subscribe(self, block_queue, debug, logging): + daemon = ZMQHandler() + daemon.handle(block_queue, debug, logging) diff --git a/pisa-btc/pisa/watcher.py b/pisa-btc/pisa/watcher.py index be8b540..e464132 100644 --- a/pisa-btc/pisa/watcher.py +++ b/pisa-btc/pisa/watcher.py @@ -1,6 +1,7 @@ from queue import Queue from threading import Thread from pisa.tools import decrypt_tx +from pisa.responder import Responder from pisa.zmq_subscriber import ZMQHandler from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException from conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS @@ -14,7 +15,7 @@ class Watcher: self.max_appointments = max_appointments def add_appointment(self, appointment, debug, logging): - # ToDo: Discuss about validation of input data + # DISCUSS: about validation of input data # Rationale: # The Watcher will analyze every received block looking for appointment matches. If there is no work @@ -23,8 +24,6 @@ class Watcher: # If the watcher is awake, every new appointment will just be added to the appointment list until # max_appointments is reached. - # ToDo: Check how to handle appointment completion - if len(self.appointments) < self.max_appointments: # Appointments are identified by the locator: the most significant 16 bytes of the commitment txid. # While 16-byte hash collisions are not likely, they are possible, so we will store appointments in lists @@ -37,7 +36,8 @@ class Watcher: if self.asleep: self.asleep = False zmq_subscriber = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging]) - watcher = Thread(target=self.do_watch, args=[debug, logging]) + responder = Responder() + watcher = Thread(target=self.do_watch, args=[responder, debug, logging]) zmq_subscriber.start() watcher.start() @@ -59,7 +59,7 @@ class Watcher: daemon = ZMQHandler() daemon.handle(block_queue, debug, logging) - def do_watch(self, debug, logging): + def do_watch(self, responder, debug, logging): bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT)) @@ -68,13 +68,10 @@ class Watcher: try: block = bitcoin_cli.getblock(block_hash) - # ToDo: prev_block_id will be used to store chain state and handle reorgs - prev_block_id = block.get('previousblockhash') txs = block.get('tx') if debug: logging.info("[Watcher] new block received {}".format(block_hash)) - logging.info("[Watcher] prev. block hash {}".format(prev_block_id)) logging.info("[Watcher] list of transactions: {}".format(txs)) potential_matches = [] @@ -90,12 +87,12 @@ class Watcher: matches = self.check_potential_matches(potential_matches, bitcoin_cli, debug, logging) - for locator, appointment_pos, transaction in matches: - # ToDo: Notify responder with every match. - # notify_responder(transaction) + for locator, appointment_pos, txid, tx in matches: + # FIXME: Notify responder with every match. + responder.add_response(txid, tx, self.appointments[locator].end_time, debug, logging) # If there was only one appointment that matches the locator we can delete the whole list - # ToDo: We may want to use locks before adding / removing appointment + # DISCUSS: We may want to use locks before adding / removing appointment if len(self.appointments[locator]) == 1: del self.appointments[locator] else: @@ -103,34 +100,42 @@ class Watcher: del self.appointments[locator][appointment_pos] if debug: - logging.error("[Watcher] Notifying responder about {}:{} and deleting appointment" + logging.error("[Watcher] notifying responder about {}:{} and deleting appointment" .format(locator, appointment_pos)) except JSONRPCException as e: - logging.error("[Watcher] JSONRPCException. Error code {}".format(e)) + if debug: + logging.error("[Watcher] JSONRPCException. Error code {}".format(e)) continue + # Go back to sleep if there are no more appointments + self.asleep = True + + if debug: + logging.error("[Watcher] no more pending appointments, going back to sleep.") + def check_potential_matches(self, potential_matches, bitcoin_cli, debug, logging): matches = [] for locator, k in potential_matches: for appointment_pos, appointment in enumerate(self.appointments.get(locator)): try: - # ToDo: Put this back - # decrypted_data = decrypt_tx(appointment.encrypted_blob, k, appointment.cypher) - # ToDo: Remove this. Temporary hack, since we are not working with blobs but with ids for now - # ToDo: just get the raw transaction that matches both parts of the id - decrypted_data = bitcoin_cli.getrawtransaction(locator + k) + txid = locator + k + # FIXME: Put this back + # tx = decrypt_tx(appointment.encrypted_blob, k, appointment.cypher) + # FIXME: Remove this. Temporary hack, since we are not working with blobs but with ids for now + # FIXME: just get the raw transaction that matches both parts of the id + tx = bitcoin_cli.getrawtransaction(txid) - bitcoin_cli.decoderawtransaction(decrypted_data) - matches.append((locator, appointment_pos, decrypted_data)) + bitcoin_cli.decoderawtransaction(tx) + matches.append((locator, appointment_pos, txid, tx)) if debug: - logging.error("[Watcher] Match found for {}:{}! {}".format(locator, appointment_pos, locator+k)) + logging.error("[Watcher] match found for {}:{}! {}".format(locator, appointment_pos, locator+k)) except JSONRPCException as e: # Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple # for the POC if debug: - logging.error("[Watcher] Can't build transaction from decoded data. Error code {}".format(e)) + logging.error("[Watcher] can't build transaction from decoded data. Error code {}".format(e)) continue return matches