Fixes from PR review

This commit is contained in:
Salvatore Ingala
2019-10-10 10:34:03 +07:00
parent 831545ef2c
commit aea1d1f1e0
11 changed files with 43 additions and 44 deletions

View File

@@ -3,7 +3,6 @@ from hashlib import sha256
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from apps.cli import logging
from apps.cli import SUPPORTED_HASH_FUNCTIONS, SUPPORTED_CIPHERS from apps.cli import SUPPORTED_HASH_FUNCTIONS, SUPPORTED_CIPHERS
from pisa.logger import Logger from pisa.logger import Logger
@@ -53,7 +52,7 @@ class Blob:
encrypted_blob = aesgcm.encrypt(nonce=nonce, data=tx, associated_data=None) encrypted_blob = aesgcm.encrypt(nonce=nonce, data=tx, associated_data=None)
encrypted_blob = hexlify(encrypted_blob).decode() encrypted_blob = hexlify(encrypted_blob).decode()
logger.info("creating new blob", logger.info("Creating new blob",
master_key=hexlify(master_key).decode(), master_key=hexlify(master_key).decode(),
sk=hexlify(sk).decode(), sk=hexlify(sk).decode(),
nonce=hexlify(nonce).decode(), nonce=hexlify(nonce).decode(),

View File

@@ -24,7 +24,7 @@ def add_appointment():
remote_addr = request.environ.get('REMOTE_ADDR') remote_addr = request.environ.get('REMOTE_ADDR')
remote_port = request.environ.get('REMOTE_PORT') remote_port = request.environ.get('REMOTE_PORT')
logger.info('connection accepted', from_addr_port='{}:{}'.format(remote_addr, remote_port)) logger.info('Connection accepted', from_addr_port='{}:{}'.format(remote_addr, remote_port))
# Check content type once if properly defined # Check content type once if properly defined
request_data = json.loads(request.get_json()) request_data = json.loads(request.get_json())
@@ -50,7 +50,7 @@ def add_appointment():
rcode = HTTP_BAD_REQUEST rcode = HTTP_BAD_REQUEST
response = "appointment rejected. Request does not match the standard" response = "appointment rejected. Request does not match the standard"
logger.info('sending response and disconnecting', logger.info('Sending response and disconnecting',
from_addr_port='{}:{}'.format(remote_addr, remote_port), response=response) from_addr_port='{}:{}'.format(remote_addr, remote_port), response=response)
return Response(response, status=rcode, mimetype='text/plain') return Response(response, status=rcode, mimetype='text/plain')

View File

@@ -17,7 +17,7 @@ class BlockProcessor:
except JSONRPCException as e: except JSONRPCException as e:
block = None block = None
logger.error("couldn't get block from bitcoind.", error_code=e) logger.error("Couldn't get block from bitcoind.", error_code=e)
return block return block
@@ -29,7 +29,7 @@ class BlockProcessor:
except JSONRPCException as e: except JSONRPCException as e:
block_hash = None block_hash = None
logger.error("couldn't get block hash.", error_code=e) logger.error("Couldn't get block hash.", error_code=e)
return block_hash return block_hash
@@ -41,7 +41,7 @@ class BlockProcessor:
except JSONRPCException as e: except JSONRPCException as e:
block_count = None block_count = None
logger.error("couldn't get block block count", error_code=e) logger.error("Couldn't get block count", error_code=e)
return block_count return block_count
@@ -57,10 +57,10 @@ class BlockProcessor:
potential_matches = {locator: potential_locators[locator] for locator in intersection} potential_matches = {locator: potential_locators[locator] for locator in intersection}
if len(potential_matches) > 0: if len(potential_matches) > 0:
logger.info("list of potential matches", potential_matches=potential_matches) logger.info("List of potential matches", potential_matches=potential_matches)
else: else:
logger.info("no potential matches found") logger.info("No potential matches found")
return potential_matches return potential_matches
@@ -76,12 +76,12 @@ class BlockProcessor:
justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid') justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid')
matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx)) matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx))
logger.info("match found for locator.", locator=locator, uuid=uuid, justice_txid=justice_txid) logger.info("Match found for locator.", locator=locator, uuid=uuid, justice_txid=justice_txid)
except JSONRPCException as e: except JSONRPCException as e:
# Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple # Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple
# for the POC # for the POC
logger.error("can't build transaction from decoded data.", error_code=e) logger.error("Can't build transaction from decoded data.", error_code=e)
return matches return matches
@@ -93,7 +93,7 @@ class BlockProcessor:
if tx in tx_job_map and tx in unconfirmed_txs: if tx in tx_job_map and tx in unconfirmed_txs:
unconfirmed_txs.remove(tx) unconfirmed_txs.remove(tx)
logger.info("confirmation received for transaction", tx=tx) logger.info("Confirmation received for transaction", tx=tx)
elif tx in unconfirmed_txs: elif tx in unconfirmed_txs:
if tx in missed_confirmations: if tx in missed_confirmations:
@@ -102,6 +102,6 @@ class BlockProcessor:
else: else:
missed_confirmations[tx] = 1 missed_confirmations[tx] = 1
logger.info("transaction missed a confirmation", tx=tx, missed_confirmations=missed_confirmations[tx]) logger.info("Transaction missed a confirmation", tx=tx, missed_confirmations=missed_confirmations[tx])
return unconfirmed_txs, missed_confirmations return unconfirmed_txs, missed_confirmations

View File

@@ -16,7 +16,7 @@ class Carrier:
def send_transaction(self, rawtx, txid): def send_transaction(self, rawtx, txid):
try: try:
logger.info("pushing transaction to the network", txid=txid, rawtx=rawtx) logger.info("Pushing transaction to the network", txid=txid, rawtx=rawtx)
bitcoin_cli.sendrawtransaction(rawtx) bitcoin_cli.sendrawtransaction(rawtx)
receipt = self.Receipt(delivered=True) receipt = self.Receipt(delivered=True)
@@ -69,7 +69,7 @@ class Carrier:
# reorged while we were querying bitcoind to get the confirmation count. In such a case we just # reorged while we were querying bitcoind to get the confirmation count. In such a case we just
# restart the job # restart the job
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
logger.info("transaction got reorged before obtaining information", txid=txid) logger.info("Transaction got reorged before obtaining information", txid=txid)
# TODO: Check RPC methods to see possible returns and avoid general else # TODO: Check RPC methods to see possible returns and avoid general else
# else: # else:

View File

@@ -20,12 +20,12 @@ class Cleaner:
else: else:
locator_uuid_map[locator].remove(uuid) locator_uuid_map[locator].remove(uuid)
logger.info("end time reached with no match! Deleting appointment.", locator=locator, uuid=uuid) logger.info("End time reached with no match. Deleting appointment.", locator=locator, uuid=uuid)
@staticmethod @staticmethod
def delete_completed_jobs(jobs, tx_job_map, completed_jobs, height): def delete_completed_jobs(jobs, tx_job_map, completed_jobs, height):
for uuid, confirmations in completed_jobs: for uuid, confirmations in completed_jobs:
logger.info("job completed. Appointment ended after reaching enough confirmations.", logger.info("Job completed. Appointment ended after reaching enough confirmations.",
uuid=uuid, height=height, confirmations=confirmations) uuid=uuid, height=height, confirmations=confirmations)
# ToDo: #9-add-data-persistence # ToDo: #9-add-data-persistence
@@ -35,7 +35,7 @@ class Cleaner:
if len(tx_job_map[justice_txid]) == 1: if len(tx_job_map[justice_txid]) == 1:
tx_job_map.pop(justice_txid) tx_job_map.pop(justice_txid)
logger.info("no more jobs for justice transaction.", justice_txid=justice_txid) logger.info("No more jobs for justice transaction.", justice_txid=justice_txid)
else: else:
tx_job_map[justice_txid].remove(uuid) tx_job_map[justice_txid].remove(uuid)

View File

@@ -24,7 +24,7 @@ class EncryptedBlob:
sk = master_key[:16] sk = master_key[:16]
nonce = master_key[16:] nonce = master_key[16:]
logger.info("[Watcher] creating new blob.", logger.info("Creating new blob.",
master_key=hexlify(master_key).decode(), master_key=hexlify(master_key).decode(),
sk=hexlify(sk).decode(), sk=hexlify(sk).decode(),
nonce=hexlify(sk).decode(), nonce=hexlify(sk).decode(),

View File

@@ -1,7 +1,6 @@
from sys import argv from sys import argv
from getopt import getopt from getopt import getopt
from pisa import logging
from pisa.logger import Logger from pisa.logger import Logger
from pisa.api import start_api from pisa.api import start_api
from pisa.tools import can_connect_to_bitcoind, in_correct_network from pisa.tools import can_connect_to_bitcoind, in_correct_network

View File

@@ -47,7 +47,7 @@ class Responder:
def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False): def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False):
if self.asleep: if self.asleep:
logger.info("waking up!") logger.info("Waking up")
carrier = Carrier() carrier = Carrier()
receipt = carrier.send_transaction(justice_rawtx, justice_txid) receipt = carrier.send_transaction(justice_rawtx, justice_txid)
@@ -82,7 +82,7 @@ class Responder:
if confirmations == 0: if confirmations == 0:
self.unconfirmed_txs.append(justice_txid) self.unconfirmed_txs.append(justice_txid)
logger.info("new job added.", logger.info("New job added.",
dispute_txid=dispute_txid, justice_txid=justice_txid, appointment_end=appointment_end) dispute_txid=dispute_txid, justice_txid=justice_txid, appointment_end=appointment_end)
if self.asleep: if self.asleep:
@@ -111,7 +111,7 @@ class Responder:
txs = block.get('tx') txs = block.get('tx')
height = block.get('height') height = block.get('height')
logger.info("new block received", logger.info("New block received",
block_hash=block_hash, prev_block_hash=block.get('previousblockhash'), txs=txs) block_hash=block_hash, prev_block_hash=block.get('previousblockhash'), txs=txs)
# ToDo: #9-add-data-persistence # ToDo: #9-add-data-persistence
@@ -126,9 +126,9 @@ class Responder:
self.rebroadcast(txs_to_rebroadcast) self.rebroadcast(txs_to_rebroadcast)
else: else:
logger.warning("reorg found!", logger.warn("Reorg found",
local_prev_block_hash=prev_block_hash, local_prev_block_hash=prev_block_hash,
remote_prev_block_hash=block.get('previousblockhash')) remote_prev_block_hash=block.get('previousblockhash'))
self.handle_reorgs() self.handle_reorgs()
@@ -138,7 +138,7 @@ class Responder:
self.asleep = True self.asleep = True
self.zmq_subscriber.terminate = True self.zmq_subscriber.terminate = True
logger.info("no more pending jobs, going back to sleep") logger.info("No more pending jobs, going back to sleep")
def get_txs_to_rebroadcast(self, txs): def get_txs_to_rebroadcast(self, txs):
txs_to_rebroadcast = [] txs_to_rebroadcast = []
@@ -174,25 +174,25 @@ class Responder:
self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid, self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid,
self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True) self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True)
logger.warning("Transaction has missed many confirmations. Rebroadcasting.", logger.warn("Transaction has missed many confirmations. Rebroadcasting.",
justice_txid=self.jobs[uuid].justice_txid, justice_txid=self.jobs[uuid].justice_txid,
confirmations_missed=CONFIRMATIONS_BEFORE_RETRY) confirmations_missed=CONFIRMATIONS_BEFORE_RETRY)
# FIXME: Legacy code, must be checked and updated/fixed # FIXME: Legacy code, must be checked and updated/fixed
def handle_reorgs(self): def handle_reorgs(self):
for uuid, job in self.jobs.items(): for uuid, job in self.jobs.items():
# First we check if the dispute transaction is still in the blockchain. If not, the justice can not be # First we check if the dispute transaction is still in the blockchain. If not, the justice can not be
# there either, so we'll need to call the reorg manager straight away # there either, so we'll need to call the reorg manager straight away
dispute_in_chain, _ = check_tx_in_chain(job.dispute_txid, logger=logger, tx_label='dispute tx') dispute_in_chain, _ = check_tx_in_chain(job.dispute_txid, logger=logger, tx_label='Dispute tx')
# If the dispute is there, we can check the justice tx # If the dispute is there, we can check the justice tx
if dispute_in_chain: if dispute_in_chain:
justice_in_chain, justice_confirmations = check_tx_in_chain(job.justice_txid, logger=logger, justice_in_chain, justice_confirmations = check_tx_in_chain(job.justice_txid, logger=logger,
tx_label='justice tx') tx_label='Justice tx')
# If both transactions are there, we only need to update the justice tx confirmation count # If both transactions are there, we only need to update the justice tx confirmation count
if justice_in_chain: if justice_in_chain:
logger.info("updating confirmation count for transaction.", logger.info("Updating confirmation count for transaction.",
justice_txid=job.justice_txid, justice_txid=job.justice_txid,
prev_count=job.confirmations, prev_count=job.confirmations,
curr_count=justice_confirmations) curr_count=justice_confirmations)
@@ -210,5 +210,5 @@ class Responder:
# ToDo: #24-properly-handle-reorgs # ToDo: #24-properly-handle-reorgs
# FIXME: if the dispute is not on chain (either in mempool or not there at all), we need to call the # FIXME: if the dispute is not on chain (either in mempool or not there at all), we need to call the
# reorg manager # reorg manager
logger.warning("dispute and justice transaction missing. Calling the reorg manager") logger.warn("Dispute and justice transaction missing. Calling the reorg manager")
logger.error("reorg manager not yet implemented") logger.error("Reorg manager not yet implemented")

View File

@@ -8,7 +8,8 @@ from pisa.utils.auth_proxy import JSONRPCException
from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY
def check_tx_in_chain(tx_id, logger=Logger(), tx_label='transaction'): # TODO: currently only used in the Responder; might move there or in the BlockProcessor
def check_tx_in_chain(tx_id, logger=Logger(), tx_label='Transaction'):
tx_in_chain = False tx_in_chain = False
confirmations = 0 confirmations = 0

View File

@@ -30,4 +30,4 @@ class ZMQHandler:
block_hash = binascii.hexlify(body).decode('UTF-8') block_hash = binascii.hexlify(body).decode('UTF-8')
block_queue.put(block_hash) block_queue.put(block_hash)
self.logger.info("new block received via ZMQ", block_hash=block_hash) self.logger.info("New block received via ZMQ", block_hash=block_hash)

View File

@@ -53,16 +53,16 @@ class Watcher:
zmq_thread.start() zmq_thread.start()
watcher.start() watcher.start()
logger.info("waking up!") logger.info("Waking up")
appointment_added = True appointment_added = True
logger.info("new appointment accepted.", locator=appointment.locator) logger.info("New appointment accepted.", locator=appointment.locator)
else: else:
appointment_added = False appointment_added = False
logger.info("maximum appointments reached, appointment rejected.", locator=appointment.locator) logger.info("Maximum appointments reached, appointment rejected.", locator=appointment.locator)
return appointment_added return appointment_added
@@ -73,14 +73,14 @@ class Watcher:
def do_watch(self): def do_watch(self):
while len(self.appointments) > 0: while len(self.appointments) > 0:
block_hash = self.block_queue.get() block_hash = self.block_queue.get()
logger.info("new block received", block_hash=block_hash) logger.info("New block received", block_hash=block_hash)
block = BlockProcessor.get_block(block_hash) block = BlockProcessor.get_block(block_hash)
if block is not None: if block is not None:
txids = block.get('tx') txids = block.get('tx')
logger.info("list of transactions.", txids=txids) logger.info("List of transactions.", txids=txids)
expired_appointments = [uuid for uuid, appointment in self.appointments.items() expired_appointments = [uuid for uuid, appointment in self.appointments.items()
if block["height"] > appointment.end_time + EXPIRY_DELTA] if block["height"] > appointment.end_time + EXPIRY_DELTA]
@@ -91,7 +91,7 @@ class Watcher:
matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments) matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments)
for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches: for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches:
logger.info("notifying responder and deleting appointment.", logger.info("Notifying responder and deleting appointment.",
justice_txid=justice_txid, locator=locator, uuid=uuid) justice_txid=justice_txid, locator=locator, uuid=uuid)
self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx,
@@ -113,4 +113,4 @@ class Watcher:
self.asleep = True self.asleep = True
self.zmq_subscriber.terminate = True self.zmq_subscriber.terminate = True
logger.error("no more pending appointments, going back to sleep") logger.error("No more pending appointments, going back to sleep")