Builds responder

Builds responder and adapts the rest of classes to integrate with it. Code has not been tested yet.
This commit is contained in:
Sergi Delgado
2019-04-27 18:52:49 +01:00
committed by Sergi Delgado Segura
parent 90a1dc70e8
commit dea4a78edc
5 changed files with 197 additions and 25 deletions

View File

@@ -39,6 +39,7 @@ def manage_request(conn, remote_addr, remote_port, inspector, watcher, debug, lo
if appointment:
appointment_added = watcher.add_appointment(appointment, debug, logging)
# FIXME: Response should be signed receipt (created and signed by the API)
if appointment_added:
response = "Appointment accepted"
else:

View File

@@ -1,9 +1,13 @@
# Basic appointment structure
# DISCUSS: about the field the appointment will have
class Appointment:
def __init__(self, locator, start_time, end_time, encrypted_blob, cypher):
def __init__(self, locator, start_time, end_time, dispute_delta, encrypted_blob, cypher):
self.locator = locator
self.start_time = start_time
self.end_time = end_time
self.dispute_delta = dispute_delta
self.encrypted_blob = encrypted_blob
self.cypher = cypher

View File

@@ -1,9 +1,11 @@
from pisa.appointment import Appointment
# FIXME: Implement a proper inspector
class Inspector:
def __init__(self):
pass
def inspect(self, appointment, debug):
return Appointment(appointment, None, None, None, None)
# Return Appointment if success, None otherwise
return Appointment(appointment, None, None, None, None, None)

160
pisa-btc/pisa/responder.py Normal file
View File

@@ -0,0 +1,160 @@
from queue import Queue
from threading import Thread
from pisa.zmq_subscriber import ZMQHandler
from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException
from conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
CONFIRMATIONS_BEFORE_RETRY = 6
MIN_CONFIRMATIONS = 6
class Job:
def __init__(self, dispute_txid, rawtx, appointment_end, retry_counter=0):
self.dispute_txid = dispute_txid
self.rawtx = rawtx
self.appointment_end = appointment_end
self.in_block_height = None
self.missed_confirmations = 0
self.retry_counter = retry_counter
class Responder:
def __init__(self):
self.jobs = dict()
self.confirmation_counter = dict()
self.block_queue = Queue()
self.asleep = True
def add_response(self, dispute_txid, txid, rawtx, appointment_end, debug, logging, retry=False):
if self.asleep:
self.asleep = False
zmq_subscriber = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging])
responder = Thread(target=self.handle_responses, args=[debug, logging])
zmq_subscriber.start()
responder.start()
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
try:
# ToDo: All errors should be handled as JSONRPCException, check that holds (if so response if no needed)
response = bitcoin_cli.sendrawtransaction(rawtx)
# handle_responses can call add_response recursively if a broadcast transaction does not get confirmations
# retry holds such information.
# DISCUSS: Check what to do if the retry counter gets too big
if retry:
self.jobs[txid].retry_counter += 1
else:
self.confirmation_counter[txid] = 0
self.jobs[txid] = Job(dispute_txid, rawtx, appointment_end)
if debug:
logging.info('[Responder] new job added (dispute txid = {}, txid = {}, appointment end = {})'.format(
dispute_txid, txid, appointment_end))
except JSONRPCException as e:
if debug:
# ToDo: Check type of error if transaction does not get through
logging.error("[Responder] JSONRPCException. Error code {}".format(e))
def handle_responses(self, debug, logging):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
prev_block_hash = None
while len(self.jobs) > 0:
# We get notified for every new received block
block_hash = self.block_queue.get()
try:
block = bitcoin_cli.getblock(block_hash)
txs = block.get('tx')
height = block.get('height')
if debug:
logging.info("[Responder] new block received {}".format(block_hash))
logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash')))
logging.info("[Responder] list of transactions: {}".format(txs))
except JSONRPCException as e:
if debug:
logging.error("[Responder] couldn't get block from bitcoind. Error code {}".format(e))
continue
if prev_block_hash == block.get('previousblockhash'):
for job_id, job in self.jobs.items():
if job.appointment_end <= height:
# The end of the appointment has been reached
# ToDo: record job in DB
del (self.jobs[job_id])
if debug:
logging.info("[Responder] job completed. Appointment ended at height {}".format(job_id,
height))
# Handling new jobs (aka jobs with not enough confirmations), when a job receives MIN_CONFIRMATIONS
# it will be passed to jobs and we will simply check for chain forks.
for job_id, confirmations in self.confirmation_counter.items():
# If we see the transaction for the first time, or MIN_CONFIRMATIONS hasn't been reached
if job_id in txs or (0 < confirmations < MIN_CONFIRMATIONS):
confirmations += 1
if debug:
logging.info("[Responder] new confirmation received for txid = {}".format(job_id))
elif self.jobs[job_id].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY:
# If a transactions has missed too many confirmations for a while we'll try to rebroadcast
# DISCUSS: How many confirmations before retry
# DISCUSS: recursion vs setting confirmations to 0 and rebroadcast here
# DISCUSS: how many max retries and what to do if the cap is reached
self.add_response(self.jobs[job_id].dispute_txid, job_id, self.jobs[job_id].tx,
self.jobs[job_id].appointment_end, debug, logging, retry=True)
if debug:
logging.info("[Responder] txid = {} has missed {} confirmations. Rebroadcast"
.format(job_id, CONFIRMATIONS_BEFORE_RETRY))
else:
# Otherwise we increase the number of missed confirmations
self.jobs[job_id].missed_confirmations += 1
else:
# ToDo: REORG!!
if debug:
logging.error("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}"
.format(prev_block_hash, block.get('previousblockhash')))
self.handle_reorgs(bitcoin_cli, debug, logging)
prev_block_hash = block.get('previousblockhash')
# Go back to sleep if there are no more jobs
self.asleep = True
if debug:
logging.error("[Responder] no more pending jobs, going back to sleep.")
def handle_reorgs(self, bitcoin_cli, debug, logging):
for job_id, job in self.jobs:
try:
tx_info = bitcoin_cli.gettransaction(job_id)
job.confirmations = int(tx_info.get("confirmations"))
except JSONRPCException as e:
# FIXME: It should be safe but check Exception code anyway
if debug:
logging.error("[Responder] justice transaction (txid = {}) not found!".format(job_id))
try:
bitcoin_cli.gettransaction(job.dispute_txid)
# DISCUSS: Add job back, should we flag it as retried?
self.add_response(job.dispute_txid, job_id, job.rawtx, job.appointment_end, debug, logging)
except JSONRPCException as e:
# FIXME: It should be safe but check Exception code anyway
# ToDO: Dispute transaction if not there either, call reorg manager
if debug:
logging.error("[Responder] dispute transaction (txid = {}) not found either!"
.format(job.dispute_txid))
pass
def do_subscribe(self, block_queue, debug, logging):
daemon = ZMQHandler()
daemon.handle(block_queue, debug, logging)

View File

@@ -1,6 +1,7 @@
from queue import Queue
from threading import Thread
from pisa.tools import decrypt_tx
from pisa.responder import Responder
from pisa.zmq_subscriber import ZMQHandler
from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException
from conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS
@@ -14,7 +15,7 @@ class Watcher:
self.max_appointments = max_appointments
def add_appointment(self, appointment, debug, logging):
# ToDo: Discuss about validation of input data
# DISCUSS: about validation of input data
# Rationale:
# The Watcher will analyze every received block looking for appointment matches. If there is no work
@@ -23,8 +24,6 @@ class Watcher:
# If the watcher is awake, every new appointment will just be added to the appointment list until
# max_appointments is reached.
# ToDo: Check how to handle appointment completion
if len(self.appointments) < self.max_appointments:
# Appointments are identified by the locator: the most significant 16 bytes of the commitment txid.
# While 16-byte hash collisions are not likely, they are possible, so we will store appointments in lists
@@ -37,7 +36,8 @@ class Watcher:
if self.asleep:
self.asleep = False
zmq_subscriber = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging])
watcher = Thread(target=self.do_watch, args=[debug, logging])
responder = Responder()
watcher = Thread(target=self.do_watch, args=[responder, debug, logging])
zmq_subscriber.start()
watcher.start()
@@ -59,7 +59,7 @@ class Watcher:
daemon = ZMQHandler()
daemon.handle(block_queue, debug, logging)
def do_watch(self, debug, logging):
def do_watch(self, responder, debug, logging):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
@@ -68,13 +68,10 @@ class Watcher:
try:
block = bitcoin_cli.getblock(block_hash)
# ToDo: prev_block_id will be used to store chain state and handle reorgs
prev_block_id = block.get('previousblockhash')
txs = block.get('tx')
if debug:
logging.info("[Watcher] new block received {}".format(block_hash))
logging.info("[Watcher] prev. block hash {}".format(prev_block_id))
logging.info("[Watcher] list of transactions: {}".format(txs))
potential_matches = []
@@ -90,12 +87,12 @@ class Watcher:
matches = self.check_potential_matches(potential_matches, bitcoin_cli, debug, logging)
for locator, appointment_pos, transaction in matches:
# ToDo: Notify responder with every match.
# notify_responder(transaction)
for locator, appointment_pos, txid, tx in matches:
# FIXME: Notify responder with every match.
responder.add_response(txid, tx, self.appointments[locator].end_time, debug, logging)
# If there was only one appointment that matches the locator we can delete the whole list
# ToDo: We may want to use locks before adding / removing appointment
# DISCUSS: We may want to use locks before adding / removing appointment
if len(self.appointments[locator]) == 1:
del self.appointments[locator]
else:
@@ -103,34 +100,42 @@ class Watcher:
del self.appointments[locator][appointment_pos]
if debug:
logging.error("[Watcher] Notifying responder about {}:{} and deleting appointment"
logging.error("[Watcher] notifying responder about {}:{} and deleting appointment"
.format(locator, appointment_pos))
except JSONRPCException as e:
logging.error("[Watcher] JSONRPCException. Error code {}".format(e))
if debug:
logging.error("[Watcher] JSONRPCException. Error code {}".format(e))
continue
# Go back to sleep if there are no more appointments
self.asleep = True
if debug:
logging.error("[Watcher] no more pending appointments, going back to sleep.")
def check_potential_matches(self, potential_matches, bitcoin_cli, debug, logging):
matches = []
for locator, k in potential_matches:
for appointment_pos, appointment in enumerate(self.appointments.get(locator)):
try:
# ToDo: Put this back
# decrypted_data = decrypt_tx(appointment.encrypted_blob, k, appointment.cypher)
# ToDo: Remove this. Temporary hack, since we are not working with blobs but with ids for now
# ToDo: just get the raw transaction that matches both parts of the id
decrypted_data = bitcoin_cli.getrawtransaction(locator + k)
txid = locator + k
# FIXME: Put this back
# tx = decrypt_tx(appointment.encrypted_blob, k, appointment.cypher)
# FIXME: Remove this. Temporary hack, since we are not working with blobs but with ids for now
# FIXME: just get the raw transaction that matches both parts of the id
tx = bitcoin_cli.getrawtransaction(txid)
bitcoin_cli.decoderawtransaction(decrypted_data)
matches.append((locator, appointment_pos, decrypted_data))
bitcoin_cli.decoderawtransaction(tx)
matches.append((locator, appointment_pos, txid, tx))
if debug:
logging.error("[Watcher] Match found for {}:{}! {}".format(locator, appointment_pos, locator+k))
logging.error("[Watcher] match found for {}:{}! {}".format(locator, appointment_pos, locator+k))
except JSONRPCException as e:
# Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple
# for the POC
if debug:
logging.error("[Watcher] Can't build transaction from decoded data. Error code {}".format(e))
logging.error("[Watcher] can't build transaction from decoded data. Error code {}".format(e))
continue
return matches