mirror of
https://github.com/aljazceru/python-teos.git
synced 2026-02-01 04:34:31 +01:00
Increases responder modularity
The responder had way too complex functions. Separate them into smaller / more specific ones to increse modularity and code reuse.
This commit is contained in:
@@ -3,27 +3,24 @@ from threading import Thread
|
||||
from hashlib import sha256
|
||||
from binascii import unhexlify
|
||||
|
||||
from pisa.rpc_errors import *
|
||||
from pisa.cleaner import Cleaner
|
||||
from pisa import logging, bitcoin_cli
|
||||
from pisa.carrier import Carrier
|
||||
from pisa import logging
|
||||
from pisa.tools import check_tx_in_chain
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.utils.zmq_subscriber import ZMQHandler
|
||||
from pisa.utils.auth_proxy import JSONRPCException
|
||||
|
||||
CONFIRMATIONS_BEFORE_RETRY = 6
|
||||
MIN_CONFIRMATIONS = 6
|
||||
|
||||
|
||||
class Job:
|
||||
def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0, retry_counter=0):
|
||||
def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry_counter=0):
|
||||
self.dispute_txid = dispute_txid
|
||||
self.justice_txid = justice_txid
|
||||
self.justice_rawtx = justice_rawtx
|
||||
self.appointment_end = appointment_end
|
||||
self.confirmations = confirmations
|
||||
|
||||
self.missed_confirmations = 0
|
||||
self.retry_counter = retry_counter
|
||||
|
||||
# FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info
|
||||
@@ -31,8 +28,7 @@ class Job:
|
||||
self.locator = sha256(unhexlify(dispute_txid)).hexdigest()
|
||||
|
||||
def to_json(self):
|
||||
job = {"locator": self.locator, "justice_rawtx": self.justice_rawtx, "confirmations": self.confirmations,
|
||||
"appointment_end": self.appointment_end}
|
||||
job = {"locator": self.locator, "justice_rawtx": self.justice_rawtx, "appointment_end": self.appointment_end}
|
||||
|
||||
return job
|
||||
|
||||
@@ -41,25 +37,28 @@ class Responder:
|
||||
def __init__(self):
|
||||
self.jobs = dict()
|
||||
self.tx_job_map = dict()
|
||||
self.unconfirmed_txs = []
|
||||
self.missed_confirmations = dict()
|
||||
self.block_queue = None
|
||||
self.asleep = True
|
||||
self.zmq_subscriber = None
|
||||
|
||||
def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False):
|
||||
if self.asleep:
|
||||
logging.info("[Responder] waking up!")
|
||||
|
||||
try:
|
||||
if self.asleep:
|
||||
logging.info("[Responder] waking up!")
|
||||
logging.info("[Responder] pushing transaction to the network (txid: {})".format(justice_txid))
|
||||
carrier = Carrier()
|
||||
receipt = carrier.send_transaction(justice_rawtx, justice_txid)
|
||||
|
||||
bitcoin_cli.sendrawtransaction(justice_rawtx)
|
||||
|
||||
# handle_responses can call add_response recursively if a broadcast transaction does not get confirmations
|
||||
if receipt.delivered:
|
||||
# do_watch can call add_response recursively if a broadcast transaction does not get confirmations
|
||||
# retry holds such information.
|
||||
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry)
|
||||
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry,
|
||||
confirmations=receipt.confirmations)
|
||||
|
||||
except JSONRPCException as e:
|
||||
self.handle_send_failures(e, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry)
|
||||
else:
|
||||
# TODO: Add the missing reasons (e.g. RPC_VERIFY_REJECTED)
|
||||
pass
|
||||
|
||||
def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0,
|
||||
retry=False):
|
||||
@@ -68,6 +67,7 @@ class Responder:
|
||||
if retry:
|
||||
self.jobs[uuid].retry_counter += 1
|
||||
self.jobs[uuid].missed_confirmations = 0
|
||||
|
||||
else:
|
||||
self.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations)
|
||||
|
||||
@@ -77,6 +77,9 @@ class Responder:
|
||||
else:
|
||||
self.tx_job_map[justice_txid] = [uuid]
|
||||
|
||||
if confirmations == 0:
|
||||
self.unconfirmed_txs.append(justice_txid)
|
||||
|
||||
logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'
|
||||
.format(dispute_txid, justice_txid, appointment_end))
|
||||
|
||||
@@ -84,7 +87,7 @@ class Responder:
|
||||
self.asleep = False
|
||||
self.block_queue = Queue()
|
||||
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue])
|
||||
responder = Thread(target=self.handle_responses)
|
||||
responder = Thread(target=self.do_watch)
|
||||
zmq_thread.start()
|
||||
responder.start()
|
||||
|
||||
@@ -92,7 +95,7 @@ class Responder:
|
||||
self.zmq_subscriber = ZMQHandler(parent='Responder')
|
||||
self.zmq_subscriber.handle(block_queue)
|
||||
|
||||
def handle_responses(self):
|
||||
def do_watch(self):
|
||||
# ToDo: #9-add-data-persistence
|
||||
# change prev_block_hash to the last known tip when bootstrapping
|
||||
prev_block_hash = 0
|
||||
@@ -100,10 +103,9 @@ class Responder:
|
||||
while len(self.jobs) > 0:
|
||||
# We get notified for every new received block
|
||||
block_hash = self.block_queue.get()
|
||||
block = BlockProcessor.getblock(block_hash)
|
||||
block = BlockProcessor.get_block(block_hash)
|
||||
|
||||
if block is not None:
|
||||
block = bitcoin_cli.getblock(block_hash)
|
||||
txs = block.get('tx')
|
||||
height = block.get('height')
|
||||
|
||||
@@ -111,48 +113,25 @@ class Responder:
|
||||
logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash')))
|
||||
logging.info("[Responder] list of transactions: {}".format(txs))
|
||||
|
||||
else:
|
||||
continue
|
||||
# ToDo: #9-add-data-persistence
|
||||
# change prev_block_hash condition
|
||||
if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0:
|
||||
self.unconfirmed_txs, self.missed_confirmations = BlockProcessor.check_confirmations(
|
||||
txs, self.unconfirmed_txs, self.tx_job_map, self.missed_confirmations)
|
||||
|
||||
completed_jobs = []
|
||||
jobs_to_rebroadcast = []
|
||||
# ToDo: #9-add-data-persistence
|
||||
# change prev_block_hash condition
|
||||
if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0:
|
||||
# Keep count of the confirmations each tx gets
|
||||
for justice_txid, jobs in self.tx_job_map.items():
|
||||
for uuid in jobs:
|
||||
if justice_txid in txs or self.jobs[uuid].confirmations > 0:
|
||||
self.jobs[uuid].confirmations += 1
|
||||
txs_to_rebroadcast = self.get_txs_to_rebroadcast(txs)
|
||||
self.jobs, self.tx_job_map = Cleaner.delete_completed_jobs(self.jobs, self.tx_job_map,
|
||||
self.get_completed_jobs(height), height)
|
||||
|
||||
logging.info("[Responder] new confirmation received for job = {}, txid = {}".format(
|
||||
uuid, justice_txid))
|
||||
self.rebroadcast(txs_to_rebroadcast)
|
||||
|
||||
elif self.jobs[uuid].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY:
|
||||
# If a transactions has missed too many confirmations we add it to the rebroadcast list
|
||||
jobs_to_rebroadcast.append(uuid)
|
||||
else:
|
||||
logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}"
|
||||
.format(prev_block_hash, block.get('previousblockhash')))
|
||||
|
||||
else:
|
||||
# Otherwise we increase the number of missed confirmations
|
||||
self.jobs[uuid].missed_confirmations += 1
|
||||
self.handle_reorgs()
|
||||
|
||||
if self.jobs[uuid].appointment_end <= height and self.jobs[uuid].confirmations >= \
|
||||
MIN_CONFIRMATIONS:
|
||||
# The end of the appointment has been reached
|
||||
completed_jobs.append(uuid)
|
||||
|
||||
self.jobs, self.tx_job_map = Cleaner.delete_completed_jobs(self.jobs, self.tx_job_map, completed_jobs,
|
||||
height)
|
||||
|
||||
self.rebroadcast(jobs_to_rebroadcast)
|
||||
|
||||
else:
|
||||
logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}"
|
||||
.format(prev_block_hash, block.get('previousblockhash')))
|
||||
|
||||
self.handle_reorgs()
|
||||
|
||||
prev_block_hash = block.get('hash')
|
||||
prev_block_hash = block.get('hash')
|
||||
|
||||
# Go back to sleep if there are no more jobs
|
||||
self.asleep = True
|
||||
@@ -160,55 +139,43 @@ class Responder:
|
||||
|
||||
logging.info("[Responder] no more pending jobs, going back to sleep")
|
||||
|
||||
def get_txs_to_rebroadcast(self, txs):
|
||||
txs_to_rebroadcast = []
|
||||
|
||||
for tx in txs:
|
||||
if self.missed_confirmations[tx] >= CONFIRMATIONS_BEFORE_RETRY:
|
||||
# If a transactions has missed too many confirmations we add it to the rebroadcast list
|
||||
txs_to_rebroadcast.append(tx)
|
||||
|
||||
return txs_to_rebroadcast
|
||||
|
||||
def get_completed_jobs(self, height):
|
||||
completed_jobs = []
|
||||
|
||||
for uuid, job in self.jobs:
|
||||
if job.appointment_end <= height:
|
||||
tx = Carrier.get_transaction(job.dispute_txid)
|
||||
|
||||
# FIXME: Should be improved with the librarian
|
||||
if tx is not None and tx.get('confirmations') > MIN_CONFIRMATIONS:
|
||||
# The end of the appointment has been reached
|
||||
completed_jobs.append(uuid)
|
||||
|
||||
return completed_jobs
|
||||
|
||||
def rebroadcast(self, jobs_to_rebroadcast):
|
||||
# ToDO: #22-discuss-confirmations-before-retry
|
||||
# ToDo: #23-define-behaviour-approaching-end
|
||||
|
||||
for uuid in jobs_to_rebroadcast:
|
||||
self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid,
|
||||
self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True)
|
||||
for tx in jobs_to_rebroadcast:
|
||||
for uuid in self.tx_job_map[tx]:
|
||||
self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid,
|
||||
self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True)
|
||||
|
||||
logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting"
|
||||
.format(self.jobs[uuid].justice_txid, CONFIRMATIONS_BEFORE_RETRY))
|
||||
|
||||
def handle_send_failures(self, e, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry):
|
||||
# Since we're pushing a raw transaction to the network we can get two kind of rejections:
|
||||
# RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected
|
||||
# due to network rules, whereas the later implies that the transaction is already in the blockchain.
|
||||
if e.error.get('code') == RPC_VERIFY_REJECTED:
|
||||
# DISCUSS: what to do in this case
|
||||
# DISCUSS: invalid transactions (properly formatted but invalid, like unsigned) fit here too.
|
||||
# DISCUSS: RPC_VERIFY_ERROR could also be a possible case.
|
||||
# DISCUSS: check errors -9 and -10
|
||||
pass
|
||||
|
||||
elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN:
|
||||
try:
|
||||
logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and start "
|
||||
"monitoring the transaction".format(justice_txid))
|
||||
|
||||
# If the transaction is already in the chain, we get the number of confirmations and watch the job
|
||||
# until the end of the appointment
|
||||
tx_info = bitcoin_cli.getrawtransaction(justice_txid, 1)
|
||||
confirmations = int(tx_info.get("confirmations"))
|
||||
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry,
|
||||
confirmations=confirmations)
|
||||
|
||||
except JSONRPCException as e:
|
||||
# While it's quite unlikely, the transaction that was already in the blockchain could have been
|
||||
# reorged while we were querying bitcoind to get the confirmation count. In such a case we just
|
||||
# restart the job
|
||||
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
|
||||
self.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry)
|
||||
|
||||
else:
|
||||
# If something else happens (unlikely but possible) log it so we can treat it in future releases
|
||||
logging.error("[Responder] JSONRPCException. Error {}".format(e))
|
||||
|
||||
else:
|
||||
# If something else happens (unlikely but possible) log it so we can treat it in future releases
|
||||
logging.error("[Responder] JSONRPCException. Error {}".format(e))
|
||||
logging.warning("[Responder] tx {} has missed {} confirmations. Rebroadcasting"
|
||||
.format(self.jobs[uuid].justice_txid, CONFIRMATIONS_BEFORE_RETRY))
|
||||
|
||||
# FIXME: Legacy code, must be checked and updated/fixed
|
||||
def handle_reorgs(self):
|
||||
for uuid, job in self.jobs.items():
|
||||
# First we check if the dispute transaction is still in the blockchain. If not, the justice can not be
|
||||
@@ -240,4 +207,3 @@ class Responder:
|
||||
# reorg manager
|
||||
logging.warning("[Responder] dispute and justice transaction missing. Calling the reorg manager")
|
||||
logging.error("[Responder] reorg manager not yet implemented")
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user