Merge pull request #29 from sr-gi/25-change-appointment-structure

Improves internal data structures
This commit is contained in:
Sergi Delgado Segura
2019-08-27 10:43:45 +02:00
committed by GitHub
6 changed files with 345 additions and 232 deletions

View File

@@ -65,22 +65,21 @@ def get_appointment():
# ToDo: #15-add-system-monitor # ToDo: #15-add-system-monitor
appointment_in_watcher = watcher.appointments.get(locator) appointment_in_watcher = watcher.locator_uuid_map.get(locator)
if appointment_in_watcher: if appointment_in_watcher:
for appointment in appointment_in_watcher: for uuid in appointment_in_watcher:
appointment_data = appointment.to_json() appointment_data = watcher.appointments[uuid].to_json()
appointment_data['status'] = "being_watched" appointment_data['status'] = "being_watched"
response.append(appointment_data) response.append(appointment_data)
if watcher.responder: if watcher.responder:
responder_jobs = watcher.responder.jobs responder_jobs = watcher.responder.jobs
for job_id, job in responder_jobs.items(): for job in responder_jobs.values():
if job.locator == locator: if job.locator == locator:
job_data = job.to_json() job_data = job.to_json()
job_data['status'] = "dispute_responded" job_data['status'] = "dispute_responded"
job_data['confirmations'] = watcher.responder.confirmation_counter.get(job_id)
response.append(job_data) response.append(job_data)
if not response: if not response:
@@ -93,22 +92,18 @@ def get_appointment():
@app.route('/get_all_appointments', methods=['GET']) @app.route('/get_all_appointments', methods=['GET'])
def get_all_appointments(): def get_all_appointments():
watcher_appointments = [] watcher_appointments = {}
responder_jobs = [] responder_jobs = {}
# ToDo: #15-add-system-monitor # ToDo: #15-add-system-monitor
if request.remote_addr in request.host or request.remote_addr == '127.0.0.1': if request.remote_addr in request.host or request.remote_addr == '127.0.0.1':
for app_id, appointment in watcher.appointments.items(): for uuid, appointment in watcher.appointments.items():
jobs_data = [job.to_json() for job in appointment] watcher_appointments[uuid] = appointment.to_json()
watcher_appointments.append({app_id: jobs_data})
if watcher.responder: if watcher.responder:
for job_id, job in watcher.responder.jobs.items(): for uuid, job in watcher.responder.jobs.items():
job_data = job.to_json() responder_jobs[uuid] = job.to_json()
job_data['confirmations'] = watcher.responder.confirmation_counter.get(job_id)
responder_jobs.append({job_id: job_data})
response = jsonify({"watcher_appointments": watcher_appointments, "responder_jobs": responder_jobs}) response = jsonify({"watcher_appointments": watcher_appointments, "responder_jobs": responder_jobs})

View File

@@ -13,18 +13,22 @@ MIN_CONFIRMATIONS = 6
class Job: class Job:
def __init__(self, dispute_txid, justice_rawtx, appointment_end, retry_counter=0): def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0, retry_counter=0):
self.dispute_txid = dispute_txid self.dispute_txid = dispute_txid
# FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info self.justice_txid = justice_txid
# can be directly got from DB
self.locator = sha256(unhexlify(dispute_txid)).hexdigest()
self.justice_rawtx = justice_rawtx self.justice_rawtx = justice_rawtx
self.appointment_end = appointment_end self.appointment_end = appointment_end
self.confirmations = confirmations
self.missed_confirmations = 0 self.missed_confirmations = 0
self.retry_counter = retry_counter self.retry_counter = retry_counter
# FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info
# can be directly got from DB
self.locator = sha256(unhexlify(dispute_txid)).hexdigest()
def to_json(self): def to_json(self):
job = {"locator": self.dispute_txid, "justice_rawtx": self.justice_rawtx, job = {"locator": self.locator, "justice_rawtx": self.justice_rawtx, "confirmations": self.confirmations,
"appointment_end": self.appointment_end} "appointment_end": self.appointment_end}
return job return job
@@ -33,45 +37,17 @@ class Job:
class Responder: class Responder:
def __init__(self): def __init__(self):
self.jobs = dict() self.jobs = dict()
self.confirmation_counter = dict() self.tx_job_map = dict()
self.block_queue = None self.block_queue = None
self.asleep = True self.asleep = True
self.zmq_subscriber = None self.zmq_subscriber = None
def do_subscribe(self, block_queue, debug, logging): def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
self.zmq_subscriber = ZMQHandler(parent='Responder') retry=False):
self.zmq_subscriber.handle(block_queue, debug, logging)
def create_job(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, conf_counter=0,
retry=False):
# ToDo: #23-define-behaviour-approaching-end
if retry:
self.jobs[justice_txid].retry_counter += 1
self.jobs[justice_txid].missed_confirmations = 0
else:
self.confirmation_counter[justice_txid] = conf_counter
self.jobs[justice_txid] = Job(dispute_txid, justice_rawtx, appointment_end)
if debug:
logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'.
format(dispute_txid, justice_txid, appointment_end))
if self.asleep:
self.asleep = False
self.block_queue = Queue()
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging])
# ToDo: This may not have to be a thead. The main thread only creates this and terminates.
responder = Thread(target=self.handle_responses, args=[debug, logging])
zmq_thread.start()
responder.start()
def add_response(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, retry=False):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT)) BTC_RPC_PORT))
# ToDo: Moving the sending functionality to a separate function would improve readability. Also try to use
# check_tx_in_chain if possible.
try: try:
if debug: if debug:
if self.asleep: if self.asleep:
@@ -82,44 +58,44 @@ class Responder:
# handle_responses can call add_response recursively if a broadcast transaction does not get confirmations # handle_responses can call add_response recursively if a broadcast transaction does not get confirmations
# retry holds such information. # retry holds such information.
self.create_job(dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, retry=retry) self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
retry=retry)
except JSONRPCException as e: except JSONRPCException as e:
# Since we're pushing a raw transaction to the network we can get two kind of rejections: self.handle_send_failures(e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end,
# RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected debug, logging, retry)
# due to network rules, whereas the later implies that the transaction is already in the blockchain.
if e.error.get('code') == RPC_VERIFY_REJECTED:
# DISCUSS: what to do in this case
# DISCUSS: invalid transactions (properly formatted but invalid, like unsigned) fit here too.
# DISCUSS: RPC_VERIFY_ERROR could also be a possible case.
# DISCUSS: check errors -9 and -10
pass
elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN:
try:
if debug:
logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and "
"start monitoring the transaction".format(justice_txid))
# If the transaction is already in the chain, we get the number of confirmations and watch the job def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
# until the end of the appointment confirmations=0, retry=False):
tx_info = bitcoin_cli.getrawtransaction(justice_txid, 1)
confirmations = int(tx_info.get("confirmations"))
self.create_job(dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
retry=retry, conf_counter=confirmations)
except JSONRPCException as e: # ToDo: #23-define-behaviour-approaching-end
# While it's quite unlikely, the transaction that was already in the blockchain could have been if retry:
# reorged while we were querying bitcoind to get the confirmation count. In such a case we just self.jobs[uuid].retry_counter += 1
# restart the job self.jobs[uuid].missed_confirmations = 0
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY: else:
self.add_response(dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, self.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations)
retry=retry)
elif debug: if justice_txid in self.tx_job_map:
# If something else happens (unlikely but possible) log it so we can treat it in future releases self.tx_job_map[justice_txid].append(uuid)
logging.error("[Responder] JSONRPCException. Error code {}".format(e))
elif debug: else:
# If something else happens (unlikely but possible) log it so we can treat it in future releases self.tx_job_map[justice_txid] = [uuid]
logging.error("[Responder] JSONRPCException. Error code {}".format(e))
if debug:
logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'.
format(dispute_txid, justice_txid, appointment_end))
if self.asleep:
self.asleep = False
self.block_queue = Queue()
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging])
responder = Thread(target=self.handle_responses, args=[debug, logging])
zmq_thread.start()
responder.start()
def do_subscribe(self, block_queue, debug, logging):
self.zmq_subscriber = ZMQHandler(parent='Responder')
self.zmq_subscriber.handle(block_queue, debug, logging)
def handle_responses(self, debug, logging): def handle_responses(self, debug, logging):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
@@ -145,48 +121,39 @@ class Responder:
continue continue
jobs_to_delete = [] completed_jobs = []
if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0: if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0:
# Keep count of the confirmations each tx gets # Keep count of the confirmations each tx gets
for job_id, confirmations in self.confirmation_counter.items(): for justice_txid, jobs in self.tx_job_map.items():
# If we see the transaction for the first time, or appointment_end & MIN_CONFIRMATIONS hasn't been for uuid in jobs:
# reached if justice_txid in txs or self.jobs[uuid].confirmations > 0:
if job_id in txs or confirmations > 0: self.jobs[uuid].confirmations += 1
self.confirmation_counter[job_id] += 1
if debug: if debug:
logging.info("[Responder] new confirmation received for txid = {}".format(job_id)) logging.info("[Responder] new confirmation received for job = {}, txid = {}".format(
uuid, justice_txid))
elif self.jobs[job_id].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY: elif self.jobs[uuid].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY:
# If a transactions has missed too many confirmations for a while we'll try to rebroadcast # If a transactions has missed too many confirmations for a while we'll try to rebroadcast
# ToDO: #22-discuss-confirmations-before-retry # ToDO: #22-discuss-confirmations-before-retry
# ToDo: #23-define-behaviour-approaching-end # ToDo: #23-define-behaviour-approaching-end
self.add_response(self.jobs[job_id].dispute_txid, job_id, self.jobs[job_id].justice_rawtx, self.add_response(uuid, self.jobs[uuid].dispute_txid, justice_txid,
self.jobs[job_id].appointment_end, debug, logging, retry=True) self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, debug,
if debug: logging, retry=True)
logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting" if debug:
.format(job_id, CONFIRMATIONS_BEFORE_RETRY)) logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting"
else: .format(justice_txid, CONFIRMATIONS_BEFORE_RETRY))
# Otherwise we increase the number of missed confirmations
self.jobs[job_id].missed_confirmations += 1
for job_id, job in self.jobs.items(): else:
if job.appointment_end <= height and self.confirmation_counter[job_id] >= MIN_CONFIRMATIONS: # Otherwise we increase the number of missed confirmations
# The end of the appointment has been reached self.jobs[uuid].missed_confirmations += 1
jobs_to_delete.append(job_id)
for job_id in jobs_to_delete: if self.jobs[uuid].appointment_end <= height and self.jobs[uuid].confirmations >= \
# ToDo: Find a better way to solve this. Deepcopy of the keys maybe? MIN_CONFIRMATIONS:
# Trying to delete directly when iterating the last for causes dictionary changed size error during # The end of the appointment has been reached
# iteration in Python3 (can not be solved iterating only trough keys in Python3 either) completed_jobs.append(uuid)
if debug: self.remove_completed_jobs(completed_jobs, height, debug, logging)
logging.info("[Responder] {} completed. Appointment ended at block {} after {} confirmations"
.format(job_id, height, self.confirmation_counter[job_id]))
# ToDo: #9-add-data-persistency
del self.jobs[job_id]
del self.confirmation_counter[job_id]
else: else:
if debug: if debug:
@@ -204,32 +171,96 @@ class Responder:
if debug: if debug:
logging.info("[Responder] no more pending jobs, going back to sleep") logging.info("[Responder] no more pending jobs, going back to sleep")
def handle_send_failures(self, e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end,
debug, logging, retry):
# Since we're pushing a raw transaction to the network we can get two kind of rejections:
# RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected
# due to network rules, whereas the later implies that the transaction is already in the blockchain.
if e.error.get('code') == RPC_VERIFY_REJECTED:
# DISCUSS: what to do in this case
# DISCUSS: invalid transactions (properly formatted but invalid, like unsigned) fit here too.
# DISCUSS: RPC_VERIFY_ERROR could also be a possible case.
# DISCUSS: check errors -9 and -10
pass
elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN:
try:
if debug:
logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and "
"start monitoring the transaction".format(justice_txid))
# If the transaction is already in the chain, we get the number of confirmations and watch the job
# until the end of the appointment
tx_info = bitcoin_cli.getrawtransaction(justice_txid, 1)
confirmations = int(tx_info.get("confirmations"))
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
retry=retry, confirmations=confirmations)
except JSONRPCException as e:
# While it's quite unlikely, the transaction that was already in the blockchain could have been
# reorged while we were querying bitcoind to get the confirmation count. In such a case we just
# restart the job
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
self.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug,
logging, retry=retry)
elif debug:
# If something else happens (unlikely but possible) log it so we can treat it in future releases
logging.error("[Responder] JSONRPCException. Error {}".format(e))
elif debug:
# If something else happens (unlikely but possible) log it so we can treat it in future releases
logging.error("[Responder] JSONRPCException. Error {}".format(e))
def remove_completed_jobs(self, completed_jobs, height, debug, logging):
for uuid in completed_jobs:
if debug:
logging.info("[Responder] job completed (uuid = {}, justice_txid = {}). Appointment ended at "
"block {} after {} confirmations".format(uuid, self.jobs[uuid].justice_txid, height,
self.jobs[uuid].confirmations))
# ToDo: #9-add-data-persistency
justice_txid = self.jobs[uuid].justice_txid
self.jobs.pop(uuid)
if len(self.tx_job_map[justice_txid]) == 1:
self.tx_job_map.pop(justice_txid)
if debug:
logging.info("[Responder] no more jobs for justice_txid {}".format(justice_txid))
else:
self.tx_job_map[justice_txid].remove(uuid)
def handle_reorgs(self, bitcoin_cli, debug, logging): def handle_reorgs(self, bitcoin_cli, debug, logging):
for job_id, job in self.jobs.items(): for uuid, job in self.jobs.items():
# First we check if the dispute transaction is still in the blockchain. If not, the justice can not be # First we check if the dispute transaction is still in the blockchain. If not, the justice can not be
# there either, so we'll need to call the reorg manager straight away # there either, so we'll need to call the reorg manager straight away
dispute_in_chain, _ = check_tx_in_chain(bitcoin_cli, job.dispute_txid, debug, logging, parent='Responder', dispute_in_chain, _ = check_tx_in_chain(bitcoin_cli, job.dispute_txid, debug, logging,
parent='Responder',
tx_label='dispute tx') tx_label='dispute tx')
# If the dispute is there, we can check the justice tx # If the dispute is there, we can check the justice tx
if dispute_in_chain: if dispute_in_chain:
justice_in_chain, justice_confirmations = check_tx_in_chain(bitcoin_cli, job_id, debug, logging, justice_in_chain, justice_confirmations = check_tx_in_chain(bitcoin_cli, job.justice_txid, debug,
parent='Responder', tx_label='justice tx') logging, parent='Responder',
tx_label='justice tx')
# If both transactions are there, we only need to update the justice tx confirmation count # If both transactions are there, we only need to update the justice tx confirmation count
if justice_in_chain: if justice_in_chain:
if debug: if debug:
logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format( logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format(
job_id, self.confirmation_counter[job_id], justice_confirmations)) job.justice_txid, job.confirmations, justice_confirmations))
self.confirmation_counter[job_id] = justice_confirmations job.confirmations = justice_confirmations
else: else:
# Otherwise, we will add the job back (implying rebroadcast of the tx) and monitor it again # Otherwise, we will add the job back (implying rebroadcast of the tx) and monitor it again
# DISCUSS: Adding job back, should we flag it as retried? # DISCUSS: Adding job back, should we flag it as retried?
# FIXME: Whether we decide to increase the retried counter or not, the current counter should be # FIXME: Whether we decide to increase the retried counter or not, the current counter should be
# maintained. There is no way of doing so with the current approach. Update if required # maintained. There is no way of doing so with the current approach. Update if required
self.add_response(job.dispute_txid, job_id, job.justice_rawtx, job.appointment_end, debug, logging) self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx,
job.appointment_end,
debug, logging)
else: else:
# ToDo: #24-properly-handle-reorgs # ToDo: #24-properly-handle-reorgs

View File

@@ -5,12 +5,14 @@ from pisa.responder import Responder
from pisa.zmq_subscriber import ZMQHandler from pisa.zmq_subscriber import ZMQHandler
from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException
from hashlib import sha256 from hashlib import sha256
from uuid import uuid4
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS, EXPIRY_DELTA from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS, EXPIRY_DELTA
class Watcher: class Watcher:
def __init__(self, max_appointments=MAX_APPOINTMENTS): def __init__(self, max_appointments=MAX_APPOINTMENTS):
self.appointments = dict() self.appointments = dict()
self.locator_uuid_map = dict()
self.block_queue = None self.block_queue = None
self.asleep = True self.asleep = True
self.max_appointments = max_appointments self.max_appointments = max_appointments
@@ -28,11 +30,17 @@ class Watcher:
if len(self.appointments) < self.max_appointments: if len(self.appointments) < self.max_appointments:
# Appointments are identified by the locator: the sha256 of commitment txid (H(tx_id)). # Appointments are identified by the locator: the sha256 of commitment txid (H(tx_id)).
# Two different nodes may ask for appointments using the same commitment txid, what will result in a # Two different nodes may ask for appointments using the same commitment txid, what will result in a
# collision in our appointments structure (and may be an attack surface), we use lists to avoid that. # collision in our appointments structure (and may be an attack surface). In order to avoid such collisions
if not self.appointments.get(appointment.locator): # we will identify every appointment with a uuid
self.appointments[appointment.locator] = []
self.appointments[appointment.locator].append(appointment) uuid = uuid4().hex
self.appointments[uuid] = appointment
if appointment.locator in self.locator_uuid_map:
self.locator_uuid_map[appointment.locator].append(uuid)
else:
self.locator_uuid_map[appointment.locator] = [uuid]
if self.asleep: if self.asleep:
self.asleep = False self.asleep = False
@@ -78,46 +86,22 @@ class Watcher:
logging.info("[Watcher] new block received {}".format(block_hash)) logging.info("[Watcher] new block received {}".format(block_hash))
logging.info("[Watcher] list of transactions: {}".format(txids)) logging.info("[Watcher] list of transactions: {}".format(txids))
# Delete expired appointments self.delete_expired_appointment(block, debug, logging)
# ToDo: #9: also move this to a function
to_delete = {}
for locator in self.appointments:
for appointment in self.appointments[locator]:
if block["height"] > appointment.end_time + EXPIRY_DELTA:
# Get the appointment index and add the appointment to the deletion list
appointment_pos = self.appointments[locator].index(appointment)
if locator in to_delete:
to_delete[locator].append(appointment_pos)
else:
to_delete[locator] = [appointment_pos]
for locator, indexes in to_delete.items():
if len(indexes) == len(self.appointments[locator]):
if debug:
logging.info("[Watcher] end time reached with no match! Deleting appointment {}"
.format(locator))
del self.appointments[locator]
# ToDo: #9-add-data-persistency
else:
for i in indexes:
if debug:
logging.info("[Watcher] end time reached with no match! Deleting appointment {}:{}"
.format(locator, i))
del self.appointments[locator][i]
# ToDo: #9-add-data-persistency
potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids} potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids}
# Check is any of the tx_ids in the received block is an actual match # Check is any of the tx_ids in the received block is an actual match
potential_matches = {}
# ToDo: set intersection should be a more optimal solution # ToDo: set intersection should be a more optimal solution
for locator in self.appointments.keys(): # Get the locators that are both in the map and in the potential locators dict.
if locator in potential_locators: intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys())
# This is locator:txid potential_matches = {locator: potential_locators[locator] for locator in intersection}
potential_matches[locator] = potential_locators[locator]
# potential_matches = {}
# for locator in self.locator_uuid_map.keys():
# if locator in potential_locators:
# # This is locator:txid
# potential_matches[locator] = potential_locators[locator]
if debug: if debug:
if len(potential_matches) > 0: if len(potential_matches) > 0:
@@ -127,22 +111,25 @@ class Watcher:
matches = self.check_potential_matches(potential_matches, bitcoin_cli, debug, logging) matches = self.check_potential_matches(potential_matches, bitcoin_cli, debug, logging)
for locator, appointment_pos, dispute_txid, justice_txid, justice_rawtx in matches: for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches:
if debug: if debug:
logging.info("[Watcher] notifying responder about {} and deleting appointment {}:{}".format( logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})"
justice_txid, locator, appointment_pos)) .format(justice_txid, locator, uuid))
self.responder.add_response(dispute_txid, justice_txid, justice_rawtx, self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx,
self.appointments[locator][appointment_pos].end_time, debug, logging) self.appointments[uuid].end_time, debug, logging)
# Delete the appointment
self.appointments.pop(uuid)
# If there was only one appointment that matches the locator we can delete the whole list # If there was only one appointment that matches the locator we can delete the whole list
if len(self.appointments[locator]) == 1: if len(self.locator_uuid_map[locator]) == 1:
# ToDo: #9-add-data-persistency # ToDo: #9-add-data-persistency
del self.appointments[locator] self.locator_uuid_map.pop(locator)
else: else:
# Otherwise we just delete the appointment that matches locator:appointment_pos # Otherwise we just delete the appointment that matches locator:appointment_pos
# ToDo: #9-add-data-persistency # ToDo: #9-add-data-persistency
del self.appointments[locator][appointment_pos] self.locator_uuid_map[locator].remove(uuid)
except JSONRPCException as e: except JSONRPCException as e:
if debug: if debug:
@@ -155,21 +142,49 @@ class Watcher:
if debug: if debug:
logging.error("[Watcher] no more pending appointments, going back to sleep") logging.error("[Watcher] no more pending appointments, going back to sleep")
def delete_expired_appointment(self, block, debug, logging):
# to_delete = []
#
# for uuid, appointment in self.appointments.items():
# if block["height"] > appointment.end_time + EXPIRY_DELTA:
# # Add the appointment to the deletion list
# to_delete.append(uuid)
to_delete = [uuid for uuid, appointment in self.appointments.items() if block["height"] > appointment.end_time
+ EXPIRY_DELTA]
for uuid in to_delete:
# ToDo: #9-add-data-persistency
locator = self.appointments[uuid].locator
self.appointments.pop(uuid)
if len(self.locator_uuid_map[locator]) == 1:
self.locator_uuid_map.pop(locator)
else:
self.locator_uuid_map[locator].remove(uuid)
if debug:
logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})"
.format(locator, uuid))
def check_potential_matches(self, potential_matches, bitcoin_cli, debug, logging): def check_potential_matches(self, potential_matches, bitcoin_cli, debug, logging):
matches = [] matches = []
for locator, dispute_txid in potential_matches.items(): for locator, dispute_txid in potential_matches.items():
for appointment_pos, appointment in enumerate(self.appointments.get(locator)): for uuid in self.locator_uuid_map[locator]:
try: try:
# ToDo: #20-test-tx-decrypting-edge-cases # ToDo: #20-test-tx-decrypting-edge-cases
justice_rawtx = appointment.encrypted_blob.decrypt(unhexlify(dispute_txid), debug, logging) justice_rawtx = self.appointments[uuid].encrypted_blob.decrypt(unhexlify(dispute_txid), debug,
logging)
justice_rawtx = hexlify(justice_rawtx).decode() justice_rawtx = hexlify(justice_rawtx).decode()
justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid') justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid')
matches.append((locator, appointment_pos, dispute_txid, justice_txid, justice_rawtx)) matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx))
if debug: if debug:
logging.info("[Watcher] match found for {}:{}! {}".format(locator, appointment_pos, logging.info("[Watcher] match found for locator {} (uuid: {}): {}".format(locator, uuid,
justice_txid)) justice_txid))
except JSONRPCException as e: except JSONRPCException as e:
# Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple # Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple
# for the POC # for the POC

View File

@@ -0,0 +1,121 @@
import os
import json
import requests
import time
from copy import deepcopy
from hashlib import sha256
from binascii import hexlify, unhexlify
from apps.cli.blob import Blob
from pisa import HOST, PORT
from pisa.utils.authproxy import AuthServiceProxy
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
PISA_API = "http://{}:{}".format(HOST, PORT)
def generate_dummy_appointment(dispute_txid):
r = requests.get(url=PISA_API + '/get_block_count', timeout=5)
current_height = r.json().get("block_count")
dummy_appointment_data = {"tx": hexlify(os.urandom(32)).decode('utf-8'),
"tx_id": dispute_txid, "start_time": current_height + 5,
"end_time": current_height + 10, "dispute_delta": 20}
cipher = "AES-GCM-128"
hash_function = "SHA256"
locator = sha256(unhexlify(dummy_appointment_data.get("tx_id"))).hexdigest()
blob = Blob(dummy_appointment_data.get("tx"), cipher, hash_function)
encrypted_blob = blob.encrypt((dummy_appointment_data.get("tx_id")), debug=False, logging=False)
appointment = {"locator": locator, "start_time": dummy_appointment_data.get("start_time"),
"end_time": dummy_appointment_data.get("end_time"),
"dispute_delta": dummy_appointment_data.get("dispute_delta"),
"encrypted_blob": encrypted_blob, "cipher": cipher, "hash_function": hash_function}
return appointment
def test_add_appointment(appointment=None):
if not appointment:
dispute_txid = hexlify(os.urandom(32)).decode('utf-8')
appointment = generate_dummy_appointment(dispute_txid)
print("Sending appointment (locator: {}) to PISA".format(appointment.get("locator")))
r = requests.post(url=PISA_API, json=json.dumps(appointment), timeout=5)
assert (r.status_code == 200 and r.reason == 'OK')
print(r.content.decode())
print("Requesting it back from PISA")
r = requests.get(url=PISA_API + "/get_appointment?locator=" + appointment["locator"])
assert (r.status_code == 200 and r.reason == 'OK')
received_appointments = json.loads(r.content)
# Take the status out and leave the received appointments ready to compare
appointment_status = [appointment.pop("status") for appointment in received_appointments]
# Check that the appointment is within the received appoints
assert (appointment in received_appointments)
# Check that all the appointments are being watched
assert (all([status == "being_watched" for status in appointment_status]))
def test_same_locator_multiple_appointments():
dispute_txid = hexlify(os.urandom(32)).decode('utf-8')
appointment = generate_dummy_appointment(dispute_txid)
# Send it once
test_add_appointment(appointment)
time.sleep(0.5)
# Try again with the same data
print("Sending it again")
test_add_appointment(appointment)
time.sleep(0.5)
# Try again with the same data but increasing the end time
print("Sending once more")
dup_appointment = deepcopy(appointment)
dup_appointment["end_time"] += 1
test_add_appointment(dup_appointment)
print("Sleeping 5 sec")
time.sleep(5)
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
print("Triggering PISA with dispute tx")
bitcoin_cli.sendrawtransaction(dispute_txid)
print("Sleeping 10 sec (waiting for a new block)")
time.sleep(10)
print("Getting all appointments")
r = requests.get(url=PISA_API + "/get_all_appointments")
assert (r.status_code == 200 and r.reason == 'OK')
received_appointments = json.loads(r.content)
# Make sure there is not pending instance of the locator in the watcher
watcher_locators = [appointment["locator"] for appointment in received_appointments["watcher_appointments"]]
assert(appointment["locator"] not in watcher_locators)
# Make sure all the appointments went trough
target_jobs = [v for k, v in received_appointments["responder_jobs"].items() if v["locator"] ==
appointment["locator"]]
assert (len(target_jobs) == 3)
if __name__ == '__main__':
test_same_locator_multiple_appointments()
print("All good!")

View File

@@ -1,52 +0,0 @@
import os
import json
import requests
import time
from hashlib import sha256
from binascii import hexlify, unhexlify
from apps.cli.blob import Blob
from pisa import HOST, PORT
from pisa.utils.authproxy import AuthServiceProxy
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
PISA_API = "http://{}:{}".format(HOST, PORT)
def generate_dummy_appointment(dispute_txid):
r = requests.get(url=PISA_API+'/get_block_count', timeout=5)
current_height = r.json().get("block_count")
dummy_appointment_data = {"tx": hexlify(os.urandom(32)).decode('utf-8'),
"tx_id": dispute_txid, "start_time": current_height + 5,
"end_time": current_height + 10, "dispute_delta": 20}
cipher = "AES-GCM-128"
hash_function = "SHA256"
locator = sha256(unhexlify(dummy_appointment_data.get("tx_id"))).hexdigest()
blob = Blob(dummy_appointment_data.get("tx"), cipher, hash_function)
encrypted_blob = blob.encrypt((dummy_appointment_data.get("tx_id")), debug=False, logging=False)
appointment = {"locator": locator, "start_time": dummy_appointment_data.get("start_time"),
"end_time": dummy_appointment_data.get("end_time"),
"dispute_delta": dummy_appointment_data.get("dispute_delta"),
"encrypted_blob": encrypted_blob, "cipher": cipher, "hash_function": hash_function}
return appointment
dispute_txid = hexlify(os.urandom(32)).decode('utf-8')
appointment = generate_dummy_appointment(dispute_txid)
print("Sending appointment (locator: {}) to PISA".format(appointment.get("locator")))
r = requests.post(url=PISA_API, json=json.dumps(appointment), timeout=5)
print(r, r.reason)
print("Sleeping 10 sec")
time.sleep(10)
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
print("Triggering PISA with dispute tx")
bitcoin_cli.sendrawtransaction(dispute_txid)

View File

@@ -77,7 +77,7 @@ def process_request():
if isinstance(txid, str): if isinstance(txid, str):
if check_txid_format(txid): if check_txid_format(txid):
if txid not in mempool and txid not in list(mined_transactions.keys()): if txid not in list(mined_transactions.keys()):
mempool.append(txid) mempool.append(txid)
else: else:
@@ -100,6 +100,9 @@ def process_request():
if block: if block:
response["result"] = {"confirmations": len(blockchain) - block.get('height')} response["result"] = {"confirmations": len(blockchain) - block.get('height')}
elif txid in mempool:
response["result"] = {"confirmations": 0}
else: else:
response["error"] = {'code': RPC_INVALID_ADDRESS_OR_KEY, response["error"] = {'code': RPC_INVALID_ADDRESS_OR_KEY,
'message': 'No such mempool or blockchain transaction. Use gettransaction for ' 'message': 'No such mempool or blockchain transaction. Use gettransaction for '