From ed0cb4f63291b7b23c5a0bf543095fc9854be9d3 Mon Sep 17 00:00:00 2001 From: Salvatore Ingala <6681844+bigspider@users.noreply.github.com> Date: Tue, 8 Oct 2019 16:32:09 +0700 Subject: [PATCH] Changed log format to JSON; fixed missing return value in get_potential_matches --- pisa/__init__.py | 18 ++++++++++++++++-- pisa/api.py | 5 +++-- pisa/block_processor.py | 22 +++++++++++----------- pisa/carrier.py | 14 +++++++------- pisa/cleaner.py | 11 +++++------ pisa/encrypted_blob.py | 14 +++++++------- pisa/inspector.py | 23 ++++++++--------------- pisa/pisad.py | 8 ++++---- pisa/responder.py | 31 ++++++++++++++----------------- pisa/utils/zmq_subscriber.py | 5 ++--- pisa/watcher.py | 21 ++++++++++----------- 11 files changed, 87 insertions(+), 85 deletions(-) diff --git a/pisa/__init__.py b/pisa/__init__.py index a279a5c..7fa2901 100644 --- a/pisa/__init__.py +++ b/pisa/__init__.py @@ -1,14 +1,28 @@ import logging +import json +import time from pisa.utils.auth_proxy import AuthServiceProxy import pisa.conf as conf - HOST = 'localhost' PORT = 9814 +class StructuredMessage(object): + def __init__(self, message, **kwargs): + self.message = message + self.time = time.asctime() + self.kwargs = kwargs + + def __str__(self): + return json.dumps({ **self.kwargs, "message": self.message, "time": self.time }) + +M = StructuredMessage # to improve readability + +logging.basicConfig(level=logging.INFO, format='%(message)s') + # Configure logging -logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[ +logging.basicConfig(format='%(message)s', level=logging.INFO, handlers=[ logging.FileHandler(conf.SERVER_LOG_FILE), logging.StreamHandler() ]) diff --git a/pisa/api.py b/pisa/api.py index 44244ee..df98cd0 100644 --- a/pisa/api.py +++ b/pisa/api.py @@ -1,6 +1,7 @@ import json from flask import Flask, request, Response, abort, jsonify +from pisa import HOST, PORT, logging, bitcoin_cli, M from pisa.watcher import Watcher from pisa.inspector import Inspector from pisa import HOST, PORT, logging @@ -20,7 +21,7 @@ def add_appointment(): remote_addr = request.environ.get('REMOTE_ADDR') remote_port = request.environ.get('REMOTE_PORT') - logging.info('[API] connection accepted from {}:{}'.format(remote_addr, remote_port)) + logging.info(M('[API] connection accepted', from_addr_port='{}:{}'.format(remote_addr, remote_port))) # Check content type once if properly defined request_data = json.loads(request.get_json()) @@ -46,7 +47,7 @@ def add_appointment(): rcode = HTTP_BAD_REQUEST response = "appointment rejected. Request does not match the standard" - logging.info('[API] sending response and disconnecting: {} --> {}:{}'.format(response, remote_addr, remote_port)) + logging.info(M('[API] sending response and disconnecting', from_addr_port='{}:{}'.format(remote_addr, remote_port), response=response)) return Response(response, status=rcode, mimetype='text/plain') diff --git a/pisa/block_processor.py b/pisa/block_processor.py index eb09e45..4343bb9 100644 --- a/pisa/block_processor.py +++ b/pisa/block_processor.py @@ -1,7 +1,7 @@ import binascii from hashlib import sha256 -from pisa import logging, bitcoin_cli +from pisa import logging, bitcoin_cli, M from pisa.utils.auth_proxy import JSONRPCException @@ -14,7 +14,7 @@ class BlockProcessor: except JSONRPCException as e: block = None - logging.error("[BlockProcessor] couldn't get block from bitcoind. Error code {}".format(e)) + logging.error(M("[BlockProcessor] couldn't get block from bitcoind.", error_code=e)) return block @@ -26,7 +26,7 @@ class BlockProcessor: except JSONRPCException as e: block_hash = None - logging.error("[BlockProcessor] couldn't get block hash. Error code {}".format(e)) + logging.error(M("[BlockProcessor] couldn't get block hash.", error_code=e)) return block_hash @@ -54,10 +54,12 @@ class BlockProcessor: potential_matches = {locator: potential_locators[locator] for locator in intersection} if len(potential_matches) > 0: - logging.info("[BlockProcessor] list of potential matches: {}".format(potential_matches)) + logging.info(M("[BlockProcessor] list of potential matches", potential_matches=potential_matches)) else: - logging.info("[BlockProcessor] no potential matches found") + logging.info(M("[BlockProcessor] no potential matches found")) + + return potential_matches return potential_matches @@ -73,13 +75,12 @@ class BlockProcessor: justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid') matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx)) - logging.info("[BlockProcessor] match found for locator {} (uuid: {}): {}".format( - locator, uuid, justice_txid)) + logging.info(M("[BlockProcessor] match found for locator.", locator=locator, uuid=uuid, justice_txid=justice_txid)) except JSONRPCException as e: # Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple # for the POC - logging.error("[BlockProcessor] can't build transaction from decoded data. Error code {}".format(e)) + logging.error(M("[BlockProcessor] can't build transaction from decoded data.", error_code=e)) return matches @@ -91,7 +92,7 @@ class BlockProcessor: if tx in tx_job_map and tx in unconfirmed_txs: unconfirmed_txs.remove(tx) - logging.info("[Responder] confirmation received for tx {}".format(tx)) + logging.info(M("[Responder] confirmation received for transaction", tx=tx)) elif tx in unconfirmed_txs: if tx in missed_confirmations: @@ -100,8 +101,7 @@ class BlockProcessor: else: missed_confirmations[tx] = 1 - logging.info("[Responder] tx {} missed a confirmation (total missed: {})" - .format(tx, missed_confirmations[tx])) + logging.info(M("[Responder] transaction missed a confirmation", tx=tx, missed_confirmations=missed_confirmations[tx])) return unconfirmed_txs, missed_confirmations diff --git a/pisa/carrier.py b/pisa/carrier.py index eb0319d..946af01 100644 --- a/pisa/carrier.py +++ b/pisa/carrier.py @@ -1,5 +1,5 @@ from pisa.rpc_errors import * -from pisa import logging, bitcoin_cli +from pisa import logging, bitcoin_cli, M from pisa.utils.auth_proxy import JSONRPCException from pisa.errors import UNKNOWN_JSON_RPC_EXCEPTION @@ -13,7 +13,7 @@ class Carrier: def send_transaction(self, rawtx, txid): try: - logging.info("[Carrier] pushing transaction to the network (txid: {})".format(rawtx)) + logging.info(M("[Carrier] pushing transaction to the network", txid=txid, rawtx=rawtx)) bitcoin_cli.sendrawtransaction(rawtx) receipt = self.Receipt(delivered=True) @@ -32,7 +32,7 @@ class Carrier: receipt = self.Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION) elif errno == RPC_VERIFY_ALREADY_IN_CHAIN: - logging.info("[Carrier] {} is already in the blockchain. Getting confirmation count".format(txid)) + logging.info(M("[Carrier] Transaction is already in the blockchain. Getting confirmation count", txid=txid)) # If the transaction is already in the chain, we get the number of confirmations and watch the job # until the end of the appointment @@ -43,13 +43,13 @@ class Carrier: receipt = self.Receipt(delivered=True, confirmations=confirmations) else: - # There's a really unlike edge case where a transaction can be reorged between receiving the + # There's a really unlikely edge case where a transaction can be reorged between receiving the # notification and querying the data. In such a case we just resend self.send_transaction(rawtx, txid) else: # If something else happens (unlikely but possible) log it so we can treat it in future releases - logging.error("[Responder] JSONRPCException. Error {}".format(e)) + logging.error(M("[Responder] JSONRPCException.", error_code=e)) receipt = self.Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION) return receipt @@ -66,12 +66,12 @@ class Carrier: # reorged while we were querying bitcoind to get the confirmation count. In such a case we just # restart the job if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: - logging.info("[Carrier] transaction {} got reorged before obtaining information".format(txid)) + logging.info(M("[Carrier] transaction got reorged before obtaining information", txid=txid)) # TODO: Check RPC methods to see possible returns and avoid general else # else: # # If something else happens (unlikely but possible) log it so we can treat it in future releases - # logging.error("[Responder] JSONRPCException. Error {}".format(e)) + # logging.error(M("[Responder] JSONRPCException.", error_code=e) return tx_info diff --git a/pisa/cleaner.py b/pisa/cleaner.py index b7c2947..68ad5e2 100644 --- a/pisa/cleaner.py +++ b/pisa/cleaner.py @@ -1,4 +1,4 @@ -from pisa import logging +from pisa import logging, M # Dictionaries in Python are "passed-by-reference", so no return is needed for the Cleaner" # https://docs.python.org/3/faq/programming.html#how-do-i-write-a-function-with-output-parameters-call-by-reference @@ -18,14 +18,13 @@ class Cleaner: else: locator_uuid_map[locator].remove(uuid) - logging.info("[Cleaner] end time reached with no match! Deleting appointment {} (uuid: {})".format(locator, - uuid)) + logging.info(M("[Cleaner] end time reached with no match! Deleting appointment.", locator=locator, uuid=uuid)) @staticmethod def delete_completed_jobs(jobs, tx_job_map, completed_jobs, height): for uuid, confirmations in completed_jobs: - logging.info("[Cleaner] job completed (uuid = {}). Appointment ended at block {} after {} confirmations" - .format(uuid, height, confirmations)) + logging.info(M("[Cleaner] job completed. Appointment ended after reaching enough confirmations.", + uuid=uuid, height=height, confirmations=confirmations)) # ToDo: #9-add-data-persistence justice_txid = jobs[uuid].justice_txid @@ -34,7 +33,7 @@ class Cleaner: if len(tx_job_map[justice_txid]) == 1: tx_job_map.pop(justice_txid) - logging.info("[Cleaner] no more jobs for justice_txid {}".format(justice_txid)) + logging.info(M("[Cleaner] no more jobs for justice transaction.", justice_txid=justice_txid)) else: tx_job_map[justice_txid].remove(uuid) diff --git a/pisa/encrypted_blob.py b/pisa/encrypted_blob.py index ffc3e38..3aeee13 100644 --- a/pisa/encrypted_blob.py +++ b/pisa/encrypted_blob.py @@ -1,8 +1,7 @@ from hashlib import sha256 from binascii import unhexlify, hexlify from cryptography.hazmat.primitives.ciphers.aead import AESGCM - -from pisa import logging +from pisa import logging, M # FIXME: EncryptedBlob is assuming AES-128-GCM. A cipher field should be part of the object and the decryption should be @@ -23,11 +22,12 @@ class EncryptedBlob: sk = master_key[:16] nonce = master_key[16:] - logging.info("[Watcher] creating new blob") - logging.info("[Watcher] master key: {}".format(hexlify(master_key).decode())) - logging.info("[Watcher] sk: {}".format(hexlify(sk).decode())) - logging.info("[Watcher] nonce: {}".format(hexlify(nonce).decode())) - logging.info("[Watcher] encrypted_blob: {}".format(self.data)) + logging.info(M("[Watcher] creating new blob.", + master_key=hexlify(master_key).decode(), + sk=hexlify(sk).decode(), + nonce=hexlify(sk).decode(), + encrypted_blob=self.data + )) # Decrypt aesgcm = AESGCM(sk) diff --git a/pisa/inspector.py b/pisa/inspector.py index ae9ca89..945afc1 100644 --- a/pisa/inspector.py +++ b/pisa/inspector.py @@ -2,7 +2,7 @@ import re from pisa import errors import pisa.conf as conf -from pisa import logging +from pisa import logging, bitcoin_cli, M from pisa.appointment import Appointment from pisa.block_processor import BlockProcessor @@ -70,8 +70,7 @@ class Inspector: rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT message = "wrong locator format ({})".format(locator) - if message is not None: - logging.error("[Inspector] {}".format(message)) + logging.error(M("[Inspector] {}".format(message))) return rcode, message @@ -98,8 +97,7 @@ class Inspector: else: message = "start_time is too close to current height" - if message is not None: - logging.error("[Inspector] {}".format(message)) + logging.error(M("[Inspector] {}".format(message))) return rcode, message @@ -132,8 +130,7 @@ class Inspector: else: message = 'end_time is too close to current height' - if message is not None: - logging.error("[Inspector] {}".format(message)) + logging.error(M("[Inspector] {}".format(message))) return rcode, message @@ -155,8 +152,7 @@ class Inspector: message = "dispute delta too small. The dispute delta should be at least {} (current: {})".format( conf.MIN_DISPUTE_DELTA, dispute_delta) - if message is not None: - logging.error("[Inspector] {}".format(message)) + logging.error(M("[Inspector] {}".format(message))) return rcode, message @@ -178,8 +174,7 @@ class Inspector: rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT message = "wrong encrypted_blob format ({})".format(encrypted_blob) - if message is not None: - logging.error("[Inspector] {}".format(message)) + logging.error(M("[Inspector] {}".format(message))) return rcode, message @@ -200,8 +195,7 @@ class Inspector: rcode = errors.APPOINTMENT_CIPHER_NOT_SUPPORTED message = "cipher not supported: {}".format(cipher) - if message is not None: - logging.error("[Inspector] {}".format(message)) + logging.error(M("[Inspector] {}".format(message))) return rcode, message @@ -222,7 +216,6 @@ class Inspector: rcode = errors.APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED message = "hash_function not supported {}".format(hash_function) - if message is not None: - logging.error("[Inspector] {}".format(message)) + logging.error(M("[Inspector] {}".format(message))) return rcode, message diff --git a/pisa/pisad.py b/pisa/pisad.py index 8fbce83..f7b3603 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -1,7 +1,7 @@ from sys import argv from getopt import getopt -from pisa import logging +from pisa import logging, M from pisa.api import start_api from pisa.tools import can_connect_to_bitcoind, in_correct_network @@ -19,8 +19,8 @@ if __name__ == '__main__': start_api() else: - logging.error("[Pisad] bitcoind is running on a different network, check conf.py and bitcoin.conf. " - "Shutting down") + logging.error(M("[Pisad] bitcoind is running on a different network, check conf.py and bitcoin.conf. " + "Shutting down")) else: - logging.error("[Pisad] can't connect to bitcoind. Shutting down") + logging.error(M("[Pisad] can't connect to bitcoind. Shutting down")) diff --git a/pisa/responder.py b/pisa/responder.py index 08f082e..e44ee61 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -3,9 +3,9 @@ from threading import Thread from hashlib import sha256 from binascii import unhexlify +from pisa import logging, M from pisa.cleaner import Cleaner from pisa.carrier import Carrier -from pisa import logging from pisa.tools import check_tx_in_chain from pisa.block_processor import BlockProcessor from pisa.utils.zmq_subscriber import ZMQHandler @@ -45,7 +45,7 @@ class Responder: def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False): if self.asleep: - logging.info("[Responder] waking up!") + logging.info(M("[Responder] waking up!")) carrier = Carrier() receipt = carrier.send_transaction(justice_rawtx, justice_txid) @@ -80,8 +80,7 @@ class Responder: if confirmations == 0: self.unconfirmed_txs.append(justice_txid) - logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})' - .format(dispute_txid, justice_txid, appointment_end)) + logging.info(M("[Responder] new job added.", dispute_txid=dispute_txid, justice_txid=justice_txid, appointment_end=appointment_end)) if self.asleep: self.asleep = False @@ -109,9 +108,7 @@ class Responder: txs = block.get('tx') height = block.get('height') - logging.info("[Responder] new block received {}".format(block_hash)) - logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash'))) - logging.info("[Responder] list of transactions: {}".format(txs)) + logging.info(M("[Responder] new block received", block_hash=block_hash, prev_block_hash=block.get('previousblockhash'), txs=txs)) # ToDo: #9-add-data-persistence # change prev_block_hash condition @@ -125,8 +122,8 @@ class Responder: self.rebroadcast(txs_to_rebroadcast) else: - logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}" - .format(prev_block_hash, block.get('previousblockhash'))) + logging.warning(M("[Responder] reorg found!", + local_prev_block_hash=prev_block_hash, remote_prev_block_hash=block.get('previousblockhash'))) self.handle_reorgs() @@ -136,7 +133,7 @@ class Responder: self.asleep = True self.zmq_subscriber.terminate = True - logging.info("[Responder] no more pending jobs, going back to sleep") + logging.info(M("[Responder] no more pending jobs, going back to sleep")) def get_txs_to_rebroadcast(self, txs): txs_to_rebroadcast = [] @@ -172,8 +169,8 @@ class Responder: self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid, self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True) - logging.warning("[Responder] tx {} has missed {} confirmations. Rebroadcasting" - .format(self.jobs[uuid].justice_txid, CONFIRMATIONS_BEFORE_RETRY)) + logging.warning(M("[Responder] Transaction has missed many confirmations. Rebroadcasting.", + justice_txid=self.jobs[uuid].justice_txid, confirmations_missed=CONFIRMATIONS_BEFORE_RETRY)) # FIXME: Legacy code, must be checked and updated/fixed def handle_reorgs(self): @@ -189,8 +186,8 @@ class Responder: # If both transactions are there, we only need to update the justice tx confirmation count if justice_in_chain: - logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format( - job.justice_txid, job.confirmations, justice_confirmations)) + logging.info(M("[Responder] updating confirmation count for transaction.", + justice_txid=job.justice_txid, prev_count=job.confirmations, curr_count=justice_confirmations)) job.confirmations = justice_confirmations @@ -203,7 +200,7 @@ class Responder: else: # ToDo: #24-properly-handle-reorgs - # FIXME: if the dispute is not on chain (either in mempool or not there al all), we need to call the + # FIXME: if the dispute is not on chain (either in mempool or not there at all), we need to call the # reorg manager - logging.warning("[Responder] dispute and justice transaction missing. Calling the reorg manager") - logging.error("[Responder] reorg manager not yet implemented") + logging.warning(M("[Responder] dispute and justice transaction missing. Calling the reorg manager")) + logging.error(M("[Responder] reorg manager not yet implemented")) diff --git a/pisa/utils/zmq_subscriber.py b/pisa/utils/zmq_subscriber.py index 75e175d..0545d9c 100644 --- a/pisa/utils/zmq_subscriber.py +++ b/pisa/utils/zmq_subscriber.py @@ -1,7 +1,6 @@ import zmq import binascii - -from pisa import logging +from pisa import logging, M from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT @@ -30,4 +29,4 @@ class ZMQHandler: block_hash = binascii.hexlify(body).decode('UTF-8') block_queue.put(block_hash) - logging.info("[ZMQHandler-{}] new block received via ZMQ".format(self.parent, block_hash)) + logging.info(M("[ZMQHandler-{}] new block received via ZMQ".format(self.parent), block_hash=block_hash)) diff --git a/pisa/watcher.py b/pisa/watcher.py index ab8da44..a70bb43 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -2,7 +2,7 @@ from uuid import uuid4 from queue import Queue from threading import Thread -from pisa import logging +from pisa import logging, M from pisa.cleaner import Cleaner from pisa.conf import EXPIRY_DELTA from pisa.responder import Responder @@ -52,35 +52,34 @@ class Watcher: zmq_thread.start() watcher.start() - logging.info("[Watcher] waking up!") + logging.info(M("[Watcher] waking up!")) appointment_added = True - logging.info('[Watcher] new appointment accepted (locator = {})'.format(appointment.locator)) + logging.info(M("[Watcher] new appointment accepted.", locator=appointment.locator)) else: appointment_added = False - logging.info('[Watcher] maximum appointments reached, appointment rejected (locator = {})'.format( - appointment.locator)) + logging.info(M("[Watcher] maximum appointments reached, appointment rejected.", locator=appointment.locator)) return appointment_added def do_subscribe(self, block_queue): - self.zmq_subscriber = ZMQHandler(parent='Watcher') + self.zmq_subscriber = ZMQHandler(parent="Watcher") self.zmq_subscriber.handle(block_queue) def do_watch(self): while len(self.appointments) > 0: block_hash = self.block_queue.get() - logging.info("[Watcher] new block received {}".format(block_hash)) + logging.info(M("[Watcher] new block received", block_hash=block_hash)) block = BlockProcessor.get_block(block_hash) if block is not None: txids = block.get('tx') - logging.info("[Watcher] list of transactions: {}".format(txids)) + logging.info(M("[Watcher] list of transactions.", txids=txids)) expired_appointments = [uuid for uuid, appointment in self.appointments.items() if block["height"] > appointment.end_time + EXPIRY_DELTA] @@ -91,8 +90,8 @@ class Watcher: matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments) for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches: - logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})" - .format(justice_txid, locator, uuid)) + logging.info(M("[Watcher] notifying responder and deleting appointment.", + justice_txid=justice_txid, locator=locator, uuid=uuid)) self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, self.appointments[uuid].end_time) @@ -113,4 +112,4 @@ class Watcher: self.asleep = True self.zmq_subscriber.terminate = True - logging.error("[Watcher] no more pending appointments, going back to sleep") + logging.error(M("[Watcher] no more pending appointments, going back to sleep"))