mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 22:24:23 +01:00
Adds Responder docstrings fixes get_txs_to_rebroadcast and renames some methods
get_txs_to_rebroadcast was beinf triggered based on received transactions indstead of stored txs. Fixing that. Some of the names in the Responder were poorly picked (not descriptibe enough). Tries to fix that. ``Job`` class has been renames to ``TransactionTracker``. ``add_response`` has been renamed to ``handle_breach`` and ``create_job`` to ``add_tracker``. All the variables that has `job` on it have already been updated.
This commit is contained in:
@@ -6,7 +6,7 @@ from pisa.logger import Logger
|
||||
from pisa.cleaner import Cleaner
|
||||
from pisa.carrier import Carrier
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.utils.zmq_subscriber import ZMQHandler
|
||||
from pisa.utils.zmq_subscriber import ZMQSubscriber
|
||||
|
||||
CONFIRMATIONS_BEFORE_RETRY = 6
|
||||
MIN_CONFIRMATIONS = 6
|
||||
@@ -14,7 +14,24 @@ MIN_CONFIRMATIONS = 6
|
||||
logger = Logger("Responder")
|
||||
|
||||
|
||||
class Job:
|
||||
class TransactionTracker:
|
||||
"""
|
||||
A ``TransactionTracker`` is used to monitor a ``penalty_tx``. Once the dispute is seen by the
|
||||
:mod:`Watcher <pisa.watcher>` the penalty transaction is decrypted and the relevant appointment data is passed
|
||||
along to the ``Responder``.
|
||||
|
||||
Once the ``Responder`` has succeeded on broadcasting the penalty transaction it will create a ``TransactionTracker``
|
||||
and monitor the blockchain until the end of the appointment.
|
||||
|
||||
Args:
|
||||
locator (str): A 16-byte hex-encoded value used by the tower to detect channel breaches. It serves as a trigger
|
||||
for the tower to decrypt and broadcast the penalty transaction.
|
||||
dispute_txid (str): the id of the transaction that created the channel breach and triggered the penalty.
|
||||
penalty_txid (str): the id of the transaction that was encrypted under ``dispute_txid``.
|
||||
penalty_rawtx (str): the raw transaction that was broadcast as a consequence of the channel breach.
|
||||
appointment_end (int): the block at which the tower will stop monitoring the blockchain for this appointment.
|
||||
"""
|
||||
|
||||
def __init__(self, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end):
|
||||
self.locator = locator
|
||||
self.dispute_txid = dispute_txid
|
||||
@@ -23,23 +40,47 @@ class Job:
|
||||
self.appointment_end = appointment_end
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, job_data):
|
||||
locator = job_data.get("locator")
|
||||
dispute_txid = job_data.get("dispute_txid")
|
||||
penalty_txid = job_data.get("penalty_txid")
|
||||
penalty_rawtx = job_data.get("penalty_rawtx")
|
||||
appointment_end = job_data.get("appointment_end")
|
||||
def from_dict(cls, tx_tracker_data):
|
||||
"""
|
||||
Constructs a ``TransactionTracker`` instance from a dictionary. Requires that all the fields are populated
|
||||
(``not None``).
|
||||
|
||||
Useful to load data from the database.
|
||||
|
||||
Args:
|
||||
tx_tracker_data (dict): a dictionary with an entry per each field required to create the
|
||||
``TransactionTracker``.
|
||||
|
||||
Returns:
|
||||
``TransactionTracker``: A ``TransactionTracker`` instantiated with the provided data.
|
||||
|
||||
Raises:
|
||||
ValueError: if any of the required fields is missing.
|
||||
"""
|
||||
|
||||
locator = tx_tracker_data.get("locator")
|
||||
dispute_txid = tx_tracker_data.get("dispute_txid")
|
||||
penalty_txid = tx_tracker_data.get("penalty_txid")
|
||||
penalty_rawtx = tx_tracker_data.get("penalty_rawtx")
|
||||
appointment_end = tx_tracker_data.get("appointment_end")
|
||||
|
||||
if any(v is None for v in [locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end]):
|
||||
raise ValueError("Wrong job data, some fields are missing")
|
||||
raise ValueError("Wrong transaction tracker data, some fields are missing")
|
||||
|
||||
else:
|
||||
job = cls(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end)
|
||||
tx_tracker = cls(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end)
|
||||
|
||||
return job
|
||||
return tx_tracker
|
||||
|
||||
def to_dict(self):
|
||||
job = {
|
||||
"""
|
||||
Exports a ``TransactionTracker`` as a dictionary.
|
||||
|
||||
Returns:
|
||||
``dict``: A dictionary containing the ``TransactionTracker`` data.
|
||||
"""
|
||||
|
||||
tx_tracker = {
|
||||
"locator": self.locator,
|
||||
"dispute_txid": self.dispute_txid,
|
||||
"penalty_txid": self.penalty_txid,
|
||||
@@ -47,16 +88,50 @@ class Job:
|
||||
"appointment_end": self.appointment_end,
|
||||
}
|
||||
|
||||
return job
|
||||
return tx_tracker
|
||||
|
||||
def to_json(self):
|
||||
"""
|
||||
Exports a ``TransactionTracker`` as a json-encoded dictionary.
|
||||
|
||||
Returns:
|
||||
``str``: A json-encoded dictionary containing the ``TransactionTracker`` data.
|
||||
"""
|
||||
|
||||
return json.dumps(self.to_dict())
|
||||
|
||||
|
||||
class Responder:
|
||||
"""
|
||||
The ``Responder`` is the class in charge of ensuring that channel breaches are dealt with. It does so handling the
|
||||
decrypted ``penalty_txs`` handed by the :mod:`Watcher <pisa.watcher>` and ensuring the they make it to the
|
||||
blockchain.
|
||||
|
||||
The ``Responder`` can be in two states:
|
||||
- Asleep (``self.asleep = True)`` when there are no trackers to take care of (``self.trackers`` is empty).
|
||||
- Awake (``self.asleep = False)`` when there are trackers to take care of (actively monitoring the blockchain).
|
||||
|
||||
Args:
|
||||
db_manager (DBManager): a :mod:`DBManager <pisa.db_manager>` instance to interact with the database.
|
||||
|
||||
Attributes:
|
||||
trackers (``dict``): A dictionary containing all the ``TransactionTrackers`` handled by the ``Responder``.
|
||||
Each entry is identified by a ``uuid``.
|
||||
tx_tracker_map (``dict``): A ``penalty_txid:uuid`` map used to allow the ``Responder`` to deal with several
|
||||
trackers triggered by the same ``penalty_txid``.
|
||||
unconfirmed_txs (``list``): A list that keeps track of all unconfirmed ``penalty_txs``.
|
||||
missed_confirmations (``dict``): a dictionary that keeps count of how many confirmations each ``penalty_tx`` has
|
||||
missed. Used to trigger rebroadcast if needed.
|
||||
asleep (``bool``): A flag that signals whether the ``Responder`` is asleep or awake.
|
||||
block_queue (``Queue``): A queue used by the ``Responder`` to receive block hashes from ``bitcoind``. It is
|
||||
populated by the ``ZMQSubscriber``.
|
||||
db_manager (``DBManager``): A :mod:`DBManager <pisa.db_manager>` instance to interact with the database.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, db_manager):
|
||||
self.jobs = dict()
|
||||
self.tx_job_map = dict()
|
||||
self.trackers = dict()
|
||||
self.tx_tracker_map = dict()
|
||||
self.unconfirmed_txs = []
|
||||
self.missed_confirmations = dict()
|
||||
self.asleep = True
|
||||
@@ -66,6 +141,25 @@ class Responder:
|
||||
|
||||
@staticmethod
|
||||
def on_sync(block_hash):
|
||||
"""
|
||||
Whether the ``Responder`` is on sync with ``bitcoind`` or not. Used when recovering from a crash.
|
||||
|
||||
The Watchtower can be instantiated with fresh or with backed up data. In the later, some triggers may have been
|
||||
missed. In order to go back on sync both the ``Watcher`` and the ``Responder`` need to perform the state
|
||||
transitions until they catch up.
|
||||
|
||||
If a transaction is broadcast by the ``Responder`` and it is rejected (due to a double-spending for example)
|
||||
and the ``Responder`` is off-sync then the ``TransactionTracker`` is abandoned.
|
||||
|
||||
This method helps making that decision.
|
||||
|
||||
Args:
|
||||
block_hash (str): the block hash passed to the ``Responder`` in the ``handle_breach`` request.
|
||||
|
||||
Returns:
|
||||
``bool``: Whether or not the ``Responder`` and ``bitcoind`` are on sync.
|
||||
"""
|
||||
|
||||
block_processor = BlockProcessor()
|
||||
distance_from_tip = block_processor.get_distance_to_tip(block_hash)
|
||||
|
||||
@@ -77,49 +171,83 @@ class Responder:
|
||||
|
||||
return synchronized
|
||||
|
||||
def add_response(
|
||||
self, uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, block_hash, retry=False
|
||||
):
|
||||
def handle_breach(self, uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, block_hash):
|
||||
"""
|
||||
Requests the ``Responder`` to handle a channel breach. This is the entry point of the ``Responder``.
|
||||
|
||||
Args:
|
||||
uuid (str): a unique identifier for the appointment.
|
||||
locator (str): the appointment locator provided by the user (16-byte hex-encoded).
|
||||
dispute_txid (str): the id of the transaction that created the channel breach.
|
||||
penalty_txid (str): the id of the decrypted transaction included in the appointment.
|
||||
penalty_rawtx (str): the raw transaction to be broadcast in response of the breach.
|
||||
appointment_end (int): the block height at which the ``Responder`` will stop monitoring for this penalty
|
||||
transaction.
|
||||
block_hash (str): the block hash at which the breach was seen (used to see if we are on sync).
|
||||
|
||||
Returns:
|
||||
``Receipt``: A :mod:`Receipt <pisa.carrier.Receipt>` indicating whether or not the ``penalty_tx`` made it
|
||||
into the blockchain.
|
||||
"""
|
||||
|
||||
if self.asleep:
|
||||
logger.info("Waking up")
|
||||
|
||||
carrier = Carrier()
|
||||
receipt = carrier.send_transaction(penalty_rawtx, penalty_txid)
|
||||
|
||||
# do_watch can call add_response recursively if a broadcast transaction does not get confirmations
|
||||
# retry holds that information. If retry is true the job already exists
|
||||
if receipt.delivered:
|
||||
if not retry:
|
||||
self.create_job(
|
||||
uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, receipt.confirmations
|
||||
)
|
||||
self.add_tracker(
|
||||
uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, receipt.confirmations
|
||||
)
|
||||
|
||||
else:
|
||||
# TODO: Add the missing reasons (e.g. RPC_VERIFY_REJECTED)
|
||||
# TODO: Use self.on_sync(block_hash) to check whether or not we failed because we are out of sync
|
||||
logger.warning("Job failed.", uuid=uuid, on_sync=self.on_sync(block_hash))
|
||||
logger.warning(
|
||||
"Tracker cannot be created.", reason=receipt.reason, uuid=uuid, on_sync=self.on_sync(block_hash)
|
||||
)
|
||||
pass
|
||||
|
||||
return receipt
|
||||
|
||||
def create_job(self, uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations=0):
|
||||
job = Job(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end)
|
||||
self.jobs[uuid] = job
|
||||
def add_tracker(self, uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations=0):
|
||||
"""
|
||||
Creates a ``TransactionTracker`` after successfully broadcasting a ``penalty_tx``.
|
||||
|
||||
if penalty_txid in self.tx_job_map:
|
||||
self.tx_job_map[penalty_txid].append(uuid)
|
||||
The ``TransactionTracker`` is stored in ``trackers`` and ``tx_tracker_map`` and the ``penalty_txid`` added to
|
||||
``unconfirmed_txs`` if ``confirmations=0``. Finally, the data is also stored in the database.
|
||||
|
||||
``add_tracker`` awakes the ``Responder`` and creates a connection with the ``ZMQSubscriber`` if he is asleep.
|
||||
|
||||
Args:
|
||||
uuid (str): a unique identifier for the appointment.
|
||||
locator (str): the appointment locator provided by the user (16-byte hex-encoded).
|
||||
dispute_txid (str): the id of the transaction that created the channel breach.
|
||||
penalty_txid (str): the id of the decrypted transaction included in the appointment.
|
||||
penalty_rawtx (str): the raw transaction to be broadcast.
|
||||
appointment_end (int): the block height at which the ``Responder`` will stop monitoring for the tracker.
|
||||
confirmations (int): the confirmation count of the ``penalty_tx``. In normal conditions it will be zero, but
|
||||
if the transaction is already on the blockchain this won't be the case.
|
||||
"""
|
||||
|
||||
tracker = TransactionTracker(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end)
|
||||
self.trackers[uuid] = tracker
|
||||
|
||||
if penalty_txid in self.tx_tracker_map:
|
||||
self.tx_tracker_map[penalty_txid].append(uuid)
|
||||
|
||||
else:
|
||||
self.tx_job_map[penalty_txid] = [uuid]
|
||||
self.tx_tracker_map[penalty_txid] = [uuid]
|
||||
|
||||
# In the case we receive two jobs with the same penalty txid we only add it to the unconfirmed txs list once
|
||||
# In the case we receive two trackers with the same penalty txid we only add it to the unconfirmed txs list once
|
||||
if penalty_txid not in self.unconfirmed_txs and confirmations == 0:
|
||||
self.unconfirmed_txs.append(penalty_txid)
|
||||
|
||||
self.db_manager.store_responder_job(uuid, job.to_json())
|
||||
self.db_manager.store_responder_tracker(uuid, tracker.to_json())
|
||||
|
||||
logger.info(
|
||||
"New job added.", dispute_txid=dispute_txid, penalty_txid=penalty_txid, appointment_end=appointment_end
|
||||
"New tracker added.", dispute_txid=dispute_txid, penalty_txid=penalty_txid, appointment_end=appointment_end
|
||||
)
|
||||
|
||||
if self.asleep:
|
||||
@@ -130,14 +258,25 @@ class Responder:
|
||||
responder.start()
|
||||
|
||||
def do_subscribe(self):
|
||||
self.zmq_subscriber = ZMQHandler(parent="Responder")
|
||||
"""
|
||||
Initializes a ``ZMQSubscriber`` instance to listen to new blocks from ``bitcoind``. Block ids are received
|
||||
trough the ``block_queue``.
|
||||
"""
|
||||
|
||||
self.zmq_subscriber = ZMQSubscriber(parent="Responder")
|
||||
self.zmq_subscriber.handle(self.block_queue)
|
||||
|
||||
def do_watch(self):
|
||||
"""
|
||||
Monitors the blockchain whilst there are pending trackers.
|
||||
|
||||
This is the main method of the ``Responder`` and triggers tracker cleaning, rebroadcasting, reorg managing, etc.
|
||||
"""
|
||||
|
||||
# ToDo: change prev_block_hash to the last known tip when bootstrapping
|
||||
prev_block_hash = BlockProcessor.get_best_block_hash()
|
||||
|
||||
while len(self.jobs) > 0:
|
||||
while len(self.trackers) > 0:
|
||||
# We get notified for every new received block
|
||||
block_hash = self.block_queue.get()
|
||||
block = BlockProcessor.get_block(block_hash)
|
||||
@@ -153,11 +292,13 @@ class Responder:
|
||||
self.check_confirmations(txs)
|
||||
|
||||
height = block.get("height")
|
||||
txs_to_rebroadcast = self.get_txs_to_rebroadcast(txs)
|
||||
completed_jobs = self.get_completed_jobs(height)
|
||||
completed_trackers = self.get_completed_trackers(height)
|
||||
Cleaner.delete_completed_trackers(
|
||||
completed_trackers, height, self.trackers, self.tx_tracker_map, self.db_manager
|
||||
)
|
||||
|
||||
Cleaner.delete_completed_jobs(completed_jobs, height, self.jobs, self.tx_job_map, self.db_manager)
|
||||
self.rebroadcast(txs_to_rebroadcast, block_hash)
|
||||
txs_to_rebroadcast = self.get_txs_to_rebroadcast()
|
||||
self.rebroadcast(txs_to_rebroadcast)
|
||||
|
||||
# NOTCOVERED
|
||||
else:
|
||||
@@ -175,17 +316,26 @@ class Responder:
|
||||
|
||||
prev_block_hash = block.get("hash")
|
||||
|
||||
# Go back to sleep if there are no more jobs
|
||||
# Go back to sleep if there are no more pendong trackers
|
||||
self.asleep = True
|
||||
self.zmq_subscriber.terminate = True
|
||||
self.block_queue = Queue()
|
||||
|
||||
logger.info("No more pending jobs, going back to sleep")
|
||||
logger.info("No more pending trackers, going back to sleep")
|
||||
|
||||
def check_confirmations(self, txs):
|
||||
"""
|
||||
Checks if any of the monitored ``penalty_txs`` has received it's first confirmation or keeps missing them.
|
||||
|
||||
This method manages ``unconfirmed_txs`` and ``missed_confirmations``.
|
||||
|
||||
Args:
|
||||
txs (list): A list of confirmed tx ids (the list of transactions included in the last received block).
|
||||
"""
|
||||
|
||||
# If a new confirmed tx matches a tx we are watching, then we remove it from the unconfirmed txs map
|
||||
for tx in txs:
|
||||
if tx in self.tx_job_map and tx in self.unconfirmed_txs:
|
||||
if tx in self.tx_tracker_map and tx in self.unconfirmed_txs:
|
||||
self.unconfirmed_txs.remove(tx)
|
||||
|
||||
logger.info("Confirmation received for transaction", tx=tx)
|
||||
@@ -201,22 +351,40 @@ class Responder:
|
||||
|
||||
logger.info("Transaction missed a confirmation", tx=tx, missed_confirmations=self.missed_confirmations[tx])
|
||||
|
||||
def get_txs_to_rebroadcast(self, txs):
|
||||
def get_txs_to_rebroadcast(self):
|
||||
"""
|
||||
Gets the transactions to be rebroadcast based on their ``missed_confirmation`` count.
|
||||
|
||||
Returns:
|
||||
``list``: A list with all the ids of the transaction that have to be rebroadcast.
|
||||
"""
|
||||
|
||||
txs_to_rebroadcast = []
|
||||
|
||||
for tx in txs:
|
||||
if tx in self.missed_confirmations and self.missed_confirmations[tx] >= CONFIRMATIONS_BEFORE_RETRY:
|
||||
for tx, missed_conf in self.missed_confirmations.items():
|
||||
if missed_conf >= CONFIRMATIONS_BEFORE_RETRY:
|
||||
# If a transactions has missed too many confirmations we add it to the rebroadcast list
|
||||
txs_to_rebroadcast.append(tx)
|
||||
|
||||
return txs_to_rebroadcast
|
||||
|
||||
def get_completed_jobs(self, height):
|
||||
completed_jobs = []
|
||||
def get_completed_trackers(self, height):
|
||||
"""
|
||||
Gets the trackers that has already been fulfilled based on a given height (``end_time`` was reached with a
|
||||
minimum confirmation count).
|
||||
|
||||
for uuid, job in self.jobs.items():
|
||||
if job.appointment_end <= height and job.penalty_txid not in self.unconfirmed_txs:
|
||||
tx = Carrier.get_transaction(job.penalty_txid)
|
||||
Args:
|
||||
height (int): the height of the last received block.
|
||||
|
||||
Returns:
|
||||
``list``: a list of tuples ``uuid:confirmations`` for the completed trackers.
|
||||
"""
|
||||
|
||||
completed_trackers = []
|
||||
|
||||
for uuid, tracker in self.trackers.items():
|
||||
if tracker.appointment_end <= height and tracker.penalty_txid not in self.unconfirmed_txs:
|
||||
tx = Carrier.get_transaction(tracker.penalty_txid)
|
||||
|
||||
# FIXME: Should be improved with the librarian
|
||||
if tx is not None:
|
||||
@@ -224,81 +392,101 @@ class Responder:
|
||||
|
||||
if confirmations >= MIN_CONFIRMATIONS:
|
||||
# The end of the appointment has been reached
|
||||
completed_jobs.append((uuid, confirmations))
|
||||
completed_trackers.append((uuid, confirmations))
|
||||
|
||||
return completed_jobs
|
||||
return completed_trackers
|
||||
|
||||
def rebroadcast(self, txs_to_rebroadcast):
|
||||
"""
|
||||
Rebroadcasts a ``penalty_tx`` that has missed too many confirmations. In the current approach this would loop
|
||||
forever si the transaction keeps not getting it.
|
||||
|
||||
Potentially the fees could be bumped here if the transaction has some tower dedicated outputs (or allows it
|
||||
trough ``ANYONECANSPEND`` or something similar).
|
||||
|
||||
Args:
|
||||
txs_to_rebroadcast (list): a list of transactions to be rebroadcast.
|
||||
|
||||
Returns:
|
||||
``list``: A list of ``Receipts`` with information about whether or not every transaction made it trough the
|
||||
network.
|
||||
"""
|
||||
|
||||
def rebroadcast(self, txs_to_rebroadcast, block_hash):
|
||||
# DISCUSS: #22-discuss-confirmations-before-retry
|
||||
# ToDo: #23-define-behaviour-approaching-end
|
||||
|
||||
receipts = []
|
||||
carrier = Carrier()
|
||||
|
||||
for txid in txs_to_rebroadcast:
|
||||
self.missed_confirmations[txid] = 0
|
||||
|
||||
for uuid in self.tx_job_map[txid]:
|
||||
job = self.jobs[uuid]
|
||||
receipt = self.add_response(
|
||||
job.locator,
|
||||
uuid,
|
||||
job.dispute_txid,
|
||||
job.penalty_txid,
|
||||
job.penalty_rawtx,
|
||||
job.appointment_end,
|
||||
block_hash,
|
||||
retry=True,
|
||||
)
|
||||
|
||||
# FIXME: This would potentially grab multiple instances of the same transaction and try to send them.
|
||||
# should we do it only once?
|
||||
for uuid in self.tx_tracker_map[txid]:
|
||||
tracker = self.trackers[uuid]
|
||||
logger.warning(
|
||||
"Transaction has missed many confirmations. Rebroadcasting.",
|
||||
penalty_txid=job.penalty_txid,
|
||||
confirmations_missed=CONFIRMATIONS_BEFORE_RETRY,
|
||||
"Transaction has missed many confirmations. Rebroadcasting.", penalty_txid=tracker.penalty_txid
|
||||
)
|
||||
|
||||
receipt = carrier.send_transaction(tracker.penalty_rawtx, tracker.penalty_txid)
|
||||
receipts.append((txid, receipt))
|
||||
|
||||
if not receipt.delivered:
|
||||
# FIXME: Can this actually happen?
|
||||
logger.warning("Transaction failed.", penalty_txid=tracker.penalty_txid)
|
||||
|
||||
return receipts
|
||||
|
||||
# NOTCOVERED
|
||||
def handle_reorgs(self, block_hash):
|
||||
"""
|
||||
Basic reorg handle. It deals with situations where a reorg has been found but the ``dispute_tx`` is still
|
||||
on the chain. If the ``dispute_tx`` is reverted, it need to call the ``ReorgManager`` (not implemented yet).
|
||||
|
||||
Args:
|
||||
block_hash (str): the hash of the last block received (which triggered the reorg).
|
||||
|
||||
"""
|
||||
carrier = Carrier()
|
||||
|
||||
for uuid, job in self.jobs.items():
|
||||
for uuid, tracker in self.trackers.items():
|
||||
# First we check if the dispute transaction is known (exists either in mempool or blockchain)
|
||||
dispute_tx = carrier.get_transaction(job.dispute_txid)
|
||||
dispute_tx = carrier.get_transaction(tracker.dispute_txid)
|
||||
|
||||
if dispute_tx is not None:
|
||||
# If the dispute is there, we check the penalty
|
||||
penalty_tx = carrier.get_transaction(job.penalty_txid)
|
||||
penalty_tx = carrier.get_transaction(tracker.penalty_txid)
|
||||
|
||||
if penalty_tx is not None:
|
||||
# If the penalty exists we need to check is it's on the blockchain or not so we can update the
|
||||
# unconfirmed transactions list accordingly.
|
||||
if penalty_tx.get("confirmations") is None:
|
||||
self.unconfirmed_txs.append(job.penalty_txid)
|
||||
self.unconfirmed_txs.append(tracker.penalty_txid)
|
||||
|
||||
logger.info(
|
||||
"Penalty transaction back in mempool. Updating unconfirmed transactions.",
|
||||
penalty_txid=job.penalty_txid,
|
||||
penalty_txid=tracker.penalty_txid,
|
||||
)
|
||||
|
||||
else:
|
||||
# If the penalty transaction is missing, we need to reset the job.
|
||||
# DISCUSS: Adding job back, should we flag it as retried?
|
||||
# If the penalty transaction is missing, we need to reset the tracker.
|
||||
# DISCUSS: Adding tracker back, should we flag it as retried?
|
||||
# FIXME: Whether we decide to increase the retried counter or not, the current counter should be
|
||||
# maintained. There is no way of doing so with the current approach. Update if required
|
||||
self.add_response(
|
||||
job.locator,
|
||||
self.handle_breach(
|
||||
tracker.locator,
|
||||
uuid,
|
||||
job.dispute_txid,
|
||||
job.penalty_txid,
|
||||
job.penalty_rawtx,
|
||||
job.appointment_end,
|
||||
tracker.dispute_txid,
|
||||
tracker.penalty_txid,
|
||||
tracker.penalty_rawtx,
|
||||
tracker.appointment_end,
|
||||
block_hash,
|
||||
)
|
||||
|
||||
logger.warning("Penalty transaction banished. Resetting the job", penalty_tx=job.penalty_txid)
|
||||
logger.warning(
|
||||
"Penalty transaction banished. Resetting the tracker", penalty_tx=tracker.penalty_txid
|
||||
)
|
||||
|
||||
else:
|
||||
# ToDo: #24-properly-handle-reorgs
|
||||
|
||||
@@ -9,7 +9,7 @@ from queue import Queue, Empty
|
||||
|
||||
from pisa import c_logger
|
||||
from pisa.db_manager import DBManager
|
||||
from pisa.responder import Responder, Job
|
||||
from pisa.responder import Responder, TransactionTracker
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.tools import bitcoin_cli
|
||||
|
||||
@@ -38,7 +38,7 @@ def temp_db_manager():
|
||||
rmtree(db_name)
|
||||
|
||||
|
||||
def create_dummy_job_data(random_txid=False, penalty_rawtx=None):
|
||||
def create_dummy_tracker_data(random_txid=False, penalty_rawtx=None):
|
||||
# The following transaction data corresponds to a valid transaction. For some test it may be interesting to have
|
||||
# some valid data, but for others we may need multiple different penalty_txids.
|
||||
|
||||
@@ -67,22 +67,22 @@ def create_dummy_job_data(random_txid=False, penalty_rawtx=None):
|
||||
return locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end
|
||||
|
||||
|
||||
def create_dummy_job(random_txid=False, penalty_rawtx=None):
|
||||
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_job_data(
|
||||
def create_dummy_tracker(random_txid=False, penalty_rawtx=None):
|
||||
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data(
|
||||
random_txid, penalty_rawtx
|
||||
)
|
||||
return Job(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end)
|
||||
return TransactionTracker(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end)
|
||||
|
||||
|
||||
def test_job_init(run_bitcoind):
|
||||
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_job_data()
|
||||
job = Job(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end)
|
||||
def test_tracker_init(run_bitcoind):
|
||||
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data()
|
||||
tracker = TransactionTracker(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end)
|
||||
|
||||
assert (
|
||||
job.dispute_txid == dispute_txid
|
||||
and job.penalty_txid == penalty_txid
|
||||
and job.penalty_rawtx == penalty_rawtx
|
||||
and job.appointment_end == appointment_end
|
||||
tracker.dispute_txid == dispute_txid
|
||||
and tracker.penalty_txid == penalty_txid
|
||||
and tracker.penalty_rawtx == penalty_rawtx
|
||||
and tracker.appointment_end == appointment_end
|
||||
)
|
||||
|
||||
|
||||
@@ -103,44 +103,44 @@ def test_on_sync_fail(responder):
|
||||
assert Responder.on_sync(chain_tip) is False
|
||||
|
||||
|
||||
def test_job_to_dict():
|
||||
job = create_dummy_job()
|
||||
job_dict = job.to_dict()
|
||||
def test_tracker_to_dict():
|
||||
tracker = create_dummy_tracker()
|
||||
tracker_dict = tracker.to_dict()
|
||||
|
||||
assert (
|
||||
job.locator == job_dict["locator"]
|
||||
and job.penalty_rawtx == job_dict["penalty_rawtx"]
|
||||
and job.appointment_end == job_dict["appointment_end"]
|
||||
tracker.locator == tracker_dict["locator"]
|
||||
and tracker.penalty_rawtx == tracker_dict["penalty_rawtx"]
|
||||
and tracker.appointment_end == tracker_dict["appointment_end"]
|
||||
)
|
||||
|
||||
|
||||
def test_job_to_json():
|
||||
job = create_dummy_job()
|
||||
job_dict = json.loads(job.to_json())
|
||||
def test_tracker_to_json():
|
||||
tracker = create_dummy_tracker()
|
||||
tracker_dict = json.loads(tracker.to_json())
|
||||
|
||||
assert (
|
||||
job.locator == job_dict["locator"]
|
||||
and job.penalty_rawtx == job_dict["penalty_rawtx"]
|
||||
and job.appointment_end == job_dict["appointment_end"]
|
||||
tracker.locator == tracker_dict["locator"]
|
||||
and tracker.penalty_rawtx == tracker_dict["penalty_rawtx"]
|
||||
and tracker.appointment_end == tracker_dict["appointment_end"]
|
||||
)
|
||||
|
||||
|
||||
def test_job_from_dict():
|
||||
job_dict = create_dummy_job().to_dict()
|
||||
new_job = Job.from_dict(job_dict)
|
||||
def test_tracker_from_dict():
|
||||
tracker_dict = create_dummy_tracker().to_dict()
|
||||
new_tracker = TransactionTracker.from_dict(tracker_dict)
|
||||
|
||||
assert job_dict == new_job.to_dict()
|
||||
assert tracker_dict == new_tracker.to_dict()
|
||||
|
||||
|
||||
def test_job_from_dict_invalid_data():
|
||||
job_dict = create_dummy_job().to_dict()
|
||||
def test_tracker_from_dict_invalid_data():
|
||||
tracker_dict = create_dummy_tracker().to_dict()
|
||||
|
||||
for value in ["dispute_txid", "penalty_txid", "penalty_rawtx", "appointment_end"]:
|
||||
job_dict_copy = deepcopy(job_dict)
|
||||
job_dict_copy[value] = None
|
||||
tracker_dict_copy = deepcopy(tracker_dict)
|
||||
tracker_dict_copy[value] = None
|
||||
|
||||
try:
|
||||
Job.from_dict(job_dict_copy)
|
||||
TransactionTracker.from_dict(tracker_dict_copy)
|
||||
assert False
|
||||
|
||||
except ValueError:
|
||||
@@ -148,8 +148,8 @@ def test_job_from_dict_invalid_data():
|
||||
|
||||
|
||||
def test_init_responder(responder):
|
||||
assert type(responder.jobs) is dict and len(responder.jobs) == 0
|
||||
assert type(responder.tx_job_map) is dict and len(responder.tx_job_map) == 0
|
||||
assert type(responder.trackers) is dict and len(responder.trackers) == 0
|
||||
assert type(responder.tx_tracker_map) is dict and len(responder.tx_tracker_map) == 0
|
||||
assert type(responder.unconfirmed_txs) is list and len(responder.unconfirmed_txs) == 0
|
||||
assert type(responder.missed_confirmations) is dict and len(responder.missed_confirmations) == 0
|
||||
assert responder.block_queue.empty()
|
||||
@@ -157,124 +157,126 @@ def test_init_responder(responder):
|
||||
assert responder.zmq_subscriber is None
|
||||
|
||||
|
||||
def test_add_response(db_manager):
|
||||
def test_handle_breach(db_manager):
|
||||
responder = Responder(db_manager)
|
||||
uuid = uuid4().hex
|
||||
job = create_dummy_job()
|
||||
tracker = create_dummy_tracker()
|
||||
|
||||
# The block_hash passed to add_response does not matter much now. It will in the future to deal with errors
|
||||
receipt = responder.add_response(
|
||||
job.locator,
|
||||
receipt = responder.handle_breach(
|
||||
tracker.locator,
|
||||
uuid,
|
||||
job.dispute_txid,
|
||||
job.penalty_txid,
|
||||
job.penalty_rawtx,
|
||||
job.appointment_end,
|
||||
tracker.dispute_txid,
|
||||
tracker.penalty_txid,
|
||||
tracker.penalty_rawtx,
|
||||
tracker.appointment_end,
|
||||
block_hash=get_random_value_hex(32),
|
||||
)
|
||||
|
||||
assert receipt.delivered is True
|
||||
|
||||
# The responder automatically fires create_job on adding a job if it is asleep. We need to stop the processes now.
|
||||
# To do so we delete all the jobs, stop the zmq and create a new fake block to unblock the queue.get method
|
||||
responder.jobs = dict()
|
||||
# The responder automatically fires add_tracker on adding a tracker if it is asleep. We need to stop the processes now.
|
||||
# To do so we delete all the trackers, stop the zmq and create a new fake block to unblock the queue.get method
|
||||
responder.trackers = dict()
|
||||
responder.zmq_subscriber.terminate = True
|
||||
responder.block_queue.put(get_random_value_hex(32))
|
||||
|
||||
|
||||
def test_add_bad_response(responder):
|
||||
uuid = uuid4().hex
|
||||
job = create_dummy_job()
|
||||
tracker = create_dummy_tracker()
|
||||
|
||||
# Now that the asleep / awake functionality has been tested we can avoid manually killing the responder by setting
|
||||
# to awake. That will prevent the zmq thread to be launched again.
|
||||
responder.asleep = False
|
||||
|
||||
# A txid instead of a rawtx should be enough for unit tests using the bitcoind mock, better tests are needed though.
|
||||
job.penalty_rawtx = job.penalty_txid
|
||||
tracker.penalty_rawtx = tracker.penalty_txid
|
||||
|
||||
# The block_hash passed to add_response does not matter much now. It will in the future to deal with errors
|
||||
receipt = responder.add_response(
|
||||
job.locator,
|
||||
receipt = responder.handle_breach(
|
||||
tracker.locator,
|
||||
uuid,
|
||||
job.dispute_txid,
|
||||
job.penalty_txid,
|
||||
job.penalty_rawtx,
|
||||
job.appointment_end,
|
||||
tracker.dispute_txid,
|
||||
tracker.penalty_txid,
|
||||
tracker.penalty_rawtx,
|
||||
tracker.appointment_end,
|
||||
block_hash=get_random_value_hex(32),
|
||||
)
|
||||
|
||||
assert receipt.delivered is False
|
||||
|
||||
|
||||
def test_create_job(responder):
|
||||
def test_add_tracker(responder):
|
||||
responder.asleep = False
|
||||
|
||||
for _ in range(20):
|
||||
uuid = uuid4().hex
|
||||
confirmations = 0
|
||||
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_job_data(random_txid=True)
|
||||
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data(
|
||||
random_txid=True
|
||||
)
|
||||
|
||||
# Check the job is not within the responder jobs before adding it
|
||||
assert uuid not in responder.jobs
|
||||
assert penalty_txid not in responder.tx_job_map
|
||||
# Check the tracker is not within the responder trackers before adding it
|
||||
assert uuid not in responder.trackers
|
||||
assert penalty_txid not in responder.tx_tracker_map
|
||||
assert penalty_txid not in responder.unconfirmed_txs
|
||||
|
||||
# And that it is afterwards
|
||||
responder.create_job(uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations)
|
||||
assert uuid in responder.jobs
|
||||
assert penalty_txid in responder.tx_job_map
|
||||
responder.add_tracker(uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations)
|
||||
assert uuid in responder.trackers
|
||||
assert penalty_txid in responder.tx_tracker_map
|
||||
assert penalty_txid in responder.unconfirmed_txs
|
||||
|
||||
# Check that the rest of job data also matches
|
||||
job = responder.jobs[uuid]
|
||||
# Check that the rest of tracker data also matches
|
||||
tracker = responder.trackers[uuid]
|
||||
assert (
|
||||
job.dispute_txid == dispute_txid
|
||||
and job.penalty_txid == penalty_txid
|
||||
and job.penalty_rawtx == penalty_rawtx
|
||||
and job.appointment_end == appointment_end
|
||||
and job.appointment_end == appointment_end
|
||||
tracker.dispute_txid == dispute_txid
|
||||
and tracker.penalty_txid == penalty_txid
|
||||
and tracker.penalty_rawtx == penalty_rawtx
|
||||
and tracker.appointment_end == appointment_end
|
||||
and tracker.appointment_end == appointment_end
|
||||
)
|
||||
|
||||
|
||||
def test_create_job_same_penalty_txid(responder):
|
||||
# Create the same job using two different uuids
|
||||
def test_add_tracker_same_penalty_txid(responder):
|
||||
# Create the same tracker using two different uuids
|
||||
confirmations = 0
|
||||
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_job_data(random_txid=True)
|
||||
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data(random_txid=True)
|
||||
uuid_1 = uuid4().hex
|
||||
uuid_2 = uuid4().hex
|
||||
|
||||
responder.create_job(uuid_1, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations)
|
||||
responder.create_job(uuid_2, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations)
|
||||
responder.add_tracker(uuid_1, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations)
|
||||
responder.add_tracker(uuid_2, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations)
|
||||
|
||||
# Check that both jobs have been added
|
||||
assert uuid_1 in responder.jobs and uuid_2 in responder.jobs
|
||||
assert penalty_txid in responder.tx_job_map
|
||||
# Check that both trackers have been added
|
||||
assert uuid_1 in responder.trackers and uuid_2 in responder.trackers
|
||||
assert penalty_txid in responder.tx_tracker_map
|
||||
assert penalty_txid in responder.unconfirmed_txs
|
||||
|
||||
# Check that the rest of job data also matches
|
||||
# Check that the rest of tracker data also matches
|
||||
for uuid in [uuid_1, uuid_2]:
|
||||
job = responder.jobs[uuid]
|
||||
tracker = responder.trackers[uuid]
|
||||
assert (
|
||||
job.dispute_txid == dispute_txid
|
||||
and job.penalty_txid == penalty_txid
|
||||
and job.penalty_rawtx == penalty_rawtx
|
||||
and job.appointment_end == appointment_end
|
||||
and job.appointment_end == appointment_end
|
||||
tracker.dispute_txid == dispute_txid
|
||||
and tracker.penalty_txid == penalty_txid
|
||||
and tracker.penalty_rawtx == penalty_rawtx
|
||||
and tracker.appointment_end == appointment_end
|
||||
and tracker.appointment_end == appointment_end
|
||||
)
|
||||
|
||||
|
||||
def test_create_job_already_confirmed(responder):
|
||||
def test_add_tracker_already_confirmed(responder):
|
||||
responder.asleep = False
|
||||
|
||||
for i in range(20):
|
||||
uuid = uuid4().hex
|
||||
confirmations = i + 1
|
||||
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_job_data(
|
||||
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data(
|
||||
penalty_rawtx=TX.create_dummy_transaction()
|
||||
)
|
||||
|
||||
responder.create_job(uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations)
|
||||
responder.add_tracker(uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations)
|
||||
|
||||
assert penalty_txid not in responder.unconfirmed_txs
|
||||
|
||||
@@ -303,16 +305,16 @@ def test_do_watch(temp_db_manager):
|
||||
zmq_thread.daemon = True
|
||||
zmq_thread.start()
|
||||
|
||||
jobs = [create_dummy_job(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(20)]
|
||||
trackers = [create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(20)]
|
||||
|
||||
# Let's set up the jobs first
|
||||
for job in jobs:
|
||||
# Let's set up the trackers first
|
||||
for tracker in trackers:
|
||||
uuid = uuid4().hex
|
||||
|
||||
responder.jobs[uuid] = job
|
||||
responder.tx_job_map[job.penalty_txid] = [uuid]
|
||||
responder.missed_confirmations[job.penalty_txid] = 0
|
||||
responder.unconfirmed_txs.append(job.penalty_txid)
|
||||
responder.trackers[uuid] = tracker
|
||||
responder.tx_tracker_map[tracker.penalty_txid] = [uuid]
|
||||
responder.missed_confirmations[tracker.penalty_txid] = 0
|
||||
responder.unconfirmed_txs.append(tracker.penalty_txid)
|
||||
|
||||
# Let's start to watch
|
||||
watch_thread = Thread(target=responder.do_watch)
|
||||
@@ -321,9 +323,9 @@ def test_do_watch(temp_db_manager):
|
||||
|
||||
# And broadcast some of the transactions
|
||||
broadcast_txs = []
|
||||
for job in jobs[:5]:
|
||||
bitcoin_cli().sendrawtransaction(job.penalty_rawtx)
|
||||
broadcast_txs.append(job.penalty_txid)
|
||||
for tracker in trackers[:5]:
|
||||
bitcoin_cli().sendrawtransaction(tracker.penalty_rawtx)
|
||||
broadcast_txs.append(tracker.penalty_txid)
|
||||
|
||||
# Mine a block
|
||||
generate_block()
|
||||
@@ -333,21 +335,21 @@ def test_do_watch(temp_db_manager):
|
||||
|
||||
# TODO: test that reorgs can be detected once data persistence is merged (new version of the simulator)
|
||||
|
||||
# Generating 5 additional blocks should complete the 5 jobs
|
||||
# Generating 5 additional blocks should complete the 5 trackers
|
||||
generate_blocks(5)
|
||||
|
||||
assert not set(broadcast_txs).issubset(responder.tx_job_map)
|
||||
assert not set(broadcast_txs).issubset(responder.tx_tracker_map)
|
||||
|
||||
# Do the rest
|
||||
broadcast_txs = []
|
||||
for job in jobs[5:]:
|
||||
bitcoin_cli().sendrawtransaction(job.penalty_rawtx)
|
||||
broadcast_txs.append(job.penalty_txid)
|
||||
for tracker in trackers[5:]:
|
||||
bitcoin_cli().sendrawtransaction(tracker.penalty_rawtx)
|
||||
broadcast_txs.append(tracker.penalty_txid)
|
||||
|
||||
# Mine a block
|
||||
generate_blocks(6)
|
||||
|
||||
assert len(responder.tx_job_map) == 0
|
||||
assert len(responder.tx_tracker_map) == 0
|
||||
assert responder.asleep is True
|
||||
|
||||
|
||||
@@ -368,9 +370,9 @@ def test_check_confirmations(temp_db_manager):
|
||||
txs_subset = random.sample(txs, k=10)
|
||||
responder.unconfirmed_txs.extend(txs_subset)
|
||||
|
||||
# We also need to add them to the tx_job_map since they would be there in normal conditions
|
||||
responder.tx_job_map = {
|
||||
txid: Job(txid[:LOCATOR_LEN_HEX], txid, None, None, None) for txid in responder.unconfirmed_txs
|
||||
# We also need to add them to the tx_tracker_map since they would be there in normal conditions
|
||||
responder.tx_tracker_map = {
|
||||
txid: TransactionTracker(txid[:LOCATOR_LEN_HEX], txid, None, None, None) for txid in responder.unconfirmed_txs
|
||||
}
|
||||
|
||||
# Let's make sure that there are no txs with missed confirmations yet
|
||||
@@ -387,6 +389,7 @@ def test_check_confirmations(temp_db_manager):
|
||||
assert responder.missed_confirmations[tx] == 1
|
||||
|
||||
|
||||
# WIP: Check this properly, a bug pass unnoticed!
|
||||
def test_get_txs_to_rebroadcast(responder):
|
||||
# Let's create a few fake txids and assign at least 6 missing confirmations to each
|
||||
txs_missing_too_many_conf = {get_random_value_hex(32): 6 + i for i in range(10)}
|
||||
@@ -396,67 +399,69 @@ def test_get_txs_to_rebroadcast(responder):
|
||||
|
||||
# All the txs in the first dict should be flagged as to_rebroadcast
|
||||
responder.missed_confirmations = txs_missing_too_many_conf
|
||||
txs_to_rebroadcast = responder.get_txs_to_rebroadcast(txs_missing_too_many_conf)
|
||||
txs_to_rebroadcast = responder.get_txs_to_rebroadcast()
|
||||
assert txs_to_rebroadcast == list(txs_missing_too_many_conf.keys())
|
||||
|
||||
# Non of the txs in the second dict should be flagged
|
||||
responder.missed_confirmations = txs_missing_some_conf
|
||||
txs_to_rebroadcast = responder.get_txs_to_rebroadcast(txs_missing_some_conf)
|
||||
txs_to_rebroadcast = responder.get_txs_to_rebroadcast()
|
||||
assert txs_to_rebroadcast == []
|
||||
|
||||
# Let's check that it also works with a mixed dict
|
||||
responder.missed_confirmations.update(txs_missing_too_many_conf)
|
||||
txs_to_rebroadcast = responder.get_txs_to_rebroadcast(txs_missing_some_conf)
|
||||
txs_to_rebroadcast = responder.get_txs_to_rebroadcast()
|
||||
assert txs_to_rebroadcast == list(txs_missing_too_many_conf.keys())
|
||||
|
||||
|
||||
def test_get_completed_jobs(db_manager):
|
||||
def test_get_completed_trackers(db_manager):
|
||||
initial_height = bitcoin_cli().getblockcount()
|
||||
|
||||
# Let's use a fresh responder for this to make it easier to compare the results
|
||||
responder = Responder(db_manager)
|
||||
|
||||
# A complete job is a job that has reached the appointment end with enough confirmations (> MIN_CONFIRMATIONS)
|
||||
# A complete tracker is a tracker that has reached the appointment end with enough confirmations (> MIN_CONFIRMATIONS)
|
||||
# We'll create three type of transactions: end reached + enough conf, end reached + no enough conf, end not reached
|
||||
jobs_end_conf = {uuid4().hex: create_dummy_job(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(10)}
|
||||
trackers_end_conf = {
|
||||
uuid4().hex: create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(10)
|
||||
}
|
||||
|
||||
jobs_end_no_conf = {}
|
||||
trackers_end_no_conf = {}
|
||||
for _ in range(10):
|
||||
job = create_dummy_job(penalty_rawtx=TX.create_dummy_transaction())
|
||||
responder.unconfirmed_txs.append(job.penalty_txid)
|
||||
jobs_end_no_conf[uuid4().hex] = job
|
||||
tracker = create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction())
|
||||
responder.unconfirmed_txs.append(tracker.penalty_txid)
|
||||
trackers_end_no_conf[uuid4().hex] = tracker
|
||||
|
||||
jobs_no_end = {}
|
||||
trackers_no_end = {}
|
||||
for _ in range(10):
|
||||
job = create_dummy_job(penalty_rawtx=TX.create_dummy_transaction())
|
||||
job.appointment_end += 10
|
||||
jobs_no_end[uuid4().hex] = job
|
||||
tracker = create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction())
|
||||
tracker.appointment_end += 10
|
||||
trackers_no_end[uuid4().hex] = tracker
|
||||
|
||||
# Let's add all to the responder
|
||||
responder.jobs.update(jobs_end_conf)
|
||||
responder.jobs.update(jobs_end_no_conf)
|
||||
responder.jobs.update(jobs_no_end)
|
||||
responder.trackers.update(trackers_end_conf)
|
||||
responder.trackers.update(trackers_end_no_conf)
|
||||
responder.trackers.update(trackers_no_end)
|
||||
|
||||
for uuid, job in responder.jobs.items():
|
||||
bitcoin_cli().sendrawtransaction(job.penalty_rawtx)
|
||||
for uuid, tracker in responder.trackers.items():
|
||||
bitcoin_cli().sendrawtransaction(tracker.penalty_rawtx)
|
||||
|
||||
# The dummy appointments have a end_appointment time of current + 2, but jobs need at least 6 confs by default
|
||||
# The dummy appointments have a end_appointment time of current + 2, but trackers need at least 6 confs by default
|
||||
generate_blocks(6)
|
||||
|
||||
# And now let's check
|
||||
completed_jobs = responder.get_completed_jobs(initial_height + 6)
|
||||
completed_jobs_ids = [job_id for job_id, confirmations in completed_jobs]
|
||||
ended_jobs_keys = list(jobs_end_conf.keys())
|
||||
assert set(completed_jobs_ids) == set(ended_jobs_keys)
|
||||
completed_trackers = responder.get_completed_trackers(initial_height + 6)
|
||||
completed_trackers_ids = [tracker_id for tracker_id, confirmations in completed_trackers]
|
||||
ended_trackers_keys = list(trackers_end_conf.keys())
|
||||
assert set(completed_trackers_ids) == set(ended_trackers_keys)
|
||||
|
||||
# Generating 6 additional blocks should also confirm jobs_no_end
|
||||
# Generating 6 additional blocks should also confirm trackers_no_end
|
||||
generate_blocks(6)
|
||||
|
||||
completed_jobs = responder.get_completed_jobs(initial_height + 12)
|
||||
completed_jobs_ids = [job_id for job_id, confirmations in completed_jobs]
|
||||
ended_jobs_keys.extend(list(jobs_no_end.keys()))
|
||||
completed_trackers = responder.get_completed_trackers(initial_height + 12)
|
||||
completed_trackers_ids = [tracker_id for tracker_id, confirmations in completed_trackers]
|
||||
ended_trackers_keys.extend(list(trackers_no_end.keys()))
|
||||
|
||||
assert set(completed_jobs_ids) == set(ended_jobs_keys)
|
||||
assert set(completed_trackers_ids) == set(ended_trackers_keys)
|
||||
|
||||
|
||||
def test_rebroadcast(db_manager):
|
||||
@@ -465,15 +470,17 @@ def test_rebroadcast(db_manager):
|
||||
|
||||
txs_to_rebroadcast = []
|
||||
|
||||
# Rebroadcast calls add_response with retry=True. The job data is already in jobs.
|
||||
# Rebroadcast calls add_response with retry=True. The tracker data is already in trackers.
|
||||
for i in range(20):
|
||||
uuid = uuid4().hex
|
||||
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_job_data(
|
||||
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data(
|
||||
penalty_rawtx=TX.create_dummy_transaction()
|
||||
)
|
||||
|
||||
responder.jobs[uuid] = Job(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end)
|
||||
responder.tx_job_map[penalty_txid] = [uuid]
|
||||
responder.trackers[uuid] = TransactionTracker(
|
||||
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end
|
||||
)
|
||||
responder.tx_tracker_map[penalty_txid] = [uuid]
|
||||
responder.unconfirmed_txs.append(penalty_txid)
|
||||
|
||||
# Let's add some of the txs in the rebroadcast list
|
||||
@@ -481,7 +488,7 @@ def test_rebroadcast(db_manager):
|
||||
txs_to_rebroadcast.append(penalty_txid)
|
||||
|
||||
# The block_hash passed to rebroadcast does not matter much now. It will in the future to deal with errors
|
||||
receipts = responder.rebroadcast(txs_to_rebroadcast, get_random_value_hex(32))
|
||||
receipts = responder.rebroadcast(txs_to_rebroadcast)
|
||||
|
||||
# All txs should have been delivered and the missed confirmation reset
|
||||
for txid, receipt in receipts:
|
||||
|
||||
Reference in New Issue
Block a user