teosd- First bits of locator cache

This commit is contained in:
Sergi Delgado Segura
2020-05-18 18:23:49 +02:00
parent 01bf857161
commit 9c10f7964f
3 changed files with 50 additions and 12 deletions

View File

@@ -21,6 +21,7 @@ DEFAULT_CONF = {
"DEFAULT_SUBSCRIPTION_DURATION": {"value": 4320, "type": int}, "DEFAULT_SUBSCRIPTION_DURATION": {"value": 4320, "type": int},
"EXPIRY_DELTA": {"value": 6, "type": int}, "EXPIRY_DELTA": {"value": 6, "type": int},
"MIN_TO_SELF_DELAY": {"value": 20, "type": int}, "MIN_TO_SELF_DELAY": {"value": 20, "type": int},
"BLOCK_CACHE_SIZE": {"value": 6, "type": int},
"LOG_FILE": {"value": "teos.log", "type": str, "path": True}, "LOG_FILE": {"value": "teos.log", "type": str, "path": True},
"TEOS_SECRET_KEY": {"value": "teos_sk.der", "type": str, "path": True}, "TEOS_SECRET_KEY": {"value": "teos_sk.der", "type": str, "path": True},
"APPOINTMENTS_DB_PATH": {"value": "appointments", "type": str, "path": True}, "APPOINTMENTS_DB_PATH": {"value": "appointments", "type": str, "path": True},

View File

@@ -84,7 +84,13 @@ def main(command_line_conf):
db_manager = AppointmentsDBM(config.get("APPOINTMENTS_DB_PATH")) db_manager = AppointmentsDBM(config.get("APPOINTMENTS_DB_PATH"))
responder = Responder(db_manager, gatekeeper, carrier, block_processor) responder = Responder(db_manager, gatekeeper, carrier, block_processor)
watcher = Watcher( watcher = Watcher(
db_manager, gatekeeper, block_processor, responder, secret_key_der, config.get("MAX_APPOINTMENTS") db_manager,
gatekeeper,
block_processor,
responder,
secret_key_der,
config.get("MAX_APPOINTMENTS"),
config.get("BLOCK_CACHE_SIZE"),
) )
# Create the chain monitor and start monitoring the chain # Create the chain monitor and start monitoring the chain

View File

@@ -1,5 +1,6 @@
from queue import Queue from queue import Queue
from threading import Thread from threading import Thread
from collections import OrderedDict
from common.logger import Logger from common.logger import Logger
from common.tools import compute_locator from common.tools import compute_locator
@@ -19,6 +20,23 @@ class AppointmentLimitReached(BasicException):
"""Raised when the tower maximum appointment count has been reached""" """Raised when the tower maximum appointment count has been reached"""
class LocatorCache:
def __init__(self, blocks_in_cache):
self.cache = dict()
self.blocks = OrderedDict()
self.cache_size = blocks_in_cache
def is_full(self):
return len(self.blocks) > self.cache_size
def remove_older_block(self):
block_hash, locator_map = self.blocks.popitem(last=False)
for locator, txid in locator_map.items():
del self.cache[locator]
logger.debug("Block removed from cache", block_hash=block_hash)
class Watcher: class Watcher:
""" """
The :class:`Watcher` is in charge of watching for channel breaches for the appointments accepted by the tower. The :class:`Watcher` is in charge of watching for channel breaches for the appointments accepted by the tower.
@@ -41,6 +59,8 @@ class Watcher:
responder (:obj:`Responder <teos.responder.Responder>`): a ``Responder`` instance. responder (:obj:`Responder <teos.responder.Responder>`): a ``Responder`` instance.
sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance). sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance).
max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time. max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time.
blocks_in_cache (:obj:`int`): the number of blocks to keep in cache so recently triggered appointments can be
covered.
Attributes: Attributes:
appointments (:obj:`dict`): a dictionary containing a summary of the appointments (:obj:`ExtendedAppointment appointments (:obj:`dict`): a dictionary containing a summary of the appointments (:obj:`ExtendedAppointment
@@ -60,13 +80,14 @@ class Watcher:
signing_key (:mod:`PrivateKey`): a private key used to sign accepted appointments. signing_key (:mod:`PrivateKey`): a private key used to sign accepted appointments.
max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time. max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time.
last_known_block (:obj:`str`): the last block known by the ``Watcher``. last_known_block (:obj:`str`): the last block known by the ``Watcher``.
last_known_block (:obj:`LocatorCache`): a cache of locators from the last ``blocks_in_cache`` blocks.
Raises: Raises:
:obj:`InvalidKey <common.exceptions.InvalidKey>`: if teos sk cannot be loaded. :obj:`InvalidKey <common.exceptions.InvalidKey>`: if teos sk cannot be loaded.
""" """
def __init__(self, db_manager, gatekeeper, block_processor, responder, sk_der, max_appointments): def __init__(self, db_manager, gatekeeper, block_processor, responder, sk_der, max_appointments, blocks_in_cache):
self.appointments = dict() self.appointments = dict()
self.locator_uuid_map = dict() self.locator_uuid_map = dict()
self.block_queue = Queue() self.block_queue = Queue()
@@ -77,6 +98,7 @@ class Watcher:
self.max_appointments = max_appointments self.max_appointments = max_appointments
self.signing_key = Cryptographer.load_private_key_der(sk_der) self.signing_key = Cryptographer.load_private_key_der(sk_der)
self.last_known_block = db_manager.load_last_block_hash_watcher() self.last_known_block = db_manager.load_last_block_hash_watcher()
self.locator_cache = LocatorCache(blocks_in_cache)
def awake(self): def awake(self):
"""Starts a new thread to monitor the blockchain for channel breaches""" """Starts a new thread to monitor the blockchain for channel breaches"""
@@ -132,6 +154,7 @@ class Watcher:
# Add the appointment to the Gatekeeper # Add the appointment to the Gatekeeper
available_slots = self.gatekeeper.add_update_appointment(user_id, uuid, appointment) available_slots = self.gatekeeper.add_update_appointment(user_id, uuid, appointment)
self.appointments[uuid] = appointment.get_summary() self.appointments[uuid] = appointment.get_summary()
if appointment.locator in self.locator_uuid_map: if appointment.locator in self.locator_uuid_map:
@@ -180,9 +203,15 @@ class Watcher:
block = self.block_processor.get_block(block_hash) block = self.block_processor.get_block(block_hash)
logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash")) logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"))
if len(self.appointments) > 0 and block is not None: txids = block.get("tx")
txids = block.get("tx") # Compute the locator for every transaction in the block and add them to the cache
locators = {compute_locator(txid): txid for txid in txids}
self.locator_cache.cache.update(locators)
self.locator_cache.blocks[block_hash] = locators
logger.debug("Block added to cache", block_hash=block_hash)
# FIXME: change txids for locators?
if len(self.appointments) > 0 and txids:
expired_appointments = self.gatekeeper.get_expired_appointments(block["height"]) expired_appointments = self.gatekeeper.get_expired_appointments(block["height"])
# Make sure we only try to delete what is on the Watcher (some appointments may have been triggered) # Make sure we only try to delete what is on the Watcher (some appointments may have been triggered)
expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys())) expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys()))
@@ -196,7 +225,7 @@ class Watcher:
expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager
) )
valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(txids)) valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(locators))
triggered_flags = [] triggered_flags = []
appointments_to_delete = [] appointments_to_delete = []
@@ -246,28 +275,30 @@ class Watcher:
if len(self.appointments) != 0: if len(self.appointments) != 0:
logger.info("No more pending appointments") logger.info("No more pending appointments")
# Remove a block from the cache if the cache has reached its maximum size
if self.locator_cache.is_full():
self.locator_cache.remove_older_block()
# Register the last processed block for the watcher # Register the last processed block for the watcher
self.db_manager.store_last_block_hash_watcher(block_hash) self.db_manager.store_last_block_hash_watcher(block_hash)
self.last_known_block = block.get("hash") self.last_known_block = block.get("hash")
self.block_queue.task_done() self.block_queue.task_done()
def get_breaches(self, txids): def get_breaches(self, locators):
""" """
Gets a list of channel breaches given the list of transaction ids. Gets a dictionary of channel breaches given a dictionary of locators.
Args: Args:
txids (:obj:`list`): the list of transaction ids included in the last received block. locators (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of transaction ids.
Returns: Returns:
:obj:`dict`: A dictionary (``locator:txid``) with all the breaches found. An empty dictionary if none are :obj:`dict`: A dictionary (``locator:txid``) with all the breaches found. An empty dictionary if none are
found. found.
""" """
potential_locators = {compute_locator(txid): txid for txid in txids}
# Check is any of the tx_ids in the received block is an actual match # Check is any of the tx_ids in the received block is an actual match
intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys()) intersection = set(self.locator_uuid_map.keys()).intersection(locators.keys())
breaches = {locator: potential_locators[locator] for locator in intersection} breaches = {locator: locators[locator] for locator in intersection}
if len(breaches) > 0: if len(breaches) > 0:
logger.info("List of breaches", breaches=breaches) logger.info("List of breaches", breaches=breaches)