diff --git a/.gitignore b/.gitignore index dfdbf33..6560657 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ bitcoin.conf* apps/cli/*.json appointments/ test.py +*.pyc +.cache diff --git a/apps/cli/__init__.py b/apps/cli/__init__.py index 40e495c..20bb9fb 100644 --- a/apps/cli/__init__.py +++ b/apps/cli/__init__.py @@ -12,7 +12,7 @@ SUPPORTED_HASH_FUNCTIONS = ["SHA256"] SUPPORTED_CIPHERS = ["AES-GCM-128"] # Configure logging -logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[ +logging.basicConfig(format='%(message)s', level=logging.INFO, handlers=[ logging.FileHandler(CLIENT_LOG_FILE), logging.StreamHandler() ]) diff --git a/apps/cli/blob.py b/apps/cli/blob.py index 6041050..5e6f9da 100644 --- a/apps/cli/blob.py +++ b/apps/cli/blob.py @@ -3,8 +3,10 @@ from hashlib import sha256 from binascii import hexlify, unhexlify from cryptography.hazmat.primitives.ciphers.aead import AESGCM -from apps.cli import logging from apps.cli import SUPPORTED_HASH_FUNCTIONS, SUPPORTED_CIPHERS +from pisa.logger import Logger + +logger = Logger("Client") class Blob: @@ -50,10 +52,10 @@ class Blob: encrypted_blob = aesgcm.encrypt(nonce=nonce, data=tx, associated_data=None) encrypted_blob = hexlify(encrypted_blob).decode() - logging.info("[Client] creating new blob") - logging.info("[Client] master key: {}".format(hexlify(master_key).decode())) - logging.info("[Client] sk: {}".format(hexlify(sk).decode())) - logging.info("[Client] nonce: {}".format(hexlify(nonce).decode())) - logging.info("[Client] encrypted_blob: {}".format(encrypted_blob)) + logger.info("Creating new blob", + master_key=hexlify(master_key).decode(), + sk=hexlify(sk).decode(), + nonce=hexlify(nonce).decode(), + encrypted_blob=encrypted_blob) return encrypted_blob diff --git a/apps/cli/pisa-cli.py b/apps/cli/pisa-cli.py index 6327214..8b181a1 100644 --- a/apps/cli/pisa-cli.py +++ b/apps/cli/pisa-cli.py @@ -2,7 +2,6 @@ import re import os import sys import json -import logging import requests from sys import argv from hashlib import sha256 @@ -10,11 +9,15 @@ from binascii import unhexlify from getopt import getopt, GetoptError from requests import ConnectTimeout, ConnectionError +from pisa.logger import Logger from apps.cli.blob import Blob from apps.cli.help import help_add_appointment, help_get_appointment from apps.cli import DEFAULT_PISA_API_SERVER, DEFAULT_PISA_API_PORT +logger = Logger("Client") + + # FIXME: TESTING ENDPOINT, WON'T BE THERE IN PRODUCTION def generate_dummy_appointment(): get_block_count_end_point = "http://{}:{}/get_block_count".format(pisa_api_server, pisa_api_port) @@ -49,14 +52,14 @@ def add_appointment(args): if os.path.isfile(fin): appointment_data = json.load(open(fin)) else: - logging.error("[Client] can't find file " + fin) + logger.error("Can't find file " + fin) else: - logging.error("[Client] no file provided as appointment. " + use_help) + logger.error("No file provided as appointment. " + use_help) else: appointment_data = json.loads(arg_opt) except json.JSONDecodeError: - logging.error("[Client] non-JSON encoded data provided as appointment. " + use_help) + logger.error("Non-JSON encoded data provided as appointment. " + use_help) if appointment_data: valid_locator = check_txid_format(appointment_data.get('tx_id')) @@ -67,22 +70,22 @@ def add_appointment(args): appointment_data.get('start_time'), appointment_data.get('end_time'), appointment_data.get('dispute_delta')) - logging.info("[Client] sending appointment to PISA") + logger.info("Sending appointment to PISA") try: r = requests.post(url=add_appointment_endpoint, json=json.dumps(appointment), timeout=5) - logging.info("[Client] {} (code: {}).".format(r.text, r.status_code)) + logger.info("{} (code: {}).".format(r.text, r.status_code)) except ConnectTimeout: - logging.error("[Client] can't connect to pisa API. Connection timeout.") + logger.error("Can't connect to pisa API. Connection timeout.") except ConnectionError: - logging.error("[Client] can't connect to pisa API. Server cannot be reached.") + logger.error("Can't connect to pisa API. Server cannot be reached.") else: - logging.error("[Client] the provided locator is not valid.") + logger.error("The provided locator is not valid.") else: - logging.error("[Client] no appointment data provided. " + use_help) + logger.error("No appointment data provided. " + use_help) def get_appointment(args): @@ -104,16 +107,16 @@ def get_appointment(args): print(json.dumps(r.json(), indent=4, sort_keys=True)) except ConnectTimeout: - logging.error("[Client] can't connect to pisa API. Connection timeout.") + logger.error("Can't connect to pisa API. Connection timeout.") except ConnectionError: - logging.error("[Client] can't connect to pisa API. Server cannot be reached.") + logger.error("Can't connect to pisa API. Server cannot be reached.") else: - logging.error("[Client] the provided locator is not valid.") + logger.error("The provided locator is not valid.") else: - logging.error("[Client] the provided locator is not valid.") + logger.error("The provided locator is not valid.") def build_appointment(tx, tx_id, start_block, end_block, dispute_delta): @@ -199,7 +202,7 @@ if __name__ == '__main__': sys.exit(help_get_appointment()) else: - logging.error("[Client] unknown command. Use help to check the list of available commands") + logger.error("Unknown command. Use help to check the list of available commands") else: sys.exit(show_usage()) @@ -210,14 +213,14 @@ if __name__ == '__main__': generate_dummy_appointment() else: - logging.error("[Client] unknown command. Use help to check the list of available commands") + logger.error("Unknown command. Use help to check the list of available commands") else: - logging.error("[Client] no command provided. Use help to check the list of available commands.") + logger.error("No command provided. Use help to check the list of available commands.") except GetoptError as e: - logging.error("[Client] {}".format(e)) + logger.error("{}".format(e)) except json.JSONDecodeError as e: - logging.error("[Client] non-JSON encoded appointment passed as parameter.") + logger.error("Non-JSON encoded appointment passed as parameter.") diff --git a/pisa/__init__.py b/pisa/__init__.py index a279a5c..5d03345 100644 --- a/pisa/__init__.py +++ b/pisa/__init__.py @@ -3,12 +3,11 @@ import logging from pisa.utils.auth_proxy import AuthServiceProxy import pisa.conf as conf - HOST = 'localhost' PORT = 9814 # Configure logging -logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[ +logging.basicConfig(format='%(message)s', level=logging.INFO, handlers=[ logging.FileHandler(conf.SERVER_LOG_FILE), logging.StreamHandler() ]) @@ -17,4 +16,3 @@ logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handle # TODO: Check if a long lived connection like this may create problems (timeouts) bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (conf.BTC_RPC_USER, conf.BTC_RPC_PASSWD, conf.BTC_RPC_HOST, conf.BTC_RPC_PORT)) - diff --git a/pisa/api.py b/pisa/api.py index 44244ee..cd05e02 100644 --- a/pisa/api.py +++ b/pisa/api.py @@ -1,9 +1,10 @@ import json from flask import Flask, request, Response, abort, jsonify +from pisa import HOST, PORT, logging +from pisa.logger import Logger from pisa.watcher import Watcher from pisa.inspector import Inspector -from pisa import HOST, PORT, logging from pisa.appointment import Appointment from pisa.block_processor import BlockProcessor @@ -14,13 +15,15 @@ HTTP_OK = 200 HTTP_BAD_REQUEST = 400 HTTP_SERVICE_UNAVAILABLE = 503 +logger = Logger("API") + @app.route('/', methods=['POST']) def add_appointment(): remote_addr = request.environ.get('REMOTE_ADDR') remote_port = request.environ.get('REMOTE_PORT') - logging.info('[API] connection accepted from {}:{}'.format(remote_addr, remote_port)) + logger.info('Connection accepted', from_addr_port='{}:{}'.format(remote_addr, remote_port)) # Check content type once if properly defined request_data = json.loads(request.get_json()) @@ -46,7 +49,8 @@ def add_appointment(): rcode = HTTP_BAD_REQUEST response = "appointment rejected. Request does not match the standard" - logging.info('[API] sending response and disconnecting: {} --> {}:{}'.format(response, remote_addr, remote_port)) + logger.info('Sending response and disconnecting', + from_addr_port='{}:{}'.format(remote_addr, remote_port), response=response) return Response(response, status=rcode, mimetype='text/plain') diff --git a/pisa/block_processor.py b/pisa/block_processor.py index eb09e45..fdd71a8 100644 --- a/pisa/block_processor.py +++ b/pisa/block_processor.py @@ -1,9 +1,12 @@ import binascii from hashlib import sha256 -from pisa import logging, bitcoin_cli +from pisa import bitcoin_cli +from pisa.logger import Logger from pisa.utils.auth_proxy import JSONRPCException +logger = Logger("BlockProcessor") + class BlockProcessor: @staticmethod @@ -14,7 +17,7 @@ class BlockProcessor: except JSONRPCException as e: block = None - logging.error("[BlockProcessor] couldn't get block from bitcoind. Error code {}".format(e)) + logger.error("Couldn't get block from bitcoind.", error_code=e) return block @@ -26,7 +29,7 @@ class BlockProcessor: except JSONRPCException as e: block_hash = None - logging.error("[BlockProcessor] couldn't get block hash. Error code {}".format(e)) + logger.error("Couldn't get block hash.", error_code=e) return block_hash @@ -38,7 +41,7 @@ class BlockProcessor: except JSONRPCException as e: block_count = None - logging.error("[BlockProcessor] couldn't get block block count. Error code {}".format(e)) + logger.error("Couldn't get block count", error_code=e) return block_count @@ -54,10 +57,10 @@ class BlockProcessor: potential_matches = {locator: potential_locators[locator] for locator in intersection} if len(potential_matches) > 0: - logging.info("[BlockProcessor] list of potential matches: {}".format(potential_matches)) + logger.info("List of potential matches", potential_matches=potential_matches) else: - logging.info("[BlockProcessor] no potential matches found") + logger.info("No potential matches found") return potential_matches @@ -73,13 +76,12 @@ class BlockProcessor: justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid') matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx)) - logging.info("[BlockProcessor] match found for locator {} (uuid: {}): {}".format( - locator, uuid, justice_txid)) + logger.info("Match found for locator.", locator=locator, uuid=uuid, justice_txid=justice_txid) except JSONRPCException as e: # Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple # for the POC - logging.error("[BlockProcessor] can't build transaction from decoded data. Error code {}".format(e)) + logger.error("Can't build transaction from decoded data.", error_code=e) return matches @@ -91,7 +93,7 @@ class BlockProcessor: if tx in tx_job_map and tx in unconfirmed_txs: unconfirmed_txs.remove(tx) - logging.info("[Responder] confirmation received for tx {}".format(tx)) + logger.info("Confirmation received for transaction", tx=tx) elif tx in unconfirmed_txs: if tx in missed_confirmations: @@ -100,8 +102,6 @@ class BlockProcessor: else: missed_confirmations[tx] = 1 - logging.info("[Responder] tx {} missed a confirmation (total missed: {})" - .format(tx, missed_confirmations[tx])) + logger.info("Transaction missed a confirmation", tx=tx, missed_confirmations=missed_confirmations[tx]) return unconfirmed_txs, missed_confirmations - diff --git a/pisa/carrier.py b/pisa/carrier.py index 2286070..20990fa 100644 --- a/pisa/carrier.py +++ b/pisa/carrier.py @@ -1,8 +1,11 @@ from pisa.rpc_errors import * -from pisa import logging, bitcoin_cli +from pisa import bitcoin_cli +from pisa.logger import Logger from pisa.utils.auth_proxy import JSONRPCException from pisa.errors import UNKNOWN_JSON_RPC_EXCEPTION +logger = Logger("Carrier") + class Receipt: def __init__(self, delivered, confirmations=0, reason=None): @@ -14,7 +17,7 @@ class Receipt: class Carrier: def send_transaction(self, rawtx, txid): try: - logging.info("[Carrier] pushing transaction to the network (txid: {})".format(rawtx)) + logger.info("Pushing transaction to the network", txid=txid, rawtx=rawtx) bitcoin_cli.sendrawtransaction(rawtx) receipt = Receipt(delivered=True) @@ -41,7 +44,7 @@ class Carrier: receipt = Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION) elif errno == RPC_VERIFY_ALREADY_IN_CHAIN: - logging.info("[Carrier] {} is already in the blockchain. Getting confirmation count".format(txid)) + logger.info("Transaction is already in the blockchain. Getting confirmation count", txid=txid) # If the transaction is already in the chain, we get the number of confirmations and watch the job # until the end of the appointment @@ -52,7 +55,7 @@ class Carrier: receipt = Receipt(delivered=True, confirmations=confirmations, reason=RPC_VERIFY_ALREADY_IN_CHAIN) else: - # There's a really unlike edge case where a transaction can be reorged between receiving the + # There's a really unlikely edge case where a transaction can be reorged between receiving the # notification and querying the data. In such a case we just resend self.send_transaction(rawtx, txid) @@ -65,8 +68,8 @@ class Carrier: else: # If something else happens (unlikely but possible) log it so we can treat it in future releases - logging.error("[Responder] JSONRPCException. Error {}".format(e)) - receipt = Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION) + logger.error("JSONRPCException.", error_code=e) + receipt = self.Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION) return receipt @@ -81,12 +84,11 @@ class Carrier: # reorged while we were querying bitcoind to get the confirmation count. In such a case we just # restart the job if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: - logging.info("[Carrier] transaction {} got reorged before obtaining information".format(txid)) + logger.info("Transaction got reorged before obtaining information", txid=txid) # TODO: Check RPC methods to see possible returns and avoid general else # else: # # If something else happens (unlikely but possible) log it so we can treat it in future releases - # logging.error("[Responder] JSONRPCException. Error {}".format(e)) + # logger.error("JSONRPCException.", error_code=e) return tx_info - diff --git a/pisa/cleaner.py b/pisa/cleaner.py index b7c2947..00076b4 100644 --- a/pisa/cleaner.py +++ b/pisa/cleaner.py @@ -1,4 +1,6 @@ -from pisa import logging +from pisa.logger import Logger + +logger = Logger("Cleaner") # Dictionaries in Python are "passed-by-reference", so no return is needed for the Cleaner" # https://docs.python.org/3/faq/programming.html#how-do-i-write-a-function-with-output-parameters-call-by-reference @@ -18,14 +20,13 @@ class Cleaner: else: locator_uuid_map[locator].remove(uuid) - logging.info("[Cleaner] end time reached with no match! Deleting appointment {} (uuid: {})".format(locator, - uuid)) + logger.info("End time reached with no match. Deleting appointment.", locator=locator, uuid=uuid) @staticmethod def delete_completed_jobs(jobs, tx_job_map, completed_jobs, height): for uuid, confirmations in completed_jobs: - logging.info("[Cleaner] job completed (uuid = {}). Appointment ended at block {} after {} confirmations" - .format(uuid, height, confirmations)) + logger.info("Job completed. Appointment ended after reaching enough confirmations.", + uuid=uuid, height=height, confirmations=confirmations) # ToDo: #9-add-data-persistence justice_txid = jobs[uuid].justice_txid @@ -34,7 +35,7 @@ class Cleaner: if len(tx_job_map[justice_txid]) == 1: tx_job_map.pop(justice_txid) - logging.info("[Cleaner] no more jobs for justice_txid {}".format(justice_txid)) + logger.info("No more jobs for justice transaction.", justice_txid=justice_txid) else: tx_job_map[justice_txid].remove(uuid) diff --git a/pisa/encrypted_blob.py b/pisa/encrypted_blob.py index ffc3e38..68c1fe5 100644 --- a/pisa/encrypted_blob.py +++ b/pisa/encrypted_blob.py @@ -1,8 +1,9 @@ from hashlib import sha256 from binascii import unhexlify, hexlify from cryptography.hazmat.primitives.ciphers.aead import AESGCM +from pisa.logger import Logger -from pisa import logging +logger = Logger("Watcher") # FIXME: EncryptedBlob is assuming AES-128-GCM. A cipher field should be part of the object and the decryption should be @@ -23,11 +24,11 @@ class EncryptedBlob: sk = master_key[:16] nonce = master_key[16:] - logging.info("[Watcher] creating new blob") - logging.info("[Watcher] master key: {}".format(hexlify(master_key).decode())) - logging.info("[Watcher] sk: {}".format(hexlify(sk).decode())) - logging.info("[Watcher] nonce: {}".format(hexlify(nonce).decode())) - logging.info("[Watcher] encrypted_blob: {}".format(self.data)) + logger.info("Creating new blob.", + master_key=hexlify(master_key).decode(), + sk=hexlify(sk).decode(), + nonce=hexlify(sk).decode(), + encrypted_blob=self.data) # Decrypt aesgcm = AESGCM(sk) diff --git a/pisa/inspector.py b/pisa/inspector.py index ae9ca89..53055b8 100644 --- a/pisa/inspector.py +++ b/pisa/inspector.py @@ -2,10 +2,13 @@ import re from pisa import errors import pisa.conf as conf -from pisa import logging +from pisa import bitcoin_cli +from pisa.logger import Logger from pisa.appointment import Appointment from pisa.block_processor import BlockProcessor +logger = Logger("Inspector") + # FIXME: The inspector logs the wrong messages sent form the users. A possible attack surface would be to send a really # long field that, even if not accepted by PISA, would be stored in the logs. This is a possible DoS surface # since pisa would store any kind of message (no matter the length). Solution: truncate the length of the fields @@ -71,7 +74,7 @@ class Inspector: message = "wrong locator format ({})".format(locator) if message is not None: - logging.error("[Inspector] {}".format(message)) + logger.error(message) return rcode, message @@ -99,7 +102,7 @@ class Inspector: message = "start_time is too close to current height" if message is not None: - logging.error("[Inspector] {}".format(message)) + logger.error(message) return rcode, message @@ -133,7 +136,7 @@ class Inspector: message = 'end_time is too close to current height' if message is not None: - logging.error("[Inspector] {}".format(message)) + logger.error(message) return rcode, message @@ -156,7 +159,7 @@ class Inspector: conf.MIN_DISPUTE_DELTA, dispute_delta) if message is not None: - logging.error("[Inspector] {}".format(message)) + logger.error(message) return rcode, message @@ -179,7 +182,7 @@ class Inspector: message = "wrong encrypted_blob format ({})".format(encrypted_blob) if message is not None: - logging.error("[Inspector] {}".format(message)) + logger.error(message) return rcode, message @@ -201,7 +204,7 @@ class Inspector: message = "cipher not supported: {}".format(cipher) if message is not None: - logging.error("[Inspector] {}".format(message)) + logger.error(message) return rcode, message @@ -223,6 +226,6 @@ class Inspector: message = "hash_function not supported {}".format(hash_function) if message is not None: - logging.error("[Inspector] {}".format(message)) + logger.error(message) return rcode, message diff --git a/pisa/logger.py b/pisa/logger.py new file mode 100644 index 0000000..e6a1f4e --- /dev/null +++ b/pisa/logger.py @@ -0,0 +1,33 @@ +import logging +import time +import json + + +class StructuredMessage(object): + def __init__(self, message, **kwargs): + self.message = message + self.time = time.asctime() + self.kwargs = kwargs + + def __str__(self): + return json.dumps({**self.kwargs, "message": self.message, "time": self.time}) + + +class Logger(object): + def __init__(self, actor=None): + self.actor = actor + + def _add_prefix(self, msg): + return msg if self.actor is None else "[{}] {}".format(self.actor, msg) + + def info(self, msg, **kwargs): + logging.info(StructuredMessage(self._add_prefix(msg), actor=self.actor, **kwargs)) + + def debug(self, msg, **kwargs): + logging.debug(StructuredMessage(self._add_prefix(msg), actor=self.actor, **kwargs)) + + def error(self, msg, **kwargs): + logging.error(StructuredMessage(self._add_prefix(msg), actor=self.actor, **kwargs)) + + def warning(self, msg, **kwargs): + logging.warning(StructuredMessage(self._add_prefix(msg), actor=self.actor, **kwargs)) diff --git a/pisa/pisad.py b/pisa/pisad.py index 8fbce83..152d7c9 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -1,10 +1,11 @@ from sys import argv from getopt import getopt -from pisa import logging +from pisa.logger import Logger from pisa.api import start_api from pisa.tools import can_connect_to_bitcoind, in_correct_network +logger = Logger("Daemon") if __name__ == '__main__': debug = False @@ -19,8 +20,7 @@ if __name__ == '__main__': start_api() else: - logging.error("[Pisad] bitcoind is running on a different network, check conf.py and bitcoin.conf. " - "Shutting down") + logger.error("bitcoind is running on a different network, check conf.py and bitcoin.conf. Shutting down") else: - logging.error("[Pisad] can't connect to bitcoind. Shutting down") + logger.error("Can't connect to bitcoind. Shutting down") diff --git a/pisa/responder.py b/pisa/responder.py index 08f082e..87f64ec 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -3,9 +3,9 @@ from threading import Thread from hashlib import sha256 from binascii import unhexlify +from pisa.logger import Logger from pisa.cleaner import Cleaner from pisa.carrier import Carrier -from pisa import logging from pisa.tools import check_tx_in_chain from pisa.block_processor import BlockProcessor from pisa.utils.zmq_subscriber import ZMQHandler @@ -13,6 +13,8 @@ from pisa.utils.zmq_subscriber import ZMQHandler CONFIRMATIONS_BEFORE_RETRY = 6 MIN_CONFIRMATIONS = 6 +logger = Logger("Responder") + class Job: def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry_counter=0): @@ -45,7 +47,7 @@ class Responder: def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False): if self.asleep: - logging.info("[Responder] waking up!") + logger.info("Waking up") carrier = Carrier() receipt = carrier.send_transaction(justice_rawtx, justice_txid) @@ -80,8 +82,8 @@ class Responder: if confirmations == 0: self.unconfirmed_txs.append(justice_txid) - logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})' - .format(dispute_txid, justice_txid, appointment_end)) + logger.info("New job added.", + dispute_txid=dispute_txid, justice_txid=justice_txid, appointment_end=appointment_end) if self.asleep: self.asleep = False @@ -109,9 +111,8 @@ class Responder: txs = block.get('tx') height = block.get('height') - logging.info("[Responder] new block received {}".format(block_hash)) - logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash'))) - logging.info("[Responder] list of transactions: {}".format(txs)) + logger.info("New block received", + block_hash=block_hash, prev_block_hash=block.get('previousblockhash'), txs=txs) # ToDo: #9-add-data-persistence # change prev_block_hash condition @@ -125,8 +126,9 @@ class Responder: self.rebroadcast(txs_to_rebroadcast) else: - logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}" - .format(prev_block_hash, block.get('previousblockhash'))) + logger.warning("Reorg found", + local_prev_block_hash=prev_block_hash, + remote_prev_block_hash=block.get('previousblockhash')) self.handle_reorgs() @@ -136,7 +138,7 @@ class Responder: self.asleep = True self.zmq_subscriber.terminate = True - logging.info("[Responder] no more pending jobs, going back to sleep") + logger.info("No more pending jobs, going back to sleep") def get_txs_to_rebroadcast(self, txs): txs_to_rebroadcast = [] @@ -172,25 +174,28 @@ class Responder: self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid, self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True) - logging.warning("[Responder] tx {} has missed {} confirmations. Rebroadcasting" - .format(self.jobs[uuid].justice_txid, CONFIRMATIONS_BEFORE_RETRY)) + logger.warning("Transaction has missed many confirmations. Rebroadcasting.", + justice_txid=self.jobs[uuid].justice_txid, + confirmations_missed=CONFIRMATIONS_BEFORE_RETRY) # FIXME: Legacy code, must be checked and updated/fixed def handle_reorgs(self): for uuid, job in self.jobs.items(): # First we check if the dispute transaction is still in the blockchain. If not, the justice can not be # there either, so we'll need to call the reorg manager straight away - dispute_in_chain, _ = check_tx_in_chain(job.dispute_txid, parent='Responder', tx_label='dispute tx') + dispute_in_chain, _ = check_tx_in_chain(job.dispute_txid, logger=logger, tx_label='Dispute tx') # If the dispute is there, we can check the justice tx if dispute_in_chain: - justice_in_chain, justice_confirmations = check_tx_in_chain(job.justice_txid, parent='Responder', - tx_label='justice tx') + justice_in_chain, justice_confirmations = check_tx_in_chain(job.justice_txid, logger=logger, + tx_label='Justice tx') # If both transactions are there, we only need to update the justice tx confirmation count if justice_in_chain: - logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format( - job.justice_txid, job.confirmations, justice_confirmations)) + logger.info("Updating confirmation count for transaction.", + justice_txid=job.justice_txid, + prev_count=job.confirmations, + curr_count=justice_confirmations) job.confirmations = justice_confirmations @@ -203,7 +208,7 @@ class Responder: else: # ToDo: #24-properly-handle-reorgs - # FIXME: if the dispute is not on chain (either in mempool or not there al all), we need to call the + # FIXME: if the dispute is not on chain (either in mempool or not there at all), we need to call the # reorg manager - logging.warning("[Responder] dispute and justice transaction missing. Calling the reorg manager") - logging.error("[Responder] reorg manager not yet implemented") + logger.warning("Dispute and justice transaction missing. Calling the reorg manager") + logger.error("Reorg manager not yet implemented") diff --git a/pisa/tools.py b/pisa/tools.py index 909c64c..db6c33d 100644 --- a/pisa/tools.py +++ b/pisa/tools.py @@ -2,12 +2,14 @@ import re from http.client import HTTPException import pisa.conf as conf -from pisa import logging, bitcoin_cli +from pisa import bitcoin_cli +from pisa.logger import Logger from pisa.utils.auth_proxy import JSONRPCException from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY -def check_tx_in_chain(tx_id, parent='', tx_label='transaction'): +# TODO: currently only used in the Responder; might move there or in the BlockProcessor +def check_tx_in_chain(tx_id, logger=Logger(), tx_label='Transaction'): tx_in_chain = False confirmations = 0 @@ -17,18 +19,18 @@ def check_tx_in_chain(tx_id, parent='', tx_label='transaction'): if tx_info.get("confirmations"): confirmations = int(tx_info.get("confirmations")) tx_in_chain = True - logging.error("[{}] {} found in the blockchain (txid: {}) ".format(parent, tx_label, tx_id)) + logger.error("{} found in the blockchain".format(tx_label), txid=tx_id) else: - logging.error("[{}] {} found in mempool (txid: {}) ".format(parent, tx_label, tx_id)) + logger.error("{} found in mempool".format(tx_label), txid=tx_id) except JSONRPCException as e: if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: - logging.error("[{}] {} not found in mempool nor blockchain (txid: {}) ".format(parent, tx_label, tx_id)) + logger.error("{} not found in mempool nor blockchain".format(tx_label), txid=tx_id) else: # ToDO: Unhandled errors, check this properly - logging.error("[{}] JSONRPCException. Error code {}".format(parent, e)) + logger.error("JSONRPCException.", error_code=e) return tx_in_chain, confirmations diff --git a/pisa/utils/zmq_subscriber.py b/pisa/utils/zmq_subscriber.py index 75e175d..76f0150 100644 --- a/pisa/utils/zmq_subscriber.py +++ b/pisa/utils/zmq_subscriber.py @@ -1,7 +1,6 @@ import zmq import binascii - -from pisa import logging +from pisa.logger import Logger from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT @@ -14,7 +13,8 @@ class ZMQHandler: self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) - self.parent = parent + self.logger = Logger("ZMQHandler-{}".format(parent)) + self.terminate = False def handle(self, block_queue): @@ -30,4 +30,4 @@ class ZMQHandler: block_hash = binascii.hexlify(body).decode('UTF-8') block_queue.put(block_hash) - logging.info("[ZMQHandler-{}] new block received via ZMQ".format(self.parent, block_hash)) + self.logger.info("New block received via ZMQ", block_hash=block_hash) diff --git a/pisa/watcher.py b/pisa/watcher.py index ab8da44..5248b77 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -2,7 +2,7 @@ from uuid import uuid4 from queue import Queue from threading import Thread -from pisa import logging +from pisa.logger import Logger from pisa.cleaner import Cleaner from pisa.conf import EXPIRY_DELTA from pisa.responder import Responder @@ -10,6 +10,8 @@ from pisa.conf import MAX_APPOINTMENTS from pisa.block_processor import BlockProcessor from pisa.utils.zmq_subscriber import ZMQHandler +logger = Logger("Watcher") + class Watcher: def __init__(self, max_appointments=MAX_APPOINTMENTS): @@ -52,35 +54,34 @@ class Watcher: zmq_thread.start() watcher.start() - logging.info("[Watcher] waking up!") + logger.info("Waking up") appointment_added = True - logging.info('[Watcher] new appointment accepted (locator = {})'.format(appointment.locator)) + logger.info("New appointment accepted.", locator=appointment.locator) else: appointment_added = False - logging.info('[Watcher] maximum appointments reached, appointment rejected (locator = {})'.format( - appointment.locator)) + logger.info("Maximum appointments reached, appointment rejected.", locator=appointment.locator) return appointment_added def do_subscribe(self, block_queue): - self.zmq_subscriber = ZMQHandler(parent='Watcher') + self.zmq_subscriber = ZMQHandler(parent="Watcher") self.zmq_subscriber.handle(block_queue) def do_watch(self): while len(self.appointments) > 0: block_hash = self.block_queue.get() - logging.info("[Watcher] new block received {}".format(block_hash)) + logger.info("New block received", block_hash=block_hash) block = BlockProcessor.get_block(block_hash) if block is not None: txids = block.get('tx') - logging.info("[Watcher] list of transactions: {}".format(txids)) + logger.info("List of transactions.", txids=txids) expired_appointments = [uuid for uuid, appointment in self.appointments.items() if block["height"] > appointment.end_time + EXPIRY_DELTA] @@ -91,8 +92,8 @@ class Watcher: matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments) for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches: - logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})" - .format(justice_txid, locator, uuid)) + logger.info("Notifying responder and deleting appointment.", + justice_txid=justice_txid, locator=locator, uuid=uuid) self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, self.appointments[uuid].end_time) @@ -113,4 +114,4 @@ class Watcher: self.asleep = True self.zmq_subscriber.terminate = True - logging.error("[Watcher] no more pending appointments, going back to sleep") + logger.error("No more pending appointments, going back to sleep") diff --git a/test/unit/test_api.py b/test/unit/test_api.py index 8505bfa..5e4a6ee 100644 --- a/test/unit/test_api.py +++ b/test/unit/test_api.py @@ -183,8 +183,3 @@ def test_get_all_appointments_responder(): assert (set(responder_jobs) == set(local_locators)) assert (len(received_appointments["watcher_appointments"]) == 0) - - - - - diff --git a/test/unit/test_blob.py b/test/unit/test_blob.py index efd9e1a..7eb3418 100644 --- a/test/unit/test_blob.py +++ b/test/unit/test_blob.py @@ -87,5 +87,3 @@ def test_encrypt(): encrypted_blob2 = blob.encrypt(key) assert(encrypted_blob == encrypted_blob2 and id(encrypted_blob) != id(encrypted_blob2)) - - diff --git a/test/unit/test_block_processor.py b/test/unit/test_block_processor.py index b0a2bba..a57fbb0 100644 --- a/test/unit/test_block_processor.py +++ b/test/unit/test_block_processor.py @@ -73,4 +73,3 @@ def test_potential_matches_random_data(locator_uuid_map): # None of the txids should match assert len(potential_matches) == 0 - diff --git a/test/unit/test_cleaner.py b/test/unit/test_cleaner.py index 5206118..237a2e3 100644 --- a/test/unit/test_cleaner.py +++ b/test/unit/test_cleaner.py @@ -78,4 +78,3 @@ def test_delete_completed_jobs(): Cleaner.delete_completed_jobs(jobs, tx_job_map, completed_jobs, 0) assert not set(completed_jobs).issubset(jobs.keys()) - diff --git a/test/unit/test_encrypted_blob.py b/test/unit/test_encrypted_blob.py index 096c316..67270c2 100644 --- a/test/unit/test_encrypted_blob.py +++ b/test/unit/test_encrypted_blob.py @@ -34,6 +34,3 @@ def test_decrypt(): encrypted_blob = EncryptedBlob(encrypted_data) assert(encrypted_blob.decrypt(key) == data) - - - diff --git a/test/unit/test_inspector.py b/test/unit/test_inspector.py index 3aa68f6..551cfe3 100644 --- a/test/unit/test_inspector.py +++ b/test/unit/test_inspector.py @@ -230,4 +230,3 @@ def test_inspect(): and appointment.end_time == end_time and appointment.dispute_delta == dispute_delta and appointment.encrypted_blob.data == encrypted_blob and appointment.cipher == cipher and appointment.hash_function == hash_function) - diff --git a/test/unit/test_tools.py b/test/unit/test_tools.py new file mode 100644 index 0000000..0e96b7c --- /dev/null +++ b/test/unit/test_tools.py @@ -0,0 +1,17 @@ +from pisa.tools import check_txid_format +from pisa import logging + +logging.getLogger().disabled = True + + +def test_check_txid_format(): + assert(check_txid_format(None) is False) + assert(check_txid_format("") is False) + assert(check_txid_format(0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef) is False) # wrong type + assert(check_txid_format("abcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") is True) # lowercase + assert(check_txid_format("ABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCD") is True) # uppercase + assert(check_txid_format("0123456789abcdef0123456789ABCDEF0123456789abcdef0123456789ABCDEF") is True) # mixed case + assert(check_txid_format("0123456789012345678901234567890123456789012345678901234567890123") is True) # only nums + assert(check_txid_format("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdf") is False) # too short + assert(check_txid_format("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0") is False) # too long + assert(check_txid_format("g123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef") is False) # non-hex