diff --git a/pisa/__init__.py b/pisa/__init__.py index 5fd2667..a279a5c 100644 --- a/pisa/__init__.py +++ b/pisa/__init__.py @@ -1,2 +1,20 @@ +import logging + +from pisa.utils.auth_proxy import AuthServiceProxy +import pisa.conf as conf + + HOST = 'localhost' -PORT = 9814 \ No newline at end of file +PORT = 9814 + +# Configure logging +logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[ + logging.FileHandler(conf.SERVER_LOG_FILE), + logging.StreamHandler() +]) + +# Create RPC connection with bitcoind +# TODO: Check if a long lived connection like this may create problems (timeouts) +bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (conf.BTC_RPC_USER, conf.BTC_RPC_PASSWD, conf.BTC_RPC_HOST, + conf.BTC_RPC_PORT)) + diff --git a/pisa/api.py b/pisa/api.py index 15bf4a2..660e5f0 100644 --- a/pisa/api.py +++ b/pisa/api.py @@ -1,15 +1,12 @@ -from pisa import * +import json +from flask import Flask, request, Response, abort, jsonify + +from pisa import HOST, PORT, logging, bitcoin_cli from pisa.watcher import Watcher from pisa.inspector import Inspector from pisa.appointment import Appointment -from flask import Flask, request, Response, abort, jsonify -import json -# FIXME: HERE FOR TESTING (get_block_count). REMOVE WHEN REMOVING THE FUNCTION -from pisa.utils.authproxy import AuthServiceProxy -from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT - # ToDo: #5-add-async-to-api app = Flask(__name__) HTTP_OK = 200 @@ -22,15 +19,14 @@ def add_appointment(): remote_addr = request.environ.get('REMOTE_ADDR') remote_port = request.environ.get('REMOTE_PORT') - if debug: - logging.info('[API] connection accepted from {}:{}'.format(remote_addr, remote_port)) + logging.info('[API] connection accepted from {}:{}'.format(remote_addr, remote_port)) # Check content type once if properly defined request_data = json.loads(request.get_json()) appointment = inspector.inspect(request_data) if type(appointment) == Appointment: - appointment_added = watcher.add_appointment(appointment, debug, logging) + appointment_added = watcher.add_appointment(appointment) # ToDo: #13-create-server-side-signature-receipt if appointment_added: @@ -49,9 +45,7 @@ def add_appointment(): rcode = HTTP_BAD_REQUEST response = "appointment rejected. Request does not match the standard" - if debug: - logging.info('[API] sending response and disconnecting: {} --> {}:{}'.format(response, remote_addr, - remote_port)) + logging.info('[API] sending response and disconnecting: {} --> {}:{}'.format(response, remote_addr, remote_port)) return Response(response, status=rcode, mimetype='text/plain') @@ -115,21 +109,16 @@ def get_all_appointments(): @app.route('/get_block_count', methods=['GET']) def get_block_count(): - bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, - BTC_RPC_PORT)) - return jsonify({"block_count": bitcoin_cli.getblockcount()}) -def start_api(d, l): +def start_api(): # FIXME: Pretty ugly but I haven't found a proper way to pass it to add_appointment - global debug, logging, watcher, inspector - debug = d - logging = l + global watcher, inspector # ToDo: #18-separate-api-from-watcher watcher = Watcher() - inspector = Inspector(debug, logging) + inspector = Inspector() # Setting Flask log t ERROR only so it does not mess with out logging logging.getLogger('werkzeug').setLevel(logging.ERROR) diff --git a/pisa/appointment.py b/pisa/appointment.py index af304ee..a4d5718 100644 --- a/pisa/appointment.py +++ b/pisa/appointment.py @@ -19,7 +19,5 @@ class Appointment: return appointment - # ToDO: #3-improve-appointment-strcuture - - + # ToDO: #3-improve-appointment-structure diff --git a/pisa/block_processor.py b/pisa/block_processor.py new file mode 100644 index 0000000..b047cc3 --- /dev/null +++ b/pisa/block_processor.py @@ -0,0 +1,90 @@ +import binascii +from hashlib import sha256 + +from pisa import logging, bitcoin_cli +from pisa.utils.auth_proxy import JSONRPCException + + +class BlockProcessor: + @staticmethod + def get_block(block_hash): + block = None + + try: + block = bitcoin_cli.getblock(block_hash) + + except JSONRPCException as e: + logging.error("[BlockProcessor] couldn't get block from bitcoind. Error code {}".format(e)) + + return block + + @staticmethod + def get_best_block_hash(): + block_hash = None + + try: + block_hash = bitcoin_cli.getbestblockhash() + + except JSONRPCException as e: + logging.error("[BlockProcessor] couldn't get block hash. Error code {}".format(e)) + + return block_hash + + @staticmethod + def get_potential_matches(txids, locator_uuid_map): + potential_locators = {sha256(binascii.unhexlify(txid)).hexdigest(): txid for txid in txids} + + # Check is any of the tx_ids in the received block is an actual match + intersection = set(locator_uuid_map.keys()).intersection(potential_locators.keys()) + potential_matches = {locator: potential_locators[locator] for locator in intersection} + + if len(potential_matches) > 0: + logging.info("[BlockProcessor] list of potential matches: {}".format(potential_matches)) + + else: + logging.info("[BlockProcessor] no potential matches found") + + @staticmethod + def get_matches(potential_matches, locator_uuid_map, appointments): + matches = [] + + for locator, dispute_txid in potential_matches.items(): + for uuid in locator_uuid_map[locator]: + try: + # ToDo: #20-test-tx-decrypting-edge-cases + justice_rawtx = appointments[uuid].encrypted_blob.decrypt(binascii.unhexlify(dispute_txid)) + justice_rawtx = binascii.hexlify(justice_rawtx).decode() + justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid') + matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx)) + + logging.info("[BlockProcessor] match found for locator {} (uuid: {}): {}".format( + locator, uuid, justice_txid)) + + except JSONRPCException as e: + # Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple + # for the POC + logging.error("[BlockProcessor] can't build transaction from decoded data. Error code {}".format(e)) + + return matches + + @staticmethod + def check_confirmations(txs, unconfirmed_txs, tx_job_map, missed_confirmations): + + for tx in txs: + if tx in tx_job_map and tx in unconfirmed_txs: + unconfirmed_txs.remove(tx) + + logging.info("[Responder] confirmation received for tx {}".format(tx)) + + elif tx in unconfirmed_txs: + if tx in missed_confirmations: + missed_confirmations[tx] += 1 + + else: + missed_confirmations[tx] = 1 + + logging.info("[Responder] tx {} missed a confirmation (total missed: {})" + .format(tx, missed_confirmations[tx])) + + return unconfirmed_txs, missed_confirmations + diff --git a/pisa/carrier.py b/pisa/carrier.py new file mode 100644 index 0000000..0526f5c --- /dev/null +++ b/pisa/carrier.py @@ -0,0 +1,77 @@ +from pisa.utils.auth_proxy import JSONRPCException +from pisa.errors import UNKNOWN_JSON_RPC_EXCEPTION +from pisa import logging, bitcoin_cli +from pisa.rpc_errors import * + + +class Carrier: + class Receipt: + def __init__(self, delivered, confirmations=0, reason=None): + self.delivered = delivered + self.confirmations = confirmations + self.reason = reason + + def send_transaction(self, rawtx, txid): + try: + logging.info("[Carrier] pushing transaction to the network (txid: {})".format(rawtx)) + bitcoin_cli.sendrawtransaction(rawtx) + + receipt = self.Receipt(delivered=True) + + except JSONRPCException as e: + errno = e.error.get('code') + # Since we're pushing a raw transaction to the network we can get two kind of rejections: + # RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected + # due to network rules, whereas the later implies that the transaction is already in the blockchain. + if errno == RPC_VERIFY_REJECTED: + # DISCUSS: what to do in this case + # DISCUSS: invalid transactions (properly formatted but invalid, like unsigned) fit here too. + # DISCUSS: RPC_VERIFY_ERROR could also be a possible case. + # DISCUSS: check errors -9 and -10 + # TODO: UNKNOWN_JSON_RPC_EXCEPTION is not the proper exception here. This is long due. + receipt = self.Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION) + + elif errno == RPC_VERIFY_ALREADY_IN_CHAIN: + logging.info("[Carrier] {} is already in the blockchain. Getting confirmation count".format(txid)) + + # If the transaction is already in the chain, we get the number of confirmations and watch the job + # until the end of the appointment + tx_info = self.get_transaction(txid) + + if tx_info is not None: + confirmations = int(tx_info.get("confirmations")) + receipt = self.Receipt(delivered=True, confirmations=confirmations) + + else: + # There's a really unlike edge case where a transaction can be reorged between receiving the + # notification and querying the data. In such a case we just resend + self.send_transaction(rawtx, txid) + + else: + # If something else happens (unlikely but possible) log it so we can treat it in future releases + logging.error("[Responder] JSONRPCException. Error {}".format(e)) + receipt = self.Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION) + + return receipt + + @staticmethod + def get_transaction(txid): + tx_info = None + + try: + tx_info = bitcoin_cli.getrawtransaction(txid, 1) + + except JSONRPCException as e: + # While it's quite unlikely, the transaction that was already in the blockchain could have been + # reorged while we were querying bitcoind to get the confirmation count. In such a case we just + # restart the job + if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: + logging.info("[Carrier] transaction {} got reorged before obtaining information".format(txid)) + + # TODO: Check RPC methods to see possible returns and avoid general else + # else: + # # If something else happens (unlikely but possible) log it so we can treat it in future releases + # logging.error("[Responder] JSONRPCException. Error {}".format(e)) + + return tx_info + diff --git a/pisa/cleaner.py b/pisa/cleaner.py new file mode 100644 index 0000000..d7808b9 --- /dev/null +++ b/pisa/cleaner.py @@ -0,0 +1,45 @@ +import pisa.conf as conf +from pisa import logging + + +class Cleaner: + @staticmethod + def delete_expired_appointment(block, appointments, locator_uuid_map): + to_delete = [uuid for uuid, appointment in appointments.items() + if block["height"] > appointment.end_time + conf.EXPIRY_DELTA] + + for uuid in to_delete: + locator = appointments[uuid].locator + + appointments.pop(uuid) + + if len(locator_uuid_map[locator]) == 1: + locator_uuid_map.pop(locator) + + else: + locator_uuid_map[locator].remove(uuid) + + logging.info("[Cleaner] end time reached with no match! Deleting appointment {} (uuid: {})".format(locator, + uuid)) + + return appointments, locator_uuid_map + + @staticmethod + def delete_completed_jobs(jobs, tx_job_map, completed_jobs, height): + for uuid in completed_jobs: + logging.info("[Cleaner] job completed (uuid = {}). Appointment ended at block {} after {} confirmations" + .format(uuid, jobs[uuid].justice_txid, height, jobs[uuid].confirmations)) + + # ToDo: #9-add-data-persistence + justice_txid = jobs[uuid].justice_txid + jobs.pop(uuid) + + if len(tx_job_map[justice_txid]) == 1: + tx_job_map.pop(justice_txid) + + logging.info("[Cleaner] no more jobs for justice_txid {}".format(justice_txid)) + + else: + tx_job_map[justice_txid].remove(uuid) + + return jobs, tx_job_map diff --git a/pisa/encrypted_blob.py b/pisa/encrypted_blob.py index 49fd4af..5c1b78f 100644 --- a/pisa/encrypted_blob.py +++ b/pisa/encrypted_blob.py @@ -1,5 +1,5 @@ -from binascii import unhexlify, hexlify from hashlib import sha256 +from binascii import unhexlify, hexlify from cryptography.hazmat.primitives.ciphers.aead import AESGCM diff --git a/pisa/inspector.py b/pisa/inspector.py index f2c2bb0..25bb344 100644 --- a/pisa/inspector.py +++ b/pisa/inspector.py @@ -1,16 +1,13 @@ import re -from pisa.appointment import Appointment + from pisa import errors -from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException -from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MIN_DISPUTE_DELTA, \ - SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS +import pisa.conf as conf +from pisa import logging, bitcoin_cli +from pisa.appointment import Appointment +from pisa.utils.auth_proxy import JSONRPCException class Inspector: - def __init__(self, debug=False, logging=None): - self.debug = debug - self.logging = logging - def inspect(self, data): locator = data.get('locator') start_time = data.get('start_time') @@ -20,8 +17,6 @@ class Inspector: cipher = data.get('cipher') hash_function = data.get('hash_function') - bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, - BTC_RPC_PORT)) try: block_height = bitcoin_cli.getblockcount() @@ -45,8 +40,7 @@ class Inspector: r = (rcode, message) except JSONRPCException as e: - if self.debug: - self.logging.error("[Inspector] JSONRPCException. Error code {}".format(e)) + logging.error("[Inspector] JSONRPCException. Error code {}".format(e)) # In case of an unknown exception, assign a special rcode and reason. r = (errors.UNKNOWN_JSON_RPC_EXCEPTION, "Unexpected error occurred") @@ -71,8 +65,7 @@ class Inspector: rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT message = "wrong locator format ({})".format(locator) - if self.debug and message: - self.logging.error("[Inspector] {}".format(message)) + logging.error("[Inspector] {}".format(message)) return rcode, message @@ -95,8 +88,7 @@ class Inspector: else: message = "start_time too close to current height" - if self.debug and message: - self.logging.error("[Inspector] {}".format(message)) + logging.error("[Inspector] {}".format(message)) return rcode, message @@ -122,8 +114,7 @@ class Inspector: rcode = errors.APPOINTMENT_FIELD_TOO_SMALL message = 'end_time is in the past' - if self.debug and message: - self.logging.error("[Inspector] {}".format(message)) + logging.error("[Inspector] {}".format(message)) return rcode, message @@ -139,13 +130,12 @@ class Inspector: elif t != int: rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE message = "wrong dispute_delta data type ({})".format(t) - elif dispute_delta < MIN_DISPUTE_DELTA: + elif dispute_delta < conf.MIN_DISPUTE_DELTA: rcode = errors.APPOINTMENT_FIELD_TOO_SMALL message = "dispute delta too small. The dispute delta should be at least {} (current: {})".format( - MIN_DISPUTE_DELTA, dispute_delta) + conf.MIN_DISPUTE_DELTA, dispute_delta) - if self.debug and message: - self.logging.error("[Inspector] {}".format(message)) + logging.error("[Inspector] {}".format(message)) return rcode, message @@ -166,8 +156,8 @@ class Inspector: # ToDo: #6 We may want to define this to be at least as long as one block of the cipher we are using rcode = errors.APPOINTMENT_WRONG_FIELD message = "wrong encrypted_blob" - if self.debug and message: - self.logging.error("[Inspector] {}".format(message)) + + logging.error("[Inspector] {}".format(message)) return rcode, message @@ -183,12 +173,11 @@ class Inspector: elif t != str: rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE message = "wrong cipher data type ({})".format(t) - elif cipher not in SUPPORTED_CIPHERS: + elif cipher not in conf.SUPPORTED_CIPHERS: rcode = errors.APPOINTMENT_CIPHER_NOT_SUPPORTED message = "cipher not supported: {}".format(cipher) - if self.debug and message: - self.logging.error("[Inspector] {}".format(message)) + logging.error("[Inspector] {}".format(message)) return rcode, message @@ -204,11 +193,10 @@ class Inspector: elif t != str: rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE message = "wrong hash_function data type ({})".format(t) - elif hash_function not in SUPPORTED_HASH_FUNCTIONS: + elif hash_function not in conf.SUPPORTED_HASH_FUNCTIONS: rcode = errors.APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED message = "hash_function not supported {}".format(hash_function) - if self.debug and message: - self.logging.error("[Inspector] {}".format(message)) + logging.error("[Inspector] {}".format(message)) return rcode, message diff --git a/pisa/pisad.py b/pisa/pisad.py index 29ba8a1..8fbce83 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -1,36 +1,26 @@ -import logging from sys import argv from getopt import getopt -from threading import Thread + +from pisa import logging from pisa.api import start_api from pisa.tools import can_connect_to_bitcoind, in_correct_network -from pisa.utils.authproxy import AuthServiceProxy -from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, BTC_NETWORK, SERVER_LOG_FILE if __name__ == '__main__': debug = False opts, _ = getopt(argv[1:], 'd', ['debug']) for opt, arg in opts: - if opt in ['-d', '--debug']: - debug = True + # FIXME: Leaving this here for future option/arguments + pass - # Configure logging - logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[ - logging.FileHandler(SERVER_LOG_FILE), - logging.StreamHandler() - ]) + if can_connect_to_bitcoind(): + if in_correct_network(): + # Fire the api + start_api() - bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, - BTC_RPC_PORT)) - - if can_connect_to_bitcoind(bitcoin_cli): - if in_correct_network(bitcoin_cli, BTC_NETWORK): - # ToDo: This may not have to be a thead. The main thread only creates this and terminates. - api_thread = Thread(target=start_api, args=[debug, logging]) - api_thread.start() else: logging.error("[Pisad] bitcoind is running on a different network, check conf.py and bitcoin.conf. " "Shutting down") + else: logging.error("[Pisad] can't connect to bitcoind. Shutting down") diff --git a/pisa/responder.py b/pisa/responder.py index 9146599..0f04662 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -2,25 +2,25 @@ from queue import Queue from threading import Thread from hashlib import sha256 from binascii import unhexlify -from pisa.zmq_subscriber import ZMQHandler -from pisa.rpc_errors import * + +from pisa.cleaner import Cleaner +from pisa.carrier import Carrier +from pisa import logging from pisa.tools import check_tx_in_chain -from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException -from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT +from pisa.block_processor import BlockProcessor +from pisa.utils.zmq_subscriber import ZMQHandler CONFIRMATIONS_BEFORE_RETRY = 6 MIN_CONFIRMATIONS = 6 class Job: - def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0, retry_counter=0): + def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry_counter=0): self.dispute_txid = dispute_txid self.justice_txid = justice_txid self.justice_rawtx = justice_rawtx self.appointment_end = appointment_end - self.confirmations = confirmations - self.missed_confirmations = 0 self.retry_counter = retry_counter # FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info @@ -28,8 +28,7 @@ class Job: self.locator = sha256(unhexlify(dispute_txid)).hexdigest() def to_json(self): - job = {"locator": self.locator, "justice_rawtx": self.justice_rawtx, "confirmations": self.confirmations, - "appointment_end": self.appointment_end} + job = {"locator": self.locator, "justice_rawtx": self.justice_rawtx, "appointment_end": self.appointment_end} return job @@ -38,40 +37,37 @@ class Responder: def __init__(self): self.jobs = dict() self.tx_job_map = dict() + self.unconfirmed_txs = [] + self.missed_confirmations = dict() self.block_queue = None self.asleep = True self.zmq_subscriber = None - def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, - retry=False): + def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False): + if self.asleep: + logging.info("[Responder] waking up!") - bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, - BTC_RPC_PORT)) + carrier = Carrier() + receipt = carrier.send_transaction(justice_rawtx, justice_txid) - try: - if debug: - if self.asleep: - logging.info("[Responder] waking up!") - logging.info("[Responder] pushing transaction to the network (txid: {})".format(justice_txid)) - - bitcoin_cli.sendrawtransaction(justice_rawtx) - - # handle_responses can call add_response recursively if a broadcast transaction does not get confirmations + if receipt.delivered: + # do_watch can call add_response recursively if a broadcast transaction does not get confirmations # retry holds such information. - self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, - retry=retry) + self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry, + confirmations=receipt.confirmations) - except JSONRPCException as e: - self.handle_send_failures(e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, - debug, logging, retry) + else: + # TODO: Add the missing reasons (e.g. RPC_VERIFY_REJECTED) + pass - def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, - confirmations=0, retry=False): + def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0, + retry=False): # ToDo: #23-define-behaviour-approaching-end if retry: self.jobs[uuid].retry_counter += 1 self.jobs[uuid].missed_confirmations = 0 + else: self.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations) @@ -81,174 +77,119 @@ class Responder: else: self.tx_job_map[justice_txid] = [uuid] - if debug: - logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'. - format(dispute_txid, justice_txid, appointment_end)) + if confirmations == 0: + self.unconfirmed_txs.append(justice_txid) + + logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})' + .format(dispute_txid, justice_txid, appointment_end)) if self.asleep: self.asleep = False self.block_queue = Queue() - zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging]) - responder = Thread(target=self.handle_responses, args=[debug, logging]) + zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue]) + responder = Thread(target=self.do_watch) zmq_thread.start() responder.start() - def do_subscribe(self, block_queue, debug, logging): + def do_subscribe(self, block_queue): self.zmq_subscriber = ZMQHandler(parent='Responder') - self.zmq_subscriber.handle(block_queue, debug, logging) + self.zmq_subscriber.handle(block_queue) - def handle_responses(self, debug, logging): - bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, - BTC_RPC_PORT)) + def do_watch(self): + # ToDo: #9-add-data-persistence + # change prev_block_hash to the last known tip when bootstrapping prev_block_hash = 0 + while len(self.jobs) > 0: # We get notified for every new received block block_hash = self.block_queue.get() + block = BlockProcessor.get_block(block_hash) - try: - block = bitcoin_cli.getblock(block_hash) + if block is not None: txs = block.get('tx') height = block.get('height') - if debug: - logging.info("[Responder] new block received {}".format(block_hash)) - logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash'))) - logging.info("[Responder] list of transactions: {}".format(txs)) + logging.info("[Responder] new block received {}".format(block_hash)) + logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash'))) + logging.info("[Responder] list of transactions: {}".format(txs)) - except JSONRPCException as e: - if debug: - logging.error("[Responder] couldn't get block from bitcoind. Error code {}".format(e)) + # ToDo: #9-add-data-persistence + # change prev_block_hash condition + if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0: + self.unconfirmed_txs, self.missed_confirmations = BlockProcessor.check_confirmations( + txs, self.unconfirmed_txs, self.tx_job_map, self.missed_confirmations) - continue + txs_to_rebroadcast = self.get_txs_to_rebroadcast(txs) + self.jobs, self.tx_job_map = Cleaner.delete_completed_jobs(self.jobs, self.tx_job_map, + self.get_completed_jobs(height), height) - completed_jobs = [] - if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0: - # Keep count of the confirmations each tx gets - for justice_txid, jobs in self.tx_job_map.items(): - for uuid in jobs: - if justice_txid in txs or self.jobs[uuid].confirmations > 0: - self.jobs[uuid].confirmations += 1 + self.rebroadcast(txs_to_rebroadcast) - if debug: - logging.info("[Responder] new confirmation received for job = {}, txid = {}".format( - uuid, justice_txid)) - - elif self.jobs[uuid].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY: - # If a transactions has missed too many confirmations for a while we'll try to rebroadcast - # ToDO: #22-discuss-confirmations-before-retry - # ToDo: #23-define-behaviour-approaching-end - self.add_response(uuid, self.jobs[uuid].dispute_txid, justice_txid, - self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, debug, - logging, retry=True) - if debug: - logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting" - .format(justice_txid, CONFIRMATIONS_BEFORE_RETRY)) - - else: - # Otherwise we increase the number of missed confirmations - self.jobs[uuid].missed_confirmations += 1 - - if self.jobs[uuid].appointment_end <= height and self.jobs[uuid].confirmations >= \ - MIN_CONFIRMATIONS: - # The end of the appointment has been reached - completed_jobs.append(uuid) - - self.remove_completed_jobs(completed_jobs, height, debug, logging) - - else: - if debug: + else: logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}" .format(prev_block_hash, block.get('previousblockhash'))) - self.handle_reorgs(bitcoin_cli, debug, logging) + self.handle_reorgs() - prev_block_hash = block.get('hash') + prev_block_hash = block.get('hash') # Go back to sleep if there are no more jobs self.asleep = True self.zmq_subscriber.terminate = True - if debug: - logging.info("[Responder] no more pending jobs, going back to sleep") + logging.info("[Responder] no more pending jobs, going back to sleep") - def handle_send_failures(self, e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, - debug, logging, retry): - # Since we're pushing a raw transaction to the network we can get two kind of rejections: - # RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected - # due to network rules, whereas the later implies that the transaction is already in the blockchain. - if e.error.get('code') == RPC_VERIFY_REJECTED: - # DISCUSS: what to do in this case - # DISCUSS: invalid transactions (properly formatted but invalid, like unsigned) fit here too. - # DISCUSS: RPC_VERIFY_ERROR could also be a possible case. - # DISCUSS: check errors -9 and -10 - pass + def get_txs_to_rebroadcast(self, txs): + txs_to_rebroadcast = [] - elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN: - try: - if debug: - logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and " - "start monitoring the transaction".format(justice_txid)) + for tx in txs: + if self.missed_confirmations[tx] >= CONFIRMATIONS_BEFORE_RETRY: + # If a transactions has missed too many confirmations we add it to the rebroadcast list + txs_to_rebroadcast.append(tx) - # If the transaction is already in the chain, we get the number of confirmations and watch the job - # until the end of the appointment - tx_info = bitcoin_cli.getrawtransaction(justice_txid, 1) - confirmations = int(tx_info.get("confirmations")) - self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, - retry=retry, confirmations=confirmations) + return txs_to_rebroadcast - except JSONRPCException as e: - # While it's quite unlikely, the transaction that was already in the blockchain could have been - # reorged while we were querying bitcoind to get the confirmation count. In such a case we just - # restart the job - if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: - self.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, - logging, retry=retry) - elif debug: - # If something else happens (unlikely but possible) log it so we can treat it in future releases - logging.error("[Responder] JSONRPCException. Error {}".format(e)) + def get_completed_jobs(self, height): + completed_jobs = [] - elif debug: - # If something else happens (unlikely but possible) log it so we can treat it in future releases - logging.error("[Responder] JSONRPCException. Error {}".format(e)) + for uuid, job in self.jobs: + if job.appointment_end <= height: + tx = Carrier.get_transaction(job.dispute_txid) - def remove_completed_jobs(self, completed_jobs, height, debug, logging): - for uuid in completed_jobs: - if debug: - logging.info("[Responder] job completed (uuid = {}, justice_txid = {}). Appointment ended at " - "block {} after {} confirmations".format(uuid, self.jobs[uuid].justice_txid, height, - self.jobs[uuid].confirmations)) + # FIXME: Should be improved with the librarian + if tx is not None and tx.get('confirmations') > MIN_CONFIRMATIONS: + # The end of the appointment has been reached + completed_jobs.append(uuid) - # ToDo: #9-add-data-persistency - justice_txid = self.jobs[uuid].justice_txid - self.jobs.pop(uuid) + return completed_jobs - if len(self.tx_job_map[justice_txid]) == 1: - self.tx_job_map.pop(justice_txid) + def rebroadcast(self, jobs_to_rebroadcast): + # ToDO: #22-discuss-confirmations-before-retry + # ToDo: #23-define-behaviour-approaching-end - if debug: - logging.info("[Responder] no more jobs for justice_txid {}".format(justice_txid)) + for tx in jobs_to_rebroadcast: + for uuid in self.tx_job_map[tx]: + self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid, + self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True) - else: - self.tx_job_map[justice_txid].remove(uuid) + logging.warning("[Responder] tx {} has missed {} confirmations. Rebroadcasting" + .format(self.jobs[uuid].justice_txid, CONFIRMATIONS_BEFORE_RETRY)) - def handle_reorgs(self, bitcoin_cli, debug, logging): + # FIXME: Legacy code, must be checked and updated/fixed + def handle_reorgs(self): for uuid, job in self.jobs.items(): # First we check if the dispute transaction is still in the blockchain. If not, the justice can not be # there either, so we'll need to call the reorg manager straight away - dispute_in_chain, _ = check_tx_in_chain(bitcoin_cli, job.dispute_txid, debug, logging, - parent='Responder', - tx_label='dispute tx') + dispute_in_chain, _ = check_tx_in_chain(job.dispute_txid, parent='Responder', tx_label='dispute tx') # If the dispute is there, we can check the justice tx if dispute_in_chain: - justice_in_chain, justice_confirmations = check_tx_in_chain(bitcoin_cli, job.justice_txid, debug, - logging, parent='Responder', + justice_in_chain, justice_confirmations = check_tx_in_chain(job.justice_txid, parent='Responder', tx_label='justice tx') # If both transactions are there, we only need to update the justice tx confirmation count if justice_in_chain: - if debug: - logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format( + logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format( job.justice_txid, job.confirmations, justice_confirmations)) job.confirmations = justice_confirmations @@ -258,9 +199,7 @@ class Responder: # DISCUSS: Adding job back, should we flag it as retried? # FIXME: Whether we decide to increase the retried counter or not, the current counter should be # maintained. There is no way of doing so with the current approach. Update if required - self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx, - job.appointment_end, - debug, logging) + self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx, job.appointment_end) else: # ToDo: #24-properly-handle-reorgs @@ -268,4 +207,3 @@ class Responder: # reorg manager logging.warning("[Responder] dispute and justice transaction missing. Calling the reorg manager") logging.error("[Responder] reorg manager not yet implemented") - pass diff --git a/pisa/tools.py b/pisa/tools.py index 42ad73f..909c64c 100644 --- a/pisa/tools.py +++ b/pisa/tools.py @@ -1,10 +1,13 @@ import re -from pisa.utils.authproxy import JSONRPCException -from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY from http.client import HTTPException +import pisa.conf as conf +from pisa import logging, bitcoin_cli +from pisa.utils.auth_proxy import JSONRPCException +from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY -def check_tx_in_chain(bitcoin_cli, tx_id, debug, logging, parent='', tx_label='transaction'): + +def check_tx_in_chain(tx_id, parent='', tx_label='transaction'): tx_in_chain = False confirmations = 0 @@ -14,22 +17,23 @@ def check_tx_in_chain(bitcoin_cli, tx_id, debug, logging, parent='', tx_label='t if tx_info.get("confirmations"): confirmations = int(tx_info.get("confirmations")) tx_in_chain = True - if debug: - logging.error("[{}] {} found in the blockchain (txid: {}) ".format(parent, tx_label, tx_id)) - elif debug: + logging.error("[{}] {} found in the blockchain (txid: {}) ".format(parent, tx_label, tx_id)) + + else: logging.error("[{}] {} found in mempool (txid: {}) ".format(parent, tx_label, tx_id)) + except JSONRPCException as e: if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: - if debug: - logging.error("[{}] {} not found in mempool nor blockchain (txid: {}) ".format(parent, tx_label, tx_id)) - elif debug: + logging.error("[{}] {} not found in mempool nor blockchain (txid: {}) ".format(parent, tx_label, tx_id)) + + else: # ToDO: Unhandled errors, check this properly logging.error("[{}] JSONRPCException. Error code {}".format(parent, e)) return tx_in_chain, confirmations -def can_connect_to_bitcoind(bitcoin_cli): +def can_connect_to_bitcoind(): can_connect = True try: @@ -40,18 +44,18 @@ def can_connect_to_bitcoind(bitcoin_cli): return can_connect -def in_correct_network(bitcoin_cli, network): +def in_correct_network(): mainnet_genesis_block_hash = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" testnet3_genesis_block_hash = "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943" correct_network = False genesis_block_hash = bitcoin_cli.getblockhash(0) - if network == 'mainnet' and genesis_block_hash == mainnet_genesis_block_hash: + if conf.BTC_NETWORK == 'mainnet' and genesis_block_hash == mainnet_genesis_block_hash: correct_network = True - elif network == 'testnet' and genesis_block_hash == testnet3_genesis_block_hash: + elif conf.BTC_NETWORK == 'testnet' and genesis_block_hash == testnet3_genesis_block_hash: correct_network = True - elif network == 'regtest' and genesis_block_hash not in [mainnet_genesis_block_hash, testnet3_genesis_block_hash]: + elif conf.BTC_NETWORK == 'regtest' and genesis_block_hash not in [mainnet_genesis_block_hash, testnet3_genesis_block_hash]: correct_network = True return correct_network diff --git a/pisa/utils/authproxy.py b/pisa/utils/auth_proxy.py similarity index 100% rename from pisa/utils/authproxy.py rename to pisa/utils/auth_proxy.py diff --git a/pisa/zmq_subscriber.py b/pisa/utils/zmq_subscriber.py similarity index 84% rename from pisa/zmq_subscriber.py rename to pisa/utils/zmq_subscriber.py index 90e706c..9ff9043 100644 --- a/pisa/zmq_subscriber.py +++ b/pisa/utils/zmq_subscriber.py @@ -1,7 +1,9 @@ import zmq import binascii +from pisa import logging from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT + # ToDo: #7-add-async-back-to-zmq class ZMQHandler: """ Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py""" @@ -14,7 +16,7 @@ class ZMQHandler: self.parent = parent self.terminate = False - def handle(self, block_queue, debug, logging): + def handle(self, block_queue): while not self.terminate: msg = self.zmqSubSocket.recv_multipart() @@ -27,5 +29,4 @@ class ZMQHandler: block_hash = binascii.hexlify(body).decode('UTF-8') block_queue.put(block_hash) - if debug: - logging.info("[ZMQHandler-{}] new block received via ZMQ".format(self.parent, block_hash)) + logging.info("[ZMQHandler-{}] new block received via ZMQ".format(self.parent, block_hash)) diff --git a/pisa/watcher.py b/pisa/watcher.py index 680db56..bbf066c 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -1,12 +1,17 @@ -from binascii import hexlify, unhexlify +from uuid import uuid4 from queue import Queue from threading import Thread + +from pisa import logging from pisa.responder import Responder -from pisa.zmq_subscriber import ZMQHandler -from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException -from hashlib import sha256 -from uuid import uuid4 -from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS, EXPIRY_DELTA +from pisa.conf import MAX_APPOINTMENTS +from pisa.block_processor import BlockProcessor +from pisa.cleaner import Cleaner +from pisa.utils.zmq_subscriber import ZMQHandler + + +# WIP: MOVED BLOCKCHAIN RELATED TASKS TO BLOCK PROCESSOR IN AN AIM TO MAKE THE CODE MORE MODULAR. THIS SHOULD HELP +# WITH CODE REUSE WHEN MERGING THE DATA PERSISTENCE PART. class Watcher: @@ -19,7 +24,7 @@ class Watcher: self.zmq_subscriber = None self.responder = Responder() - def add_appointment(self, appointment, debug, logging): + def add_appointment(self, appointment): # Rationale: # The Watcher will analyze every received block looking for appointment matches. If there is no work # to do the watcher can go sleep (if appointments = {} then asleep = True) otherwise for every received block @@ -45,135 +50,68 @@ class Watcher: if self.asleep: self.asleep = False self.block_queue = Queue() - zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging]) - watcher = Thread(target=self.do_watch, args=[debug, logging]) + zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue]) + watcher = Thread(target=self.do_watch) zmq_thread.start() watcher.start() - if debug: - logging.info("[Watcher] waking up!") + logging.info("[Watcher] waking up!") appointment_added = True - if debug: - logging.info('[Watcher] new appointment accepted (locator = {})'.format(appointment.locator)) + logging.info('[Watcher] new appointment accepted (locator = {})'.format(appointment.locator)) else: appointment_added = False - if debug: - logging.info('[Watcher] maximum appointments reached, appointment rejected (locator = {})' - .format(appointment.locator)) + logging.info('[Watcher] maximum appointments reached, appointment rejected (locator = {})'.format( + appointment.locator)) return appointment_added - def do_subscribe(self, block_queue, debug, logging): + def do_subscribe(self, block_queue): self.zmq_subscriber = ZMQHandler(parent='Watcher') - self.zmq_subscriber.handle(block_queue, debug, logging) - - def do_watch(self, debug, logging): - bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, - BTC_RPC_PORT)) + self.zmq_subscriber.handle(block_queue) + def do_watch(self): while len(self.appointments) > 0: block_hash = self.block_queue.get() + logging.info("[Watcher] new block received {}".format(block_hash)) - try: - block = bitcoin_cli.getblock(block_hash) + block = BlockProcessor.getblock(block_hash) + + if block is not None: txids = block.get('tx') - if debug: - logging.info("[Watcher] new block received {}".format(block_hash)) - logging.info("[Watcher] list of transactions: {}".format(txids)) + logging.info("[Watcher] list of transactions: {}".format(txids)) - self.delete_expired_appointment(block, debug, logging) + self.appointments, self.locator_uuid_map = Cleaner.delete_expired_appointment( + block, self.appointments, self.locator_uuid_map) - potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids} - - # Check is any of the tx_ids in the received block is an actual match - # Get the locators that are both in the map and in the potential locators dict. - intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys()) - potential_matches = {locator: potential_locators[locator] for locator in intersection} - - if debug: - if len(potential_matches) > 0: - logging.info("[Watcher] list of potential matches: {}".format(potential_matches)) - else: - logging.info("[Watcher] no potential matches found") - - matches = self.check_potential_matches(potential_matches, bitcoin_cli, debug, logging) + potential_matches = BlockProcessor.get_potential_matches(txids, self.locator_uuid_map) + matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments) for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches: - if debug: - logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})" - .format(justice_txid, locator, uuid)) + logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})" + .format(justice_txid, locator, uuid)) self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, - self.appointments[uuid].end_time, debug, logging) + self.appointments[uuid].end_time) # Delete the appointment self.appointments.pop(uuid) # If there was only one appointment that matches the locator we can delete the whole list if len(self.locator_uuid_map[locator]) == 1: - # ToDo: #9-add-data-persistency + # ToDo: #9-add-data-persistence self.locator_uuid_map.pop(locator) else: # Otherwise we just delete the appointment that matches locator:appointment_pos - # ToDo: #9-add-data-persistency + # ToDo: #9-add-data-persistence self.locator_uuid_map[locator].remove(uuid) - except JSONRPCException as e: - if debug: - logging.error("[Watcher] couldn't get block from bitcoind. Error code {}".format(e)) - # Go back to sleep if there are no more appointments self.asleep = True self.zmq_subscriber.terminate = True - if debug: - logging.error("[Watcher] no more pending appointments, going back to sleep") - - def delete_expired_appointment(self, block, debug, logging): - to_delete = [uuid for uuid, appointment in self.appointments.items() if block["height"] > appointment.end_time - + EXPIRY_DELTA] - - for uuid in to_delete: - # ToDo: #9-add-data-persistency - locator = self.appointments[uuid].locator - - self.appointments.pop(uuid) - - if len(self.locator_uuid_map[locator]) == 1: - self.locator_uuid_map.pop(locator) - - else: - self.locator_uuid_map[locator].remove(uuid) - - if debug: - logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})" - .format(locator, uuid)) - - def check_potential_matches(self, potential_matches, bitcoin_cli, debug, logging): - matches = [] - - for locator, dispute_txid in potential_matches.items(): - for uuid in self.locator_uuid_map[locator]: - try: - # ToDo: #20-test-tx-decrypting-edge-cases - justice_rawtx = self.appointments[uuid].encrypted_blob.decrypt(unhexlify(dispute_txid), debug, - logging) - justice_rawtx = hexlify(justice_rawtx).decode() - justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid') - matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx)) - - if debug: - logging.info("[Watcher] match found for locator {} (uuid: {}): {}".format(locator, uuid, - justice_txid)) - except JSONRPCException as e: - # Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple - # for the POC - if debug: - logging.error("[Watcher] can't build transaction from decoded data. Error code {}".format(e)) - - return matches + logging.error("[Watcher] no more pending appointments, going back to sleep")