Merge pull request #33 from sr-gi/21-json-logs

Changed log format to JSON
This commit is contained in:
Salvatore Ingala
2019-10-10 18:54:58 +07:00
committed by GitHub
24 changed files with 193 additions and 132 deletions

2
.gitignore vendored
View File

@@ -10,3 +10,5 @@ bitcoin.conf*
apps/cli/*.json
appointments/
test.py
*.pyc
.cache

View File

@@ -12,7 +12,7 @@ SUPPORTED_HASH_FUNCTIONS = ["SHA256"]
SUPPORTED_CIPHERS = ["AES-GCM-128"]
# Configure logging
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[
logging.basicConfig(format='%(message)s', level=logging.INFO, handlers=[
logging.FileHandler(CLIENT_LOG_FILE),
logging.StreamHandler()
])

View File

@@ -3,8 +3,10 @@ from hashlib import sha256
from binascii import hexlify, unhexlify
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from apps.cli import logging
from apps.cli import SUPPORTED_HASH_FUNCTIONS, SUPPORTED_CIPHERS
from pisa.logger import Logger
logger = Logger("Client")
class Blob:
@@ -50,10 +52,10 @@ class Blob:
encrypted_blob = aesgcm.encrypt(nonce=nonce, data=tx, associated_data=None)
encrypted_blob = hexlify(encrypted_blob).decode()
logging.info("[Client] creating new blob")
logging.info("[Client] master key: {}".format(hexlify(master_key).decode()))
logging.info("[Client] sk: {}".format(hexlify(sk).decode()))
logging.info("[Client] nonce: {}".format(hexlify(nonce).decode()))
logging.info("[Client] encrypted_blob: {}".format(encrypted_blob))
logger.info("Creating new blob",
master_key=hexlify(master_key).decode(),
sk=hexlify(sk).decode(),
nonce=hexlify(nonce).decode(),
encrypted_blob=encrypted_blob)
return encrypted_blob

View File

@@ -2,7 +2,6 @@ import re
import os
import sys
import json
import logging
import requests
from sys import argv
from hashlib import sha256
@@ -10,11 +9,15 @@ from binascii import unhexlify
from getopt import getopt, GetoptError
from requests import ConnectTimeout, ConnectionError
from pisa.logger import Logger
from apps.cli.blob import Blob
from apps.cli.help import help_add_appointment, help_get_appointment
from apps.cli import DEFAULT_PISA_API_SERVER, DEFAULT_PISA_API_PORT
logger = Logger("Client")
# FIXME: TESTING ENDPOINT, WON'T BE THERE IN PRODUCTION
def generate_dummy_appointment():
get_block_count_end_point = "http://{}:{}/get_block_count".format(pisa_api_server, pisa_api_port)
@@ -49,14 +52,14 @@ def add_appointment(args):
if os.path.isfile(fin):
appointment_data = json.load(open(fin))
else:
logging.error("[Client] can't find file " + fin)
logger.error("Can't find file " + fin)
else:
logging.error("[Client] no file provided as appointment. " + use_help)
logger.error("No file provided as appointment. " + use_help)
else:
appointment_data = json.loads(arg_opt)
except json.JSONDecodeError:
logging.error("[Client] non-JSON encoded data provided as appointment. " + use_help)
logger.error("Non-JSON encoded data provided as appointment. " + use_help)
if appointment_data:
valid_locator = check_txid_format(appointment_data.get('tx_id'))
@@ -67,22 +70,22 @@ def add_appointment(args):
appointment_data.get('start_time'), appointment_data.get('end_time'),
appointment_data.get('dispute_delta'))
logging.info("[Client] sending appointment to PISA")
logger.info("Sending appointment to PISA")
try:
r = requests.post(url=add_appointment_endpoint, json=json.dumps(appointment), timeout=5)
logging.info("[Client] {} (code: {}).".format(r.text, r.status_code))
logger.info("{} (code: {}).".format(r.text, r.status_code))
except ConnectTimeout:
logging.error("[Client] can't connect to pisa API. Connection timeout.")
logger.error("Can't connect to pisa API. Connection timeout.")
except ConnectionError:
logging.error("[Client] can't connect to pisa API. Server cannot be reached.")
logger.error("Can't connect to pisa API. Server cannot be reached.")
else:
logging.error("[Client] the provided locator is not valid.")
logger.error("The provided locator is not valid.")
else:
logging.error("[Client] no appointment data provided. " + use_help)
logger.error("No appointment data provided. " + use_help)
def get_appointment(args):
@@ -104,16 +107,16 @@ def get_appointment(args):
print(json.dumps(r.json(), indent=4, sort_keys=True))
except ConnectTimeout:
logging.error("[Client] can't connect to pisa API. Connection timeout.")
logger.error("Can't connect to pisa API. Connection timeout.")
except ConnectionError:
logging.error("[Client] can't connect to pisa API. Server cannot be reached.")
logger.error("Can't connect to pisa API. Server cannot be reached.")
else:
logging.error("[Client] the provided locator is not valid.")
logger.error("The provided locator is not valid.")
else:
logging.error("[Client] the provided locator is not valid.")
logger.error("The provided locator is not valid.")
def build_appointment(tx, tx_id, start_block, end_block, dispute_delta):
@@ -199,7 +202,7 @@ if __name__ == '__main__':
sys.exit(help_get_appointment())
else:
logging.error("[Client] unknown command. Use help to check the list of available commands")
logger.error("Unknown command. Use help to check the list of available commands")
else:
sys.exit(show_usage())
@@ -210,14 +213,14 @@ if __name__ == '__main__':
generate_dummy_appointment()
else:
logging.error("[Client] unknown command. Use help to check the list of available commands")
logger.error("Unknown command. Use help to check the list of available commands")
else:
logging.error("[Client] no command provided. Use help to check the list of available commands.")
logger.error("No command provided. Use help to check the list of available commands.")
except GetoptError as e:
logging.error("[Client] {}".format(e))
logger.error("{}".format(e))
except json.JSONDecodeError as e:
logging.error("[Client] non-JSON encoded appointment passed as parameter.")
logger.error("Non-JSON encoded appointment passed as parameter.")

View File

@@ -3,12 +3,11 @@ import logging
from pisa.utils.auth_proxy import AuthServiceProxy
import pisa.conf as conf
HOST = 'localhost'
PORT = 9814
# Configure logging
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[
logging.basicConfig(format='%(message)s', level=logging.INFO, handlers=[
logging.FileHandler(conf.SERVER_LOG_FILE),
logging.StreamHandler()
])
@@ -17,4 +16,3 @@ logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handle
# TODO: Check if a long lived connection like this may create problems (timeouts)
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (conf.BTC_RPC_USER, conf.BTC_RPC_PASSWD, conf.BTC_RPC_HOST,
conf.BTC_RPC_PORT))

View File

@@ -1,9 +1,10 @@
import json
from flask import Flask, request, Response, abort, jsonify
from pisa import HOST, PORT, logging
from pisa.logger import Logger
from pisa.watcher import Watcher
from pisa.inspector import Inspector
from pisa import HOST, PORT, logging
from pisa.appointment import Appointment
from pisa.block_processor import BlockProcessor
@@ -14,13 +15,15 @@ HTTP_OK = 200
HTTP_BAD_REQUEST = 400
HTTP_SERVICE_UNAVAILABLE = 503
logger = Logger("API")
@app.route('/', methods=['POST'])
def add_appointment():
remote_addr = request.environ.get('REMOTE_ADDR')
remote_port = request.environ.get('REMOTE_PORT')
logging.info('[API] connection accepted from {}:{}'.format(remote_addr, remote_port))
logger.info('Connection accepted', from_addr_port='{}:{}'.format(remote_addr, remote_port))
# Check content type once if properly defined
request_data = json.loads(request.get_json())
@@ -46,7 +49,8 @@ def add_appointment():
rcode = HTTP_BAD_REQUEST
response = "appointment rejected. Request does not match the standard"
logging.info('[API] sending response and disconnecting: {} --> {}:{}'.format(response, remote_addr, remote_port))
logger.info('Sending response and disconnecting',
from_addr_port='{}:{}'.format(remote_addr, remote_port), response=response)
return Response(response, status=rcode, mimetype='text/plain')

View File

@@ -1,9 +1,12 @@
import binascii
from hashlib import sha256
from pisa import logging, bitcoin_cli
from pisa import bitcoin_cli
from pisa.logger import Logger
from pisa.utils.auth_proxy import JSONRPCException
logger = Logger("BlockProcessor")
class BlockProcessor:
@staticmethod
@@ -14,7 +17,7 @@ class BlockProcessor:
except JSONRPCException as e:
block = None
logging.error("[BlockProcessor] couldn't get block from bitcoind. Error code {}".format(e))
logger.error("Couldn't get block from bitcoind.", error_code=e)
return block
@@ -26,7 +29,7 @@ class BlockProcessor:
except JSONRPCException as e:
block_hash = None
logging.error("[BlockProcessor] couldn't get block hash. Error code {}".format(e))
logger.error("Couldn't get block hash.", error_code=e)
return block_hash
@@ -38,7 +41,7 @@ class BlockProcessor:
except JSONRPCException as e:
block_count = None
logging.error("[BlockProcessor] couldn't get block block count. Error code {}".format(e))
logger.error("Couldn't get block count", error_code=e)
return block_count
@@ -54,10 +57,10 @@ class BlockProcessor:
potential_matches = {locator: potential_locators[locator] for locator in intersection}
if len(potential_matches) > 0:
logging.info("[BlockProcessor] list of potential matches: {}".format(potential_matches))
logger.info("List of potential matches", potential_matches=potential_matches)
else:
logging.info("[BlockProcessor] no potential matches found")
logger.info("No potential matches found")
return potential_matches
@@ -73,13 +76,12 @@ class BlockProcessor:
justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid')
matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx))
logging.info("[BlockProcessor] match found for locator {} (uuid: {}): {}".format(
locator, uuid, justice_txid))
logger.info("Match found for locator.", locator=locator, uuid=uuid, justice_txid=justice_txid)
except JSONRPCException as e:
# Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple
# for the POC
logging.error("[BlockProcessor] can't build transaction from decoded data. Error code {}".format(e))
logger.error("Can't build transaction from decoded data.", error_code=e)
return matches
@@ -91,7 +93,7 @@ class BlockProcessor:
if tx in tx_job_map and tx in unconfirmed_txs:
unconfirmed_txs.remove(tx)
logging.info("[Responder] confirmation received for tx {}".format(tx))
logger.info("Confirmation received for transaction", tx=tx)
elif tx in unconfirmed_txs:
if tx in missed_confirmations:
@@ -100,8 +102,6 @@ class BlockProcessor:
else:
missed_confirmations[tx] = 1
logging.info("[Responder] tx {} missed a confirmation (total missed: {})"
.format(tx, missed_confirmations[tx]))
logger.info("Transaction missed a confirmation", tx=tx, missed_confirmations=missed_confirmations[tx])
return unconfirmed_txs, missed_confirmations

View File

@@ -1,8 +1,11 @@
from pisa.rpc_errors import *
from pisa import logging, bitcoin_cli
from pisa import bitcoin_cli
from pisa.logger import Logger
from pisa.utils.auth_proxy import JSONRPCException
from pisa.errors import UNKNOWN_JSON_RPC_EXCEPTION
logger = Logger("Carrier")
class Receipt:
def __init__(self, delivered, confirmations=0, reason=None):
@@ -14,7 +17,7 @@ class Receipt:
class Carrier:
def send_transaction(self, rawtx, txid):
try:
logging.info("[Carrier] pushing transaction to the network (txid: {})".format(rawtx))
logger.info("Pushing transaction to the network", txid=txid, rawtx=rawtx)
bitcoin_cli.sendrawtransaction(rawtx)
receipt = Receipt(delivered=True)
@@ -41,7 +44,7 @@ class Carrier:
receipt = Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION)
elif errno == RPC_VERIFY_ALREADY_IN_CHAIN:
logging.info("[Carrier] {} is already in the blockchain. Getting confirmation count".format(txid))
logger.info("Transaction is already in the blockchain. Getting confirmation count", txid=txid)
# If the transaction is already in the chain, we get the number of confirmations and watch the job
# until the end of the appointment
@@ -52,7 +55,7 @@ class Carrier:
receipt = Receipt(delivered=True, confirmations=confirmations, reason=RPC_VERIFY_ALREADY_IN_CHAIN)
else:
# There's a really unlike edge case where a transaction can be reorged between receiving the
# There's a really unlikely edge case where a transaction can be reorged between receiving the
# notification and querying the data. In such a case we just resend
self.send_transaction(rawtx, txid)
@@ -65,8 +68,8 @@ class Carrier:
else:
# If something else happens (unlikely but possible) log it so we can treat it in future releases
logging.error("[Responder] JSONRPCException. Error {}".format(e))
receipt = Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION)
logger.error("JSONRPCException.", error_code=e)
receipt = self.Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION)
return receipt
@@ -81,12 +84,11 @@ class Carrier:
# reorged while we were querying bitcoind to get the confirmation count. In such a case we just
# restart the job
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
logging.info("[Carrier] transaction {} got reorged before obtaining information".format(txid))
logger.info("Transaction got reorged before obtaining information", txid=txid)
# TODO: Check RPC methods to see possible returns and avoid general else
# else:
# # If something else happens (unlikely but possible) log it so we can treat it in future releases
# logging.error("[Responder] JSONRPCException. Error {}".format(e))
# logger.error("JSONRPCException.", error_code=e)
return tx_info

View File

@@ -1,4 +1,6 @@
from pisa import logging
from pisa.logger import Logger
logger = Logger("Cleaner")
# Dictionaries in Python are "passed-by-reference", so no return is needed for the Cleaner"
# https://docs.python.org/3/faq/programming.html#how-do-i-write-a-function-with-output-parameters-call-by-reference
@@ -18,14 +20,13 @@ class Cleaner:
else:
locator_uuid_map[locator].remove(uuid)
logging.info("[Cleaner] end time reached with no match! Deleting appointment {} (uuid: {})".format(locator,
uuid))
logger.info("End time reached with no match. Deleting appointment.", locator=locator, uuid=uuid)
@staticmethod
def delete_completed_jobs(jobs, tx_job_map, completed_jobs, height):
for uuid, confirmations in completed_jobs:
logging.info("[Cleaner] job completed (uuid = {}). Appointment ended at block {} after {} confirmations"
.format(uuid, height, confirmations))
logger.info("Job completed. Appointment ended after reaching enough confirmations.",
uuid=uuid, height=height, confirmations=confirmations)
# ToDo: #9-add-data-persistence
justice_txid = jobs[uuid].justice_txid
@@ -34,7 +35,7 @@ class Cleaner:
if len(tx_job_map[justice_txid]) == 1:
tx_job_map.pop(justice_txid)
logging.info("[Cleaner] no more jobs for justice_txid {}".format(justice_txid))
logger.info("No more jobs for justice transaction.", justice_txid=justice_txid)
else:
tx_job_map[justice_txid].remove(uuid)

View File

@@ -1,8 +1,9 @@
from hashlib import sha256
from binascii import unhexlify, hexlify
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from pisa.logger import Logger
from pisa import logging
logger = Logger("Watcher")
# FIXME: EncryptedBlob is assuming AES-128-GCM. A cipher field should be part of the object and the decryption should be
@@ -23,11 +24,11 @@ class EncryptedBlob:
sk = master_key[:16]
nonce = master_key[16:]
logging.info("[Watcher] creating new blob")
logging.info("[Watcher] master key: {}".format(hexlify(master_key).decode()))
logging.info("[Watcher] sk: {}".format(hexlify(sk).decode()))
logging.info("[Watcher] nonce: {}".format(hexlify(nonce).decode()))
logging.info("[Watcher] encrypted_blob: {}".format(self.data))
logger.info("Creating new blob.",
master_key=hexlify(master_key).decode(),
sk=hexlify(sk).decode(),
nonce=hexlify(sk).decode(),
encrypted_blob=self.data)
# Decrypt
aesgcm = AESGCM(sk)

View File

@@ -2,10 +2,13 @@ import re
from pisa import errors
import pisa.conf as conf
from pisa import logging
from pisa import bitcoin_cli
from pisa.logger import Logger
from pisa.appointment import Appointment
from pisa.block_processor import BlockProcessor
logger = Logger("Inspector")
# FIXME: The inspector logs the wrong messages sent form the users. A possible attack surface would be to send a really
# long field that, even if not accepted by PISA, would be stored in the logs. This is a possible DoS surface
# since pisa would store any kind of message (no matter the length). Solution: truncate the length of the fields
@@ -71,7 +74,7 @@ class Inspector:
message = "wrong locator format ({})".format(locator)
if message is not None:
logging.error("[Inspector] {}".format(message))
logger.error(message)
return rcode, message
@@ -99,7 +102,7 @@ class Inspector:
message = "start_time is too close to current height"
if message is not None:
logging.error("[Inspector] {}".format(message))
logger.error(message)
return rcode, message
@@ -133,7 +136,7 @@ class Inspector:
message = 'end_time is too close to current height'
if message is not None:
logging.error("[Inspector] {}".format(message))
logger.error(message)
return rcode, message
@@ -156,7 +159,7 @@ class Inspector:
conf.MIN_DISPUTE_DELTA, dispute_delta)
if message is not None:
logging.error("[Inspector] {}".format(message))
logger.error(message)
return rcode, message
@@ -179,7 +182,7 @@ class Inspector:
message = "wrong encrypted_blob format ({})".format(encrypted_blob)
if message is not None:
logging.error("[Inspector] {}".format(message))
logger.error(message)
return rcode, message
@@ -201,7 +204,7 @@ class Inspector:
message = "cipher not supported: {}".format(cipher)
if message is not None:
logging.error("[Inspector] {}".format(message))
logger.error(message)
return rcode, message
@@ -223,6 +226,6 @@ class Inspector:
message = "hash_function not supported {}".format(hash_function)
if message is not None:
logging.error("[Inspector] {}".format(message))
logger.error(message)
return rcode, message

33
pisa/logger.py Normal file
View File

@@ -0,0 +1,33 @@
import logging
import time
import json
class StructuredMessage(object):
def __init__(self, message, **kwargs):
self.message = message
self.time = time.asctime()
self.kwargs = kwargs
def __str__(self):
return json.dumps({**self.kwargs, "message": self.message, "time": self.time})
class Logger(object):
def __init__(self, actor=None):
self.actor = actor
def _add_prefix(self, msg):
return msg if self.actor is None else "[{}] {}".format(self.actor, msg)
def info(self, msg, **kwargs):
logging.info(StructuredMessage(self._add_prefix(msg), actor=self.actor, **kwargs))
def debug(self, msg, **kwargs):
logging.debug(StructuredMessage(self._add_prefix(msg), actor=self.actor, **kwargs))
def error(self, msg, **kwargs):
logging.error(StructuredMessage(self._add_prefix(msg), actor=self.actor, **kwargs))
def warning(self, msg, **kwargs):
logging.warning(StructuredMessage(self._add_prefix(msg), actor=self.actor, **kwargs))

View File

@@ -1,10 +1,11 @@
from sys import argv
from getopt import getopt
from pisa import logging
from pisa.logger import Logger
from pisa.api import start_api
from pisa.tools import can_connect_to_bitcoind, in_correct_network
logger = Logger("Daemon")
if __name__ == '__main__':
debug = False
@@ -19,8 +20,7 @@ if __name__ == '__main__':
start_api()
else:
logging.error("[Pisad] bitcoind is running on a different network, check conf.py and bitcoin.conf. "
"Shutting down")
logger.error("bitcoind is running on a different network, check conf.py and bitcoin.conf. Shutting down")
else:
logging.error("[Pisad] can't connect to bitcoind. Shutting down")
logger.error("Can't connect to bitcoind. Shutting down")

View File

@@ -3,9 +3,9 @@ from threading import Thread
from hashlib import sha256
from binascii import unhexlify
from pisa.logger import Logger
from pisa.cleaner import Cleaner
from pisa.carrier import Carrier
from pisa import logging
from pisa.tools import check_tx_in_chain
from pisa.block_processor import BlockProcessor
from pisa.utils.zmq_subscriber import ZMQHandler
@@ -13,6 +13,8 @@ from pisa.utils.zmq_subscriber import ZMQHandler
CONFIRMATIONS_BEFORE_RETRY = 6
MIN_CONFIRMATIONS = 6
logger = Logger("Responder")
class Job:
def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry_counter=0):
@@ -45,7 +47,7 @@ class Responder:
def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False):
if self.asleep:
logging.info("[Responder] waking up!")
logger.info("Waking up")
carrier = Carrier()
receipt = carrier.send_transaction(justice_rawtx, justice_txid)
@@ -80,8 +82,8 @@ class Responder:
if confirmations == 0:
self.unconfirmed_txs.append(justice_txid)
logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'
.format(dispute_txid, justice_txid, appointment_end))
logger.info("New job added.",
dispute_txid=dispute_txid, justice_txid=justice_txid, appointment_end=appointment_end)
if self.asleep:
self.asleep = False
@@ -109,9 +111,8 @@ class Responder:
txs = block.get('tx')
height = block.get('height')
logging.info("[Responder] new block received {}".format(block_hash))
logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash')))
logging.info("[Responder] list of transactions: {}".format(txs))
logger.info("New block received",
block_hash=block_hash, prev_block_hash=block.get('previousblockhash'), txs=txs)
# ToDo: #9-add-data-persistence
# change prev_block_hash condition
@@ -125,8 +126,9 @@ class Responder:
self.rebroadcast(txs_to_rebroadcast)
else:
logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}"
.format(prev_block_hash, block.get('previousblockhash')))
logger.warning("Reorg found",
local_prev_block_hash=prev_block_hash,
remote_prev_block_hash=block.get('previousblockhash'))
self.handle_reorgs()
@@ -136,7 +138,7 @@ class Responder:
self.asleep = True
self.zmq_subscriber.terminate = True
logging.info("[Responder] no more pending jobs, going back to sleep")
logger.info("No more pending jobs, going back to sleep")
def get_txs_to_rebroadcast(self, txs):
txs_to_rebroadcast = []
@@ -172,25 +174,28 @@ class Responder:
self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid,
self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True)
logging.warning("[Responder] tx {} has missed {} confirmations. Rebroadcasting"
.format(self.jobs[uuid].justice_txid, CONFIRMATIONS_BEFORE_RETRY))
logger.warning("Transaction has missed many confirmations. Rebroadcasting.",
justice_txid=self.jobs[uuid].justice_txid,
confirmations_missed=CONFIRMATIONS_BEFORE_RETRY)
# FIXME: Legacy code, must be checked and updated/fixed
def handle_reorgs(self):
for uuid, job in self.jobs.items():
# First we check if the dispute transaction is still in the blockchain. If not, the justice can not be
# there either, so we'll need to call the reorg manager straight away
dispute_in_chain, _ = check_tx_in_chain(job.dispute_txid, parent='Responder', tx_label='dispute tx')
dispute_in_chain, _ = check_tx_in_chain(job.dispute_txid, logger=logger, tx_label='Dispute tx')
# If the dispute is there, we can check the justice tx
if dispute_in_chain:
justice_in_chain, justice_confirmations = check_tx_in_chain(job.justice_txid, parent='Responder',
tx_label='justice tx')
justice_in_chain, justice_confirmations = check_tx_in_chain(job.justice_txid, logger=logger,
tx_label='Justice tx')
# If both transactions are there, we only need to update the justice tx confirmation count
if justice_in_chain:
logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format(
job.justice_txid, job.confirmations, justice_confirmations))
logger.info("Updating confirmation count for transaction.",
justice_txid=job.justice_txid,
prev_count=job.confirmations,
curr_count=justice_confirmations)
job.confirmations = justice_confirmations
@@ -203,7 +208,7 @@ class Responder:
else:
# ToDo: #24-properly-handle-reorgs
# FIXME: if the dispute is not on chain (either in mempool or not there al all), we need to call the
# FIXME: if the dispute is not on chain (either in mempool or not there at all), we need to call the
# reorg manager
logging.warning("[Responder] dispute and justice transaction missing. Calling the reorg manager")
logging.error("[Responder] reorg manager not yet implemented")
logger.warning("Dispute and justice transaction missing. Calling the reorg manager")
logger.error("Reorg manager not yet implemented")

View File

@@ -2,12 +2,14 @@ import re
from http.client import HTTPException
import pisa.conf as conf
from pisa import logging, bitcoin_cli
from pisa import bitcoin_cli
from pisa.logger import Logger
from pisa.utils.auth_proxy import JSONRPCException
from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY
def check_tx_in_chain(tx_id, parent='', tx_label='transaction'):
# TODO: currently only used in the Responder; might move there or in the BlockProcessor
def check_tx_in_chain(tx_id, logger=Logger(), tx_label='Transaction'):
tx_in_chain = False
confirmations = 0
@@ -17,18 +19,18 @@ def check_tx_in_chain(tx_id, parent='', tx_label='transaction'):
if tx_info.get("confirmations"):
confirmations = int(tx_info.get("confirmations"))
tx_in_chain = True
logging.error("[{}] {} found in the blockchain (txid: {}) ".format(parent, tx_label, tx_id))
logger.error("{} found in the blockchain".format(tx_label), txid=tx_id)
else:
logging.error("[{}] {} found in mempool (txid: {}) ".format(parent, tx_label, tx_id))
logger.error("{} found in mempool".format(tx_label), txid=tx_id)
except JSONRPCException as e:
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
logging.error("[{}] {} not found in mempool nor blockchain (txid: {}) ".format(parent, tx_label, tx_id))
logger.error("{} not found in mempool nor blockchain".format(tx_label), txid=tx_id)
else:
# ToDO: Unhandled errors, check this properly
logging.error("[{}] JSONRPCException. Error code {}".format(parent, e))
logger.error("JSONRPCException.", error_code=e)
return tx_in_chain, confirmations

View File

@@ -1,7 +1,6 @@
import zmq
import binascii
from pisa import logging
from pisa.logger import Logger
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT
@@ -14,7 +13,8 @@ class ZMQHandler:
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT))
self.parent = parent
self.logger = Logger("ZMQHandler-{}".format(parent))
self.terminate = False
def handle(self, block_queue):
@@ -30,4 +30,4 @@ class ZMQHandler:
block_hash = binascii.hexlify(body).decode('UTF-8')
block_queue.put(block_hash)
logging.info("[ZMQHandler-{}] new block received via ZMQ".format(self.parent, block_hash))
self.logger.info("New block received via ZMQ", block_hash=block_hash)

View File

@@ -2,7 +2,7 @@ from uuid import uuid4
from queue import Queue
from threading import Thread
from pisa import logging
from pisa.logger import Logger
from pisa.cleaner import Cleaner
from pisa.conf import EXPIRY_DELTA
from pisa.responder import Responder
@@ -10,6 +10,8 @@ from pisa.conf import MAX_APPOINTMENTS
from pisa.block_processor import BlockProcessor
from pisa.utils.zmq_subscriber import ZMQHandler
logger = Logger("Watcher")
class Watcher:
def __init__(self, max_appointments=MAX_APPOINTMENTS):
@@ -52,35 +54,34 @@ class Watcher:
zmq_thread.start()
watcher.start()
logging.info("[Watcher] waking up!")
logger.info("Waking up")
appointment_added = True
logging.info('[Watcher] new appointment accepted (locator = {})'.format(appointment.locator))
logger.info("New appointment accepted.", locator=appointment.locator)
else:
appointment_added = False
logging.info('[Watcher] maximum appointments reached, appointment rejected (locator = {})'.format(
appointment.locator))
logger.info("Maximum appointments reached, appointment rejected.", locator=appointment.locator)
return appointment_added
def do_subscribe(self, block_queue):
self.zmq_subscriber = ZMQHandler(parent='Watcher')
self.zmq_subscriber = ZMQHandler(parent="Watcher")
self.zmq_subscriber.handle(block_queue)
def do_watch(self):
while len(self.appointments) > 0:
block_hash = self.block_queue.get()
logging.info("[Watcher] new block received {}".format(block_hash))
logger.info("New block received", block_hash=block_hash)
block = BlockProcessor.get_block(block_hash)
if block is not None:
txids = block.get('tx')
logging.info("[Watcher] list of transactions: {}".format(txids))
logger.info("List of transactions.", txids=txids)
expired_appointments = [uuid for uuid, appointment in self.appointments.items()
if block["height"] > appointment.end_time + EXPIRY_DELTA]
@@ -91,8 +92,8 @@ class Watcher:
matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments)
for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches:
logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})"
.format(justice_txid, locator, uuid))
logger.info("Notifying responder and deleting appointment.",
justice_txid=justice_txid, locator=locator, uuid=uuid)
self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx,
self.appointments[uuid].end_time)
@@ -113,4 +114,4 @@ class Watcher:
self.asleep = True
self.zmq_subscriber.terminate = True
logging.error("[Watcher] no more pending appointments, going back to sleep")
logger.error("No more pending appointments, going back to sleep")

View File

@@ -183,8 +183,3 @@ def test_get_all_appointments_responder():
assert (set(responder_jobs) == set(local_locators))
assert (len(received_appointments["watcher_appointments"]) == 0)

View File

@@ -87,5 +87,3 @@ def test_encrypt():
encrypted_blob2 = blob.encrypt(key)
assert(encrypted_blob == encrypted_blob2 and id(encrypted_blob) != id(encrypted_blob2))

View File

@@ -73,4 +73,3 @@ def test_potential_matches_random_data(locator_uuid_map):
# None of the txids should match
assert len(potential_matches) == 0

View File

@@ -78,4 +78,3 @@ def test_delete_completed_jobs():
Cleaner.delete_completed_jobs(jobs, tx_job_map, completed_jobs, 0)
assert not set(completed_jobs).issubset(jobs.keys())

View File

@@ -34,6 +34,3 @@ def test_decrypt():
encrypted_blob = EncryptedBlob(encrypted_data)
assert(encrypted_blob.decrypt(key) == data)

View File

@@ -230,4 +230,3 @@ def test_inspect():
and appointment.end_time == end_time and appointment.dispute_delta == dispute_delta and
appointment.encrypted_blob.data == encrypted_blob and appointment.cipher == cipher and
appointment.hash_function == hash_function)

17
test/unit/test_tools.py Normal file
View File

@@ -0,0 +1,17 @@
from pisa.tools import check_txid_format
from pisa import logging
logging.getLogger().disabled = True
def test_check_txid_format():
assert(check_txid_format(None) is False)
assert(check_txid_format("") is False)
assert(check_txid_format(0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef) is False) # wrong type
assert(check_txid_format("abcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") is True) # lowercase
assert(check_txid_format("ABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCD") is True) # uppercase
assert(check_txid_format("0123456789abcdef0123456789ABCDEF0123456789abcdef0123456789ABCDEF") is True) # mixed case
assert(check_txid_format("0123456789012345678901234567890123456789012345678901234567890123") is True) # only nums
assert(check_txid_format("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdf") is False) # too short
assert(check_txid_format("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0") is False) # too long
assert(check_txid_format("g123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef") is False) # non-hex