mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
Renames matches to breaches to be consistent with the Responder. Also adds docstrings
This commit is contained in:
176
pisa/watcher.py
176
pisa/watcher.py
@@ -16,6 +16,47 @@ logger = Logger("Watcher")
|
|||||||
|
|
||||||
|
|
||||||
class Watcher:
|
class Watcher:
|
||||||
|
"""
|
||||||
|
The ``Watcher`` is the class in charge to watch for channel breaches for the appointments accepted by the tower.
|
||||||
|
|
||||||
|
The ``Watcher`` keeps track of the accepted appointments in ``appointments`` and, for new received block, checks
|
||||||
|
if any breach has happened by comparing the txids with the appointment locators. If a breach is seen, the
|
||||||
|
:mod:`EncryptedBlob <pisa.encrypted_blob>` of the corresponding appointment is decrypted and the data is passed
|
||||||
|
to the :mod:`Responder <pisa.responder>`.
|
||||||
|
|
||||||
|
If an appointment reaches its end with no breach, the data is simply deleted.
|
||||||
|
|
||||||
|
The ``Watcher`` receives information about new received blocks via the ``block_queue`` that is populated by the
|
||||||
|
:mod:`ZMQSubscriber <pisa.utils.zmq_subscriber>`.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db_manager (:obj:`DBManager <pisa.db_manager>`): a ``DBManager`` instance to interact with the database.
|
||||||
|
pisa_sk_file (:obj:`str`): a path to the private key used to sign appointment receipts (signaling acceptance).
|
||||||
|
responder (:obj:`Responder <pisa.responder.Responder>`): a ``Responder`` instance. If ``None`` is passed, a new
|
||||||
|
instance is created. Populated instances are useful when bootstrapping the system from backed-up data.
|
||||||
|
max_appointments(:obj:`int`): the maximum amount of appointments that the :obj:`Watcher` will keep at any given
|
||||||
|
time. Defaults to ``MAX_APPOINTMENTS``.
|
||||||
|
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
appointments (:obj:`dict`): a dictionary containing all the appointments (:obj:`Appointment
|
||||||
|
<pisa.appointment.Appointment>` instances) accepted by the tower. It's populated trough ``add_appointment``.
|
||||||
|
locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several
|
||||||
|
appointments with the same ``locator``.
|
||||||
|
asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake.
|
||||||
|
block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is
|
||||||
|
populated by the :obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>`.
|
||||||
|
max_appointments(:obj:`int`): the maximum amount of appointments that the :obj:`Watcher` will keep at any given
|
||||||
|
time.
|
||||||
|
zmq_subscriber (:obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>`): a ZMQSubscriber instance used
|
||||||
|
to receive new block notifications from ``bitcoind``.
|
||||||
|
db_manager (:obj:`DBManager <pisa.db_manager>`): A db manager instance to interact with the database.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: if `pisa_sk_file` is not found.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, db_manager, pisa_sk_file=PISA_SECRET_KEY, responder=None, max_appointments=MAX_APPOINTMENTS):
|
def __init__(self, db_manager, pisa_sk_file=PISA_SECRET_KEY, responder=None, max_appointments=MAX_APPOINTMENTS):
|
||||||
self.appointments = dict()
|
self.appointments = dict()
|
||||||
self.locator_uuid_map = dict()
|
self.locator_uuid_map = dict()
|
||||||
@@ -37,22 +78,48 @@ class Watcher:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def compute_locator(tx_id):
|
def compute_locator(tx_id):
|
||||||
|
"""
|
||||||
|
Computes an appointment locator given a transaction id.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tx_id (:obj:`str`): the transaction id used to compute the locator.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(:obj:`str`): The computed locator.
|
||||||
|
"""
|
||||||
|
|
||||||
return tx_id[:LOCATOR_LEN_HEX]
|
return tx_id[:LOCATOR_LEN_HEX]
|
||||||
|
|
||||||
def add_appointment(self, appointment):
|
def add_appointment(self, appointment):
|
||||||
# Rationale:
|
"""
|
||||||
# The Watcher will analyze every received block looking for appointment matches. If there is no work
|
Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached.
|
||||||
# to do the watcher can go sleep (if appointments = {} then asleep = True) otherwise for every received block
|
|
||||||
# the watcher will get the list of transactions and compare it with the list of appointments.
|
``add_appointment`` is the entry point of the Watcher. Upon receiving a new appointment, if the :obj:`Watcher`
|
||||||
# If the watcher is awake, every new appointment will just be added to the appointment list until
|
is asleep, it will be awaken and start monitoring the blockchain (``do_watch``) until ``appointments`` is empty.
|
||||||
# max_appointments is reached.
|
It will go back to sleep once there are no more pending appointments.
|
||||||
|
|
||||||
|
Once a breach is seen on the blockchain, the :obj:`Watcher` will decrypt the corresponding
|
||||||
|
:mod:`EncryptedBlob <pisa.encrypted_blob.EncryptedBlob` and pass the information to the
|
||||||
|
:mod:`Responder <pisa.responder.Responder>`.
|
||||||
|
|
||||||
|
The tower may store multiple appointments with the same ``locator`` to avoid DoS attacks based on data
|
||||||
|
rewriting. `locators`` should be derived from the ``dispute_txid``, but that task is performed by the user, and
|
||||||
|
the tower has no way of verifying whether or not they have been properly derived. Therefore, appointments are
|
||||||
|
identified by ``uuid`` and stored in ``appointments`` and ``locator_uuid_map``.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
appointment (:obj:`Appointment <pisa.appointment.Appointment>`): the appointment to be added to the
|
||||||
|
:obj:`Watcher`.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:obj:`tuple`: A tuple signaling if the appointment has been added or not (based on ``max_appointments``).
|
||||||
|
The structure looks as follows:
|
||||||
|
- ``(True, signature)`` if the appointment has been accepted.
|
||||||
|
- ``(False, None)`` otherwise.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
if len(self.appointments) < self.max_appointments:
|
if len(self.appointments) < self.max_appointments:
|
||||||
# Appointments are identified by the locator: the sha256 of commitment txid (H(tx_id)).
|
|
||||||
# Two different nodes may ask for appointments using the same commitment txid, what will result in a
|
|
||||||
# collision in our appointments structure (and may be an attack surface). In order to avoid such collisions
|
|
||||||
# we will identify every appointment with a uuid
|
|
||||||
|
|
||||||
uuid = uuid4().hex
|
uuid = uuid4().hex
|
||||||
self.appointments[uuid] = appointment
|
self.appointments[uuid] = appointment
|
||||||
|
|
||||||
@@ -89,10 +156,22 @@ class Watcher:
|
|||||||
return appointment_added, signature
|
return appointment_added, signature
|
||||||
|
|
||||||
def do_subscribe(self):
|
def do_subscribe(self):
|
||||||
|
"""
|
||||||
|
Initializes a ``ZMQSubscriber`` instance to listen to new blocks from ``bitcoind``. Block ids are received
|
||||||
|
trough the ``block_queue``.
|
||||||
|
"""
|
||||||
|
|
||||||
self.zmq_subscriber = ZMQSubscriber(parent="Watcher")
|
self.zmq_subscriber = ZMQSubscriber(parent="Watcher")
|
||||||
self.zmq_subscriber.handle(self.block_queue)
|
self.zmq_subscriber.handle(self.block_queue)
|
||||||
|
|
||||||
def do_watch(self):
|
def do_watch(self):
|
||||||
|
"""
|
||||||
|
Monitors the blockchain whilst there are pending appointments.
|
||||||
|
|
||||||
|
This is the main method of the :obj:`Watcher` and the one in charge to pass appointments to the
|
||||||
|
:obj:`Responder <pisa.responder.Responder>` upon detecting a breach.
|
||||||
|
"""
|
||||||
|
|
||||||
while len(self.appointments) > 0:
|
while len(self.appointments) > 0:
|
||||||
block_hash = self.block_queue.get()
|
block_hash = self.block_queue.get()
|
||||||
logger.info("New block received", block_hash=block_hash)
|
logger.info("New block received", block_hash=block_hash)
|
||||||
@@ -114,24 +193,24 @@ class Watcher:
|
|||||||
expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager
|
expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager
|
||||||
)
|
)
|
||||||
|
|
||||||
filtered_matches = self.filter_valid_matches(self.get_matches(txids))
|
filtered_breaches = self.filter_valid_breaches(self.get_breaches(txids))
|
||||||
|
|
||||||
for uuid, filtered_match in filtered_matches.items():
|
for uuid, filtered_breach in filtered_breaches.items():
|
||||||
# Errors decrypting the Blob will result in a None penalty_txid
|
# Errors decrypting the Blob will result in a None penalty_txid
|
||||||
if filtered_match["valid_match"] is True:
|
if filtered_breach["valid_breach"] is True:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Notifying responder and deleting appointment.",
|
"Notifying responder and deleting appointment.",
|
||||||
penalty_txid=filtered_match["penalty_txid"],
|
penalty_txid=filtered_breach["penalty_txid"],
|
||||||
locator=filtered_match["locator"],
|
locator=filtered_breach["locator"],
|
||||||
uuid=uuid,
|
uuid=uuid,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.responder.handle_breach(
|
self.responder.handle_breach(
|
||||||
uuid,
|
uuid,
|
||||||
filtered_match["locator"],
|
filtered_breach["locator"],
|
||||||
filtered_match["dispute_txid"],
|
filtered_breach["dispute_txid"],
|
||||||
filtered_match["penalty_txid"],
|
filtered_breach["penalty_txid"],
|
||||||
filtered_match["penalty_rawtx"],
|
filtered_breach["penalty_rawtx"],
|
||||||
self.appointments[uuid].end_time,
|
self.appointments[uuid].end_time,
|
||||||
block_hash,
|
block_hash,
|
||||||
)
|
)
|
||||||
@@ -151,25 +230,52 @@ class Watcher:
|
|||||||
|
|
||||||
logger.info("No more pending appointments, going back to sleep")
|
logger.info("No more pending appointments, going back to sleep")
|
||||||
|
|
||||||
def get_matches(self, txids):
|
def get_breaches(self, txids):
|
||||||
|
"""
|
||||||
|
Gets a list of channel breaches given the list of transaction ids.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
txids (:obj:`list`): the list of transaction ids included in the last received block.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:obj:`dict`: A dictionary (``locator:txid``) with all the breaches found. An empty dictionary if none are
|
||||||
|
found.
|
||||||
|
"""
|
||||||
|
|
||||||
potential_locators = {Watcher.compute_locator(txid): txid for txid in txids}
|
potential_locators = {Watcher.compute_locator(txid): txid for txid in txids}
|
||||||
|
|
||||||
# Check is any of the tx_ids in the received block is an actual match
|
# Check is any of the tx_ids in the received block is an actual match
|
||||||
intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys())
|
intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys())
|
||||||
matches = {locator: potential_locators[locator] for locator in intersection}
|
breaches = {locator: potential_locators[locator] for locator in intersection}
|
||||||
|
|
||||||
if len(matches) > 0:
|
if len(breaches) > 0:
|
||||||
logger.info("List of matches", potential_matches=matches)
|
logger.info("List of breaches", breaches=breaches)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.info("No matches found")
|
logger.info("No breaches found")
|
||||||
|
|
||||||
return matches
|
return breaches
|
||||||
|
|
||||||
def filter_valid_matches(self, matches):
|
def filter_valid_breaches(self, breaches):
|
||||||
filtered_matches = {}
|
"""
|
||||||
|
Filters what of the found breaches contain valid transaction data.
|
||||||
|
|
||||||
for locator, dispute_txid in matches.items():
|
The :obj:`Watcher` cannot if a given :obj:`EncryptedBlob <pisa.encrypted_blob.EncryptedBlob>` contains a valid
|
||||||
|
transaction until a breach if seen. Blobs that contain arbitrary data are dropped and not sent to the
|
||||||
|
:obj:`Responder <pisa.responder.Responder>`.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
breaches (:obj:`dict`): a dictionary containing channel breaches (``locator:txid``).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:obj:`dict`: A dictionary containing all the breaches flagged either as valid or invalid.
|
||||||
|
The structure is as follows:
|
||||||
|
``{locator, dispute_txid, penalty_txid, penalty_rawtx, valid_breach}``
|
||||||
|
"""
|
||||||
|
|
||||||
|
filtered_breaches = {}
|
||||||
|
|
||||||
|
for locator, dispute_txid in breaches.items():
|
||||||
for uuid in self.locator_uuid_map[locator]:
|
for uuid in self.locator_uuid_map[locator]:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -182,20 +288,20 @@ class Watcher:
|
|||||||
|
|
||||||
if penalty_tx is not None:
|
if penalty_tx is not None:
|
||||||
penalty_txid = penalty_tx.get("txid")
|
penalty_txid = penalty_tx.get("txid")
|
||||||
valid_match = True
|
valid_breach = True
|
||||||
|
|
||||||
logger.info("Match found for locator.", locator=locator, uuid=uuid, penalty_txid=penalty_txid)
|
logger.info("Breach found for locator.", locator=locator, uuid=uuid, penalty_txid=penalty_txid)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
penalty_txid = None
|
penalty_txid = None
|
||||||
valid_match = False
|
valid_breach = False
|
||||||
|
|
||||||
filtered_matches[uuid] = {
|
filtered_breaches[uuid] = {
|
||||||
"locator": locator,
|
"locator": locator,
|
||||||
"dispute_txid": dispute_txid,
|
"dispute_txid": dispute_txid,
|
||||||
"penalty_txid": penalty_txid,
|
"penalty_txid": penalty_txid,
|
||||||
"penalty_rawtx": penalty_rawtx,
|
"penalty_rawtx": penalty_rawtx,
|
||||||
"valid_match": valid_match,
|
"valid_breach": valid_breach,
|
||||||
}
|
}
|
||||||
|
|
||||||
return filtered_matches
|
return filtered_breaches
|
||||||
|
|||||||
@@ -164,29 +164,29 @@ def test_do_watch(watcher):
|
|||||||
assert watcher.asleep is True
|
assert watcher.asleep is True
|
||||||
|
|
||||||
|
|
||||||
def test_get_matches(watcher, txids, locator_uuid_map):
|
def test_get_breaches(watcher, txids, locator_uuid_map):
|
||||||
watcher.locator_uuid_map = locator_uuid_map
|
watcher.locator_uuid_map = locator_uuid_map
|
||||||
potential_matches = watcher.get_matches(txids)
|
potential_breaches = watcher.get_breaches(txids)
|
||||||
|
|
||||||
# All the txids must match
|
# All the txids must breach
|
||||||
assert locator_uuid_map.keys() == potential_matches.keys()
|
assert locator_uuid_map.keys() == potential_breaches.keys()
|
||||||
|
|
||||||
|
|
||||||
def test_get_matches_random_data(watcher, locator_uuid_map):
|
def test_get_breaches_random_data(watcher, locator_uuid_map):
|
||||||
# The likelihood of finding a potential match with random data should be negligible
|
# The likelihood of finding a potential breach with random data should be negligible
|
||||||
watcher.locator_uuid_map = locator_uuid_map
|
watcher.locator_uuid_map = locator_uuid_map
|
||||||
txids = [get_random_value_hex(32) for _ in range(TEST_SET_SIZE)]
|
txids = [get_random_value_hex(32) for _ in range(TEST_SET_SIZE)]
|
||||||
|
|
||||||
potential_matches = watcher.get_matches(txids)
|
potential_breaches = watcher.get_breaches(txids)
|
||||||
|
|
||||||
# None of the txids should match
|
# None of the txids should breach
|
||||||
assert len(potential_matches) == 0
|
assert len(potential_breaches) == 0
|
||||||
|
|
||||||
|
|
||||||
def test_filter_valid_matches_random_data(watcher):
|
def test_filter_valid_breaches_random_data(watcher):
|
||||||
appointments = {}
|
appointments = {}
|
||||||
locator_uuid_map = {}
|
locator_uuid_map = {}
|
||||||
matches = {}
|
breaches = {}
|
||||||
|
|
||||||
for i in range(TEST_SET_SIZE):
|
for i in range(TEST_SET_SIZE):
|
||||||
dummy_appointment, _ = generate_dummy_appointment()
|
dummy_appointment, _ = generate_dummy_appointment()
|
||||||
@@ -197,17 +197,17 @@ def test_filter_valid_matches_random_data(watcher):
|
|||||||
|
|
||||||
if i % 2:
|
if i % 2:
|
||||||
dispute_txid = get_random_value_hex(32)
|
dispute_txid = get_random_value_hex(32)
|
||||||
matches[dummy_appointment.locator] = dispute_txid
|
breaches[dummy_appointment.locator] = dispute_txid
|
||||||
|
|
||||||
watcher.locator_uuid_map = locator_uuid_map
|
watcher.locator_uuid_map = locator_uuid_map
|
||||||
watcher.appointments = appointments
|
watcher.appointments = appointments
|
||||||
|
|
||||||
filtered_valid_matches = watcher.filter_valid_matches(matches)
|
filtered_valid_breaches = watcher.filter_valid_breaches(breaches)
|
||||||
|
|
||||||
assert not any([fil_match["valid_match"] for uuid, fil_match in filtered_valid_matches.items()])
|
assert not any([fil_breach["valid_breach"] for uuid, fil_breach in filtered_valid_breaches.items()])
|
||||||
|
|
||||||
|
|
||||||
def test_filter_valid_matches(watcher):
|
def test_filter_valid_breaches(watcher):
|
||||||
dispute_txid = "0437cd7f8525ceed2324359c2d0ba26006d92d856a9c20fa0241106ee5a597c9"
|
dispute_txid = "0437cd7f8525ceed2324359c2d0ba26006d92d856a9c20fa0241106ee5a597c9"
|
||||||
encrypted_blob = (
|
encrypted_blob = (
|
||||||
"a62aa9bb3c8591e4d5de10f1bd49db92432ce2341af55762cdc9242c08662f97f5f47da0a1aa88373508cd6e67e87eefddeca0cee98c1"
|
"a62aa9bb3c8591e4d5de10f1bd49db92432ce2341af55762cdc9242c08662f97f5f47da0a1aa88373508cd6e67e87eefddeca0cee98c1"
|
||||||
@@ -225,11 +225,11 @@ def test_filter_valid_matches(watcher):
|
|||||||
|
|
||||||
appointments = {uuid: dummy_appointment}
|
appointments = {uuid: dummy_appointment}
|
||||||
locator_uuid_map = {dummy_appointment.locator: [uuid]}
|
locator_uuid_map = {dummy_appointment.locator: [uuid]}
|
||||||
matches = {dummy_appointment.locator: dispute_txid}
|
breaches = {dummy_appointment.locator: dispute_txid}
|
||||||
|
|
||||||
watcher.appointments = appointments
|
watcher.appointments = appointments
|
||||||
watcher.locator_uuid_map = locator_uuid_map
|
watcher.locator_uuid_map = locator_uuid_map
|
||||||
|
|
||||||
filtered_valid_matches = watcher.filter_valid_matches(matches)
|
filtered_valid_breaches = watcher.filter_valid_breaches(breaches)
|
||||||
|
|
||||||
assert all([fil_match["valid_match"] for uuid, fil_match in filtered_valid_matches.items()])
|
assert all([fil_breach["valid_breach"] for uuid, fil_breach in filtered_valid_breaches.items()])
|
||||||
|
|||||||
Reference in New Issue
Block a user