From 9c10f7964fbd31502a19d85986a89a2b99ab3c1d Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 18 May 2020 18:23:49 +0200 Subject: [PATCH] teosd- First bits of locator cache --- teos/__init__.py | 1 + teos/teosd.py | 8 +++++++- teos/watcher.py | 53 ++++++++++++++++++++++++++++++++++++++---------- 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/teos/__init__.py b/teos/__init__.py index 1a29ddb..7c986bf 100644 --- a/teos/__init__.py +++ b/teos/__init__.py @@ -21,6 +21,7 @@ DEFAULT_CONF = { "DEFAULT_SUBSCRIPTION_DURATION": {"value": 4320, "type": int}, "EXPIRY_DELTA": {"value": 6, "type": int}, "MIN_TO_SELF_DELAY": {"value": 20, "type": int}, + "BLOCK_CACHE_SIZE": {"value": 6, "type": int}, "LOG_FILE": {"value": "teos.log", "type": str, "path": True}, "TEOS_SECRET_KEY": {"value": "teos_sk.der", "type": str, "path": True}, "APPOINTMENTS_DB_PATH": {"value": "appointments", "type": str, "path": True}, diff --git a/teos/teosd.py b/teos/teosd.py index 8e1a694..c9f0a02 100644 --- a/teos/teosd.py +++ b/teos/teosd.py @@ -84,7 +84,13 @@ def main(command_line_conf): db_manager = AppointmentsDBM(config.get("APPOINTMENTS_DB_PATH")) responder = Responder(db_manager, gatekeeper, carrier, block_processor) watcher = Watcher( - db_manager, gatekeeper, block_processor, responder, secret_key_der, config.get("MAX_APPOINTMENTS") + db_manager, + gatekeeper, + block_processor, + responder, + secret_key_der, + config.get("MAX_APPOINTMENTS"), + config.get("BLOCK_CACHE_SIZE"), ) # Create the chain monitor and start monitoring the chain diff --git a/teos/watcher.py b/teos/watcher.py index ff5ff40..8d6aeba 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -1,5 +1,6 @@ from queue import Queue from threading import Thread +from collections import OrderedDict from common.logger import Logger from common.tools import compute_locator @@ -19,6 +20,23 @@ class AppointmentLimitReached(BasicException): """Raised when the tower maximum appointment count has been reached""" +class LocatorCache: + def __init__(self, blocks_in_cache): + self.cache = dict() + self.blocks = OrderedDict() + self.cache_size = blocks_in_cache + + def is_full(self): + return len(self.blocks) > self.cache_size + + def remove_older_block(self): + block_hash, locator_map = self.blocks.popitem(last=False) + for locator, txid in locator_map.items(): + del self.cache[locator] + + logger.debug("Block removed from cache", block_hash=block_hash) + + class Watcher: """ The :class:`Watcher` is in charge of watching for channel breaches for the appointments accepted by the tower. @@ -41,6 +59,8 @@ class Watcher: responder (:obj:`Responder `): a ``Responder`` instance. sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance). max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time. + blocks_in_cache (:obj:`int`): the number of blocks to keep in cache so recently triggered appointments can be + covered. Attributes: appointments (:obj:`dict`): a dictionary containing a summary of the appointments (:obj:`ExtendedAppointment @@ -60,13 +80,14 @@ class Watcher: signing_key (:mod:`PrivateKey`): a private key used to sign accepted appointments. max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time. last_known_block (:obj:`str`): the last block known by the ``Watcher``. + last_known_block (:obj:`LocatorCache`): a cache of locators from the last ``blocks_in_cache`` blocks. Raises: :obj:`InvalidKey `: if teos sk cannot be loaded. """ - def __init__(self, db_manager, gatekeeper, block_processor, responder, sk_der, max_appointments): + def __init__(self, db_manager, gatekeeper, block_processor, responder, sk_der, max_appointments, blocks_in_cache): self.appointments = dict() self.locator_uuid_map = dict() self.block_queue = Queue() @@ -77,6 +98,7 @@ class Watcher: self.max_appointments = max_appointments self.signing_key = Cryptographer.load_private_key_der(sk_der) self.last_known_block = db_manager.load_last_block_hash_watcher() + self.locator_cache = LocatorCache(blocks_in_cache) def awake(self): """Starts a new thread to monitor the blockchain for channel breaches""" @@ -132,6 +154,7 @@ class Watcher: # Add the appointment to the Gatekeeper available_slots = self.gatekeeper.add_update_appointment(user_id, uuid, appointment) + self.appointments[uuid] = appointment.get_summary() if appointment.locator in self.locator_uuid_map: @@ -180,9 +203,15 @@ class Watcher: block = self.block_processor.get_block(block_hash) logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash")) - if len(self.appointments) > 0 and block is not None: - txids = block.get("tx") + txids = block.get("tx") + # Compute the locator for every transaction in the block and add them to the cache + locators = {compute_locator(txid): txid for txid in txids} + self.locator_cache.cache.update(locators) + self.locator_cache.blocks[block_hash] = locators + logger.debug("Block added to cache", block_hash=block_hash) + # FIXME: change txids for locators? + if len(self.appointments) > 0 and txids: expired_appointments = self.gatekeeper.get_expired_appointments(block["height"]) # Make sure we only try to delete what is on the Watcher (some appointments may have been triggered) expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys())) @@ -196,7 +225,7 @@ class Watcher: expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager ) - valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(txids)) + valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(locators)) triggered_flags = [] appointments_to_delete = [] @@ -246,28 +275,30 @@ class Watcher: if len(self.appointments) != 0: logger.info("No more pending appointments") + # Remove a block from the cache if the cache has reached its maximum size + if self.locator_cache.is_full(): + self.locator_cache.remove_older_block() + # Register the last processed block for the watcher self.db_manager.store_last_block_hash_watcher(block_hash) self.last_known_block = block.get("hash") self.block_queue.task_done() - def get_breaches(self, txids): + def get_breaches(self, locators): """ - Gets a list of channel breaches given the list of transaction ids. + Gets a dictionary of channel breaches given a dictionary of locators. Args: - txids (:obj:`list`): the list of transaction ids included in the last received block. + locators (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of transaction ids. Returns: :obj:`dict`: A dictionary (``locator:txid``) with all the breaches found. An empty dictionary if none are found. """ - potential_locators = {compute_locator(txid): txid for txid in txids} - # Check is any of the tx_ids in the received block is an actual match - intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys()) - breaches = {locator: potential_locators[locator] for locator in intersection} + intersection = set(self.locator_uuid_map.keys()).intersection(locators.keys()) + breaches = {locator: locators[locator] for locator in intersection} if len(breaches) > 0: logger.info("List of breaches", breaches=breaches)