Added Logger class; refactored logging accordingly

This commit is contained in:
Salvatore Ingala
2019-10-08 19:08:12 +07:00
parent 7f9c7d8609
commit bae9b6b913
11 changed files with 109 additions and 82 deletions

View File

@@ -18,7 +18,22 @@ class StructuredMessage(object):
def __str__(self):
return json.dumps({**self.kwargs, "message": self.message, "time": self.time})
M = StructuredMessage # to improve readability
class Logger(object):
def __init__(self, actor=None):
self.actor = actor
def _add_prefix(self, msg):
return msg if self.actor is None else "[{}] {}".format(self.actor, msg)
def info(msg, **kwargs):
logging.info(StructuredMessage(self._add_prefix(msg), **kwargs))
def debug(msg, **kwargs):
logging.debug(StructuredMessage(self._add_prefix(msg), **kwargs))
def error(msg, **kwargs):
logging.error(StructuredMessage(self._add_prefix(msg), **kwargs))
# Configure logging
logging.basicConfig(format='%(message)s', level=logging.INFO, handlers=[

View File

@@ -1,7 +1,7 @@
import json
from flask import Flask, request, Response, abort, jsonify
from pisa import HOST, PORT, logging, bitcoin_cli, M
from pisa import HOST, PORT, logging, bitcoin_cli, Logger
from pisa.watcher import Watcher
from pisa.inspector import Inspector
from pisa import HOST, PORT, logging
@@ -15,13 +15,15 @@ HTTP_OK = 200
HTTP_BAD_REQUEST = 400
HTTP_SERVICE_UNAVAILABLE = 503
logger = Logger("API")
@app.route('/', methods=['POST'])
def add_appointment():
remote_addr = request.environ.get('REMOTE_ADDR')
remote_port = request.environ.get('REMOTE_PORT')
logging.info(M('[API] connection accepted', from_addr_port='{}:{}'.format(remote_addr, remote_port)))
logger.info('connection accepted', from_addr_port='{}:{}'.format(remote_addr, remote_port))
# Check content type once if properly defined
request_data = json.loads(request.get_json())
@@ -47,8 +49,8 @@ def add_appointment():
rcode = HTTP_BAD_REQUEST
response = "appointment rejected. Request does not match the standard"
logging.info(M('[API] sending response and disconnecting',
from_addr_port='{}:{}'.format(remote_addr, remote_port), response=response))
logger.info('sending response and disconnecting',
from_addr_port='{}:{}'.format(remote_addr, remote_port), response=response)
return Response(response, status=rcode, mimetype='text/plain')

View File

@@ -1,9 +1,11 @@
import binascii
from hashlib import sha256
from pisa import logging, bitcoin_cli, M
from pisa import bitcoin_cli, Logger
from pisa.utils.auth_proxy import JSONRPCException
logger = Logger("BlockProcessor")
class BlockProcessor:
@staticmethod
@@ -14,7 +16,7 @@ class BlockProcessor:
except JSONRPCException as e:
block = None
logging.error(M("[BlockProcessor] couldn't get block from bitcoind.", error_code=e))
logger.error("couldn't get block from bitcoind.", error_code=e)
return block
@@ -26,7 +28,7 @@ class BlockProcessor:
except JSONRPCException as e:
block_hash = None
logging.error(M("[BlockProcessor] couldn't get block hash.", error_code=e))
logger.error("couldn't get block hash.", error_code=e)
return block_hash
@@ -38,7 +40,7 @@ class BlockProcessor:
except JSONRPCException as e:
block_count = None
logging.error("[BlockProcessor] couldn't get block block count. Error code {}".format(e))
logger.error("couldn't get block block count", error_code=e)
return block_count
@@ -54,10 +56,10 @@ class BlockProcessor:
potential_matches = {locator: potential_locators[locator] for locator in intersection}
if len(potential_matches) > 0:
logging.info(M("[BlockProcessor] list of potential matches", potential_matches=potential_matches))
logger.info("list of potential matches", potential_matches=potential_matches)
else:
logging.info(M("[BlockProcessor] no potential matches found"))
logger.info("no potential matches found")
return potential_matches
@@ -75,13 +77,12 @@ class BlockProcessor:
justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid')
matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx))
logging.info(M("[BlockProcessor] match found for locator.",
locator=locator, uuid=uuid, justice_txid=justice_txid))
logger.info("match found for locator.", locator=locator, uuid=uuid, justice_txid=justice_txid)
except JSONRPCException as e:
# Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple
# for the POC
logging.error(M("[BlockProcessor] can't build transaction from decoded data.", error_code=e))
logger.error("can't build transaction from decoded data.", error_code=e)
return matches
@@ -93,7 +94,7 @@ class BlockProcessor:
if tx in tx_job_map and tx in unconfirmed_txs:
unconfirmed_txs.remove(tx)
logging.info(M("[Responder] confirmation received for transaction", tx=tx))
logger.info("confirmation received for transaction", tx=tx)
elif tx in unconfirmed_txs:
if tx in missed_confirmations:
@@ -102,8 +103,6 @@ class BlockProcessor:
else:
missed_confirmations[tx] = 1
logging.info(M("[Responder] transaction missed a confirmation",
tx=tx, missed_confirmations=missed_confirmations[tx]))
logger.info("transaction missed a confirmation", tx=tx, missed_confirmations=missed_confirmations[tx])
return unconfirmed_txs, missed_confirmations

View File

@@ -1,8 +1,10 @@
from pisa.rpc_errors import *
from pisa import logging, bitcoin_cli, M
from pisa import bitcoin_cli, Logger
from pisa.utils.auth_proxy import JSONRPCException
from pisa.errors import UNKNOWN_JSON_RPC_EXCEPTION
logger = Logger("Carrier")
class Carrier:
class Receipt:
@@ -13,7 +15,7 @@ class Carrier:
def send_transaction(self, rawtx, txid):
try:
logging.info(M("[Carrier] pushing transaction to the network", txid=txid, rawtx=rawtx))
logger.info("pushing transaction to the network", txid=txid, rawtx=rawtx)
bitcoin_cli.sendrawtransaction(rawtx)
receipt = self.Receipt(delivered=True)
@@ -32,8 +34,7 @@ class Carrier:
receipt = self.Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION)
elif errno == RPC_VERIFY_ALREADY_IN_CHAIN:
logging.info(M("[Carrier] Transaction is already in the blockchain. Getting confirmation count",
txid=txid))
logger.info("Transaction is already in the blockchain. Getting confirmation count", txid=txid)
# If the transaction is already in the chain, we get the number of confirmations and watch the job
# until the end of the appointment
@@ -50,7 +51,7 @@ class Carrier:
else:
# If something else happens (unlikely but possible) log it so we can treat it in future releases
logging.error(M("[Responder] JSONRPCException.", error_code=e))
logger.error("JSONRPCException.", error_code=e)
receipt = self.Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION)
return receipt
@@ -67,12 +68,11 @@ class Carrier:
# reorged while we were querying bitcoind to get the confirmation count. In such a case we just
# restart the job
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
logging.info(M("[Carrier] transaction got reorged before obtaining information", txid=txid))
logger.info("transaction got reorged before obtaining information", txid=txid)
# TODO: Check RPC methods to see possible returns and avoid general else
# else:
# # If something else happens (unlikely but possible) log it so we can treat it in future releases
# logging.error(M("[Responder] JSONRPCException.", error_code=e)
# logger.error("JSONRPCException.", error_code=e)
return tx_info

View File

@@ -1,4 +1,6 @@
from pisa import logging, M
from pisa import Logger
logger = Logger("Cleaner")
# Dictionaries in Python are "passed-by-reference", so no return is needed for the Cleaner"
# https://docs.python.org/3/faq/programming.html#how-do-i-write-a-function-with-output-parameters-call-by-reference
@@ -18,14 +20,13 @@ class Cleaner:
else:
locator_uuid_map[locator].remove(uuid)
logging.info(M("[Cleaner] end time reached with no match! Deleting appointment.",
locator=locator, uuid=uuid))
logger.info("end time reached with no match! Deleting appointment.", locator=locator, uuid=uuid)
@staticmethod
def delete_completed_jobs(jobs, tx_job_map, completed_jobs, height):
for uuid, confirmations in completed_jobs:
logging.info(M("[Cleaner] job completed. Appointment ended after reaching enough confirmations.",
uuid=uuid, height=height, confirmations=confirmations))
logger.info("job completed. Appointment ended after reaching enough confirmations.",
uuid=uuid, height=height, confirmations=confirmations)
# ToDo: #9-add-data-persistence
justice_txid = jobs[uuid].justice_txid
@@ -34,7 +35,7 @@ class Cleaner:
if len(tx_job_map[justice_txid]) == 1:
tx_job_map.pop(justice_txid)
logging.info(M("[Cleaner] no more jobs for justice transaction.", justice_txid=justice_txid))
logger.info("no more jobs for justice transaction.", justice_txid=justice_txid)
else:
tx_job_map[justice_txid].remove(uuid)

View File

@@ -1,7 +1,9 @@
from hashlib import sha256
from binascii import unhexlify, hexlify
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from pisa import logging, M
from pisa import Logger
logger = Logger("Watcher")
# FIXME: EncryptedBlob is assuming AES-128-GCM. A cipher field should be part of the object and the decryption should be
@@ -22,11 +24,11 @@ class EncryptedBlob:
sk = master_key[:16]
nonce = master_key[16:]
logging.info(M("[Watcher] creating new blob.",
master_key=hexlify(master_key).decode(),
sk=hexlify(sk).decode(),
nonce=hexlify(sk).decode(),
encrypted_blob=self.data))
logger.info("[Watcher] creating new blob.",
master_key=hexlify(master_key).decode(),
sk=hexlify(sk).decode(),
nonce=hexlify(sk).decode(),
encrypted_blob=self.data)
# Decrypt
aesgcm = AESGCM(sk)

View File

@@ -2,10 +2,12 @@ import re
from pisa import errors
import pisa.conf as conf
from pisa import logging, bitcoin_cli, M
from pisa import bitcoin_cli, Logger
from pisa.appointment import Appointment
from pisa.block_processor import BlockProcessor
logger = Logger("Inspector")
# FIXME: The inspector logs the wrong messages sent form the users. A possible attack surface would be to send a really
# long field that, even if not accepted by PISA, would be stored in the logs. This is a possible DoS surface
# since pisa would store any kind of message (no matter the length). Solution: truncate the length of the fields
@@ -70,7 +72,7 @@ class Inspector:
rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT
message = "wrong locator format ({})".format(locator)
logging.error(M("[Inspector] {}".format(message)))
logger.error(message)
return rcode, message
@@ -97,7 +99,7 @@ class Inspector:
else:
message = "start_time is too close to current height"
logging.error(M("[Inspector] {}".format(message)))
logger.error(message)
return rcode, message
@@ -130,7 +132,7 @@ class Inspector:
else:
message = 'end_time is too close to current height'
logging.error(M("[Inspector] {}".format(message)))
logger.error(message)
return rcode, message
@@ -152,7 +154,7 @@ class Inspector:
message = "dispute delta too small. The dispute delta should be at least {} (current: {})".format(
conf.MIN_DISPUTE_DELTA, dispute_delta)
logging.error(M("[Inspector] {}".format(message)))
logger.error(message)
return rcode, message
@@ -174,7 +176,7 @@ class Inspector:
rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT
message = "wrong encrypted_blob format ({})".format(encrypted_blob)
logging.error(M("[Inspector] {}".format(message)))
logger.error(message)
return rcode, message
@@ -195,7 +197,7 @@ class Inspector:
rcode = errors.APPOINTMENT_CIPHER_NOT_SUPPORTED
message = "cipher not supported: {}".format(cipher)
logging.error(M("[Inspector] {}".format(message)))
logger.error(message)
return rcode, message
@@ -216,6 +218,6 @@ class Inspector:
rcode = errors.APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED
message = "hash_function not supported {}".format(hash_function)
logging.error(M("[Inspector] {}".format(message)))
logger.error(message)
return rcode, message

View File

@@ -1,10 +1,11 @@
from sys import argv
from getopt import getopt
from pisa import logging, M
from pisa import logging, Logger
from pisa.api import start_api
from pisa.tools import can_connect_to_bitcoind, in_correct_network
logger = Logger("Pisad")
if __name__ == '__main__':
debug = False
@@ -19,8 +20,8 @@ if __name__ == '__main__':
start_api()
else:
logging.error(M("[Pisad] bitcoind is running on a different network, check conf.py and bitcoin.conf. "
"Shutting down"))
logger.error("[Pisad] bitcoind is running on a different network, check conf.py and bitcoin.conf. "
"Shutting down")
else:
logging.error(M("[Pisad] can't connect to bitcoind. Shutting down"))
logging.error("[Pisad] can't connect to bitcoind. Shutting down")

View File

@@ -3,7 +3,7 @@ from threading import Thread
from hashlib import sha256
from binascii import unhexlify
from pisa import logging, M
from pisa import Logger
from pisa.cleaner import Cleaner
from pisa.carrier import Carrier
from pisa.tools import check_tx_in_chain
@@ -13,6 +13,8 @@ from pisa.utils.zmq_subscriber import ZMQHandler
CONFIRMATIONS_BEFORE_RETRY = 6
MIN_CONFIRMATIONS = 6
logger = Logger("Responder")
class Job:
def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry_counter=0):
@@ -45,7 +47,7 @@ class Responder:
def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False):
if self.asleep:
logging.info(M("[Responder] waking up!"))
logger.info("waking up!")
carrier = Carrier()
receipt = carrier.send_transaction(justice_rawtx, justice_txid)
@@ -80,8 +82,8 @@ class Responder:
if confirmations == 0:
self.unconfirmed_txs.append(justice_txid)
logging.info(M("[Responder] new job added.",
dispute_txid=dispute_txid, justice_txid=justice_txid, appointment_end=appointment_end))
logger.info("new job added.",
dispute_txid=dispute_txid, justice_txid=justice_txid, appointment_end=appointment_end)
if self.asleep:
self.asleep = False
@@ -109,8 +111,8 @@ class Responder:
txs = block.get('tx')
height = block.get('height')
logging.info(M("[Responder] new block received",
block_hash=block_hash, prev_block_hash=block.get('previousblockhash'), txs=txs))
logger.info("new block received",
block_hash=block_hash, prev_block_hash=block.get('previousblockhash'), txs=txs)
# ToDo: #9-add-data-persistence
# change prev_block_hash condition
@@ -124,9 +126,9 @@ class Responder:
self.rebroadcast(txs_to_rebroadcast)
else:
logging.warning(M("[Responder] reorg found!",
local_prev_block_hash=prev_block_hash,
remote_prev_block_hash=block.get('previousblockhash')))
logger.warning("reorg found!",
local_prev_block_hash=prev_block_hash,
remote_prev_block_hash=block.get('previousblockhash'))
self.handle_reorgs()
@@ -136,7 +138,7 @@ class Responder:
self.asleep = True
self.zmq_subscriber.terminate = True
logging.info(M("[Responder] no more pending jobs, going back to sleep"))
logger.info("no more pending jobs, going back to sleep")
def get_txs_to_rebroadcast(self, txs):
txs_to_rebroadcast = []
@@ -172,9 +174,9 @@ class Responder:
self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid,
self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True)
logging.warning(M("[Responder] Transaction has missed many confirmations. Rebroadcasting.",
justice_txid=self.jobs[uuid].justice_txid,
confirmations_missed=CONFIRMATIONS_BEFORE_RETRY))
logger.warning("Transaction has missed many confirmations. Rebroadcasting.",
justice_txid=self.jobs[uuid].justice_txid,
confirmations_missed=CONFIRMATIONS_BEFORE_RETRY)
# FIXME: Legacy code, must be checked and updated/fixed
def handle_reorgs(self):
@@ -190,10 +192,10 @@ class Responder:
# If both transactions are there, we only need to update the justice tx confirmation count
if justice_in_chain:
logging.info(M("[Responder] updating confirmation count for transaction.",
justice_txid=job.justice_txid,
prev_count=job.confirmations,
curr_count=justice_confirmations))
logger.info("updating confirmation count for transaction.",
justice_txid=job.justice_txid,
prev_count=job.confirmations,
curr_count=justice_confirmations)
job.confirmations = justice_confirmations
@@ -208,5 +210,5 @@ class Responder:
# ToDo: #24-properly-handle-reorgs
# FIXME: if the dispute is not on chain (either in mempool or not there at all), we need to call the
# reorg manager
logging.warning(M("[Responder] dispute and justice transaction missing. Calling the reorg manager"))
logging.error(M("[Responder] reorg manager not yet implemented"))
logger.warning("dispute and justice transaction missing. Calling the reorg manager")
logger.error("reorg manager not yet implemented")

View File

@@ -1,8 +1,10 @@
import zmq
import binascii
from pisa import logging, M
from pisa import Logger
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT
logger = Logger("ZMQHandler")
# ToDo: #7-add-async-back-to-zmq
class ZMQHandler:
@@ -29,5 +31,6 @@ class ZMQHandler:
block_hash = binascii.hexlify(body).decode('UTF-8')
block_queue.put(block_hash)
logging.info(M("[ZMQHandler-{}] new block received via ZMQ".format(self.parent),
block_hash=block_hash))
logger.info("new block received via ZMQ",
parent=self.parent,
block_hash=block_hash)

View File

@@ -2,7 +2,7 @@ from uuid import uuid4
from queue import Queue
from threading import Thread
from pisa import logging, M
from pisa import Logger
from pisa.cleaner import Cleaner
from pisa.conf import EXPIRY_DELTA
from pisa.responder import Responder
@@ -10,6 +10,7 @@ from pisa.conf import MAX_APPOINTMENTS
from pisa.block_processor import BlockProcessor
from pisa.utils.zmq_subscriber import ZMQHandler
logging = Logger("Watcher")
class Watcher:
def __init__(self, max_appointments=MAX_APPOINTMENTS):
@@ -52,17 +53,16 @@ class Watcher:
zmq_thread.start()
watcher.start()
logging.info(M("[Watcher] waking up!"))
logger.info("waking up!")
appointment_added = True
logging.info(M("[Watcher] new appointment accepted.", locator=appointment.locator))
logger.info("new appointment accepted.", locator=appointment.locator)
else:
appointment_added = False
logging.info(M("[Watcher] maximum appointments reached, appointment rejected.",
locator=appointment.locator))
logger.info("maximum appointments reached, appointment rejected.", locator=appointment.locator)
return appointment_added
@@ -73,14 +73,14 @@ class Watcher:
def do_watch(self):
while len(self.appointments) > 0:
block_hash = self.block_queue.get()
logging.info(M("[Watcher] new block received", block_hash=block_hash))
logger.info("new block received", block_hash=block_hash)
block = BlockProcessor.get_block(block_hash)
if block is not None:
txids = block.get('tx')
logging.info(M("[Watcher] list of transactions.", txids=txids))
logger.info("list of transactions.", txids=txids)
expired_appointments = [uuid for uuid, appointment in self.appointments.items()
if block["height"] > appointment.end_time + EXPIRY_DELTA]
@@ -91,8 +91,8 @@ class Watcher:
matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments)
for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches:
logging.info(M("[Watcher] notifying responder and deleting appointment.",
justice_txid=justice_txid, locator=locator, uuid=uuid))
logger.info("notifying responder and deleting appointment.",
justice_txid=justice_txid, locator=locator, uuid=uuid)
self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx,
self.appointments[uuid].end_time)
@@ -113,4 +113,4 @@ class Watcher:
self.asleep = True
self.zmq_subscriber.terminate = True
logging.error(M("[Watcher] no more pending appointments, going back to sleep"))
logger.error("no more pending appointments, going back to sleep")