Code clean up

Deletes debug/logging pair. Defines logging and bitcoin_cli as system-wide variables
This commit is contained in:
Sergi Delgado Segura
2019-10-02 17:03:43 +01:00
parent 9bb3b38b3f
commit 93e23e769f
10 changed files with 158 additions and 214 deletions

View File

@@ -1,2 +1,18 @@
from pisa.utils.auth_proxy import AuthServiceProxy
import pisa.conf as conf
import logging
HOST = 'localhost' HOST = 'localhost'
PORT = 9814 PORT = 9814
# Configure logging
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[
logging.FileHandler(conf.SERVER_LOG_FILE),
logging.StreamHandler()
])
# Create RPC connection with bitcoind
# TODO: Check if a long lived connection like this may create problems (timeouts)
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (conf.BTC_RPC_USER, conf.BTC_RPC_PASSWD, conf.BTC_RPC_HOST,
conf.BTC_RPC_PORT))

View File

@@ -1,15 +1,10 @@
from pisa import * from pisa import HOST, PORT, logging, bitcoin_cli
from pisa.watcher import Watcher from pisa.watcher import Watcher
from pisa.inspector import Inspector from pisa.inspector import Inspector
from pisa.appointment import Appointment from pisa.appointment import Appointment
from flask import Flask, request, Response, abort, jsonify from flask import Flask, request, Response, abort, jsonify
import json import json
# FIXME: HERE FOR TESTING (get_block_count). REMOVE WHEN REMOVING THE FUNCTION
from pisa.utils.authproxy import AuthServiceProxy
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
# ToDo: #5-add-async-to-api # ToDo: #5-add-async-to-api
app = Flask(__name__) app = Flask(__name__)
HTTP_OK = 200 HTTP_OK = 200
@@ -22,15 +17,14 @@ def add_appointment():
remote_addr = request.environ.get('REMOTE_ADDR') remote_addr = request.environ.get('REMOTE_ADDR')
remote_port = request.environ.get('REMOTE_PORT') remote_port = request.environ.get('REMOTE_PORT')
if debug: logging.info('[API] connection accepted from {}:{}'.format(remote_addr, remote_port))
logging.info('[API] connection accepted from {}:{}'.format(remote_addr, remote_port))
# Check content type once if properly defined # Check content type once if properly defined
request_data = json.loads(request.get_json()) request_data = json.loads(request.get_json())
appointment = inspector.inspect(request_data) appointment = inspector.inspect(request_data)
if type(appointment) == Appointment: if type(appointment) == Appointment:
appointment_added = watcher.add_appointment(appointment, debug, logging) appointment_added = watcher.add_appointment(appointment)
# ToDo: #13-create-server-side-signature-receipt # ToDo: #13-create-server-side-signature-receipt
if appointment_added: if appointment_added:
@@ -49,9 +43,7 @@ def add_appointment():
rcode = HTTP_BAD_REQUEST rcode = HTTP_BAD_REQUEST
response = "appointment rejected. Request does not match the standard" response = "appointment rejected. Request does not match the standard"
if debug: logging.info('[API] sending response and disconnecting: {} --> {}:{}'.format(response, remote_addr, remote_port))
logging.info('[API] sending response and disconnecting: {} --> {}:{}'.format(response, remote_addr,
remote_port))
return Response(response, status=rcode, mimetype='text/plain') return Response(response, status=rcode, mimetype='text/plain')
@@ -115,21 +107,16 @@ def get_all_appointments():
@app.route('/get_block_count', methods=['GET']) @app.route('/get_block_count', methods=['GET'])
def get_block_count(): def get_block_count():
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
return jsonify({"block_count": bitcoin_cli.getblockcount()}) return jsonify({"block_count": bitcoin_cli.getblockcount()})
def start_api(d, l): def start_api():
# FIXME: Pretty ugly but I haven't found a proper way to pass it to add_appointment # FIXME: Pretty ugly but I haven't found a proper way to pass it to add_appointment
global debug, logging, watcher, inspector global watcher, inspector
debug = d
logging = l
# ToDo: #18-separate-api-from-watcher # ToDo: #18-separate-api-from-watcher
watcher = Watcher() watcher = Watcher()
inspector = Inspector(debug, logging) inspector = Inspector()
# Setting Flask log t ERROR only so it does not mess with out logging # Setting Flask log t ERROR only so it does not mess with out logging
logging.getLogger('werkzeug').setLevel(logging.ERROR) logging.getLogger('werkzeug').setLevel(logging.ERROR)

View File

@@ -19,7 +19,7 @@ class Appointment:
return appointment return appointment
# ToDO: #3-improve-appointment-strcuture # ToDO: #3-improve-appointment-structure

View File

@@ -1,16 +1,12 @@
import re import re
from pisa.appointment import Appointment import pisa.conf as conf
from pisa import errors from pisa import errors
from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException from pisa import logging, bitcoin_cli
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MIN_DISPUTE_DELTA, \ from pisa.appointment import Appointment
SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS from pisa.utils.auth_proxy import JSONRPCException
class Inspector: class Inspector:
def __init__(self, debug=False, logging=None):
self.debug = debug
self.logging = logging
def inspect(self, data): def inspect(self, data):
locator = data.get('locator') locator = data.get('locator')
start_time = data.get('start_time') start_time = data.get('start_time')
@@ -20,8 +16,6 @@ class Inspector:
cipher = data.get('cipher') cipher = data.get('cipher')
hash_function = data.get('hash_function') hash_function = data.get('hash_function')
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
try: try:
block_height = bitcoin_cli.getblockcount() block_height = bitcoin_cli.getblockcount()
@@ -45,8 +39,7 @@ class Inspector:
r = (rcode, message) r = (rcode, message)
except JSONRPCException as e: except JSONRPCException as e:
if self.debug: logging.error("[Inspector] JSONRPCException. Error code {}".format(e))
self.logging.error("[Inspector] JSONRPCException. Error code {}".format(e))
# In case of an unknown exception, assign a special rcode and reason. # In case of an unknown exception, assign a special rcode and reason.
r = (errors.UNKNOWN_JSON_RPC_EXCEPTION, "Unexpected error occurred") r = (errors.UNKNOWN_JSON_RPC_EXCEPTION, "Unexpected error occurred")
@@ -71,8 +64,7 @@ class Inspector:
rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT
message = "wrong locator format ({})".format(locator) message = "wrong locator format ({})".format(locator)
if self.debug and message: logging.error("[Inspector] {}".format(message))
self.logging.error("[Inspector] {}".format(message))
return rcode, message return rcode, message
@@ -95,8 +87,7 @@ class Inspector:
else: else:
message = "start_time too close to current height" message = "start_time too close to current height"
if self.debug and message: logging.error("[Inspector] {}".format(message))
self.logging.error("[Inspector] {}".format(message))
return rcode, message return rcode, message
@@ -122,8 +113,7 @@ class Inspector:
rcode = errors.APPOINTMENT_FIELD_TOO_SMALL rcode = errors.APPOINTMENT_FIELD_TOO_SMALL
message = 'end_time is in the past' message = 'end_time is in the past'
if self.debug and message: logging.error("[Inspector] {}".format(message))
self.logging.error("[Inspector] {}".format(message))
return rcode, message return rcode, message
@@ -139,13 +129,12 @@ class Inspector:
elif t != int: elif t != int:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong dispute_delta data type ({})".format(t) message = "wrong dispute_delta data type ({})".format(t)
elif dispute_delta < MIN_DISPUTE_DELTA: elif dispute_delta < conf.MIN_DISPUTE_DELTA:
rcode = errors.APPOINTMENT_FIELD_TOO_SMALL rcode = errors.APPOINTMENT_FIELD_TOO_SMALL
message = "dispute delta too small. The dispute delta should be at least {} (current: {})".format( message = "dispute delta too small. The dispute delta should be at least {} (current: {})".format(
MIN_DISPUTE_DELTA, dispute_delta) conf.MIN_DISPUTE_DELTA, dispute_delta)
if self.debug and message: logging.error("[Inspector] {}".format(message))
self.logging.error("[Inspector] {}".format(message))
return rcode, message return rcode, message
@@ -166,8 +155,8 @@ class Inspector:
# ToDo: #6 We may want to define this to be at least as long as one block of the cipher we are using # ToDo: #6 We may want to define this to be at least as long as one block of the cipher we are using
rcode = errors.APPOINTMENT_WRONG_FIELD rcode = errors.APPOINTMENT_WRONG_FIELD
message = "wrong encrypted_blob" message = "wrong encrypted_blob"
if self.debug and message:
self.logging.error("[Inspector] {}".format(message)) logging.error("[Inspector] {}".format(message))
return rcode, message return rcode, message
@@ -183,12 +172,11 @@ class Inspector:
elif t != str: elif t != str:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong cipher data type ({})".format(t) message = "wrong cipher data type ({})".format(t)
elif cipher not in SUPPORTED_CIPHERS: elif cipher not in conf.SUPPORTED_CIPHERS:
rcode = errors.APPOINTMENT_CIPHER_NOT_SUPPORTED rcode = errors.APPOINTMENT_CIPHER_NOT_SUPPORTED
message = "cipher not supported: {}".format(cipher) message = "cipher not supported: {}".format(cipher)
if self.debug and message: logging.error("[Inspector] {}".format(message))
self.logging.error("[Inspector] {}".format(message))
return rcode, message return rcode, message
@@ -204,11 +192,10 @@ class Inspector:
elif t != str: elif t != str:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong hash_function data type ({})".format(t) message = "wrong hash_function data type ({})".format(t)
elif hash_function not in SUPPORTED_HASH_FUNCTIONS: elif hash_function not in conf.SUPPORTED_HASH_FUNCTIONS:
rcode = errors.APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED rcode = errors.APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED
message = "hash_function not supported {}".format(hash_function) message = "hash_function not supported {}".format(hash_function)
if self.debug and message: logging.error("[Inspector] {}".format(message))
self.logging.error("[Inspector] {}".format(message))
return rcode, message return rcode, message

View File

@@ -1,36 +1,25 @@
import logging
from sys import argv from sys import argv
from getopt import getopt from getopt import getopt
from threading import Thread from pisa import logging
from pisa.api import start_api from pisa.api import start_api
from pisa.tools import can_connect_to_bitcoind, in_correct_network from pisa.tools import can_connect_to_bitcoind, in_correct_network
from pisa.utils.authproxy import AuthServiceProxy
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, BTC_NETWORK, SERVER_LOG_FILE
if __name__ == '__main__': if __name__ == '__main__':
debug = False debug = False
opts, _ = getopt(argv[1:], 'd', ['debug']) opts, _ = getopt(argv[1:], 'd', ['debug'])
for opt, arg in opts: for opt, arg in opts:
if opt in ['-d', '--debug']: # FIXME: Leaving this here for future option/arguments
debug = True pass
# Configure logging if can_connect_to_bitcoind():
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[ if in_correct_network():
logging.FileHandler(SERVER_LOG_FILE), # Fire the api
logging.StreamHandler() start_api()
])
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
if can_connect_to_bitcoind(bitcoin_cli):
if in_correct_network(bitcoin_cli, BTC_NETWORK):
# ToDo: This may not have to be a thead. The main thread only creates this and terminates.
api_thread = Thread(target=start_api, args=[debug, logging])
api_thread.start()
else: else:
logging.error("[Pisad] bitcoind is running on a different network, check conf.py and bitcoin.conf. " logging.error("[Pisad] bitcoind is running on a different network, check conf.py and bitcoin.conf. "
"Shutting down") "Shutting down")
else: else:
logging.error("[Pisad] can't connect to bitcoind. Shutting down") logging.error("[Pisad] can't connect to bitcoind. Shutting down")

View File

@@ -2,11 +2,11 @@ from queue import Queue
from threading import Thread from threading import Thread
from hashlib import sha256 from hashlib import sha256
from binascii import unhexlify from binascii import unhexlify
from pisa.zmq_subscriber import ZMQHandler from pisa import logging, bitcoin_cli
from pisa.rpc_errors import * from pisa.rpc_errors import *
from pisa.tools import check_tx_in_chain from pisa.tools import check_tx_in_chain
from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException from pisa.utils.zmq_subscriber import ZMQHandler
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT from pisa.utils.auth_proxy import JSONRPCException
CONFIRMATIONS_BEFORE_RETRY = 6 CONFIRMATIONS_BEFORE_RETRY = 6
MIN_CONFIRMATIONS = 6 MIN_CONFIRMATIONS = 6
@@ -42,31 +42,24 @@ class Responder:
self.asleep = True self.asleep = True
self.zmq_subscriber = None self.zmq_subscriber = None
def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False):
retry=False):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
try: try:
if debug: if self.asleep:
if self.asleep: logging.info("[Responder] waking up!")
logging.info("[Responder] waking up!") logging.info("[Responder] pushing transaction to the network (txid: {})".format(justice_txid))
logging.info("[Responder] pushing transaction to the network (txid: {})".format(justice_txid))
bitcoin_cli.sendrawtransaction(justice_rawtx) bitcoin_cli.sendrawtransaction(justice_rawtx)
# handle_responses can call add_response recursively if a broadcast transaction does not get confirmations # handle_responses can call add_response recursively if a broadcast transaction does not get confirmations
# retry holds such information. # retry holds such information.
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry)
retry=retry)
except JSONRPCException as e: except JSONRPCException as e:
self.handle_send_failures(e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, self.handle_send_failures(e, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry)
debug, logging, retry)
def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0,
confirmations=0, retry=False): retry=False):
# ToDo: #23-define-behaviour-approaching-end # ToDo: #23-define-behaviour-approaching-end
if retry: if retry:
@@ -81,25 +74,22 @@ class Responder:
else: else:
self.tx_job_map[justice_txid] = [uuid] self.tx_job_map[justice_txid] = [uuid]
if debug: logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'
logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'. .format(dispute_txid, justice_txid, appointment_end))
format(dispute_txid, justice_txid, appointment_end))
if self.asleep: if self.asleep:
self.asleep = False self.asleep = False
self.block_queue = Queue() self.block_queue = Queue()
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging]) zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue])
responder = Thread(target=self.handle_responses, args=[debug, logging]) responder = Thread(target=self.handle_responses)
zmq_thread.start() zmq_thread.start()
responder.start() responder.start()
def do_subscribe(self, block_queue, debug, logging): def do_subscribe(self, block_queue):
self.zmq_subscriber = ZMQHandler(parent='Responder') self.zmq_subscriber = ZMQHandler(parent='Responder')
self.zmq_subscriber.handle(block_queue, debug, logging) self.zmq_subscriber.handle(block_queue)
def handle_responses(self, debug, logging): def handle_responses(self):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
prev_block_hash = 0 prev_block_hash = 0
while len(self.jobs) > 0: while len(self.jobs) > 0:
# We get notified for every new received block # We get notified for every new received block
@@ -110,14 +100,12 @@ class Responder:
txs = block.get('tx') txs = block.get('tx')
height = block.get('height') height = block.get('height')
if debug: logging.info("[Responder] new block received {}".format(block_hash))
logging.info("[Responder] new block received {}".format(block_hash)) logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash')))
logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash'))) logging.info("[Responder] list of transactions: {}".format(txs))
logging.info("[Responder] list of transactions: {}".format(txs))
except JSONRPCException as e: except JSONRPCException as e:
if debug: logging.error("[Responder] couldn't get block from bitcoind. Error code {}".format(e))
logging.error("[Responder] couldn't get block from bitcoind. Error code {}".format(e))
continue continue
@@ -129,20 +117,19 @@ class Responder:
if justice_txid in txs or self.jobs[uuid].confirmations > 0: if justice_txid in txs or self.jobs[uuid].confirmations > 0:
self.jobs[uuid].confirmations += 1 self.jobs[uuid].confirmations += 1
if debug: logging.info("[Responder] new confirmation received for job = {}, txid = {}".format(
logging.info("[Responder] new confirmation received for job = {}, txid = {}".format( uuid, justice_txid))
uuid, justice_txid))
elif self.jobs[uuid].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY: elif self.jobs[uuid].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY:
# If a transactions has missed too many confirmations for a while we'll try to rebroadcast # If a transactions has missed too many confirmations for a while we'll try to rebroadcast
# ToDO: #22-discuss-confirmations-before-retry # ToDO: #22-discuss-confirmations-before-retry
# ToDo: #23-define-behaviour-approaching-end # ToDo: #23-define-behaviour-approaching-end
self.add_response(uuid, self.jobs[uuid].dispute_txid, justice_txid, self.add_response(uuid, self.jobs[uuid].dispute_txid, justice_txid,
self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, debug, self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end,
logging, retry=True) retry=True)
if debug:
logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting" logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting"
.format(justice_txid, CONFIRMATIONS_BEFORE_RETRY)) .format(justice_txid, CONFIRMATIONS_BEFORE_RETRY))
else: else:
# Otherwise we increase the number of missed confirmations # Otherwise we increase the number of missed confirmations
@@ -153,14 +140,13 @@ class Responder:
# The end of the appointment has been reached # The end of the appointment has been reached
completed_jobs.append(uuid) completed_jobs.append(uuid)
self.remove_completed_jobs(completed_jobs, height, debug, logging) self.remove_completed_jobs(completed_jobs, height)
else: else:
if debug: logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}"
logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}" .format(prev_block_hash, block.get('previousblockhash')))
.format(prev_block_hash, block.get('previousblockhash')))
self.handle_reorgs(bitcoin_cli, debug, logging) self.handle_reorgs()
prev_block_hash = block.get('hash') prev_block_hash = block.get('hash')
@@ -168,11 +154,9 @@ class Responder:
self.asleep = True self.asleep = True
self.zmq_subscriber.terminate = True self.zmq_subscriber.terminate = True
if debug: logging.info("[Responder] no more pending jobs, going back to sleep")
logging.info("[Responder] no more pending jobs, going back to sleep")
def handle_send_failures(self, e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, def handle_send_failures(self, e, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry):
debug, logging, retry):
# Since we're pushing a raw transaction to the network we can get two kind of rejections: # Since we're pushing a raw transaction to the network we can get two kind of rejections:
# RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected # RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected
# due to network rules, whereas the later implies that the transaction is already in the blockchain. # due to network rules, whereas the later implies that the transaction is already in the blockchain.
@@ -185,38 +169,36 @@ class Responder:
elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN: elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN:
try: try:
if debug: logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and start "
logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and " "monitoring the transaction".format(justice_txid))
"start monitoring the transaction".format(justice_txid))
# If the transaction is already in the chain, we get the number of confirmations and watch the job # If the transaction is already in the chain, we get the number of confirmations and watch the job
# until the end of the appointment # until the end of the appointment
tx_info = bitcoin_cli.getrawtransaction(justice_txid, 1) tx_info = bitcoin_cli.getrawtransaction(justice_txid, 1)
confirmations = int(tx_info.get("confirmations")) confirmations = int(tx_info.get("confirmations"))
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry,
retry=retry, confirmations=confirmations) confirmations=confirmations)
except JSONRPCException as e: except JSONRPCException as e:
# While it's quite unlikely, the transaction that was already in the blockchain could have been # While it's quite unlikely, the transaction that was already in the blockchain could have been
# reorged while we were querying bitcoind to get the confirmation count. In such a case we just # reorged while we were querying bitcoind to get the confirmation count. In such a case we just
# restart the job # restart the job
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
self.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, self.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry)
logging, retry=retry)
elif debug: else:
# If something else happens (unlikely but possible) log it so we can treat it in future releases # If something else happens (unlikely but possible) log it so we can treat it in future releases
logging.error("[Responder] JSONRPCException. Error {}".format(e)) logging.error("[Responder] JSONRPCException. Error {}".format(e))
elif debug: else:
# If something else happens (unlikely but possible) log it so we can treat it in future releases # If something else happens (unlikely but possible) log it so we can treat it in future releases
logging.error("[Responder] JSONRPCException. Error {}".format(e)) logging.error("[Responder] JSONRPCException. Error {}".format(e))
def remove_completed_jobs(self, completed_jobs, height, debug, logging): def remove_completed_jobs(self, completed_jobs, height):
for uuid in completed_jobs: for uuid in completed_jobs:
if debug: logging.info("[Responder] job completed (uuid = {}, justice_txid = {}). Appointment ended at "
logging.info("[Responder] job completed (uuid = {}, justice_txid = {}). Appointment ended at " "block {} after {} confirmations".format(uuid, self.jobs[uuid].justice_txid, height,
"block {} after {} confirmations".format(uuid, self.jobs[uuid].justice_txid, height, self.jobs[uuid].confirmations))
self.jobs[uuid].confirmations))
# ToDo: #9-add-data-persistency # ToDo: #9-add-data-persistency
justice_txid = self.jobs[uuid].justice_txid justice_txid = self.jobs[uuid].justice_txid
@@ -225,30 +207,25 @@ class Responder:
if len(self.tx_job_map[justice_txid]) == 1: if len(self.tx_job_map[justice_txid]) == 1:
self.tx_job_map.pop(justice_txid) self.tx_job_map.pop(justice_txid)
if debug: logging.info("[Responder] no more jobs for justice_txid {}".format(justice_txid))
logging.info("[Responder] no more jobs for justice_txid {}".format(justice_txid))
else: else:
self.tx_job_map[justice_txid].remove(uuid) self.tx_job_map[justice_txid].remove(uuid)
def handle_reorgs(self, bitcoin_cli, debug, logging): def handle_reorgs(self):
for uuid, job in self.jobs.items(): for uuid, job in self.jobs.items():
# First we check if the dispute transaction is still in the blockchain. If not, the justice can not be # First we check if the dispute transaction is still in the blockchain. If not, the justice can not be
# there either, so we'll need to call the reorg manager straight away # there either, so we'll need to call the reorg manager straight away
dispute_in_chain, _ = check_tx_in_chain(bitcoin_cli, job.dispute_txid, debug, logging, dispute_in_chain, _ = check_tx_in_chain(job.dispute_txid, parent='Responder', tx_label='dispute tx')
parent='Responder',
tx_label='dispute tx')
# If the dispute is there, we can check the justice tx # If the dispute is there, we can check the justice tx
if dispute_in_chain: if dispute_in_chain:
justice_in_chain, justice_confirmations = check_tx_in_chain(bitcoin_cli, job.justice_txid, debug, justice_in_chain, justice_confirmations = check_tx_in_chain(job.justice_txid, parent='Responder',
logging, parent='Responder',
tx_label='justice tx') tx_label='justice tx')
# If both transactions are there, we only need to update the justice tx confirmation count # If both transactions are there, we only need to update the justice tx confirmation count
if justice_in_chain: if justice_in_chain:
if debug: logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format(
logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format(
job.justice_txid, job.confirmations, justice_confirmations)) job.justice_txid, job.confirmations, justice_confirmations))
job.confirmations = justice_confirmations job.confirmations = justice_confirmations
@@ -258,9 +235,7 @@ class Responder:
# DISCUSS: Adding job back, should we flag it as retried? # DISCUSS: Adding job back, should we flag it as retried?
# FIXME: Whether we decide to increase the retried counter or not, the current counter should be # FIXME: Whether we decide to increase the retried counter or not, the current counter should be
# maintained. There is no way of doing so with the current approach. Update if required # maintained. There is no way of doing so with the current approach. Update if required
self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx, self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx, job.appointment_end)
job.appointment_end,
debug, logging)
else: else:
# ToDo: #24-properly-handle-reorgs # ToDo: #24-properly-handle-reorgs

View File

@@ -1,10 +1,12 @@
import re import re
from pisa.utils.authproxy import JSONRPCException import pisa.conf as conf
from pisa import logging, bitcoin_cli
from pisa.utils.auth_proxy import JSONRPCException
from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY
from http.client import HTTPException from http.client import HTTPException
def check_tx_in_chain(bitcoin_cli, tx_id, debug, logging, parent='', tx_label='transaction'): def check_tx_in_chain(tx_id, parent='', tx_label='transaction'):
tx_in_chain = False tx_in_chain = False
confirmations = 0 confirmations = 0
@@ -14,22 +16,23 @@ def check_tx_in_chain(bitcoin_cli, tx_id, debug, logging, parent='', tx_label='t
if tx_info.get("confirmations"): if tx_info.get("confirmations"):
confirmations = int(tx_info.get("confirmations")) confirmations = int(tx_info.get("confirmations"))
tx_in_chain = True tx_in_chain = True
if debug: logging.error("[{}] {} found in the blockchain (txid: {}) ".format(parent, tx_label, tx_id))
logging.error("[{}] {} found in the blockchain (txid: {}) ".format(parent, tx_label, tx_id))
elif debug: else:
logging.error("[{}] {} found in mempool (txid: {}) ".format(parent, tx_label, tx_id)) logging.error("[{}] {} found in mempool (txid: {}) ".format(parent, tx_label, tx_id))
except JSONRPCException as e: except JSONRPCException as e:
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
if debug: logging.error("[{}] {} not found in mempool nor blockchain (txid: {}) ".format(parent, tx_label, tx_id))
logging.error("[{}] {} not found in mempool nor blockchain (txid: {}) ".format(parent, tx_label, tx_id))
elif debug: else:
# ToDO: Unhandled errors, check this properly # ToDO: Unhandled errors, check this properly
logging.error("[{}] JSONRPCException. Error code {}".format(parent, e)) logging.error("[{}] JSONRPCException. Error code {}".format(parent, e))
return tx_in_chain, confirmations return tx_in_chain, confirmations
def can_connect_to_bitcoind(bitcoin_cli): def can_connect_to_bitcoind():
can_connect = True can_connect = True
try: try:
@@ -40,18 +43,18 @@ def can_connect_to_bitcoind(bitcoin_cli):
return can_connect return can_connect
def in_correct_network(bitcoin_cli, network): def in_correct_network():
mainnet_genesis_block_hash = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f" mainnet_genesis_block_hash = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
testnet3_genesis_block_hash = "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943" testnet3_genesis_block_hash = "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943"
correct_network = False correct_network = False
genesis_block_hash = bitcoin_cli.getblockhash(0) genesis_block_hash = bitcoin_cli.getblockhash(0)
if network == 'mainnet' and genesis_block_hash == mainnet_genesis_block_hash: if conf.BTC_NETWORK == 'mainnet' and genesis_block_hash == mainnet_genesis_block_hash:
correct_network = True correct_network = True
elif network == 'testnet' and genesis_block_hash == testnet3_genesis_block_hash: elif conf.BTC_NETWORK == 'testnet' and genesis_block_hash == testnet3_genesis_block_hash:
correct_network = True correct_network = True
elif network == 'regtest' and genesis_block_hash not in [mainnet_genesis_block_hash, testnet3_genesis_block_hash]: elif conf.BTC_NETWORK == 'regtest' and genesis_block_hash not in [mainnet_genesis_block_hash, testnet3_genesis_block_hash]:
correct_network = True correct_network = True
return correct_network return correct_network

View File

@@ -1,7 +1,9 @@
import zmq import zmq
import binascii import binascii
from pisa import logging
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT
# ToDo: #7-add-async-back-to-zmq # ToDo: #7-add-async-back-to-zmq
class ZMQHandler: class ZMQHandler:
""" Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py""" """ Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py"""
@@ -14,7 +16,7 @@ class ZMQHandler:
self.parent = parent self.parent = parent
self.terminate = False self.terminate = False
def handle(self, block_queue, debug, logging): def handle(self, block_queue):
while not self.terminate: while not self.terminate:
msg = self.zmqSubSocket.recv_multipart() msg = self.zmqSubSocket.recv_multipart()
@@ -27,5 +29,4 @@ class ZMQHandler:
block_hash = binascii.hexlify(body).decode('UTF-8') block_hash = binascii.hexlify(body).decode('UTF-8')
block_queue.put(block_hash) block_queue.put(block_hash)
if debug: logging.info("[ZMQHandler-{}] new block received via ZMQ".format(self.parent, block_hash))
logging.info("[ZMQHandler-{}] new block received via ZMQ".format(self.parent, block_hash))

View File

@@ -1,12 +1,13 @@
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from queue import Queue from queue import Queue
from threading import Thread from threading import Thread
from pisa import logging, bitcoin_cli
from pisa.responder import Responder from pisa.responder import Responder
from pisa.zmq_subscriber import ZMQHandler from pisa.utils.zmq_subscriber import ZMQHandler
from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException from pisa.utils.auth_proxy import JSONRPCException
from hashlib import sha256 from hashlib import sha256
from uuid import uuid4 from uuid import uuid4
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS, EXPIRY_DELTA from pisa.conf import MAX_APPOINTMENTS, EXPIRY_DELTA
class Watcher: class Watcher:
@@ -19,7 +20,7 @@ class Watcher:
self.zmq_subscriber = None self.zmq_subscriber = None
self.responder = Responder() self.responder = Responder()
def add_appointment(self, appointment, debug, logging): def add_appointment(self, appointment):
# Rationale: # Rationale:
# The Watcher will analyze every received block looking for appointment matches. If there is no work # The Watcher will analyze every received block looking for appointment matches. If there is no work
# to do the watcher can go sleep (if appointments = {} then asleep = True) otherwise for every received block # to do the watcher can go sleep (if appointments = {} then asleep = True) otherwise for every received block
@@ -45,36 +46,30 @@ class Watcher:
if self.asleep: if self.asleep:
self.asleep = False self.asleep = False
self.block_queue = Queue() self.block_queue = Queue()
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging]) zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue])
watcher = Thread(target=self.do_watch, args=[debug, logging]) watcher = Thread(target=self.do_watch)
zmq_thread.start() zmq_thread.start()
watcher.start() watcher.start()
if debug: logging.info("[Watcher] waking up!")
logging.info("[Watcher] waking up!")
appointment_added = True appointment_added = True
if debug: logging.info('[Watcher] new appointment accepted (locator = {})'.format(appointment.locator))
logging.info('[Watcher] new appointment accepted (locator = {})'.format(appointment.locator))
else: else:
appointment_added = False appointment_added = False
if debug: logging.info('[Watcher] maximum appointments reached, appointment rejected (locator = {})'.format(
logging.info('[Watcher] maximum appointments reached, appointment rejected (locator = {})' appointment.locator))
.format(appointment.locator))
return appointment_added return appointment_added
def do_subscribe(self, block_queue, debug, logging): def do_subscribe(self, block_queue):
self.zmq_subscriber = ZMQHandler(parent='Watcher') self.zmq_subscriber = ZMQHandler(parent='Watcher')
self.zmq_subscriber.handle(block_queue, debug, logging) self.zmq_subscriber.handle(block_queue)
def do_watch(self, debug, logging):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
def do_watch(self):
while len(self.appointments) > 0: while len(self.appointments) > 0:
block_hash = self.block_queue.get() block_hash = self.block_queue.get()
@@ -82,11 +77,10 @@ class Watcher:
block = bitcoin_cli.getblock(block_hash) block = bitcoin_cli.getblock(block_hash)
txids = block.get('tx') txids = block.get('tx')
if debug: logging.info("[Watcher] new block received {}".format(block_hash))
logging.info("[Watcher] new block received {}".format(block_hash)) logging.info("[Watcher] list of transactions: {}".format(txids))
logging.info("[Watcher] list of transactions: {}".format(txids))
self.delete_expired_appointment(block, debug, logging) self.delete_expired_appointment(block)
potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids} potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids}
@@ -95,21 +89,19 @@ class Watcher:
intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys()) intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys())
potential_matches = {locator: potential_locators[locator] for locator in intersection} potential_matches = {locator: potential_locators[locator] for locator in intersection}
if debug: if len(potential_matches) > 0:
if len(potential_matches) > 0: logging.info("[Watcher] list of potential matches: {}".format(potential_matches))
logging.info("[Watcher] list of potential matches: {}".format(potential_matches)) else:
else: logging.info("[Watcher] no potential matches found")
logging.info("[Watcher] no potential matches found")
matches = self.check_potential_matches(potential_matches, bitcoin_cli, debug, logging) matches = self.check_potential_matches(potential_matches)
for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches: for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches:
if debug: logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})"
logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})" .format(justice_txid, locator, uuid))
.format(justice_txid, locator, uuid))
self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx,
self.appointments[uuid].end_time, debug, logging) self.appointments[uuid].end_time)
# Delete the appointment # Delete the appointment
self.appointments.pop(uuid) self.appointments.pop(uuid)
@@ -124,17 +116,15 @@ class Watcher:
self.locator_uuid_map[locator].remove(uuid) self.locator_uuid_map[locator].remove(uuid)
except JSONRPCException as e: except JSONRPCException as e:
if debug: logging.error("[Watcher] couldn't get block from bitcoind. Error code {}".format(e))
logging.error("[Watcher] couldn't get block from bitcoind. Error code {}".format(e))
# Go back to sleep if there are no more appointments # Go back to sleep if there are no more appointments
self.asleep = True self.asleep = True
self.zmq_subscriber.terminate = True self.zmq_subscriber.terminate = True
if debug: logging.error("[Watcher] no more pending appointments, going back to sleep")
logging.error("[Watcher] no more pending appointments, going back to sleep")
def delete_expired_appointment(self, block, debug, logging): def delete_expired_appointment(self, block):
to_delete = [uuid for uuid, appointment in self.appointments.items() if block["height"] > appointment.end_time to_delete = [uuid for uuid, appointment in self.appointments.items() if block["height"] > appointment.end_time
+ EXPIRY_DELTA] + EXPIRY_DELTA]
@@ -150,30 +140,26 @@ class Watcher:
else: else:
self.locator_uuid_map[locator].remove(uuid) self.locator_uuid_map[locator].remove(uuid)
if debug: logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})".format(locator,
logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})" uuid))
.format(locator, uuid))
def check_potential_matches(self, potential_matches, bitcoin_cli, debug, logging): def check_potential_matches(self, potential_matches):
matches = [] matches = []
for locator, dispute_txid in potential_matches.items(): for locator, dispute_txid in potential_matches.items():
for uuid in self.locator_uuid_map[locator]: for uuid in self.locator_uuid_map[locator]:
try: try:
# ToDo: #20-test-tx-decrypting-edge-cases # ToDo: #20-test-tx-decrypting-edge-cases
justice_rawtx = self.appointments[uuid].encrypted_blob.decrypt(unhexlify(dispute_txid), debug, justice_rawtx = self.appointments[uuid].encrypted_blob.decrypt(unhexlify(dispute_txid))
logging)
justice_rawtx = hexlify(justice_rawtx).decode() justice_rawtx = hexlify(justice_rawtx).decode()
justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid') justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid')
matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx)) matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx))
if debug: logging.info("[Watcher] match found for locator {} (uuid: {}): {}".format(locator, uuid,
logging.info("[Watcher] match found for locator {} (uuid: {}): {}".format(locator, uuid, justice_txid))
justice_txid))
except JSONRPCException as e: except JSONRPCException as e:
# Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple # Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple
# for the POC # for the POC
if debug: logging.error("[Watcher] can't build transaction from decoded data. Error code {}".format(e))
logging.error("[Watcher] can't build transaction from decoded data. Error code {}".format(e))
return matches return matches