From 17128edadaed2db77be222592db0ca5ab93f5998 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Thu, 21 May 2020 16:40:36 +0200 Subject: [PATCH] teos - Renames get_locator to check_locator, adds LocatorCache.init and docstrings --- teos/watcher.py | 100 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 77 insertions(+), 23 deletions(-) diff --git a/teos/watcher.py b/teos/watcher.py index f74e405..c699d31 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -26,15 +26,58 @@ class AppointmentAlreadyTriggered(BasicException): class LocatorCache: + """ + The LocatorCache keeps the data about the last ``cache_size`` blocks around so appointments can be checked against + it. The data is indexed by locator and it's mainly built during the normal ``Watcher`` operation so no extra steps + are normally needed. + + Args: + blocks_in_cache (:obj:`int`): the numbers of blocks to keep in the cache. + + Attributes: + cache (:obj:`dict`): a dictionary of ``locator:dispute_txid`` pair that received appointments are checked + against. + blocks (:obj:`OrderedDict`): An ordered dictionary of the last ``blocks_in_cache`` blocks (block_hash:locators). + Used to keep track of what data belongs to what block, so data can be pruned accordingly. Also needed to + rebuilt the cache in case of a reorgs. + cache_size (:obj:`int`): the size of the cache in blocks. + """ + def __init__(self, blocks_in_cache): self.cache = dict() self.blocks = OrderedDict() self.cache_size = blocks_in_cache + def init(self, last_known_block, block_processor): + """ + Sets the initial state of the block cache. + + Args: + last_known_block (:obj:`str`): the last known block of the ``Watcher``. + block_processor (:obj:`teos.block_processor.BlockProcessor`): a ``BlockProcessor`` instance. + """ + + # This is needed as a separate method from __init__ since it has to be initialized right before start watching. + # Not doing so implies store temporary variables in the Watcher and initialising the cache as None. + target_block_hash = last_known_block + for _ in range(self.cache_size): + target_block = block_processor.get_block(target_block_hash) + + # In some setups, like regtest, it could be the case that there are no enough previous blocks. + if target_block: + locators = {compute_locator(txid): txid for txid in target_block.get("tx")} + self.cache.update(locators) + self.blocks[target_block_hash] = locators + target_block_hash = target_block.get("previousblockhash") + + self.blocks = OrderedDict(reversed((list(self.blocks.items())))) + def is_full(self): + """ Returns whether the cache is full or not """ return len(self.blocks) > self.cache_size def remove_older_block(self): + """ Removes the older block from the cache """ block_hash, locator_map = self.blocks.popitem(last=False) for locator, txid in locator_map.items(): del self.cache[locator] @@ -169,14 +212,14 @@ class Watcher: raise AppointmentAlreadyTriggered(message) try: - breach = self.filter_breach(uuid, appointment, self.locator_cache.cache[appointment.locator]) + breach = self.check_breach(uuid, appointment, self.locator_cache.cache[appointment.locator]) receipt = self.responder.handle_breach( uuid, breach["locator"], breach["dispute_txid"], breach["penalty_txid"], breach["penalty_rawtx"], - self.appointments[uuid].get("user_id"), + user_id, self.last_known_block, ) @@ -239,15 +282,7 @@ class Watcher: self.db_manager.store_last_block_hash_watcher(self.last_known_block) # Initialise the locator cache with the last ``cache_size`` blocks. - target_block_hash = self.last_known_block - for _ in range(self.locator_cache.cache_size): - target_block = self.block_processor.get_block(target_block_hash) - locators = {compute_locator(txid): txid for txid in target_block.get("tx")} - self.locator_cache.cache.update(locators) - self.locator_cache.blocks[target_block_hash] = locators - target_block_hash = target_block.get("previousblockhash") - - self.locator_cache.blocks = OrderedDict(reversed((list(self.locator_cache.blocks.items())))) + self.locator_cache.init(self.last_known_block, self.block_processor) while True: block_hash = self.block_queue.get() @@ -256,12 +291,12 @@ class Watcher: txids = block.get("tx") # Compute the locator for every transaction in the block and add them to the cache - locators = {compute_locator(txid): txid for txid in txids} - self.locator_cache.cache.update(locators) - self.locator_cache.blocks[block_hash] = locators + locators_txid_map = {compute_locator(txid): txid for txid in txids} + self.locator_cache.cache.update(locators_txid_map) + self.locator_cache.blocks[block_hash] = locators_txid_map logger.debug("Block added to cache", block_hash=block_hash) - if len(self.appointments) > 0 and locators: + if len(self.appointments) > 0 and locators_txid_map: expired_appointments = self.gatekeeper.get_expired_appointments(block["height"]) # Make sure we only try to delete what is on the Watcher (some appointments may have been triggered) expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys())) @@ -275,7 +310,7 @@ class Watcher: expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager ) - valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(locators)) + valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(locators_txid_map)) triggered_flags = [] appointments_to_delete = [] @@ -334,12 +369,13 @@ class Watcher: self.last_known_block = block.get("hash") self.block_queue.task_done() - def get_breaches(self, locators): + def get_breaches(self, locators_txid_map): """ - Gets a dictionary of channel breaches given a dictionary of locators. + Gets a dictionary of channel breaches given a map of locator:dispute_txid. Args: - locators (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of transaction ids. + locators_txid_map (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of + transaction ids. Returns: :obj:`dict`: A dictionary (``locator:txid``) with all the breaches found. An empty dictionary if none are @@ -347,8 +383,8 @@ class Watcher: """ # Check is any of the tx_ids in the received block is an actual match - intersection = set(self.locator_uuid_map.keys()).intersection(locators.keys()) - breaches = {locator: locators[locator] for locator in intersection} + intersection = set(self.locator_uuid_map.keys()).intersection(locators_txid_map.keys()) + breaches = {locator: locators_txid_map[locator] for locator in intersection} if len(breaches) > 0: logger.info("List of breaches", breaches=breaches) @@ -358,7 +394,25 @@ class Watcher: return breaches - def filter_breach(self, uuid, appointment, dispute_txid): + def check_breach(self, uuid, appointment, dispute_txid): + """ + Checks if a breach is valid. Valid breaches should decrypt to a valid transaction. + + Args: + uuid (:obj:`str`): the uuid of the appointment that was triggered by the breach. + appointment (:obj:`teos.extended_appointment.ExtendedAppointment`): the appointment data. + dispute_txid (:obj:`str`): the id of the transaction that triggered the breach. + + Returns: + :obj:`dic`: The breach data in a dictionary (locator, dispute_txid, penalty_txid, penalty_rawtx), if the + breach is correct. + + Raises: + :obj:`EncryptionError`: If the encrypted blob from the provided appointment cannot be decrypted with the + key derived from the breach transaction id. + :obj:`InvalidTransactionFormat`: If the decrypted data does not have a valid transaction format. + """ + try: penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid) penalty_tx = self.block_processor.decode_raw_transaction(penalty_rawtx) @@ -422,7 +476,7 @@ class Watcher: else: try: - valid_breach = self.filter_breach(uuid, appointment, dispute_txid) + valid_breach = self.check_breach(uuid, appointment, dispute_txid) valid_breaches[uuid] = valid_breach decrypted_blobs[appointment.encrypted_blob] = ( valid_breach["penalty_txid"],