From bae9b6b9133608eabb15ca033aeb7a83c147ae61 Mon Sep 17 00:00:00 2001 From: Salvatore Ingala <6681844+bigspider@users.noreply.github.com> Date: Tue, 8 Oct 2019 19:08:12 +0700 Subject: [PATCH] Added Logger class; refactored logging accordingly --- pisa/__init__.py | 17 ++++++++++++++- pisa/api.py | 10 +++++---- pisa/block_processor.py | 25 +++++++++++----------- pisa/carrier.py | 16 +++++++-------- pisa/cleaner.py | 13 ++++++------ pisa/encrypted_blob.py | 14 +++++++------ pisa/inspector.py | 18 ++++++++-------- pisa/pisad.py | 9 ++++---- pisa/responder.py | 40 +++++++++++++++++++----------------- pisa/utils/zmq_subscriber.py | 9 +++++--- pisa/watcher.py | 20 +++++++++--------- 11 files changed, 109 insertions(+), 82 deletions(-) diff --git a/pisa/__init__.py b/pisa/__init__.py index 0d58d71..77ae6b2 100644 --- a/pisa/__init__.py +++ b/pisa/__init__.py @@ -18,7 +18,22 @@ class StructuredMessage(object): def __str__(self): return json.dumps({**self.kwargs, "message": self.message, "time": self.time}) -M = StructuredMessage # to improve readability + +class Logger(object): + def __init__(self, actor=None): + self.actor = actor + + def _add_prefix(self, msg): + return msg if self.actor is None else "[{}] {}".format(self.actor, msg) + + def info(msg, **kwargs): + logging.info(StructuredMessage(self._add_prefix(msg), **kwargs)) + + def debug(msg, **kwargs): + logging.debug(StructuredMessage(self._add_prefix(msg), **kwargs)) + + def error(msg, **kwargs): + logging.error(StructuredMessage(self._add_prefix(msg), **kwargs)) # Configure logging logging.basicConfig(format='%(message)s', level=logging.INFO, handlers=[ diff --git a/pisa/api.py b/pisa/api.py index b437d80..301dfa5 100644 --- a/pisa/api.py +++ b/pisa/api.py @@ -1,7 +1,7 @@ import json from flask import Flask, request, Response, abort, jsonify -from pisa import HOST, PORT, logging, bitcoin_cli, M +from pisa import HOST, PORT, logging, bitcoin_cli, Logger from pisa.watcher import Watcher from pisa.inspector import Inspector from pisa import HOST, PORT, logging @@ -15,13 +15,15 @@ HTTP_OK = 200 HTTP_BAD_REQUEST = 400 HTTP_SERVICE_UNAVAILABLE = 503 +logger = Logger("API") + @app.route('/', methods=['POST']) def add_appointment(): remote_addr = request.environ.get('REMOTE_ADDR') remote_port = request.environ.get('REMOTE_PORT') - logging.info(M('[API] connection accepted', from_addr_port='{}:{}'.format(remote_addr, remote_port))) + logger.info('connection accepted', from_addr_port='{}:{}'.format(remote_addr, remote_port)) # Check content type once if properly defined request_data = json.loads(request.get_json()) @@ -47,8 +49,8 @@ def add_appointment(): rcode = HTTP_BAD_REQUEST response = "appointment rejected. Request does not match the standard" - logging.info(M('[API] sending response and disconnecting', - from_addr_port='{}:{}'.format(remote_addr, remote_port), response=response)) + logger.info('sending response and disconnecting', + from_addr_port='{}:{}'.format(remote_addr, remote_port), response=response) return Response(response, status=rcode, mimetype='text/plain') diff --git a/pisa/block_processor.py b/pisa/block_processor.py index 08bd1c9..ae9530b 100644 --- a/pisa/block_processor.py +++ b/pisa/block_processor.py @@ -1,9 +1,11 @@ import binascii from hashlib import sha256 -from pisa import logging, bitcoin_cli, M +from pisa import bitcoin_cli, Logger from pisa.utils.auth_proxy import JSONRPCException +logger = Logger("BlockProcessor") + class BlockProcessor: @staticmethod @@ -14,7 +16,7 @@ class BlockProcessor: except JSONRPCException as e: block = None - logging.error(M("[BlockProcessor] couldn't get block from bitcoind.", error_code=e)) + logger.error("couldn't get block from bitcoind.", error_code=e) return block @@ -26,7 +28,7 @@ class BlockProcessor: except JSONRPCException as e: block_hash = None - logging.error(M("[BlockProcessor] couldn't get block hash.", error_code=e)) + logger.error("couldn't get block hash.", error_code=e) return block_hash @@ -38,7 +40,7 @@ class BlockProcessor: except JSONRPCException as e: block_count = None - logging.error("[BlockProcessor] couldn't get block block count. Error code {}".format(e)) + logger.error("couldn't get block block count", error_code=e) return block_count @@ -54,10 +56,10 @@ class BlockProcessor: potential_matches = {locator: potential_locators[locator] for locator in intersection} if len(potential_matches) > 0: - logging.info(M("[BlockProcessor] list of potential matches", potential_matches=potential_matches)) + logger.info("list of potential matches", potential_matches=potential_matches) else: - logging.info(M("[BlockProcessor] no potential matches found")) + logger.info("no potential matches found") return potential_matches @@ -75,13 +77,12 @@ class BlockProcessor: justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid') matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx)) - logging.info(M("[BlockProcessor] match found for locator.", - locator=locator, uuid=uuid, justice_txid=justice_txid)) + logger.info("match found for locator.", locator=locator, uuid=uuid, justice_txid=justice_txid) except JSONRPCException as e: # Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple # for the POC - logging.error(M("[BlockProcessor] can't build transaction from decoded data.", error_code=e)) + logger.error("can't build transaction from decoded data.", error_code=e) return matches @@ -93,7 +94,7 @@ class BlockProcessor: if tx in tx_job_map and tx in unconfirmed_txs: unconfirmed_txs.remove(tx) - logging.info(M("[Responder] confirmation received for transaction", tx=tx)) + logger.info("confirmation received for transaction", tx=tx) elif tx in unconfirmed_txs: if tx in missed_confirmations: @@ -102,8 +103,6 @@ class BlockProcessor: else: missed_confirmations[tx] = 1 - logging.info(M("[Responder] transaction missed a confirmation", - tx=tx, missed_confirmations=missed_confirmations[tx])) + logger.info("transaction missed a confirmation", tx=tx, missed_confirmations=missed_confirmations[tx]) return unconfirmed_txs, missed_confirmations - diff --git a/pisa/carrier.py b/pisa/carrier.py index 6efa58f..a82fb3f 100644 --- a/pisa/carrier.py +++ b/pisa/carrier.py @@ -1,8 +1,10 @@ from pisa.rpc_errors import * -from pisa import logging, bitcoin_cli, M +from pisa import bitcoin_cli, Logger from pisa.utils.auth_proxy import JSONRPCException from pisa.errors import UNKNOWN_JSON_RPC_EXCEPTION +logger = Logger("Carrier") + class Carrier: class Receipt: @@ -13,7 +15,7 @@ class Carrier: def send_transaction(self, rawtx, txid): try: - logging.info(M("[Carrier] pushing transaction to the network", txid=txid, rawtx=rawtx)) + logger.info("pushing transaction to the network", txid=txid, rawtx=rawtx) bitcoin_cli.sendrawtransaction(rawtx) receipt = self.Receipt(delivered=True) @@ -32,8 +34,7 @@ class Carrier: receipt = self.Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION) elif errno == RPC_VERIFY_ALREADY_IN_CHAIN: - logging.info(M("[Carrier] Transaction is already in the blockchain. Getting confirmation count", - txid=txid)) + logger.info("Transaction is already in the blockchain. Getting confirmation count", txid=txid) # If the transaction is already in the chain, we get the number of confirmations and watch the job # until the end of the appointment @@ -50,7 +51,7 @@ class Carrier: else: # If something else happens (unlikely but possible) log it so we can treat it in future releases - logging.error(M("[Responder] JSONRPCException.", error_code=e)) + logger.error("JSONRPCException.", error_code=e) receipt = self.Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION) return receipt @@ -67,12 +68,11 @@ class Carrier: # reorged while we were querying bitcoind to get the confirmation count. In such a case we just # restart the job if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: - logging.info(M("[Carrier] transaction got reorged before obtaining information", txid=txid)) + logger.info("transaction got reorged before obtaining information", txid=txid) # TODO: Check RPC methods to see possible returns and avoid general else # else: # # If something else happens (unlikely but possible) log it so we can treat it in future releases - # logging.error(M("[Responder] JSONRPCException.", error_code=e) + # logger.error("JSONRPCException.", error_code=e) return tx_info - diff --git a/pisa/cleaner.py b/pisa/cleaner.py index f9d748b..39af1d5 100644 --- a/pisa/cleaner.py +++ b/pisa/cleaner.py @@ -1,4 +1,6 @@ -from pisa import logging, M +from pisa import Logger + +logger = Logger("Cleaner") # Dictionaries in Python are "passed-by-reference", so no return is needed for the Cleaner" # https://docs.python.org/3/faq/programming.html#how-do-i-write-a-function-with-output-parameters-call-by-reference @@ -18,14 +20,13 @@ class Cleaner: else: locator_uuid_map[locator].remove(uuid) - logging.info(M("[Cleaner] end time reached with no match! Deleting appointment.", - locator=locator, uuid=uuid)) + logger.info("end time reached with no match! Deleting appointment.", locator=locator, uuid=uuid) @staticmethod def delete_completed_jobs(jobs, tx_job_map, completed_jobs, height): for uuid, confirmations in completed_jobs: - logging.info(M("[Cleaner] job completed. Appointment ended after reaching enough confirmations.", - uuid=uuid, height=height, confirmations=confirmations)) + logger.info("job completed. Appointment ended after reaching enough confirmations.", + uuid=uuid, height=height, confirmations=confirmations) # ToDo: #9-add-data-persistence justice_txid = jobs[uuid].justice_txid @@ -34,7 +35,7 @@ class Cleaner: if len(tx_job_map[justice_txid]) == 1: tx_job_map.pop(justice_txid) - logging.info(M("[Cleaner] no more jobs for justice transaction.", justice_txid=justice_txid)) + logger.info("no more jobs for justice transaction.", justice_txid=justice_txid) else: tx_job_map[justice_txid].remove(uuid) diff --git a/pisa/encrypted_blob.py b/pisa/encrypted_blob.py index 1dcf644..a772241 100644 --- a/pisa/encrypted_blob.py +++ b/pisa/encrypted_blob.py @@ -1,7 +1,9 @@ from hashlib import sha256 from binascii import unhexlify, hexlify from cryptography.hazmat.primitives.ciphers.aead import AESGCM -from pisa import logging, M +from pisa import Logger + +logger = Logger("Watcher") # FIXME: EncryptedBlob is assuming AES-128-GCM. A cipher field should be part of the object and the decryption should be @@ -22,11 +24,11 @@ class EncryptedBlob: sk = master_key[:16] nonce = master_key[16:] - logging.info(M("[Watcher] creating new blob.", - master_key=hexlify(master_key).decode(), - sk=hexlify(sk).decode(), - nonce=hexlify(sk).decode(), - encrypted_blob=self.data)) + logger.info("[Watcher] creating new blob.", + master_key=hexlify(master_key).decode(), + sk=hexlify(sk).decode(), + nonce=hexlify(sk).decode(), + encrypted_blob=self.data) # Decrypt aesgcm = AESGCM(sk) diff --git a/pisa/inspector.py b/pisa/inspector.py index 945afc1..0bffa75 100644 --- a/pisa/inspector.py +++ b/pisa/inspector.py @@ -2,10 +2,12 @@ import re from pisa import errors import pisa.conf as conf -from pisa import logging, bitcoin_cli, M +from pisa import bitcoin_cli, Logger from pisa.appointment import Appointment from pisa.block_processor import BlockProcessor +logger = Logger("Inspector") + # FIXME: The inspector logs the wrong messages sent form the users. A possible attack surface would be to send a really # long field that, even if not accepted by PISA, would be stored in the logs. This is a possible DoS surface # since pisa would store any kind of message (no matter the length). Solution: truncate the length of the fields @@ -70,7 +72,7 @@ class Inspector: rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT message = "wrong locator format ({})".format(locator) - logging.error(M("[Inspector] {}".format(message))) + logger.error(message) return rcode, message @@ -97,7 +99,7 @@ class Inspector: else: message = "start_time is too close to current height" - logging.error(M("[Inspector] {}".format(message))) + logger.error(message) return rcode, message @@ -130,7 +132,7 @@ class Inspector: else: message = 'end_time is too close to current height' - logging.error(M("[Inspector] {}".format(message))) + logger.error(message) return rcode, message @@ -152,7 +154,7 @@ class Inspector: message = "dispute delta too small. The dispute delta should be at least {} (current: {})".format( conf.MIN_DISPUTE_DELTA, dispute_delta) - logging.error(M("[Inspector] {}".format(message))) + logger.error(message) return rcode, message @@ -174,7 +176,7 @@ class Inspector: rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT message = "wrong encrypted_blob format ({})".format(encrypted_blob) - logging.error(M("[Inspector] {}".format(message))) + logger.error(message) return rcode, message @@ -195,7 +197,7 @@ class Inspector: rcode = errors.APPOINTMENT_CIPHER_NOT_SUPPORTED message = "cipher not supported: {}".format(cipher) - logging.error(M("[Inspector] {}".format(message))) + logger.error(message) return rcode, message @@ -216,6 +218,6 @@ class Inspector: rcode = errors.APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED message = "hash_function not supported {}".format(hash_function) - logging.error(M("[Inspector] {}".format(message))) + logger.error(message) return rcode, message diff --git a/pisa/pisad.py b/pisa/pisad.py index f7b3603..c769201 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -1,10 +1,11 @@ from sys import argv from getopt import getopt -from pisa import logging, M +from pisa import logging, Logger from pisa.api import start_api from pisa.tools import can_connect_to_bitcoind, in_correct_network +logger = Logger("Pisad") if __name__ == '__main__': debug = False @@ -19,8 +20,8 @@ if __name__ == '__main__': start_api() else: - logging.error(M("[Pisad] bitcoind is running on a different network, check conf.py and bitcoin.conf. " - "Shutting down")) + logger.error("[Pisad] bitcoind is running on a different network, check conf.py and bitcoin.conf. " + "Shutting down") else: - logging.error(M("[Pisad] can't connect to bitcoind. Shutting down")) + logging.error("[Pisad] can't connect to bitcoind. Shutting down") diff --git a/pisa/responder.py b/pisa/responder.py index ae1859e..6e76904 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -3,7 +3,7 @@ from threading import Thread from hashlib import sha256 from binascii import unhexlify -from pisa import logging, M +from pisa import Logger from pisa.cleaner import Cleaner from pisa.carrier import Carrier from pisa.tools import check_tx_in_chain @@ -13,6 +13,8 @@ from pisa.utils.zmq_subscriber import ZMQHandler CONFIRMATIONS_BEFORE_RETRY = 6 MIN_CONFIRMATIONS = 6 +logger = Logger("Responder") + class Job: def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry_counter=0): @@ -45,7 +47,7 @@ class Responder: def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False): if self.asleep: - logging.info(M("[Responder] waking up!")) + logger.info("waking up!") carrier = Carrier() receipt = carrier.send_transaction(justice_rawtx, justice_txid) @@ -80,8 +82,8 @@ class Responder: if confirmations == 0: self.unconfirmed_txs.append(justice_txid) - logging.info(M("[Responder] new job added.", - dispute_txid=dispute_txid, justice_txid=justice_txid, appointment_end=appointment_end)) + logger.info("new job added.", + dispute_txid=dispute_txid, justice_txid=justice_txid, appointment_end=appointment_end) if self.asleep: self.asleep = False @@ -109,8 +111,8 @@ class Responder: txs = block.get('tx') height = block.get('height') - logging.info(M("[Responder] new block received", - block_hash=block_hash, prev_block_hash=block.get('previousblockhash'), txs=txs)) + logger.info("new block received", + block_hash=block_hash, prev_block_hash=block.get('previousblockhash'), txs=txs) # ToDo: #9-add-data-persistence # change prev_block_hash condition @@ -124,9 +126,9 @@ class Responder: self.rebroadcast(txs_to_rebroadcast) else: - logging.warning(M("[Responder] reorg found!", - local_prev_block_hash=prev_block_hash, - remote_prev_block_hash=block.get('previousblockhash'))) + logger.warning("reorg found!", + local_prev_block_hash=prev_block_hash, + remote_prev_block_hash=block.get('previousblockhash')) self.handle_reorgs() @@ -136,7 +138,7 @@ class Responder: self.asleep = True self.zmq_subscriber.terminate = True - logging.info(M("[Responder] no more pending jobs, going back to sleep")) + logger.info("no more pending jobs, going back to sleep") def get_txs_to_rebroadcast(self, txs): txs_to_rebroadcast = [] @@ -172,9 +174,9 @@ class Responder: self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid, self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True) - logging.warning(M("[Responder] Transaction has missed many confirmations. Rebroadcasting.", - justice_txid=self.jobs[uuid].justice_txid, - confirmations_missed=CONFIRMATIONS_BEFORE_RETRY)) + logger.warning("Transaction has missed many confirmations. Rebroadcasting.", + justice_txid=self.jobs[uuid].justice_txid, + confirmations_missed=CONFIRMATIONS_BEFORE_RETRY) # FIXME: Legacy code, must be checked and updated/fixed def handle_reorgs(self): @@ -190,10 +192,10 @@ class Responder: # If both transactions are there, we only need to update the justice tx confirmation count if justice_in_chain: - logging.info(M("[Responder] updating confirmation count for transaction.", - justice_txid=job.justice_txid, - prev_count=job.confirmations, - curr_count=justice_confirmations)) + logger.info("updating confirmation count for transaction.", + justice_txid=job.justice_txid, + prev_count=job.confirmations, + curr_count=justice_confirmations) job.confirmations = justice_confirmations @@ -208,5 +210,5 @@ class Responder: # ToDo: #24-properly-handle-reorgs # FIXME: if the dispute is not on chain (either in mempool or not there at all), we need to call the # reorg manager - logging.warning(M("[Responder] dispute and justice transaction missing. Calling the reorg manager")) - logging.error(M("[Responder] reorg manager not yet implemented")) + logger.warning("dispute and justice transaction missing. Calling the reorg manager") + logger.error("reorg manager not yet implemented") diff --git a/pisa/utils/zmq_subscriber.py b/pisa/utils/zmq_subscriber.py index 4fbc63b..d28ae64 100644 --- a/pisa/utils/zmq_subscriber.py +++ b/pisa/utils/zmq_subscriber.py @@ -1,8 +1,10 @@ import zmq import binascii -from pisa import logging, M +from pisa import Logger from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT +logger = Logger("ZMQHandler") + # ToDo: #7-add-async-back-to-zmq class ZMQHandler: @@ -29,5 +31,6 @@ class ZMQHandler: block_hash = binascii.hexlify(body).decode('UTF-8') block_queue.put(block_hash) - logging.info(M("[ZMQHandler-{}] new block received via ZMQ".format(self.parent), - block_hash=block_hash)) + logger.info("new block received via ZMQ", + parent=self.parent, + block_hash=block_hash) diff --git a/pisa/watcher.py b/pisa/watcher.py index 3e62f06..8dbaa1c 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -2,7 +2,7 @@ from uuid import uuid4 from queue import Queue from threading import Thread -from pisa import logging, M +from pisa import Logger from pisa.cleaner import Cleaner from pisa.conf import EXPIRY_DELTA from pisa.responder import Responder @@ -10,6 +10,7 @@ from pisa.conf import MAX_APPOINTMENTS from pisa.block_processor import BlockProcessor from pisa.utils.zmq_subscriber import ZMQHandler +logging = Logger("Watcher") class Watcher: def __init__(self, max_appointments=MAX_APPOINTMENTS): @@ -52,17 +53,16 @@ class Watcher: zmq_thread.start() watcher.start() - logging.info(M("[Watcher] waking up!")) + logger.info("waking up!") appointment_added = True - logging.info(M("[Watcher] new appointment accepted.", locator=appointment.locator)) + logger.info("new appointment accepted.", locator=appointment.locator) else: appointment_added = False - logging.info(M("[Watcher] maximum appointments reached, appointment rejected.", - locator=appointment.locator)) + logger.info("maximum appointments reached, appointment rejected.", locator=appointment.locator) return appointment_added @@ -73,14 +73,14 @@ class Watcher: def do_watch(self): while len(self.appointments) > 0: block_hash = self.block_queue.get() - logging.info(M("[Watcher] new block received", block_hash=block_hash)) + logger.info("new block received", block_hash=block_hash) block = BlockProcessor.get_block(block_hash) if block is not None: txids = block.get('tx') - logging.info(M("[Watcher] list of transactions.", txids=txids)) + logger.info("list of transactions.", txids=txids) expired_appointments = [uuid for uuid, appointment in self.appointments.items() if block["height"] > appointment.end_time + EXPIRY_DELTA] @@ -91,8 +91,8 @@ class Watcher: matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments) for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches: - logging.info(M("[Watcher] notifying responder and deleting appointment.", - justice_txid=justice_txid, locator=locator, uuid=uuid)) + logger.info("notifying responder and deleting appointment.", + justice_txid=justice_txid, locator=locator, uuid=uuid) self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, self.appointments[uuid].end_time) @@ -113,4 +113,4 @@ class Watcher: self.asleep = True self.zmq_subscriber.terminate = True - logging.error(M("[Watcher] no more pending appointments, going back to sleep")) + logger.error("no more pending appointments, going back to sleep")