Code clean up

Deletes debug/logging pair. Defines logging and bitcoin_cli as system-wide variables
This commit is contained in:
Sergi Delgado Segura
2019-10-02 17:03:43 +01:00
parent 9bb3b38b3f
commit 93e23e769f
10 changed files with 158 additions and 214 deletions

View File

@@ -1,2 +1,18 @@
from pisa.utils.auth_proxy import AuthServiceProxy
import pisa.conf as conf
import logging
HOST = 'localhost'
PORT = 9814
# Configure logging
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[
logging.FileHandler(conf.SERVER_LOG_FILE),
logging.StreamHandler()
])
# Create RPC connection with bitcoind
# TODO: Check if a long lived connection like this may create problems (timeouts)
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (conf.BTC_RPC_USER, conf.BTC_RPC_PASSWD, conf.BTC_RPC_HOST,
conf.BTC_RPC_PORT))

View File

@@ -1,15 +1,10 @@
from pisa import *
from pisa import HOST, PORT, logging, bitcoin_cli
from pisa.watcher import Watcher
from pisa.inspector import Inspector
from pisa.appointment import Appointment
from flask import Flask, request, Response, abort, jsonify
import json
# FIXME: HERE FOR TESTING (get_block_count). REMOVE WHEN REMOVING THE FUNCTION
from pisa.utils.authproxy import AuthServiceProxy
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
# ToDo: #5-add-async-to-api
app = Flask(__name__)
HTTP_OK = 200
@@ -22,7 +17,6 @@ def add_appointment():
remote_addr = request.environ.get('REMOTE_ADDR')
remote_port = request.environ.get('REMOTE_PORT')
if debug:
logging.info('[API] connection accepted from {}:{}'.format(remote_addr, remote_port))
# Check content type once if properly defined
@@ -30,7 +24,7 @@ def add_appointment():
appointment = inspector.inspect(request_data)
if type(appointment) == Appointment:
appointment_added = watcher.add_appointment(appointment, debug, logging)
appointment_added = watcher.add_appointment(appointment)
# ToDo: #13-create-server-side-signature-receipt
if appointment_added:
@@ -49,9 +43,7 @@ def add_appointment():
rcode = HTTP_BAD_REQUEST
response = "appointment rejected. Request does not match the standard"
if debug:
logging.info('[API] sending response and disconnecting: {} --> {}:{}'.format(response, remote_addr,
remote_port))
logging.info('[API] sending response and disconnecting: {} --> {}:{}'.format(response, remote_addr, remote_port))
return Response(response, status=rcode, mimetype='text/plain')
@@ -115,21 +107,16 @@ def get_all_appointments():
@app.route('/get_block_count', methods=['GET'])
def get_block_count():
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
return jsonify({"block_count": bitcoin_cli.getblockcount()})
def start_api(d, l):
def start_api():
# FIXME: Pretty ugly but I haven't found a proper way to pass it to add_appointment
global debug, logging, watcher, inspector
debug = d
logging = l
global watcher, inspector
# ToDo: #18-separate-api-from-watcher
watcher = Watcher()
inspector = Inspector(debug, logging)
inspector = Inspector()
# Setting Flask log t ERROR only so it does not mess with out logging
logging.getLogger('werkzeug').setLevel(logging.ERROR)

View File

@@ -19,7 +19,7 @@ class Appointment:
return appointment
# ToDO: #3-improve-appointment-strcuture
# ToDO: #3-improve-appointment-structure

View File

@@ -1,16 +1,12 @@
import re
from pisa.appointment import Appointment
import pisa.conf as conf
from pisa import errors
from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MIN_DISPUTE_DELTA, \
SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS
from pisa import logging, bitcoin_cli
from pisa.appointment import Appointment
from pisa.utils.auth_proxy import JSONRPCException
class Inspector:
def __init__(self, debug=False, logging=None):
self.debug = debug
self.logging = logging
def inspect(self, data):
locator = data.get('locator')
start_time = data.get('start_time')
@@ -20,8 +16,6 @@ class Inspector:
cipher = data.get('cipher')
hash_function = data.get('hash_function')
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
try:
block_height = bitcoin_cli.getblockcount()
@@ -45,8 +39,7 @@ class Inspector:
r = (rcode, message)
except JSONRPCException as e:
if self.debug:
self.logging.error("[Inspector] JSONRPCException. Error code {}".format(e))
logging.error("[Inspector] JSONRPCException. Error code {}".format(e))
# In case of an unknown exception, assign a special rcode and reason.
r = (errors.UNKNOWN_JSON_RPC_EXCEPTION, "Unexpected error occurred")
@@ -71,8 +64,7 @@ class Inspector:
rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT
message = "wrong locator format ({})".format(locator)
if self.debug and message:
self.logging.error("[Inspector] {}".format(message))
logging.error("[Inspector] {}".format(message))
return rcode, message
@@ -95,8 +87,7 @@ class Inspector:
else:
message = "start_time too close to current height"
if self.debug and message:
self.logging.error("[Inspector] {}".format(message))
logging.error("[Inspector] {}".format(message))
return rcode, message
@@ -122,8 +113,7 @@ class Inspector:
rcode = errors.APPOINTMENT_FIELD_TOO_SMALL
message = 'end_time is in the past'
if self.debug and message:
self.logging.error("[Inspector] {}".format(message))
logging.error("[Inspector] {}".format(message))
return rcode, message
@@ -139,13 +129,12 @@ class Inspector:
elif t != int:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong dispute_delta data type ({})".format(t)
elif dispute_delta < MIN_DISPUTE_DELTA:
elif dispute_delta < conf.MIN_DISPUTE_DELTA:
rcode = errors.APPOINTMENT_FIELD_TOO_SMALL
message = "dispute delta too small. The dispute delta should be at least {} (current: {})".format(
MIN_DISPUTE_DELTA, dispute_delta)
conf.MIN_DISPUTE_DELTA, dispute_delta)
if self.debug and message:
self.logging.error("[Inspector] {}".format(message))
logging.error("[Inspector] {}".format(message))
return rcode, message
@@ -166,8 +155,8 @@ class Inspector:
# ToDo: #6 We may want to define this to be at least as long as one block of the cipher we are using
rcode = errors.APPOINTMENT_WRONG_FIELD
message = "wrong encrypted_blob"
if self.debug and message:
self.logging.error("[Inspector] {}".format(message))
logging.error("[Inspector] {}".format(message))
return rcode, message
@@ -183,12 +172,11 @@ class Inspector:
elif t != str:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong cipher data type ({})".format(t)
elif cipher not in SUPPORTED_CIPHERS:
elif cipher not in conf.SUPPORTED_CIPHERS:
rcode = errors.APPOINTMENT_CIPHER_NOT_SUPPORTED
message = "cipher not supported: {}".format(cipher)
if self.debug and message:
self.logging.error("[Inspector] {}".format(message))
logging.error("[Inspector] {}".format(message))
return rcode, message
@@ -204,11 +192,10 @@ class Inspector:
elif t != str:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong hash_function data type ({})".format(t)
elif hash_function not in SUPPORTED_HASH_FUNCTIONS:
elif hash_function not in conf.SUPPORTED_HASH_FUNCTIONS:
rcode = errors.APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED
message = "hash_function not supported {}".format(hash_function)
if self.debug and message:
self.logging.error("[Inspector] {}".format(message))
logging.error("[Inspector] {}".format(message))
return rcode, message

View File

@@ -1,36 +1,25 @@
import logging
from sys import argv
from getopt import getopt
from threading import Thread
from pisa import logging
from pisa.api import start_api
from pisa.tools import can_connect_to_bitcoind, in_correct_network
from pisa.utils.authproxy import AuthServiceProxy
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, BTC_NETWORK, SERVER_LOG_FILE
if __name__ == '__main__':
debug = False
opts, _ = getopt(argv[1:], 'd', ['debug'])
for opt, arg in opts:
if opt in ['-d', '--debug']:
debug = True
# FIXME: Leaving this here for future option/arguments
pass
# Configure logging
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[
logging.FileHandler(SERVER_LOG_FILE),
logging.StreamHandler()
])
if can_connect_to_bitcoind():
if in_correct_network():
# Fire the api
start_api()
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
if can_connect_to_bitcoind(bitcoin_cli):
if in_correct_network(bitcoin_cli, BTC_NETWORK):
# ToDo: This may not have to be a thead. The main thread only creates this and terminates.
api_thread = Thread(target=start_api, args=[debug, logging])
api_thread.start()
else:
logging.error("[Pisad] bitcoind is running on a different network, check conf.py and bitcoin.conf. "
"Shutting down")
else:
logging.error("[Pisad] can't connect to bitcoind. Shutting down")

View File

@@ -2,11 +2,11 @@ from queue import Queue
from threading import Thread
from hashlib import sha256
from binascii import unhexlify
from pisa.zmq_subscriber import ZMQHandler
from pisa import logging, bitcoin_cli
from pisa.rpc_errors import *
from pisa.tools import check_tx_in_chain
from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
from pisa.utils.zmq_subscriber import ZMQHandler
from pisa.utils.auth_proxy import JSONRPCException
CONFIRMATIONS_BEFORE_RETRY = 6
MIN_CONFIRMATIONS = 6
@@ -42,14 +42,9 @@ class Responder:
self.asleep = True
self.zmq_subscriber = None
def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
retry=False):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False):
try:
if debug:
if self.asleep:
logging.info("[Responder] waking up!")
logging.info("[Responder] pushing transaction to the network (txid: {})".format(justice_txid))
@@ -58,15 +53,13 @@ class Responder:
# handle_responses can call add_response recursively if a broadcast transaction does not get confirmations
# retry holds such information.
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
retry=retry)
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry)
except JSONRPCException as e:
self.handle_send_failures(e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end,
debug, logging, retry)
self.handle_send_failures(e, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry)
def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
confirmations=0, retry=False):
def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0,
retry=False):
# ToDo: #23-define-behaviour-approaching-end
if retry:
@@ -81,25 +74,22 @@ class Responder:
else:
self.tx_job_map[justice_txid] = [uuid]
if debug:
logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'.
format(dispute_txid, justice_txid, appointment_end))
logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'
.format(dispute_txid, justice_txid, appointment_end))
if self.asleep:
self.asleep = False
self.block_queue = Queue()
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging])
responder = Thread(target=self.handle_responses, args=[debug, logging])
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue])
responder = Thread(target=self.handle_responses)
zmq_thread.start()
responder.start()
def do_subscribe(self, block_queue, debug, logging):
def do_subscribe(self, block_queue):
self.zmq_subscriber = ZMQHandler(parent='Responder')
self.zmq_subscriber.handle(block_queue, debug, logging)
self.zmq_subscriber.handle(block_queue)
def handle_responses(self, debug, logging):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
def handle_responses(self):
prev_block_hash = 0
while len(self.jobs) > 0:
# We get notified for every new received block
@@ -110,13 +100,11 @@ class Responder:
txs = block.get('tx')
height = block.get('height')
if debug:
logging.info("[Responder] new block received {}".format(block_hash))
logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash')))
logging.info("[Responder] list of transactions: {}".format(txs))
except JSONRPCException as e:
if debug:
logging.error("[Responder] couldn't get block from bitcoind. Error code {}".format(e))
continue
@@ -129,7 +117,6 @@ class Responder:
if justice_txid in txs or self.jobs[uuid].confirmations > 0:
self.jobs[uuid].confirmations += 1
if debug:
logging.info("[Responder] new confirmation received for job = {}, txid = {}".format(
uuid, justice_txid))
@@ -138,9 +125,9 @@ class Responder:
# ToDO: #22-discuss-confirmations-before-retry
# ToDo: #23-define-behaviour-approaching-end
self.add_response(uuid, self.jobs[uuid].dispute_txid, justice_txid,
self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, debug,
logging, retry=True)
if debug:
self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end,
retry=True)
logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting"
.format(justice_txid, CONFIRMATIONS_BEFORE_RETRY))
@@ -153,14 +140,13 @@ class Responder:
# The end of the appointment has been reached
completed_jobs.append(uuid)
self.remove_completed_jobs(completed_jobs, height, debug, logging)
self.remove_completed_jobs(completed_jobs, height)
else:
if debug:
logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}"
.format(prev_block_hash, block.get('previousblockhash')))
self.handle_reorgs(bitcoin_cli, debug, logging)
self.handle_reorgs()
prev_block_hash = block.get('hash')
@@ -168,11 +154,9 @@ class Responder:
self.asleep = True
self.zmq_subscriber.terminate = True
if debug:
logging.info("[Responder] no more pending jobs, going back to sleep")
def handle_send_failures(self, e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end,
debug, logging, retry):
def handle_send_failures(self, e, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry):
# Since we're pushing a raw transaction to the network we can get two kind of rejections:
# RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected
# due to network rules, whereas the later implies that the transaction is already in the blockchain.
@@ -185,35 +169,33 @@ class Responder:
elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN:
try:
if debug:
logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and "
"start monitoring the transaction".format(justice_txid))
logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and start "
"monitoring the transaction".format(justice_txid))
# If the transaction is already in the chain, we get the number of confirmations and watch the job
# until the end of the appointment
tx_info = bitcoin_cli.getrawtransaction(justice_txid, 1)
confirmations = int(tx_info.get("confirmations"))
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
retry=retry, confirmations=confirmations)
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry,
confirmations=confirmations)
except JSONRPCException as e:
# While it's quite unlikely, the transaction that was already in the blockchain could have been
# reorged while we were querying bitcoind to get the confirmation count. In such a case we just
# restart the job
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
self.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug,
logging, retry=retry)
elif debug:
self.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry)
else:
# If something else happens (unlikely but possible) log it so we can treat it in future releases
logging.error("[Responder] JSONRPCException. Error {}".format(e))
elif debug:
else:
# If something else happens (unlikely but possible) log it so we can treat it in future releases
logging.error("[Responder] JSONRPCException. Error {}".format(e))
def remove_completed_jobs(self, completed_jobs, height, debug, logging):
def remove_completed_jobs(self, completed_jobs, height):
for uuid in completed_jobs:
if debug:
logging.info("[Responder] job completed (uuid = {}, justice_txid = {}). Appointment ended at "
"block {} after {} confirmations".format(uuid, self.jobs[uuid].justice_txid, height,
self.jobs[uuid].confirmations))
@@ -225,29 +207,24 @@ class Responder:
if len(self.tx_job_map[justice_txid]) == 1:
self.tx_job_map.pop(justice_txid)
if debug:
logging.info("[Responder] no more jobs for justice_txid {}".format(justice_txid))
else:
self.tx_job_map[justice_txid].remove(uuid)
def handle_reorgs(self, bitcoin_cli, debug, logging):
def handle_reorgs(self):
for uuid, job in self.jobs.items():
# First we check if the dispute transaction is still in the blockchain. If not, the justice can not be
# there either, so we'll need to call the reorg manager straight away
dispute_in_chain, _ = check_tx_in_chain(bitcoin_cli, job.dispute_txid, debug, logging,
parent='Responder',
tx_label='dispute tx')
dispute_in_chain, _ = check_tx_in_chain(job.dispute_txid, parent='Responder', tx_label='dispute tx')
# If the dispute is there, we can check the justice tx
if dispute_in_chain:
justice_in_chain, justice_confirmations = check_tx_in_chain(bitcoin_cli, job.justice_txid, debug,
logging, parent='Responder',
justice_in_chain, justice_confirmations = check_tx_in_chain(job.justice_txid, parent='Responder',
tx_label='justice tx')
# If both transactions are there, we only need to update the justice tx confirmation count
if justice_in_chain:
if debug:
logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format(
job.justice_txid, job.confirmations, justice_confirmations))
@@ -258,9 +235,7 @@ class Responder:
# DISCUSS: Adding job back, should we flag it as retried?
# FIXME: Whether we decide to increase the retried counter or not, the current counter should be
# maintained. There is no way of doing so with the current approach. Update if required
self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx,
job.appointment_end,
debug, logging)
self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx, job.appointment_end)
else:
# ToDo: #24-properly-handle-reorgs

View File

@@ -1,10 +1,12 @@
import re
from pisa.utils.authproxy import JSONRPCException
import pisa.conf as conf
from pisa import logging, bitcoin_cli
from pisa.utils.auth_proxy import JSONRPCException
from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY
from http.client import HTTPException
def check_tx_in_chain(bitcoin_cli, tx_id, debug, logging, parent='', tx_label='transaction'):
def check_tx_in_chain(tx_id, parent='', tx_label='transaction'):
tx_in_chain = False
confirmations = 0
@@ -14,22 +16,23 @@ def check_tx_in_chain(bitcoin_cli, tx_id, debug, logging, parent='', tx_label='t
if tx_info.get("confirmations"):
confirmations = int(tx_info.get("confirmations"))
tx_in_chain = True
if debug:
logging.error("[{}] {} found in the blockchain (txid: {}) ".format(parent, tx_label, tx_id))
elif debug:
else:
logging.error("[{}] {} found in mempool (txid: {}) ".format(parent, tx_label, tx_id))
except JSONRPCException as e:
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
if debug:
logging.error("[{}] {} not found in mempool nor blockchain (txid: {}) ".format(parent, tx_label, tx_id))
elif debug:
else:
# ToDO: Unhandled errors, check this properly
logging.error("[{}] JSONRPCException. Error code {}".format(parent, e))
return tx_in_chain, confirmations
def can_connect_to_bitcoind(bitcoin_cli):
def can_connect_to_bitcoind():
can_connect = True
try:
@@ -40,18 +43,18 @@ def can_connect_to_bitcoind(bitcoin_cli):
return can_connect
def in_correct_network(bitcoin_cli, network):
def in_correct_network():
mainnet_genesis_block_hash = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
testnet3_genesis_block_hash = "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943"
correct_network = False
genesis_block_hash = bitcoin_cli.getblockhash(0)
if network == 'mainnet' and genesis_block_hash == mainnet_genesis_block_hash:
if conf.BTC_NETWORK == 'mainnet' and genesis_block_hash == mainnet_genesis_block_hash:
correct_network = True
elif network == 'testnet' and genesis_block_hash == testnet3_genesis_block_hash:
elif conf.BTC_NETWORK == 'testnet' and genesis_block_hash == testnet3_genesis_block_hash:
correct_network = True
elif network == 'regtest' and genesis_block_hash not in [mainnet_genesis_block_hash, testnet3_genesis_block_hash]:
elif conf.BTC_NETWORK == 'regtest' and genesis_block_hash not in [mainnet_genesis_block_hash, testnet3_genesis_block_hash]:
correct_network = True
return correct_network

View File

@@ -1,7 +1,9 @@
import zmq
import binascii
from pisa import logging
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT
# ToDo: #7-add-async-back-to-zmq
class ZMQHandler:
""" Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py"""
@@ -14,7 +16,7 @@ class ZMQHandler:
self.parent = parent
self.terminate = False
def handle(self, block_queue, debug, logging):
def handle(self, block_queue):
while not self.terminate:
msg = self.zmqSubSocket.recv_multipart()
@@ -27,5 +29,4 @@ class ZMQHandler:
block_hash = binascii.hexlify(body).decode('UTF-8')
block_queue.put(block_hash)
if debug:
logging.info("[ZMQHandler-{}] new block received via ZMQ".format(self.parent, block_hash))

View File

@@ -1,12 +1,13 @@
from binascii import hexlify, unhexlify
from queue import Queue
from threading import Thread
from pisa import logging, bitcoin_cli
from pisa.responder import Responder
from pisa.zmq_subscriber import ZMQHandler
from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException
from pisa.utils.zmq_subscriber import ZMQHandler
from pisa.utils.auth_proxy import JSONRPCException
from hashlib import sha256
from uuid import uuid4
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS, EXPIRY_DELTA
from pisa.conf import MAX_APPOINTMENTS, EXPIRY_DELTA
class Watcher:
@@ -19,7 +20,7 @@ class Watcher:
self.zmq_subscriber = None
self.responder = Responder()
def add_appointment(self, appointment, debug, logging):
def add_appointment(self, appointment):
# Rationale:
# The Watcher will analyze every received block looking for appointment matches. If there is no work
# to do the watcher can go sleep (if appointments = {} then asleep = True) otherwise for every received block
@@ -45,36 +46,30 @@ class Watcher:
if self.asleep:
self.asleep = False
self.block_queue = Queue()
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging])
watcher = Thread(target=self.do_watch, args=[debug, logging])
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue])
watcher = Thread(target=self.do_watch)
zmq_thread.start()
watcher.start()
if debug:
logging.info("[Watcher] waking up!")
appointment_added = True
if debug:
logging.info('[Watcher] new appointment accepted (locator = {})'.format(appointment.locator))
else:
appointment_added = False
if debug:
logging.info('[Watcher] maximum appointments reached, appointment rejected (locator = {})'
.format(appointment.locator))
logging.info('[Watcher] maximum appointments reached, appointment rejected (locator = {})'.format(
appointment.locator))
return appointment_added
def do_subscribe(self, block_queue, debug, logging):
def do_subscribe(self, block_queue):
self.zmq_subscriber = ZMQHandler(parent='Watcher')
self.zmq_subscriber.handle(block_queue, debug, logging)
def do_watch(self, debug, logging):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
self.zmq_subscriber.handle(block_queue)
def do_watch(self):
while len(self.appointments) > 0:
block_hash = self.block_queue.get()
@@ -82,11 +77,10 @@ class Watcher:
block = bitcoin_cli.getblock(block_hash)
txids = block.get('tx')
if debug:
logging.info("[Watcher] new block received {}".format(block_hash))
logging.info("[Watcher] list of transactions: {}".format(txids))
self.delete_expired_appointment(block, debug, logging)
self.delete_expired_appointment(block)
potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids}
@@ -95,21 +89,19 @@ class Watcher:
intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys())
potential_matches = {locator: potential_locators[locator] for locator in intersection}
if debug:
if len(potential_matches) > 0:
logging.info("[Watcher] list of potential matches: {}".format(potential_matches))
else:
logging.info("[Watcher] no potential matches found")
matches = self.check_potential_matches(potential_matches, bitcoin_cli, debug, logging)
matches = self.check_potential_matches(potential_matches)
for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches:
if debug:
logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})"
.format(justice_txid, locator, uuid))
self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx,
self.appointments[uuid].end_time, debug, logging)
self.appointments[uuid].end_time)
# Delete the appointment
self.appointments.pop(uuid)
@@ -124,17 +116,15 @@ class Watcher:
self.locator_uuid_map[locator].remove(uuid)
except JSONRPCException as e:
if debug:
logging.error("[Watcher] couldn't get block from bitcoind. Error code {}".format(e))
# Go back to sleep if there are no more appointments
self.asleep = True
self.zmq_subscriber.terminate = True
if debug:
logging.error("[Watcher] no more pending appointments, going back to sleep")
def delete_expired_appointment(self, block, debug, logging):
def delete_expired_appointment(self, block):
to_delete = [uuid for uuid, appointment in self.appointments.items() if block["height"] > appointment.end_time
+ EXPIRY_DELTA]
@@ -150,30 +140,26 @@ class Watcher:
else:
self.locator_uuid_map[locator].remove(uuid)
if debug:
logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})"
.format(locator, uuid))
logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})".format(locator,
uuid))
def check_potential_matches(self, potential_matches, bitcoin_cli, debug, logging):
def check_potential_matches(self, potential_matches):
matches = []
for locator, dispute_txid in potential_matches.items():
for uuid in self.locator_uuid_map[locator]:
try:
# ToDo: #20-test-tx-decrypting-edge-cases
justice_rawtx = self.appointments[uuid].encrypted_blob.decrypt(unhexlify(dispute_txid), debug,
logging)
justice_rawtx = self.appointments[uuid].encrypted_blob.decrypt(unhexlify(dispute_txid))
justice_rawtx = hexlify(justice_rawtx).decode()
justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid')
matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx))
if debug:
logging.info("[Watcher] match found for locator {} (uuid: {}): {}".format(locator, uuid,
justice_txid))
except JSONRPCException as e:
# Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple
# for the POC
if debug:
logging.error("[Watcher] can't build transaction from decoded data. Error code {}".format(e))
return matches