Clean up + higher modular design

This commit is contained in:
Sergi Delgado Segura
2019-10-03 11:49:49 +01:00
parent 5ba6fcb9ef
commit 3e0cca14d7
2 changed files with 50 additions and 101 deletions

View File

@@ -2,9 +2,12 @@ from queue import Queue
from threading import Thread from threading import Thread
from hashlib import sha256 from hashlib import sha256
from binascii import unhexlify from binascii import unhexlify
from pisa import logging, bitcoin_cli
from pisa.rpc_errors import * from pisa.rpc_errors import *
from pisa.cleaner import Cleaner
from pisa import logging, bitcoin_cli
from pisa.tools import check_tx_in_chain from pisa.tools import check_tx_in_chain
from pisa.block_processor import BlockProcessor
from pisa.utils.zmq_subscriber import ZMQHandler from pisa.utils.zmq_subscriber import ZMQHandler
from pisa.utils.auth_proxy import JSONRPCException from pisa.utils.auth_proxy import JSONRPCException
@@ -90,12 +93,16 @@ class Responder:
self.zmq_subscriber.handle(block_queue) self.zmq_subscriber.handle(block_queue)
def handle_responses(self): def handle_responses(self):
# ToDo: #9-add-data-persistence
# change prev_block_hash to the last known tip when bootstrapping
prev_block_hash = 0 prev_block_hash = 0
while len(self.jobs) > 0: while len(self.jobs) > 0:
# We get notified for every new received block # We get notified for every new received block
block_hash = self.block_queue.get() block_hash = self.block_queue.get()
block = BlockProcessor.getblock(block_hash)
try: if block is not None:
block = bitcoin_cli.getblock(block_hash) block = bitcoin_cli.getblock(block_hash)
txs = block.get('tx') txs = block.get('tx')
height = block.get('height') height = block.get('height')
@@ -104,12 +111,13 @@ class Responder:
logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash'))) logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash')))
logging.info("[Responder] list of transactions: {}".format(txs)) logging.info("[Responder] list of transactions: {}".format(txs))
except JSONRPCException as e: else:
logging.error("[Responder] couldn't get block from bitcoind. Error code {}".format(e))
continue continue
completed_jobs = [] completed_jobs = []
jobs_to_rebroadcast = []
# ToDo: #9-add-data-persistence
# change prev_block_hash condition
if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0: if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0:
# Keep count of the confirmations each tx gets # Keep count of the confirmations each tx gets
for justice_txid, jobs in self.tx_job_map.items(): for justice_txid, jobs in self.tx_job_map.items():
@@ -121,15 +129,8 @@ class Responder:
uuid, justice_txid)) uuid, justice_txid))
elif self.jobs[uuid].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY: elif self.jobs[uuid].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY:
# If a transactions has missed too many confirmations for a while we'll try to rebroadcast # If a transactions has missed too many confirmations we add it to the rebroadcast list
# ToDO: #22-discuss-confirmations-before-retry jobs_to_rebroadcast.append(uuid)
# ToDo: #23-define-behaviour-approaching-end
self.add_response(uuid, self.jobs[uuid].dispute_txid, justice_txid,
self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end,
retry=True)
logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting"
.format(justice_txid, CONFIRMATIONS_BEFORE_RETRY))
else: else:
# Otherwise we increase the number of missed confirmations # Otherwise we increase the number of missed confirmations
@@ -140,7 +141,10 @@ class Responder:
# The end of the appointment has been reached # The end of the appointment has been reached
completed_jobs.append(uuid) completed_jobs.append(uuid)
self.remove_completed_jobs(completed_jobs, height) self.jobs, self.tx_job_map = Cleaner.delete_completed_jobs(self.jobs, self.tx_job_map, completed_jobs,
height)
self.rebroadcast(jobs_to_rebroadcast)
else: else:
logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}" logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}"
@@ -156,6 +160,17 @@ class Responder:
logging.info("[Responder] no more pending jobs, going back to sleep") logging.info("[Responder] no more pending jobs, going back to sleep")
def rebroadcast(self, jobs_to_rebroadcast):
# ToDO: #22-discuss-confirmations-before-retry
# ToDo: #23-define-behaviour-approaching-end
for uuid in jobs_to_rebroadcast:
self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid,
self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True)
logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting"
.format(self.jobs[uuid].justice_txid, CONFIRMATIONS_BEFORE_RETRY))
def handle_send_failures(self, e, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry): def handle_send_failures(self, e, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry):
# Since we're pushing a raw transaction to the network we can get two kind of rejections: # Since we're pushing a raw transaction to the network we can get two kind of rejections:
# RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected # RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected
@@ -194,24 +209,6 @@ class Responder:
# If something else happens (unlikely but possible) log it so we can treat it in future releases # If something else happens (unlikely but possible) log it so we can treat it in future releases
logging.error("[Responder] JSONRPCException. Error {}".format(e)) logging.error("[Responder] JSONRPCException. Error {}".format(e))
def remove_completed_jobs(self, completed_jobs, height):
for uuid in completed_jobs:
logging.info("[Responder] job completed (uuid = {}, justice_txid = {}). Appointment ended at "
"block {} after {} confirmations".format(uuid, self.jobs[uuid].justice_txid, height,
self.jobs[uuid].confirmations))
# ToDo: #9-add-data-persistency
justice_txid = self.jobs[uuid].justice_txid
self.jobs.pop(uuid)
if len(self.tx_job_map[justice_txid]) == 1:
self.tx_job_map.pop(justice_txid)
logging.info("[Responder] no more jobs for justice_txid {}".format(justice_txid))
else:
self.tx_job_map[justice_txid].remove(uuid)
def handle_reorgs(self): def handle_reorgs(self):
for uuid, job in self.jobs.items(): for uuid, job in self.jobs.items():
# First we check if the dispute transaction is still in the blockchain. If not, the justice can not be # First we check if the dispute transaction is still in the blockchain. If not, the justice can not be

View File

@@ -1,13 +1,17 @@
from binascii import hexlify, unhexlify from uuid import uuid4
from queue import Queue from queue import Queue
from threading import Thread from threading import Thread
from pisa import logging, bitcoin_cli
from pisa import logging
from pisa.responder import Responder from pisa.responder import Responder
from pisa.conf import MAX_APPOINTMENTS
from pisa.block_processor import BlockProcessor
from pisa.cleaner import Cleaner
from pisa.utils.zmq_subscriber import ZMQHandler from pisa.utils.zmq_subscriber import ZMQHandler
from pisa.utils.auth_proxy import JSONRPCException
from hashlib import sha256
from uuid import uuid4 # WIP: MOVED BLOCKCHAIN RELATED TASKS TO BLOCK PROCESSOR IN AN AIM TO MAKE THE CODE MORE MODULAR. THIS SHOULD HELP
from pisa.conf import MAX_APPOINTMENTS, EXPIRY_DELTA # WITH CODE REUSE WHEN MERGING THE DATA PERSISTENCE PART.
class Watcher: class Watcher:
@@ -72,29 +76,20 @@ class Watcher:
def do_watch(self): def do_watch(self):
while len(self.appointments) > 0: while len(self.appointments) > 0:
block_hash = self.block_queue.get() block_hash = self.block_queue.get()
logging.info("[Watcher] new block received {}".format(block_hash))
try: block = BlockProcessor.getblock(block_hash)
block = bitcoin_cli.getblock(block_hash)
if block is not None:
txids = block.get('tx') txids = block.get('tx')
logging.info("[Watcher] new block received {}".format(block_hash))
logging.info("[Watcher] list of transactions: {}".format(txids)) logging.info("[Watcher] list of transactions: {}".format(txids))
self.delete_expired_appointment(block) self.appointments, self.locator_uuid_map = Cleaner.delete_expired_appointment(
block, self.appointments, self.locator_uuid_map)
potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids} potential_matches = BlockProcessor.get_potential_matches(txids, self.locator_uuid_map)
matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments)
# Check is any of the tx_ids in the received block is an actual match
# Get the locators that are both in the map and in the potential locators dict.
intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys())
potential_matches = {locator: potential_locators[locator] for locator in intersection}
if len(potential_matches) > 0:
logging.info("[Watcher] list of potential matches: {}".format(potential_matches))
else:
logging.info("[Watcher] no potential matches found")
matches = self.check_potential_matches(potential_matches)
for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches: for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches:
logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})" logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})"
@@ -108,58 +103,15 @@ class Watcher:
# If there was only one appointment that matches the locator we can delete the whole list # If there was only one appointment that matches the locator we can delete the whole list
if len(self.locator_uuid_map[locator]) == 1: if len(self.locator_uuid_map[locator]) == 1:
# ToDo: #9-add-data-persistency # ToDo: #9-add-data-persistence
self.locator_uuid_map.pop(locator) self.locator_uuid_map.pop(locator)
else: else:
# Otherwise we just delete the appointment that matches locator:appointment_pos # Otherwise we just delete the appointment that matches locator:appointment_pos
# ToDo: #9-add-data-persistency # ToDo: #9-add-data-persistence
self.locator_uuid_map[locator].remove(uuid) self.locator_uuid_map[locator].remove(uuid)
except JSONRPCException as e:
logging.error("[Watcher] couldn't get block from bitcoind. Error code {}".format(e))
# Go back to sleep if there are no more appointments # Go back to sleep if there are no more appointments
self.asleep = True self.asleep = True
self.zmq_subscriber.terminate = True self.zmq_subscriber.terminate = True
logging.error("[Watcher] no more pending appointments, going back to sleep") logging.error("[Watcher] no more pending appointments, going back to sleep")
def delete_expired_appointment(self, block):
to_delete = [uuid for uuid, appointment in self.appointments.items() if block["height"] > appointment.end_time
+ EXPIRY_DELTA]
for uuid in to_delete:
# ToDo: #9-add-data-persistency
locator = self.appointments[uuid].locator
self.appointments.pop(uuid)
if len(self.locator_uuid_map[locator]) == 1:
self.locator_uuid_map.pop(locator)
else:
self.locator_uuid_map[locator].remove(uuid)
logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})".format(locator,
uuid))
def check_potential_matches(self, potential_matches):
matches = []
for locator, dispute_txid in potential_matches.items():
for uuid in self.locator_uuid_map[locator]:
try:
# ToDo: #20-test-tx-decrypting-edge-cases
justice_rawtx = self.appointments[uuid].encrypted_blob.decrypt(unhexlify(dispute_txid))
justice_rawtx = hexlify(justice_rawtx).decode()
justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid')
matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx))
logging.info("[Watcher] match found for locator {} (uuid: {}): {}".format(locator, uuid,
justice_txid))
except JSONRPCException as e:
# Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple
# for the POC
logging.error("[Watcher] can't build transaction from decoded data. Error code {}".format(e))
return matches