From 9c10f7964fbd31502a19d85986a89a2b99ab3c1d Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 18 May 2020 18:23:49 +0200 Subject: [PATCH 01/21] teosd- First bits of locator cache --- teos/__init__.py | 1 + teos/teosd.py | 8 +++++++- teos/watcher.py | 53 ++++++++++++++++++++++++++++++++++++++---------- 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/teos/__init__.py b/teos/__init__.py index 1a29ddb..7c986bf 100644 --- a/teos/__init__.py +++ b/teos/__init__.py @@ -21,6 +21,7 @@ DEFAULT_CONF = { "DEFAULT_SUBSCRIPTION_DURATION": {"value": 4320, "type": int}, "EXPIRY_DELTA": {"value": 6, "type": int}, "MIN_TO_SELF_DELAY": {"value": 20, "type": int}, + "BLOCK_CACHE_SIZE": {"value": 6, "type": int}, "LOG_FILE": {"value": "teos.log", "type": str, "path": True}, "TEOS_SECRET_KEY": {"value": "teos_sk.der", "type": str, "path": True}, "APPOINTMENTS_DB_PATH": {"value": "appointments", "type": str, "path": True}, diff --git a/teos/teosd.py b/teos/teosd.py index 8e1a694..c9f0a02 100644 --- a/teos/teosd.py +++ b/teos/teosd.py @@ -84,7 +84,13 @@ def main(command_line_conf): db_manager = AppointmentsDBM(config.get("APPOINTMENTS_DB_PATH")) responder = Responder(db_manager, gatekeeper, carrier, block_processor) watcher = Watcher( - db_manager, gatekeeper, block_processor, responder, secret_key_der, config.get("MAX_APPOINTMENTS") + db_manager, + gatekeeper, + block_processor, + responder, + secret_key_der, + config.get("MAX_APPOINTMENTS"), + config.get("BLOCK_CACHE_SIZE"), ) # Create the chain monitor and start monitoring the chain diff --git a/teos/watcher.py b/teos/watcher.py index ff5ff40..8d6aeba 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -1,5 +1,6 @@ from queue import Queue from threading import Thread +from collections import OrderedDict from common.logger import Logger from common.tools import compute_locator @@ -19,6 +20,23 @@ class AppointmentLimitReached(BasicException): """Raised when the tower maximum appointment count has been reached""" +class LocatorCache: + def __init__(self, blocks_in_cache): + self.cache = dict() + self.blocks = OrderedDict() + self.cache_size = blocks_in_cache + + def is_full(self): + return len(self.blocks) > self.cache_size + + def remove_older_block(self): + block_hash, locator_map = self.blocks.popitem(last=False) + for locator, txid in locator_map.items(): + del self.cache[locator] + + logger.debug("Block removed from cache", block_hash=block_hash) + + class Watcher: """ The :class:`Watcher` is in charge of watching for channel breaches for the appointments accepted by the tower. @@ -41,6 +59,8 @@ class Watcher: responder (:obj:`Responder `): a ``Responder`` instance. sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance). max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time. + blocks_in_cache (:obj:`int`): the number of blocks to keep in cache so recently triggered appointments can be + covered. Attributes: appointments (:obj:`dict`): a dictionary containing a summary of the appointments (:obj:`ExtendedAppointment @@ -60,13 +80,14 @@ class Watcher: signing_key (:mod:`PrivateKey`): a private key used to sign accepted appointments. max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time. last_known_block (:obj:`str`): the last block known by the ``Watcher``. + last_known_block (:obj:`LocatorCache`): a cache of locators from the last ``blocks_in_cache`` blocks. Raises: :obj:`InvalidKey `: if teos sk cannot be loaded. """ - def __init__(self, db_manager, gatekeeper, block_processor, responder, sk_der, max_appointments): + def __init__(self, db_manager, gatekeeper, block_processor, responder, sk_der, max_appointments, blocks_in_cache): self.appointments = dict() self.locator_uuid_map = dict() self.block_queue = Queue() @@ -77,6 +98,7 @@ class Watcher: self.max_appointments = max_appointments self.signing_key = Cryptographer.load_private_key_der(sk_der) self.last_known_block = db_manager.load_last_block_hash_watcher() + self.locator_cache = LocatorCache(blocks_in_cache) def awake(self): """Starts a new thread to monitor the blockchain for channel breaches""" @@ -132,6 +154,7 @@ class Watcher: # Add the appointment to the Gatekeeper available_slots = self.gatekeeper.add_update_appointment(user_id, uuid, appointment) + self.appointments[uuid] = appointment.get_summary() if appointment.locator in self.locator_uuid_map: @@ -180,9 +203,15 @@ class Watcher: block = self.block_processor.get_block(block_hash) logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash")) - if len(self.appointments) > 0 and block is not None: - txids = block.get("tx") + txids = block.get("tx") + # Compute the locator for every transaction in the block and add them to the cache + locators = {compute_locator(txid): txid for txid in txids} + self.locator_cache.cache.update(locators) + self.locator_cache.blocks[block_hash] = locators + logger.debug("Block added to cache", block_hash=block_hash) + # FIXME: change txids for locators? + if len(self.appointments) > 0 and txids: expired_appointments = self.gatekeeper.get_expired_appointments(block["height"]) # Make sure we only try to delete what is on the Watcher (some appointments may have been triggered) expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys())) @@ -196,7 +225,7 @@ class Watcher: expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager ) - valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(txids)) + valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(locators)) triggered_flags = [] appointments_to_delete = [] @@ -246,28 +275,30 @@ class Watcher: if len(self.appointments) != 0: logger.info("No more pending appointments") + # Remove a block from the cache if the cache has reached its maximum size + if self.locator_cache.is_full(): + self.locator_cache.remove_older_block() + # Register the last processed block for the watcher self.db_manager.store_last_block_hash_watcher(block_hash) self.last_known_block = block.get("hash") self.block_queue.task_done() - def get_breaches(self, txids): + def get_breaches(self, locators): """ - Gets a list of channel breaches given the list of transaction ids. + Gets a dictionary of channel breaches given a dictionary of locators. Args: - txids (:obj:`list`): the list of transaction ids included in the last received block. + locators (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of transaction ids. Returns: :obj:`dict`: A dictionary (``locator:txid``) with all the breaches found. An empty dictionary if none are found. """ - potential_locators = {compute_locator(txid): txid for txid in txids} - # Check is any of the tx_ids in the received block is an actual match - intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys()) - breaches = {locator: potential_locators[locator] for locator in intersection} + intersection = set(self.locator_uuid_map.keys()).intersection(locators.keys()) + breaches = {locator: locators[locator] for locator in intersection} if len(breaches) > 0: logger.info("List of breaches", breaches=breaches) From 699da54aa01de9265e82a9c1aec5e25a8057e137 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Tue, 19 May 2020 18:24:40 +0200 Subject: [PATCH 02/21] teos - Adds LocatorCache --- common/errors.py | 1 + teos/api.py | 9 ++- teos/block_processor.py | 15 +++-- teos/watcher.py | 133 ++++++++++++++++++++++++++++++---------- 4 files changed, 121 insertions(+), 37 deletions(-) diff --git a/common/errors.py b/common/errors.py index 786a9d7..c115a31 100644 --- a/common/errors.py +++ b/common/errors.py @@ -7,6 +7,7 @@ APPOINTMENT_FIELD_TOO_SMALL = -5 APPOINTMENT_FIELD_TOO_BIG = -6 APPOINTMENT_WRONG_FIELD = -7 APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS = -8 +APPOINTMENT_ALREADY_TRIGGERED = -9 # Registration errors [-33, -64] REGISTRATION_MISSING_FIELD = -33 diff --git a/teos/api.py b/teos/api.py index 0bcbddb..b1c90ec 100644 --- a/teos/api.py +++ b/teos/api.py @@ -5,7 +5,7 @@ from flask import Flask, request, abort, jsonify from teos import LOG_PREFIX import common.errors as errors from teos.inspector import InspectionFailed -from teos.watcher import AppointmentLimitReached +from teos.watcher import AppointmentLimitReached, AppointmentAlreadyTriggered from teos.gatekeeper import NotEnoughSlots, AuthenticationFailure from common.logger import Logger @@ -192,6 +192,13 @@ class API: rcode = HTTP_SERVICE_UNAVAILABLE response = {"error": "appointment rejected"} + except AppointmentAlreadyTriggered: + rcode = HTTP_BAD_REQUEST + response = { + "error": "appointment rejected. The provided appointment has already been triggered", + "error_code": errors.APPOINTMENT_ALREADY_TRIGGERED, + } + logger.info("Sending response and disconnecting", from_addr="{}".format(remote_addr), response=response) return jsonify(response), rcode diff --git a/teos/block_processor.py b/teos/block_processor.py index 136e9f7..73b8370 100644 --- a/teos/block_processor.py +++ b/teos/block_processor.py @@ -1,4 +1,5 @@ from common.logger import Logger +from common.exceptions import BasicException from teos import LOG_PREFIX from teos.tools import bitcoin_cli @@ -7,6 +8,10 @@ from teos.utils.auth_proxy import JSONRPCException logger = Logger(actor="BlockProcessor", log_name_prefix=LOG_PREFIX) +class InvalidTransactionFormat(BasicException): + """Raised when a transaction is not properly formatted""" + + class BlockProcessor: """ The :class:`BlockProcessor` contains methods related to the blockchain. Most of its methods require communication @@ -89,17 +94,19 @@ class BlockProcessor: raw_tx (:obj:`str`): the hex representation of the transaction. Returns: - :obj:`dict` or :obj:`None`: The decoding of the given ``raw_tx`` if the transaction is well formatted. + :obj:`dict`: The decoding of the given ``raw_tx`` if the transaction is well formatted. - Returns ``None`` otherwise. + Raises: + :obj:`InvalidTransactionFormat`: If the provided ``raw_tx`` has invalid format. """ try: tx = bitcoin_cli(self.btc_connect_params).decoderawtransaction(raw_tx) except JSONRPCException as e: - tx = None - logger.error("Cannot build transaction from decoded data", error=e.error) + msg = "Cannot build transaction from decoded data" + logger.error(msg, error=e.error) + raise InvalidTransactionFormat(msg) return tx diff --git a/teos/watcher.py b/teos/watcher.py index 8d6aeba..f74e405 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -12,6 +12,7 @@ from common.exceptions import InvalidParameter, SignatureError from teos import LOG_PREFIX from teos.cleaner import Cleaner from teos.extended_appointment import ExtendedAppointment +from teos.block_processor import InvalidTransactionFormat logger = Logger(actor="Watcher", log_name_prefix=LOG_PREFIX) @@ -20,6 +21,10 @@ class AppointmentLimitReached(BasicException): """Raised when the tower maximum appointment count has been reached""" +class AppointmentAlreadyTriggered(BasicException): + """Raised an appointment is sent to the Watcher but that same data has already been sent to the Responder""" + + class LocatorCache: def __init__(self, blocks_in_cache): self.cache = dict() @@ -148,25 +153,60 @@ class Watcher: # The user_id needs to be added to the ExtendedAppointment once the former has been authenticated appointment.user_id = user_id - # The uuids are generated as the RIPMED160(locator||user_pubkey). + # The uuids are generated as the RIPEMD160(locator||user_pubkey). # If an appointment is requested by the user the uuid can be recomputed and queried straightaway (no maps). uuid = hash_160("{}{}".format(appointment.locator, user_id)) # Add the appointment to the Gatekeeper available_slots = self.gatekeeper.add_update_appointment(user_id, uuid, appointment) - self.appointments[uuid] = appointment.get_summary() + # Appointments that were triggered in blocks hold in the cache + if appointment.locator in self.locator_cache.cache: + # If this is a copy of an appointment we've already reacted to, the new appointment is rejected. + if uuid in self.responder.trackers: + message = "Appointment already in Responder" + logger.info(message) + raise AppointmentAlreadyTriggered(message) - if appointment.locator in self.locator_uuid_map: - # If the uuid is already in the map it means this is an update. - if uuid not in self.locator_uuid_map[appointment.locator]: - self.locator_uuid_map[appointment.locator].append(uuid) + try: + breach = self.filter_breach(uuid, appointment, self.locator_cache.cache[appointment.locator]) + receipt = self.responder.handle_breach( + uuid, + breach["locator"], + breach["dispute_txid"], + breach["penalty_txid"], + breach["penalty_rawtx"], + self.appointments[uuid].get("user_id"), + self.last_known_block, + ) + + # At this point the appointment is accepted but data is only kept if it goes through the Responder + # otherwise it is dropped. + if receipt.delivered: + self.db_manager.store_watcher_appointment(uuid, appointment.to_dict()) + self.db_manager.create_append_locator_map(appointment.locator, uuid) + self.db_manager.create_triggered_appointment_flag(uuid) + + except (EncryptionError, InvalidTransactionFormat): + # If data inside the encrypted blob is invalid, the appointment is accepted but the data is dropped. + # (same as with data that bounces in the Responder). This reduces the appointment slot count so it + # could be used to discourage user misbehaviour. + pass + + # Regular appointments that have not been triggered (or not recently at least) else: - # Otherwise two users have sent an appointment with the same locator, so we need to store both. - self.locator_uuid_map[appointment.locator] = [uuid] + self.appointments[uuid] = appointment.get_summary() - self.db_manager.store_watcher_appointment(uuid, appointment.to_dict()) - self.db_manager.create_append_locator_map(appointment.locator, uuid) + if appointment.locator in self.locator_uuid_map: + # If the uuid is already in the map it means this is an update. + if uuid not in self.locator_uuid_map[appointment.locator]: + self.locator_uuid_map[appointment.locator].append(uuid) + else: + # Otherwise two users have sent an appointment with the same locator, so we need to store both. + self.locator_uuid_map[appointment.locator] = [uuid] + + self.db_manager.store_watcher_appointment(uuid, appointment.to_dict()) + self.db_manager.create_append_locator_map(appointment.locator, uuid) try: signature = Cryptographer.sign(appointment.serialize(), self.signing_key) @@ -198,6 +238,17 @@ class Watcher: self.last_known_block = self.block_processor.get_best_block_hash() self.db_manager.store_last_block_hash_watcher(self.last_known_block) + # Initialise the locator cache with the last ``cache_size`` blocks. + target_block_hash = self.last_known_block + for _ in range(self.locator_cache.cache_size): + target_block = self.block_processor.get_block(target_block_hash) + locators = {compute_locator(txid): txid for txid in target_block.get("tx")} + self.locator_cache.cache.update(locators) + self.locator_cache.blocks[target_block_hash] = locators + target_block_hash = target_block.get("previousblockhash") + + self.locator_cache.blocks = OrderedDict(reversed((list(self.locator_cache.blocks.items())))) + while True: block_hash = self.block_queue.get() block = self.block_processor.get_block(block_hash) @@ -210,8 +261,7 @@ class Watcher: self.locator_cache.blocks[block_hash] = locators logger.debug("Block added to cache", block_hash=block_hash) - # FIXME: change txids for locators? - if len(self.appointments) > 0 and txids: + if len(self.appointments) > 0 and locators: expired_appointments = self.gatekeeper.get_expired_appointments(block["height"]) # Make sure we only try to delete what is on the Watcher (some appointments may have been triggered) expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys())) @@ -279,7 +329,7 @@ class Watcher: if self.locator_cache.is_full(): self.locator_cache.remove_older_block() - # Register the last processed block for the watcher + # Register the last processed block for the Watcher self.db_manager.store_last_block_hash_watcher(block_hash) self.last_known_block = block.get("hash") self.block_queue.task_done() @@ -308,11 +358,37 @@ class Watcher: return breaches + def filter_breach(self, uuid, appointment, dispute_txid): + try: + penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid) + penalty_tx = self.block_processor.decode_raw_transaction(penalty_rawtx) + + except EncryptionError as e: + logger.info("Transaction cannot be decrypted", uuid=uuid) + raise e + + except InvalidTransactionFormat as e: + logger.info("The breach contained an invalid transaction") + raise e + + valid_breach = { + "locator": appointment.locator, + "dispute_txid": dispute_txid, + "penalty_txid": penalty_tx.get("txid"), + "penalty_rawtx": penalty_rawtx, + } + + logger.info( + "Breach found for locator", locator=appointment.locator, uuid=uuid, penalty_txid=penalty_tx.get("txid") + ) + + return valid_breach + def filter_breaches(self, breaches): """ Filters the valid from the invalid channel breaches. - The :obj:`Watcher` cannot if a given ``encrypted_blob`` contains a valid transaction until a breach if seen. + The :obj:`Watcher` cannot know if an ``encrypted_blob`` contains a valid transaction until a breach is seen. Blobs that contain arbitrary data are dropped and not sent to the :obj:`Responder `. Args: @@ -337,30 +413,23 @@ class Watcher: if appointment.encrypted_blob in decrypted_blobs: penalty_tx, penalty_rawtx = decrypted_blobs[appointment.encrypted_blob] - - else: - try: - penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid) - - except EncryptionError: - penalty_rawtx = None - - penalty_tx = self.block_processor.decode_raw_transaction(penalty_rawtx) - decrypted_blobs[appointment.encrypted_blob] = (penalty_tx, penalty_rawtx) - - if penalty_tx is not None: valid_breaches[uuid] = { - "locator": locator, + "locator": appointment.locator, "dispute_txid": dispute_txid, "penalty_txid": penalty_tx.get("txid"), "penalty_rawtx": penalty_rawtx, } - logger.info( - "Breach found for locator", locator=locator, uuid=uuid, penalty_txid=penalty_tx.get("txid") - ) - else: - invalid_breaches.append(uuid) + try: + valid_breach = self.filter_breach(uuid, appointment, dispute_txid) + valid_breaches[uuid] = valid_breach + decrypted_blobs[appointment.encrypted_blob] = ( + valid_breach["penalty_txid"], + valid_breach["penalty_rawtx"], + ) + + except (EncryptionError, InvalidTransactionFormat): + invalid_breaches.append(uuid) return valid_breaches, invalid_breaches From 17128edadaed2db77be222592db0ca5ab93f5998 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Thu, 21 May 2020 16:40:36 +0200 Subject: [PATCH 03/21] teos - Renames get_locator to check_locator, adds LocatorCache.init and docstrings --- teos/watcher.py | 100 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 77 insertions(+), 23 deletions(-) diff --git a/teos/watcher.py b/teos/watcher.py index f74e405..c699d31 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -26,15 +26,58 @@ class AppointmentAlreadyTriggered(BasicException): class LocatorCache: + """ + The LocatorCache keeps the data about the last ``cache_size`` blocks around so appointments can be checked against + it. The data is indexed by locator and it's mainly built during the normal ``Watcher`` operation so no extra steps + are normally needed. + + Args: + blocks_in_cache (:obj:`int`): the numbers of blocks to keep in the cache. + + Attributes: + cache (:obj:`dict`): a dictionary of ``locator:dispute_txid`` pair that received appointments are checked + against. + blocks (:obj:`OrderedDict`): An ordered dictionary of the last ``blocks_in_cache`` blocks (block_hash:locators). + Used to keep track of what data belongs to what block, so data can be pruned accordingly. Also needed to + rebuilt the cache in case of a reorgs. + cache_size (:obj:`int`): the size of the cache in blocks. + """ + def __init__(self, blocks_in_cache): self.cache = dict() self.blocks = OrderedDict() self.cache_size = blocks_in_cache + def init(self, last_known_block, block_processor): + """ + Sets the initial state of the block cache. + + Args: + last_known_block (:obj:`str`): the last known block of the ``Watcher``. + block_processor (:obj:`teos.block_processor.BlockProcessor`): a ``BlockProcessor`` instance. + """ + + # This is needed as a separate method from __init__ since it has to be initialized right before start watching. + # Not doing so implies store temporary variables in the Watcher and initialising the cache as None. + target_block_hash = last_known_block + for _ in range(self.cache_size): + target_block = block_processor.get_block(target_block_hash) + + # In some setups, like regtest, it could be the case that there are no enough previous blocks. + if target_block: + locators = {compute_locator(txid): txid for txid in target_block.get("tx")} + self.cache.update(locators) + self.blocks[target_block_hash] = locators + target_block_hash = target_block.get("previousblockhash") + + self.blocks = OrderedDict(reversed((list(self.blocks.items())))) + def is_full(self): + """ Returns whether the cache is full or not """ return len(self.blocks) > self.cache_size def remove_older_block(self): + """ Removes the older block from the cache """ block_hash, locator_map = self.blocks.popitem(last=False) for locator, txid in locator_map.items(): del self.cache[locator] @@ -169,14 +212,14 @@ class Watcher: raise AppointmentAlreadyTriggered(message) try: - breach = self.filter_breach(uuid, appointment, self.locator_cache.cache[appointment.locator]) + breach = self.check_breach(uuid, appointment, self.locator_cache.cache[appointment.locator]) receipt = self.responder.handle_breach( uuid, breach["locator"], breach["dispute_txid"], breach["penalty_txid"], breach["penalty_rawtx"], - self.appointments[uuid].get("user_id"), + user_id, self.last_known_block, ) @@ -239,15 +282,7 @@ class Watcher: self.db_manager.store_last_block_hash_watcher(self.last_known_block) # Initialise the locator cache with the last ``cache_size`` blocks. - target_block_hash = self.last_known_block - for _ in range(self.locator_cache.cache_size): - target_block = self.block_processor.get_block(target_block_hash) - locators = {compute_locator(txid): txid for txid in target_block.get("tx")} - self.locator_cache.cache.update(locators) - self.locator_cache.blocks[target_block_hash] = locators - target_block_hash = target_block.get("previousblockhash") - - self.locator_cache.blocks = OrderedDict(reversed((list(self.locator_cache.blocks.items())))) + self.locator_cache.init(self.last_known_block, self.block_processor) while True: block_hash = self.block_queue.get() @@ -256,12 +291,12 @@ class Watcher: txids = block.get("tx") # Compute the locator for every transaction in the block and add them to the cache - locators = {compute_locator(txid): txid for txid in txids} - self.locator_cache.cache.update(locators) - self.locator_cache.blocks[block_hash] = locators + locators_txid_map = {compute_locator(txid): txid for txid in txids} + self.locator_cache.cache.update(locators_txid_map) + self.locator_cache.blocks[block_hash] = locators_txid_map logger.debug("Block added to cache", block_hash=block_hash) - if len(self.appointments) > 0 and locators: + if len(self.appointments) > 0 and locators_txid_map: expired_appointments = self.gatekeeper.get_expired_appointments(block["height"]) # Make sure we only try to delete what is on the Watcher (some appointments may have been triggered) expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys())) @@ -275,7 +310,7 @@ class Watcher: expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager ) - valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(locators)) + valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(locators_txid_map)) triggered_flags = [] appointments_to_delete = [] @@ -334,12 +369,13 @@ class Watcher: self.last_known_block = block.get("hash") self.block_queue.task_done() - def get_breaches(self, locators): + def get_breaches(self, locators_txid_map): """ - Gets a dictionary of channel breaches given a dictionary of locators. + Gets a dictionary of channel breaches given a map of locator:dispute_txid. Args: - locators (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of transaction ids. + locators_txid_map (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of + transaction ids. Returns: :obj:`dict`: A dictionary (``locator:txid``) with all the breaches found. An empty dictionary if none are @@ -347,8 +383,8 @@ class Watcher: """ # Check is any of the tx_ids in the received block is an actual match - intersection = set(self.locator_uuid_map.keys()).intersection(locators.keys()) - breaches = {locator: locators[locator] for locator in intersection} + intersection = set(self.locator_uuid_map.keys()).intersection(locators_txid_map.keys()) + breaches = {locator: locators_txid_map[locator] for locator in intersection} if len(breaches) > 0: logger.info("List of breaches", breaches=breaches) @@ -358,7 +394,25 @@ class Watcher: return breaches - def filter_breach(self, uuid, appointment, dispute_txid): + def check_breach(self, uuid, appointment, dispute_txid): + """ + Checks if a breach is valid. Valid breaches should decrypt to a valid transaction. + + Args: + uuid (:obj:`str`): the uuid of the appointment that was triggered by the breach. + appointment (:obj:`teos.extended_appointment.ExtendedAppointment`): the appointment data. + dispute_txid (:obj:`str`): the id of the transaction that triggered the breach. + + Returns: + :obj:`dic`: The breach data in a dictionary (locator, dispute_txid, penalty_txid, penalty_rawtx), if the + breach is correct. + + Raises: + :obj:`EncryptionError`: If the encrypted blob from the provided appointment cannot be decrypted with the + key derived from the breach transaction id. + :obj:`InvalidTransactionFormat`: If the decrypted data does not have a valid transaction format. + """ + try: penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid) penalty_tx = self.block_processor.decode_raw_transaction(penalty_rawtx) @@ -422,7 +476,7 @@ class Watcher: else: try: - valid_breach = self.filter_breach(uuid, appointment, dispute_txid) + valid_breach = self.check_breach(uuid, appointment, dispute_txid) valid_breaches[uuid] = valid_breach decrypted_blobs[appointment.encrypted_blob] = ( valid_breach["penalty_txid"], From 3228eeac6b336f981a3995caf599a040e371cc74 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Thu, 21 May 2020 16:41:44 +0200 Subject: [PATCH 04/21] tests - Adds LocatorCache unittests and updates existing ones to match --- test/teos/unit/test_api.py | 90 +++++++- test/teos/unit/test_block_processor.py | 5 +- test/teos/unit/test_builder.py | 3 + test/teos/unit/test_watcher.py | 307 +++++++++++++++++++++++-- 4 files changed, 376 insertions(+), 29 deletions(-) diff --git a/test/teos/unit/test_api.py b/test/teos/unit/test_api.py index daac059..ff9ff0f 100644 --- a/test/teos/unit/test_api.py +++ b/test/teos/unit/test_api.py @@ -4,13 +4,21 @@ from binascii import hexlify from teos.api import API import common.errors as errors -from teos.watcher import Watcher from teos.inspector import Inspector from teos.gatekeeper import UserInfo from teos.appointments_dbm import AppointmentsDBM from teos.responder import Responder, TransactionTracker +from teos.extended_appointment import ExtendedAppointment +from teos.watcher import Watcher, AppointmentAlreadyTriggered -from test.teos.unit.conftest import get_random_value_hex, generate_dummy_appointment, generate_keypair, get_config +from test.teos.unit.conftest import ( + get_random_value_hex, + generate_dummy_appointment, + generate_keypair, + get_config, + create_dummy_transaction, + compute_locator, +) from common.cryptographer import Cryptographer, hash_160 from common.constants import ( @@ -60,7 +68,15 @@ def api(db_manager, carrier, block_processor, gatekeeper, run_bitcoind): sk, pk = generate_keypair() responder = Responder(db_manager, gatekeeper, carrier, block_processor) - watcher = Watcher(db_manager, gatekeeper, block_processor, responder, sk.to_der(), MAX_APPOINTMENTS) + watcher = Watcher( + db_manager, + gatekeeper, + block_processor, + responder, + sk.to_der(), + MAX_APPOINTMENTS, + config.get("BLOCK_CACHE_SIZE"), + ) inspector = Inspector(block_processor, config.get("MIN_TO_SELF_DELAY")) api = API(config.get("API_HOST"), config.get("API_PORT"), inspector, watcher) @@ -323,6 +339,74 @@ def test_add_appointment_update_smaller(api, client, appointment): assert r.status_code == HTTP_OK and r.json.get("available_slots") == 1 +def test_add_appointment_in_cache(api, client): + api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0) + appointment, dispute_tx = generate_dummy_appointment() + appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) + + # Add the data to the cache + dispute_txid = api.watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid") + api.watcher.locator_cache.cache[appointment.locator] = dispute_txid + + r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) + assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + + # Trying to add it again should fail, since it is already in the Responder + r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) + assert r.status_code == HTTP_BAD_REQUEST and r.json.get("error_code") == errors.APPOINTMENT_ALREADY_TRIGGERED + + # The appointment should simply accepted if the data is not in the cache, since it cannot be triggered again + del api.watcher.locator_cache.cache[appointment.locator] + r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) + assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + + +def test_add_appointment_in_cache_cannot_decrypt(api, client): + api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0) + appointment, dispute_tx = generate_dummy_appointment() + appointment.encrypted_blob = appointment.encrypted_blob[::-1] + appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) + + # Add the data to the cache + dispute_txid = api.watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid") + api.watcher.locator_cache.cache[dispute_txid] = appointment.locator + + # The appointment should be accepted + r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) + assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + + +def test_add_appointment_in_cache_invalid_transaction(api, client): + api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0) + + # We need to create the appointment manually + dispute_tx = create_dummy_transaction() + dispute_txid = dispute_tx.tx_id.hex() + penalty_tx = create_dummy_transaction(dispute_txid) + + locator = compute_locator(dispute_txid) + dummy_appointment_data = {"tx": penalty_tx.hex(), "tx_id": dispute_txid, "to_self_delay": 20} + encrypted_blob = Cryptographer.encrypt(dummy_appointment_data.get("tx")[::-1], dummy_appointment_data.get("tx_id")) + + appointment_data = { + "locator": locator, + "to_self_delay": dummy_appointment_data.get("to_self_delay"), + "encrypted_blob": encrypted_blob, + "user_id": get_random_value_hex(16), + } + + appointment = ExtendedAppointment.from_dict(appointment_data) + api.watcher.locator_cache.cache[appointment.locator] = dispute_tx.tx_id.hex() + appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) + + # Add the data to the cache + api.watcher.locator_cache.cache[dispute_txid] = appointment.locator + + # The appointment should be accepted + r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) + assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + + def test_add_too_many_appointment(api, client): # Give slots to the user api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=200, subscription_expiry=0) diff --git a/test/teos/unit/test_block_processor.py b/test/teos/unit/test_block_processor.py index f61082c..4a0bf88 100644 --- a/test/teos/unit/test_block_processor.py +++ b/test/teos/unit/test_block_processor.py @@ -1,3 +1,5 @@ +import pytest +from teos.watcher import InvalidTransactionFormat from test.teos.unit.conftest import get_random_value_hex, generate_block, generate_blocks, fork @@ -46,7 +48,8 @@ def test_decode_raw_transaction(block_processor): def test_decode_raw_transaction_invalid(block_processor): # Same but with an invalid one - assert block_processor.decode_raw_transaction(hex_tx[::-1]) is None + with pytest.raises(InvalidTransactionFormat): + block_processor.decode_raw_transaction(hex_tx[::-1]) def test_get_missed_blocks(block_processor): diff --git a/test/teos/unit/test_builder.py b/test/teos/unit/test_builder.py index f804e54..81f7b5c 100644 --- a/test/teos/unit/test_builder.py +++ b/test/teos/unit/test_builder.py @@ -102,6 +102,7 @@ def test_update_states_empty_list(db_manager, gatekeeper, carrier, block_process responder=Responder(db_manager, gatekeeper, carrier, block_processor), sk_der=generate_keypair()[0].to_der(), max_appointments=config.get("MAX_APPOINTMENTS"), + blocks_in_cache=config.get("BLOCK_CACHE_SIZE"), ) missed_blocks_watcher = [] @@ -123,6 +124,7 @@ def test_update_states_responder_misses_more(run_bitcoind, db_manager, gatekeepe responder=Responder(db_manager, gatekeeper, carrier, block_processor), sk_der=generate_keypair()[0].to_der(), max_appointments=config.get("MAX_APPOINTMENTS"), + blocks_in_cache=config.get("BLOCK_CACHE_SIZE"), ) blocks = [] @@ -148,6 +150,7 @@ def test_update_states_watcher_misses_more(db_manager, gatekeeper, carrier, bloc responder=Responder(db_manager, gatekeeper, carrier, block_processor), sk_der=generate_keypair()[0].to_der(), max_appointments=config.get("MAX_APPOINTMENTS"), + blocks_in_cache=config.get("BLOCK_CACHE_SIZE"), ) blocks = [] diff --git a/test/teos/unit/test_watcher.py b/test/teos/unit/test_watcher.py index 25f84e5..f2e0d3b 100644 --- a/test/teos/unit/test_watcher.py +++ b/test/teos/unit/test_watcher.py @@ -9,22 +9,32 @@ from teos.tools import bitcoin_cli from teos.responder import Responder from teos.gatekeeper import UserInfo from teos.chain_monitor import ChainMonitor -from teos.appointments_dbm import AppointmentsDBM from teos.block_processor import BlockProcessor -from teos.watcher import Watcher, AppointmentLimitReached +from teos.appointments_dbm import AppointmentsDBM +from teos.extended_appointment import ExtendedAppointment from teos.gatekeeper import Gatekeeper, AuthenticationFailure, NotEnoughSlots +from teos.watcher import ( + Watcher, + AppointmentLimitReached, + LocatorCache, + EncryptionError, + InvalidTransactionFormat, + AppointmentAlreadyTriggered, +) from common.tools import compute_locator from common.cryptographer import Cryptographer from test.teos.unit.conftest import ( generate_blocks_w_delay, + generate_blocks, generate_dummy_appointment, get_random_value_hex, generate_keypair, get_config, bitcoind_feed_params, bitcoind_connect_params, + create_dummy_transaction, ) APPOINTMENTS = 5 @@ -55,7 +65,15 @@ def watcher(db_manager, gatekeeper): carrier = Carrier(bitcoind_connect_params) responder = Responder(db_manager, gatekeeper, carrier, block_processor) - watcher = Watcher(db_manager, gatekeeper, block_processor, responder, signing_key.to_der(), MAX_APPOINTMENTS) + watcher = Watcher( + db_manager, + gatekeeper, + block_processor, + responder, + signing_key.to_der(), + MAX_APPOINTMENTS, + config.get("BLOCK_CACHE_SIZE"), + ) chain_monitor = ChainMonitor( watcher.block_queue, watcher.responder.block_queue, block_processor, bitcoind_feed_params @@ -91,7 +109,7 @@ def create_appointments(n): return appointments, locator_uuid_map, dispute_txs -def test_init(run_bitcoind, watcher): +def test_watcher_init(watcher, run_bitcoind): assert isinstance(watcher.appointments, dict) and len(watcher.appointments) == 0 assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0 assert watcher.block_queue.empty() @@ -101,6 +119,76 @@ def test_init(run_bitcoind, watcher): assert isinstance(watcher.responder, Responder) assert isinstance(watcher.max_appointments, int) assert isinstance(watcher.signing_key, PrivateKey) + assert isinstance(watcher.locator_cache, LocatorCache) + + +def test_locator_cache_init_not_enough_blocks(watcher): + # Make sure there are at least 3 blocks + block_count = watcher.block_processor.get_block_count() + if block_count < 3: + generate_blocks_w_delay(3 - block_count) + + # Simulate there are only 3 blocks + third_block_hash = bitcoin_cli(bitcoind_connect_params).getblockhash(2) + watcher.locator_cache.init(third_block_hash, watcher.block_processor) + assert len(watcher.locator_cache.blocks) == 3 + for k, v in watcher.locator_cache.blocks.items(): + assert watcher.block_processor.get_block(k) + + +def test_locator_cache_init(watcher): + # Empty cache + watcher.locator_cache = LocatorCache(watcher.locator_cache.cache_size) + + # Generate enough blocks so the cache can start full + generate_blocks(2 * watcher.locator_cache.cache_size) + + watcher.locator_cache.init(watcher.block_processor.get_best_block_hash(), watcher.block_processor) + assert len(watcher.locator_cache.blocks) == watcher.locator_cache.cache_size + for k, v in watcher.locator_cache.blocks.items(): + assert watcher.block_processor.get_block(k) + + +def test_locator_cache_is_full(watcher): + # Empty cache + watcher.locator_cache = LocatorCache(watcher.locator_cache.cache_size) + + for _ in range(watcher.locator_cache.cache_size): + watcher.locator_cache.blocks[uuid4().hex] = 0 + assert not watcher.locator_cache.is_full() + + watcher.locator_cache.blocks[uuid4().hex] = 0 + assert watcher.locator_cache.is_full() + + # Remove the data + watcher.locator_cache = LocatorCache(watcher.locator_cache.cache_size) + + +def test_locator_remove_older_block(watcher): + # Add some blocks to the cache if it is empty + if not len(watcher.locator_cache.blocks): + for _ in range(watcher.locator_cache.cache_size): + txid = get_random_value_hex(32) + locator = txid[:16] + watcher.locator_cache.blocks[get_random_value_hex(32)] = {locator: txid} + watcher.locator_cache.cache[locator] = txid + + blocks_in_cache = watcher.locator_cache.blocks + oldest_block_hash = list(blocks_in_cache.keys())[0] + oldest_block_data = blocks_in_cache.get(oldest_block_hash) + rest_of_blocks = list(blocks_in_cache.keys())[1:] + watcher.locator_cache.remove_older_block() + + # Oldest block data is not in the cache + assert oldest_block_hash not in watcher.locator_cache.blocks + for locator in oldest_block_data: + assert locator not in watcher.locator_cache.cache + + # The rest of data is in the cache + assert set(rest_of_blocks).issubset(watcher.locator_cache.blocks) + for block_hash in rest_of_blocks: + for locator in watcher.locator_cache.blocks[block_hash]: + assert locator in watcher.locator_cache.cache def test_add_appointment_non_registered(watcher): @@ -171,6 +259,103 @@ def test_add_appointment(watcher): assert len(watcher.locator_uuid_map[appointment.locator]) == 2 +# WIP: ADD appointment with the different uses of the cache +def test_add_appointment_in_cache(watcher): + # Generate an appointment and add the dispute txid to the cache + user_sk, user_pk = generate_keypair() + user_id = Cryptographer.get_compressed_pk(user_pk) + watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=10) + + appointment, dispute_tx = generate_dummy_appointment() + dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid") + watcher.locator_cache.cache[appointment.locator] = dispute_txid + + # Try to add the appointment + response = watcher.add_appointment(appointment, Cryptographer.sign(appointment.serialize(), user_sk)) + + # The appointment is accepted but it's not in the Watcher + assert ( + response + and response.get("locator") == appointment.locator + and Cryptographer.get_compressed_pk(watcher.signing_key.public_key) + == Cryptographer.get_compressed_pk(Cryptographer.recover_pk(appointment.serialize(), response.get("signature"))) + ) + assert not watcher.locator_uuid_map.get(appointment.locator) + + # It went to the Responder straightaway + assert appointment.locator in [tracker.get("locator") for tracker in watcher.responder.trackers.values()] + + # Trying to send it again should fail since it is already in the Responder + with pytest.raises(AppointmentAlreadyTriggered): + watcher.add_appointment(appointment, Cryptographer.sign(appointment.serialize(), user_sk)) + + +def test_add_appointment_in_cache_invalid_blob(watcher): + # Generate an appointment with an invalid transaction and add the dispute txid to the cache + user_sk, user_pk = generate_keypair() + user_id = Cryptographer.get_compressed_pk(user_pk) + watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=10) + + # We need to create the appointment manually + dispute_tx = create_dummy_transaction() + dispute_txid = dispute_tx.tx_id.hex() + penalty_tx = create_dummy_transaction(dispute_txid) + + locator = compute_locator(dispute_txid) + dummy_appointment_data = {"tx": penalty_tx.hex(), "tx_id": dispute_txid, "to_self_delay": 20} + encrypted_blob = Cryptographer.encrypt(dummy_appointment_data.get("tx")[::-1], dummy_appointment_data.get("tx_id")) + + appointment_data = { + "locator": locator, + "to_self_delay": dummy_appointment_data.get("to_self_delay"), + "encrypted_blob": encrypted_blob, + "user_id": get_random_value_hex(16), + } + + appointment = ExtendedAppointment.from_dict(appointment_data) + watcher.locator_cache.cache[appointment.locator] = dispute_tx.tx_id.hex() + + # Try to add the appointment + response = watcher.add_appointment(appointment, Cryptographer.sign(appointment.serialize(), user_sk)) + + # The appointment is accepted but dropped (same as an invalid appointment that gets triggered) + assert ( + response + and response.get("locator") == appointment.locator + and Cryptographer.get_compressed_pk(watcher.signing_key.public_key) + == Cryptographer.get_compressed_pk(Cryptographer.recover_pk(appointment.serialize(), response.get("signature"))) + ) + + assert not watcher.locator_uuid_map.get(appointment.locator) + assert appointment.locator not in [tracker.get("locator") for tracker in watcher.responder.trackers.values()] + + +def test_add_appointment_in_cache_invalid_transaction(watcher): + # Generate an appointment that cannot be decrypted and add the dispute txid to the cache + user_sk, user_pk = generate_keypair() + user_id = Cryptographer.get_compressed_pk(user_pk) + watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=10) + + appointment, dispute_tx = generate_dummy_appointment() + appointment.encrypted_blob = appointment.encrypted_blob[::-1] + dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid") + watcher.locator_cache.cache[appointment.locator] = dispute_txid + + # Try to add the appointment + response = watcher.add_appointment(appointment, Cryptographer.sign(appointment.serialize(), user_sk)) + + # The appointment is accepted but dropped (same as an invalid appointment that gets triggered) + assert ( + response + and response.get("locator") == appointment.locator + and Cryptographer.get_compressed_pk(watcher.signing_key.public_key) + == Cryptographer.get_compressed_pk(Cryptographer.recover_pk(appointment.serialize(), response.get("signature"))) + ) + + assert not watcher.locator_uuid_map.get(appointment.locator) + assert appointment.locator not in [tracker.get("locator") for tracker in watcher.responder.trackers.values()] + + def test_add_too_many_appointments(watcher): # Simulate the user is registered user_sk, user_pk = generate_keypair() @@ -200,6 +385,7 @@ def test_add_too_many_appointments(watcher): def test_do_watch(watcher, temp_db_manager): watcher.db_manager = temp_db_manager + watcher.locator_cache = LocatorCache(watcher.locator_cache.cache_size) # We will wipe all the previous data and add 5 appointments appointments, locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS) @@ -246,9 +432,37 @@ def test_do_watch(watcher, temp_db_manager): # FIXME: We should also add cases where the transactions are invalid. bitcoind_mock needs to be extended for this. +def test_do_watch_cache_update(watcher): + # Test that data is properly added/remove to/from the cache + + for _ in range(10): + blocks_in_cache = watcher.locator_cache.blocks + oldest_block_hash = list(blocks_in_cache.keys())[0] + oldest_block_data = blocks_in_cache.get(oldest_block_hash) + rest_of_blocks = list(blocks_in_cache.keys())[1:] + assert len(watcher.locator_cache.blocks) == watcher.locator_cache.cache_size + + generate_blocks_w_delay(1) + + # The last oldest block is gone but the rest remain + assert oldest_block_hash not in watcher.locator_cache.blocks + assert set(rest_of_blocks).issubset(watcher.locator_cache.blocks.keys()) + + # The locators of the older block are gone but the rest remain + for locator in oldest_block_data: + assert locator not in watcher.locator_cache.cache + for block_hash in rest_of_blocks: + for locator in watcher.locator_cache.blocks[block_hash]: + assert locator in watcher.locator_cache.cache + + # The size of the cache is the same + assert len(watcher.locator_cache.blocks) == watcher.locator_cache.cache_size + + def test_get_breaches(watcher, txids, locator_uuid_map): watcher.locator_uuid_map = locator_uuid_map - potential_breaches = watcher.get_breaches(txids) + locators_txid_map = {compute_locator(txid): txid for txid in txids} + potential_breaches = watcher.get_breaches(locators_txid_map) # All the txids must breach assert locator_uuid_map.keys() == potential_breaches.keys() @@ -258,38 +472,54 @@ def test_get_breaches_random_data(watcher, locator_uuid_map): # The likelihood of finding a potential breach with random data should be negligible watcher.locator_uuid_map = locator_uuid_map txids = [get_random_value_hex(32) for _ in range(TEST_SET_SIZE)] + locators_txid_map = {compute_locator(txid): txid for txid in txids} - potential_breaches = watcher.get_breaches(txids) + potential_breaches = watcher.get_breaches(locators_txid_map) # None of the txids should breach assert len(potential_breaches) == 0 -def test_filter_breaches_random_data(watcher): - appointments = {} - locator_uuid_map = {} - breaches = {} +def test_check_breach(watcher): + # A breach will be flagged as valid only if the encrypted blob can be properly decrypted and the resulting data + # matches a transaction format. + uuid = uuid4().hex + appointment, dispute_tx = generate_dummy_appointment() + dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid") - for i in range(TEST_SET_SIZE): - dummy_appointment, _ = generate_dummy_appointment() - uuid = uuid4().hex - appointments[uuid] = {"locator": dummy_appointment.locator, "user_id": dummy_appointment.user_id} - watcher.db_manager.store_watcher_appointment(uuid, dummy_appointment.to_dict()) - watcher.db_manager.create_append_locator_map(dummy_appointment.locator, uuid) + valid_breach = watcher.check_breach(uuid, appointment, dispute_txid) + assert ( + valid_breach + and valid_breach.get("locator") == appointment.locator + and valid_breach.get("dispute_txid") == dispute_txid + ) - locator_uuid_map[dummy_appointment.locator] = [uuid] - if i % 2: - dispute_txid = get_random_value_hex(32) - breaches[dummy_appointment.locator] = dispute_txid +def test_check_breach_random_data(watcher): + # If a breach triggers an appointment with random data as encrypted blob, the check should fail. + uuid = uuid4().hex + appointment, dispute_tx = generate_dummy_appointment() + dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid") - watcher.locator_uuid_map = locator_uuid_map - watcher.appointments = appointments + # Set the blob to something "random" + appointment.encrypted_blob = get_random_value_hex(200) - valid_breaches, invalid_breaches = watcher.filter_breaches(breaches) + with pytest.raises(EncryptionError): + watcher.check_breach(uuid, appointment, dispute_txid) - # We have "triggered" TEST_SET_SIZE/2 breaches, all of them invalid. - assert len(valid_breaches) == 0 and len(invalid_breaches) == TEST_SET_SIZE / 2 + +def test_check_breach_invalid_transaction(watcher): + # If the breach triggers an appointment with data that can be decrypted but does not match a transaction, it should + # fail + uuid = uuid4().hex + appointment, dispute_tx = generate_dummy_appointment() + dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid") + + # Set the blob to something "random" + appointment.encrypted_blob = Cryptographer.encrypt(get_random_value_hex(200), dispute_txid) + + with pytest.raises(InvalidTransactionFormat): + watcher.check_breach(uuid, appointment, dispute_txid) def test_filter_valid_breaches(watcher): @@ -323,3 +553,30 @@ def test_filter_valid_breaches(watcher): # We have "triggered" a single breach and it was valid. assert len(invalid_breaches) == 0 and len(valid_breaches) == 1 + + +def test_filter_breaches_random_data(watcher): + appointments = {} + locator_uuid_map = {} + breaches = {} + + for i in range(TEST_SET_SIZE): + dummy_appointment, _ = generate_dummy_appointment() + uuid = uuid4().hex + appointments[uuid] = {"locator": dummy_appointment.locator, "user_id": dummy_appointment.user_id} + watcher.db_manager.store_watcher_appointment(uuid, dummy_appointment.to_dict()) + watcher.db_manager.create_append_locator_map(dummy_appointment.locator, uuid) + + locator_uuid_map[dummy_appointment.locator] = [uuid] + + if i % 2: + dispute_txid = get_random_value_hex(32) + breaches[dummy_appointment.locator] = dispute_txid + + watcher.locator_uuid_map = locator_uuid_map + watcher.appointments = appointments + + valid_breaches, invalid_breaches = watcher.filter_breaches(breaches) + + # We have "triggered" TEST_SET_SIZE/2 breaches, all of them invalid. + assert len(valid_breaches) == 0 and len(invalid_breaches) == TEST_SET_SIZE / 2 From 708f2e1f3bc37b2e12a7c8ba0a4cc4f6bc3b9bb3 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Thu, 21 May 2020 19:37:33 +0200 Subject: [PATCH 05/21] teos - Changes the behavior for updates of already triggered appointments If an update for an already triggered appointment is received, it will be drop no matter if the locator is in the cache or not. This tries to prevent inconsistencies between the Watcher and the Responder, specially in the case of a reorg when data may have to flow backwards. --- teos/watcher.py | 12 ++++++------ test/teos/unit/test_api.py | 4 ++-- test/teos/unit/test_watcher.py | 1 - 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/teos/watcher.py b/teos/watcher.py index c699d31..1727ef8 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -200,17 +200,17 @@ class Watcher: # If an appointment is requested by the user the uuid can be recomputed and queried straightaway (no maps). uuid = hash_160("{}{}".format(appointment.locator, user_id)) + # If this is a copy of an appointment we've already reacted to, the new appointment is rejected. + if uuid in self.responder.trackers: + message = "Appointment already in Responder" + logger.info(message) + raise AppointmentAlreadyTriggered(message) + # Add the appointment to the Gatekeeper available_slots = self.gatekeeper.add_update_appointment(user_id, uuid, appointment) # Appointments that were triggered in blocks hold in the cache if appointment.locator in self.locator_cache.cache: - # If this is a copy of an appointment we've already reacted to, the new appointment is rejected. - if uuid in self.responder.trackers: - message = "Appointment already in Responder" - logger.info(message) - raise AppointmentAlreadyTriggered(message) - try: breach = self.check_breach(uuid, appointment, self.locator_cache.cache[appointment.locator]) receipt = self.responder.handle_breach( diff --git a/test/teos/unit/test_api.py b/test/teos/unit/test_api.py index ff9ff0f..38732af 100644 --- a/test/teos/unit/test_api.py +++ b/test/teos/unit/test_api.py @@ -355,10 +355,10 @@ def test_add_appointment_in_cache(api, client): r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) assert r.status_code == HTTP_BAD_REQUEST and r.json.get("error_code") == errors.APPOINTMENT_ALREADY_TRIGGERED - # The appointment should simply accepted if the data is not in the cache, since it cannot be triggered again + # The appointment would be rejected even if the data is not in the cache provided we've it has been triggered del api.watcher.locator_cache.cache[appointment.locator] r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) - assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + assert r.status_code == HTTP_BAD_REQUEST and r.json.get("error_code") == errors.APPOINTMENT_ALREADY_TRIGGERED def test_add_appointment_in_cache_cannot_decrypt(api, client): diff --git a/test/teos/unit/test_watcher.py b/test/teos/unit/test_watcher.py index f2e0d3b..c43b9b2 100644 --- a/test/teos/unit/test_watcher.py +++ b/test/teos/unit/test_watcher.py @@ -259,7 +259,6 @@ def test_add_appointment(watcher): assert len(watcher.locator_uuid_map[appointment.locator]) == 2 -# WIP: ADD appointment with the different uses of the cache def test_add_appointment_in_cache(watcher): # Generate an appointment and add the dispute txid to the cache user_sk, user_pk = generate_keypair() From 76e4780f4c64320ea098348e7df73e524ddf4b8a Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Thu, 21 May 2020 19:39:42 +0200 Subject: [PATCH 06/21] teos - Adds start_block to add_appointment response --- teos/watcher.py | 1 + 1 file changed, 1 insertion(+) diff --git a/teos/watcher.py b/teos/watcher.py index 1727ef8..ea899eb 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -263,6 +263,7 @@ class Watcher: return { "locator": appointment.locator, + "start_block": self.last_known_block, "signature": signature, "available_slots": available_slots, "subscription_expiry": self.gatekeeper.registered_users[user_id].subscription_expiry, From 386642ff4238185417f613c9247cc0ab27cea13e Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 22 May 2020 14:18:37 +0200 Subject: [PATCH 07/21] Adds checks of add_appointment returning current block --- test/teos/unit/test_api.py | 53 +++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 9 deletions(-) diff --git a/test/teos/unit/test_api.py b/test/teos/unit/test_api.py index 38732af..6f3840b 100644 --- a/test/teos/unit/test_api.py +++ b/test/teos/unit/test_api.py @@ -173,6 +173,7 @@ def test_add_appointment(api, client, appointment): r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) assert r.status_code == HTTP_OK assert r.json.get("available_slots") == 0 + assert r.json.get("start_block") == api.watcher.last_known_block def test_add_appointment_no_json(api, client, appointment): @@ -258,6 +259,7 @@ def test_add_appointment_multiple_times_same_user(api, client, appointment, n=MU r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) assert r.status_code == HTTP_OK assert r.json.get("available_slots") == n - 1 + assert r.json.get("start_block") == api.watcher.last_known_block # Since all updates came from the same user, only the last one is stored assert len(api.watcher.locator_uuid_map[appointment.locator]) == 1 @@ -280,6 +282,7 @@ def test_add_appointment_multiple_times_different_users(api, client, appointment r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": signature}, compressed_pk) assert r.status_code == HTTP_OK assert r.json.get("available_slots") == 1 + assert r.json.get("start_block") == api.watcher.last_known_block # Check that all the appointments have been added and that there are no duplicates assert len(set(api.watcher.locator_uuid_map[appointment.locator])) == n @@ -291,14 +294,22 @@ def test_add_appointment_update_same_size(api, client, appointment): appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) - assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 0 + and r.json.get("start_block") == api.watcher.last_known_block + ) # The user has no additional slots, but it should be able to update # Let's just reverse the encrypted blob for example appointment.encrypted_blob = appointment.encrypted_blob[::-1] appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) - assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 0 + and r.json.get("start_block") == api.watcher.last_known_block + ) def test_add_appointment_update_bigger(api, client, appointment): @@ -313,7 +324,11 @@ def test_add_appointment_update_bigger(api, client, appointment): appointment.encrypted_blob = TWO_SLOTS_BLOTS appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) - assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 0 + and r.json.get("start_block") == api.watcher.last_known_block + ) # Check that it'll fail if no enough slots are available # Double the size from before @@ -330,13 +345,21 @@ def test_add_appointment_update_smaller(api, client, appointment): appointment.encrypted_blob = TWO_SLOTS_BLOTS appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) - assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 0 + and r.json.get("start_block") == api.watcher.last_known_block + ) # Let's update with one just small enough appointment.encrypted_blob = "A" * (ENCRYPTED_BLOB_MAX_SIZE_HEX - 2) appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) - assert r.status_code == HTTP_OK and r.json.get("available_slots") == 1 + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 1 + and r.json.get("start_block") == api.watcher.last_known_block + ) def test_add_appointment_in_cache(api, client): @@ -349,7 +372,11 @@ def test_add_appointment_in_cache(api, client): api.watcher.locator_cache.cache[appointment.locator] = dispute_txid r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) - assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 0 + and r.json.get("start_block") == api.watcher.last_known_block + ) # Trying to add it again should fail, since it is already in the Responder r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) @@ -373,7 +400,11 @@ def test_add_appointment_in_cache_cannot_decrypt(api, client): # The appointment should be accepted r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) - assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 0 + and r.json.get("start_block") == api.watcher.last_known_block + ) def test_add_appointment_in_cache_invalid_transaction(api, client): @@ -404,7 +435,11 @@ def test_add_appointment_in_cache_invalid_transaction(api, client): # The appointment should be accepted r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) - assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 0 + and r.json.get("start_block") == api.watcher.last_known_block + ) def test_add_too_many_appointment(api, client): @@ -421,7 +456,7 @@ def test_add_too_many_appointment(api, client): r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) if i < free_appointment_slots: - assert r.status_code == HTTP_OK + assert r.status_code == HTTP_OK and r.json.get("start_block") == api.watcher.last_known_block else: assert r.status_code == HTTP_SERVICE_UNAVAILABLE From c05f96b738999bd31f54f7bc742c8e27df42c9d3 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 22 May 2020 18:25:08 +0200 Subject: [PATCH 08/21] tests - Adds e2e tests to cover the cache --- test/teos/e2e/test_basic_e2e.py | 67 +++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/test/teos/e2e/test_basic_e2e.py b/test/teos/e2e/test_basic_e2e.py index dbcfa0f..e611c4d 100644 --- a/test/teos/e2e/test_basic_e2e.py +++ b/test/teos/e2e/test_basic_e2e.py @@ -389,6 +389,73 @@ def test_two_appointment_same_locator_different_penalty_different_users(bitcoin_ assert appointment_info.get("appointment").get("penalty_tx") == appointment1_data.get("penalty_tx") +def test_add_appointment_trigger_on_cache(bitcoin_cli): + # This tests sending an appointment which trigger is in the cache + commitment_tx, penalty_tx = create_txs(bitcoin_cli) + commitment_tx_id = bitcoin_cli.decoderawtransaction(commitment_tx).get("txid") + appointment_data = build_appointment_data(commitment_tx_id, penalty_tx) + locator = compute_locator(commitment_tx_id) + + # Let's send the commitment to the network and mine a block + broadcast_transaction_and_mine_block(bitcoin_cli, commitment_tx, bitcoin_cli.getnewaddress()) + + # Send the data to the tower and request it back. It should have gone straightaway to the Responder + add_appointment(appointment_data) + assert get_appointment_info(locator).get("status") == "dispute_responded" + + +def test_add_appointment_invalid_trigger_on_cache(bitcoin_cli): + # This tests sending an invalid appointment which trigger is in the cache + commitment_tx, penalty_tx = create_txs(bitcoin_cli) + commitment_tx_id = bitcoin_cli.decoderawtransaction(commitment_tx).get("txid") + + # We can just flip the justice tx so it is invalid + appointment_data = build_appointment_data(commitment_tx_id, penalty_tx[::-1]) + locator = compute_locator(commitment_tx_id) + + # Let's send the commitment to the network and mine a block + broadcast_transaction_and_mine_block(bitcoin_cli, commitment_tx, bitcoin_cli.getnewaddress()) + sleep(1) + + # Send the data to the tower and request it back. It should get accepted but the data will be dropped. + add_appointment(appointment_data) + with pytest.raises(TowerResponseError): + get_appointment_info(locator) + + +def test_add_appointment_trigger_on_cache_cannot_decrypt(bitcoin_cli): + commitment_tx, penalty_tx = create_txs(bitcoin_cli) + + # Let's send the commitment to the network and mine a block + broadcast_transaction_and_mine_block(bitcoin_cli, commitment_tx, bitcoin_cli.getnewaddress()) + sleep(1) + + # The appointment data is built using a random 32-byte value. + appointment_data = build_appointment_data(get_random_value_hex(32), penalty_tx) + + # We cannot use teos_cli.add_appointment here since it computes the locator internally, so let's do it manually. + appointment_data["locator"] = compute_locator(bitcoin_cli.decoderawtransaction(commitment_tx).get("txid")) + appointment_data["encrypted_blob"] = Cryptographer.encrypt(penalty_tx, get_random_value_hex(32)) + appointment = Appointment.from_dict(appointment_data) + + signature = Cryptographer.sign(appointment.serialize(), user_sk) + data = {"appointment": appointment.to_dict(), "signature": signature} + + # Send appointment to the server. + response = teos_cli.post_request(data, teos_add_appointment_endpoint) + response_json = teos_cli.process_post_response(response) + + # Check that the server has accepted the appointment + signature = response_json.get("signature") + rpk = Cryptographer.recover_pk(appointment.serialize(), signature) + assert teos_id == Cryptographer.get_compressed_pk(rpk) + assert response_json.get("locator") == appointment.locator + + # The appointment should should have been inmediately dropped + with pytest.raises(TowerResponseError): + get_appointment_info(appointment_data["locator"]) + + def test_appointment_shutdown_teos_trigger_back_online(bitcoin_cli): global teosd_process From 77d1ea21c8cba6b0ab620c39711543b842f1d081 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 22 May 2020 19:14:43 +0200 Subject: [PATCH 09/21] teos - BLOCK_CACHE_SIZE -> LOCATOR_CACHE_SIZE --- teos/__init__.py | 2 +- teos/teosd.py | 2 +- test/teos/unit/test_api.py | 2 +- test/teos/unit/test_builder.py | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/teos/__init__.py b/teos/__init__.py index 7c986bf..b65b7e7 100644 --- a/teos/__init__.py +++ b/teos/__init__.py @@ -21,7 +21,7 @@ DEFAULT_CONF = { "DEFAULT_SUBSCRIPTION_DURATION": {"value": 4320, "type": int}, "EXPIRY_DELTA": {"value": 6, "type": int}, "MIN_TO_SELF_DELAY": {"value": 20, "type": int}, - "BLOCK_CACHE_SIZE": {"value": 6, "type": int}, + "LOCATOR_CACHE_SIZE": {"value": 6, "type": int}, "LOG_FILE": {"value": "teos.log", "type": str, "path": True}, "TEOS_SECRET_KEY": {"value": "teos_sk.der", "type": str, "path": True}, "APPOINTMENTS_DB_PATH": {"value": "appointments", "type": str, "path": True}, diff --git a/teos/teosd.py b/teos/teosd.py index c9f0a02..f1fa1df 100644 --- a/teos/teosd.py +++ b/teos/teosd.py @@ -90,7 +90,7 @@ def main(command_line_conf): responder, secret_key_der, config.get("MAX_APPOINTMENTS"), - config.get("BLOCK_CACHE_SIZE"), + config.get("LOCATOR_CACHE_SIZE"), ) # Create the chain monitor and start monitoring the chain diff --git a/test/teos/unit/test_api.py b/test/teos/unit/test_api.py index 6f3840b..145fa8a 100644 --- a/test/teos/unit/test_api.py +++ b/test/teos/unit/test_api.py @@ -75,7 +75,7 @@ def api(db_manager, carrier, block_processor, gatekeeper, run_bitcoind): responder, sk.to_der(), MAX_APPOINTMENTS, - config.get("BLOCK_CACHE_SIZE"), + config.get("LOCATOR_CACHE_SIZE"), ) inspector = Inspector(block_processor, config.get("MIN_TO_SELF_DELAY")) api = API(config.get("API_HOST"), config.get("API_PORT"), inspector, watcher) diff --git a/test/teos/unit/test_builder.py b/test/teos/unit/test_builder.py index 81f7b5c..581581e 100644 --- a/test/teos/unit/test_builder.py +++ b/test/teos/unit/test_builder.py @@ -102,7 +102,7 @@ def test_update_states_empty_list(db_manager, gatekeeper, carrier, block_process responder=Responder(db_manager, gatekeeper, carrier, block_processor), sk_der=generate_keypair()[0].to_der(), max_appointments=config.get("MAX_APPOINTMENTS"), - blocks_in_cache=config.get("BLOCK_CACHE_SIZE"), + blocks_in_cache=config.get("LOCATOR_CACHE_SIZE"), ) missed_blocks_watcher = [] @@ -124,7 +124,7 @@ def test_update_states_responder_misses_more(run_bitcoind, db_manager, gatekeepe responder=Responder(db_manager, gatekeeper, carrier, block_processor), sk_der=generate_keypair()[0].to_der(), max_appointments=config.get("MAX_APPOINTMENTS"), - blocks_in_cache=config.get("BLOCK_CACHE_SIZE"), + blocks_in_cache=config.get("LOCATOR_CACHE_SIZE"), ) blocks = [] @@ -150,7 +150,7 @@ def test_update_states_watcher_misses_more(db_manager, gatekeeper, carrier, bloc responder=Responder(db_manager, gatekeeper, carrier, block_processor), sk_der=generate_keypair()[0].to_der(), max_appointments=config.get("MAX_APPOINTMENTS"), - blocks_in_cache=config.get("BLOCK_CACHE_SIZE"), + blocks_in_cache=config.get("LOCATOR_CACHE_SIZE"), ) blocks = [] From 2a52006c0303539d120e2660bfbebc57d7c518bd Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 22 May 2020 19:15:32 +0200 Subject: [PATCH 10/21] tests - Removes Wartcher from LocatorCache tests --- test/teos/unit/test_watcher.py | 142 ++++++++++++++++----------------- 1 file changed, 70 insertions(+), 72 deletions(-) diff --git a/test/teos/unit/test_watcher.py b/test/teos/unit/test_watcher.py index c43b9b2..cc347f6 100644 --- a/test/teos/unit/test_watcher.py +++ b/test/teos/unit/test_watcher.py @@ -72,7 +72,7 @@ def watcher(db_manager, gatekeeper): responder, signing_key.to_der(), MAX_APPOINTMENTS, - config.get("BLOCK_CACHE_SIZE"), + config.get("LOCATOR_CACHE_SIZE"), ) chain_monitor = ChainMonitor( @@ -109,7 +109,75 @@ def create_appointments(n): return appointments, locator_uuid_map, dispute_txs -def test_watcher_init(watcher, run_bitcoind): +def test_locator_cache_init_not_enough_blocks(run_bitcoind, block_processor): + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + # Make sure there are at least 3 blocks + block_count = block_processor.get_block_count() + if block_count < 3: + generate_blocks_w_delay(3 - block_count) + + # Simulate there are only 3 blocks + third_block_hash = bitcoin_cli(bitcoind_connect_params).getblockhash(2) + locator_cache.init(third_block_hash, block_processor) + assert len(locator_cache.blocks) == 3 + for k, v in locator_cache.blocks.items(): + assert block_processor.get_block(k) + + +def test_locator_cache_init(block_processor): + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + + # Generate enough blocks so the cache can start full + generate_blocks(2 * locator_cache.cache_size) + + locator_cache.init(block_processor.get_best_block_hash(), block_processor) + assert len(locator_cache.blocks) == locator_cache.cache_size + for k, v in locator_cache.blocks.items(): + assert block_processor.get_block(k) + + +def test_locator_cache_is_full(block_processor): + # Empty cache + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + + for _ in range(locator_cache.cache_size): + locator_cache.blocks[uuid4().hex] = 0 + assert not locator_cache.is_full() + + locator_cache.blocks[uuid4().hex] = 0 + assert locator_cache.is_full() + + +def test_locator_remove_older_block(block_processor): + # Empty cache + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + + # Add some blocks to the cache + for _ in range(locator_cache.cache_size): + txid = get_random_value_hex(32) + locator = txid[:16] + locator_cache.blocks[get_random_value_hex(32)] = {locator: txid} + locator_cache.cache[locator] = txid + + blocks_in_cache = locator_cache.blocks + oldest_block_hash = list(blocks_in_cache.keys())[0] + oldest_block_data = blocks_in_cache.get(oldest_block_hash) + rest_of_blocks = list(blocks_in_cache.keys())[1:] + locator_cache.remove_older_block() + + # Oldest block data is not in the cache + assert oldest_block_hash not in locator_cache.blocks + for locator in oldest_block_data: + assert locator not in locator_cache.cache + + # The rest of data is in the cache + assert set(rest_of_blocks).issubset(locator_cache.blocks) + for block_hash in rest_of_blocks: + for locator in locator_cache.blocks[block_hash]: + assert locator in locator_cache.cache + + +def test_watcher_init(watcher): assert isinstance(watcher.appointments, dict) and len(watcher.appointments) == 0 assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0 assert watcher.block_queue.empty() @@ -122,75 +190,6 @@ def test_watcher_init(watcher, run_bitcoind): assert isinstance(watcher.locator_cache, LocatorCache) -def test_locator_cache_init_not_enough_blocks(watcher): - # Make sure there are at least 3 blocks - block_count = watcher.block_processor.get_block_count() - if block_count < 3: - generate_blocks_w_delay(3 - block_count) - - # Simulate there are only 3 blocks - third_block_hash = bitcoin_cli(bitcoind_connect_params).getblockhash(2) - watcher.locator_cache.init(third_block_hash, watcher.block_processor) - assert len(watcher.locator_cache.blocks) == 3 - for k, v in watcher.locator_cache.blocks.items(): - assert watcher.block_processor.get_block(k) - - -def test_locator_cache_init(watcher): - # Empty cache - watcher.locator_cache = LocatorCache(watcher.locator_cache.cache_size) - - # Generate enough blocks so the cache can start full - generate_blocks(2 * watcher.locator_cache.cache_size) - - watcher.locator_cache.init(watcher.block_processor.get_best_block_hash(), watcher.block_processor) - assert len(watcher.locator_cache.blocks) == watcher.locator_cache.cache_size - for k, v in watcher.locator_cache.blocks.items(): - assert watcher.block_processor.get_block(k) - - -def test_locator_cache_is_full(watcher): - # Empty cache - watcher.locator_cache = LocatorCache(watcher.locator_cache.cache_size) - - for _ in range(watcher.locator_cache.cache_size): - watcher.locator_cache.blocks[uuid4().hex] = 0 - assert not watcher.locator_cache.is_full() - - watcher.locator_cache.blocks[uuid4().hex] = 0 - assert watcher.locator_cache.is_full() - - # Remove the data - watcher.locator_cache = LocatorCache(watcher.locator_cache.cache_size) - - -def test_locator_remove_older_block(watcher): - # Add some blocks to the cache if it is empty - if not len(watcher.locator_cache.blocks): - for _ in range(watcher.locator_cache.cache_size): - txid = get_random_value_hex(32) - locator = txid[:16] - watcher.locator_cache.blocks[get_random_value_hex(32)] = {locator: txid} - watcher.locator_cache.cache[locator] = txid - - blocks_in_cache = watcher.locator_cache.blocks - oldest_block_hash = list(blocks_in_cache.keys())[0] - oldest_block_data = blocks_in_cache.get(oldest_block_hash) - rest_of_blocks = list(blocks_in_cache.keys())[1:] - watcher.locator_cache.remove_older_block() - - # Oldest block data is not in the cache - assert oldest_block_hash not in watcher.locator_cache.blocks - for locator in oldest_block_data: - assert locator not in watcher.locator_cache.cache - - # The rest of data is in the cache - assert set(rest_of_blocks).issubset(watcher.locator_cache.blocks) - for block_hash in rest_of_blocks: - for locator in watcher.locator_cache.blocks[block_hash]: - assert locator in watcher.locator_cache.cache - - def test_add_appointment_non_registered(watcher): # Appointments from non-registered users should fail user_sk, user_pk = generate_keypair() @@ -384,7 +383,6 @@ def test_add_too_many_appointments(watcher): def test_do_watch(watcher, temp_db_manager): watcher.db_manager = temp_db_manager - watcher.locator_cache = LocatorCache(watcher.locator_cache.cache_size) # We will wipe all the previous data and add 5 appointments appointments, locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS) From 5af0658720a04d9954c22aa122dd6bab57183f97 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 22 May 2020 19:57:00 +0200 Subject: [PATCH 11/21] watcher - Adds reorg protection for the LocatorCache --- teos/watcher.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/teos/watcher.py b/teos/watcher.py index ea899eb..de16500 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -72,6 +72,26 @@ class LocatorCache: self.blocks = OrderedDict(reversed((list(self.blocks.items())))) + def fix_cache(self, last_known_block, block_processor): + tmp_cache = LocatorCache(self.cache_size) + + target_block_hash = last_known_block + for _ in range(self.cache_size): + target_block = block_processor.get_block(target_block_hash) + if target_block: + if target_block_hash in self.blocks: + tmp_cache.cache.update(self.blocks[target_block_hash]) + tmp_cache.blocks[target_block_hash] = self.blocks[target_block_hash] + else: + locators = {compute_locator(txid): txid for txid in target_block.get("tx")} + tmp_cache.cache.update(locators) + tmp_cache.blocks[target_block_hash] = locators + + target_block_hash = target_block.get("previousblockhash") + + self.blocks = OrderedDict(reversed((list(tmp_cache.blocks.items())))) + self.cache = tmp_cache.cache + def is_full(self): """ Returns whether the cache is full or not """ return len(self.blocks) > self.cache_size @@ -290,6 +310,10 @@ class Watcher: block = self.block_processor.get_block(block_hash) logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash")) + # If a reorg is detected, the cache is fixed to cover the las `cache_size` blocks of the new chain + if self.last_known_block != block.get("previousblockhash"): + self.locator_cache.fix_cache(block_hash, self.block_processor) + txids = block.get("tx") # Compute the locator for every transaction in the block and add them to the cache locators_txid_map = {compute_locator(txid): txid for txid in txids} From e1aab63940884499a82c81b0777b0767dae48f62 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 22 May 2020 19:57:48 +0200 Subject: [PATCH 12/21] tests - unit tests LocatorCache reorg protection --- test/teos/unit/test_watcher.py | 49 ++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/test/teos/unit/test_watcher.py b/test/teos/unit/test_watcher.py index cc347f6..44e75d2 100644 --- a/test/teos/unit/test_watcher.py +++ b/test/teos/unit/test_watcher.py @@ -1,6 +1,7 @@ import pytest from uuid import uuid4 from shutil import rmtree +from copy import deepcopy from threading import Thread from coincurve import PrivateKey @@ -136,6 +137,54 @@ def test_locator_cache_init(block_processor): assert block_processor.get_block(k) +def test_fix_cache(block_processor): + # This tests how a reorg will create a new version of the cache + # Let's start setting a full cache. We'll mine ``cache_size`` bocks to be sure it's full + generate_blocks_w_delay((config.get("LOCATOR_CACHE_SIZE"))) + + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + locator_cache.init(block_processor.get_best_block_hash(), block_processor) + assert len(locator_cache.blocks) == locator_cache.cache_size + + # Now let's fake a reorg of less than ``cache_size``. We'll go two blocks into the past. + current_tip = block_processor.get_best_block_hash() + current_tip_locators = list(locator_cache.blocks[current_tip].keys()) + current_tip_parent = block_processor.get_block(current_tip).get("previousblockhash") + current_tip_parent_locators = list(locator_cache.blocks[current_tip_parent].keys()) + fake_tip = block_processor.get_block(current_tip_parent).get("previousblockhash") + locator_cache.fix_cache(fake_tip, block_processor) + + # The last two blocks are not in the cache nor are the any of its locators + assert current_tip not in locator_cache.blocks and current_tip_parent not in locator_cache.blocks + for locator in current_tip_parent_locators + current_tip_locators: + assert locator not in locator_cache.cache + + # The fake tip is the new tip, and two additional blocks are at the bottom + assert fake_tip in locator_cache.blocks and list(locator_cache.blocks.keys())[-1] == fake_tip + assert len(locator_cache.blocks) == locator_cache.cache_size + + # Test the same for a full cache reorg. We can simulate this by adding more blocks than the cache can fit and + # trigger a fix. We'll use a new cache to compare with the old + new_cache = deepcopy(locator_cache) + + generate_blocks_w_delay((config.get("LOCATOR_CACHE_SIZE") * 2)) + new_cache.fix_cache(block_processor.get_best_block_hash(), block_processor) + + # None of the data from the old cache is in the new cache + for block_hash, data in locator_cache.blocks.items(): + assert block_hash not in new_cache.blocks + for locator, txid in data.items(): + assert locator not in new_cache.cache + + # The data in the new cache corresponds to the last ``cache_size`` blocks. + block_count = block_processor.get_block_count() + for i in range(block_count, block_count - locator_cache.cache_size, -1): + block_hash = bitcoin_cli(bitcoind_connect_params).getblockhash(i - 1) + assert block_hash in new_cache.blocks + for locator, _ in new_cache.blocks[block_hash].items(): + assert locator in new_cache.cache + + def test_locator_cache_is_full(block_processor): # Empty cache locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) From 837f7d428a7f21c1bdfbc76be57c4b38ae841fcd Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Tue, 2 Jun 2020 19:49:50 +0200 Subject: [PATCH 13/21] watcher - Fixes cache setup for regtest Cache setup in regtest was raising some warnings due to the blocks not being found --- teos/watcher.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/teos/watcher.py b/teos/watcher.py index de16500..3d7120d 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -61,14 +61,19 @@ class LocatorCache: # Not doing so implies store temporary variables in the Watcher and initialising the cache as None. target_block_hash = last_known_block for _ in range(self.cache_size): - target_block = block_processor.get_block(target_block_hash) - # In some setups, like regtest, it could be the case that there are no enough previous blocks. - if target_block: - locators = {compute_locator(txid): txid for txid in target_block.get("tx")} - self.cache.update(locators) - self.blocks[target_block_hash] = locators - target_block_hash = target_block.get("previousblockhash") + # In those cases we pull as many as we can. + if target_block_hash: + target_block = block_processor.get_block(target_block_hash) + if not target_block: + break + else: + break + + locators = {compute_locator(txid): txid for txid in target_block.get("tx")} + self.cache.update(locators) + self.blocks[target_block_hash] = locators + target_block_hash = target_block.get("previousblockhash") self.blocks = OrderedDict(reversed((list(self.blocks.items())))) From 37d1bd9b126f459f1e7a6163bc271fd8b542f93f Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Tue, 2 Jun 2020 19:49:55 +0200 Subject: [PATCH 14/21] testing - fixes e2e test utxo management e2e tests where reusing utxos for transactions that where not confirmed, meaning than some times we were actually sending the same appointment over and over. --- test/teos/e2e/conftest.py | 7 +++++-- test/teos/e2e/test_basic_e2e.py | 27 +++++++++++++++++++++++---- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/test/teos/e2e/conftest.py b/test/teos/e2e/conftest.py index d4bb314..67e8a38 100644 --- a/test/teos/e2e/conftest.py +++ b/test/teos/e2e/conftest.py @@ -11,6 +11,7 @@ from common.config_loader import ConfigLoader getcontext().prec = 10 +utxos = [] @pytest.fixture(scope="session") @@ -37,11 +38,13 @@ def prng_seed(): def setup_node(bitcoin_cli): # This method will create a new address a mine bitcoin so the node can be used for testing new_addr = bitcoin_cli.getnewaddress() - bitcoin_cli.generatetoaddress(106, new_addr) + bitcoin_cli.generatetoaddress(200, new_addr) def create_txs(bitcoin_cli, n=1): - utxos = bitcoin_cli.listunspent() + global utxos + if not utxos: + utxos = bitcoin_cli.listunspent() if len(utxos) < n: raise ValueError("There're no enough UTXOs.") diff --git a/test/teos/e2e/test_basic_e2e.py b/test/teos/e2e/test_basic_e2e.py index e611c4d..da6d5be 100644 --- a/test/teos/e2e/test_basic_e2e.py +++ b/test/teos/e2e/test_basic_e2e.py @@ -40,6 +40,9 @@ teosd_process = run_teosd() teos_id, user_sk, user_id = teos_cli.load_keys(cli_config.get("TEOS_PUBLIC_KEY"), cli_config.get("CLI_PRIVATE_KEY")) +appointment_in_watcher = 0 +appointment_in_responder = 0 + def broadcast_transaction_and_mine_block(bitcoin_cli, commitment_tx, addr): # Broadcast the commitment transaction and mine a block @@ -78,6 +81,8 @@ def test_commands_non_registered(bitcoin_cli): def test_commands_registered(bitcoin_cli): + global appointment_in_watcher + # Test registering and trying again teos_cli.register(user_id, teos_base_endpoint) @@ -93,9 +98,12 @@ def test_commands_registered(bitcoin_cli): r = get_appointment_info(appointment_data.get("locator")) assert r.get("locator") == appointment.locator assert r.get("appointment") == appointment.to_dict() + appointment_in_watcher += 1 def test_appointment_life_cycle(bitcoin_cli): + global appointment_in_watcher, appointment_in_responder + # First of all we need to register response = teos_cli.register(user_id, teos_base_endpoint) available_slots = response.get("available_slots") @@ -106,6 +114,7 @@ def test_appointment_life_cycle(bitcoin_cli): appointment_data = build_appointment_data(commitment_tx_id, penalty_tx) locator = compute_locator(commitment_tx_id) appointment, signature = add_appointment(appointment_data) + appointment_in_watcher += 1 # Get the information from the tower to check that it matches appointment_info = get_appointment_info(locator) @@ -117,7 +126,7 @@ def test_appointment_life_cycle(bitcoin_cli): all_appointments = get_all_appointments() watching = all_appointments.get("watcher_appointments") responding = all_appointments.get("responder_trackers") - assert len(watching) == 1 and len(responding) == 0 + assert len(watching) == appointment_in_watcher and len(responding) == 0 # Trigger a breach and check again new_addr = bitcoin_cli.getnewaddress() @@ -125,11 +134,13 @@ def test_appointment_life_cycle(bitcoin_cli): appointment_info = get_appointment_info(locator) assert appointment_info.get("status") == "dispute_responded" assert appointment_info.get("locator") == locator + appointment_in_watcher -= 1 + appointment_in_responder += 1 all_appointments = get_all_appointments() watching = all_appointments.get("watcher_appointments") responding = all_appointments.get("responder_trackers") - assert len(watching) == 0 and len(responding) == 1 + assert len(watching) == appointment_in_watcher and len(responding) == appointment_in_responder # It can be also checked by ensuring that the penalty transaction made it to the network penalty_tx_id = bitcoin_cli.decoderawtransaction(penalty_tx).get("txid") @@ -144,6 +155,7 @@ def test_appointment_life_cycle(bitcoin_cli): # Now let's mine some blocks so the appointment reaches its end. We need 100 + EXPIRY_DELTA -1 bitcoin_cli.generatetoaddress(100 + teos_config.get("EXPIRY_DELTA") - 1, new_addr) + appointment_in_responder -= 1 # The appointment is no longer in the tower with pytest.raises(TowerResponseError): @@ -152,10 +164,14 @@ def test_appointment_life_cycle(bitcoin_cli): # Check that the appointment is not in the Gatekeeper by checking the available slots (should have increase by 1) # We can do so by topping up the subscription (FIXME: find a better way to check this). response = teos_cli.register(user_id, teos_base_endpoint) - assert response.get("available_slots") == available_slots + teos_config.get("DEFAULT_SLOTS") + 1 + assert ( + response.get("available_slots") + == available_slots + teos_config.get("DEFAULT_SLOTS") + 1 - appointment_in_watcher - appointment_in_responder + ) def test_multiple_appointments_life_cycle(bitcoin_cli): + global appointment_in_watcher, appointment_in_responder # Tests that get_all_appointments returns all the appointments the tower is storing at various stages in the # appointment lifecycle. appointments = [] @@ -180,6 +196,7 @@ def test_multiple_appointments_life_cycle(bitcoin_cli): # Send all of them to watchtower. for appt in appointments: add_appointment(appt.get("appointment_data")) + appointment_in_watcher += 1 # Two of these appointments are breached, and the watchtower responds to them. breached_appointments = [] @@ -188,13 +205,15 @@ def test_multiple_appointments_life_cycle(bitcoin_cli): broadcast_transaction_and_mine_block(bitcoin_cli, appointments[i]["commitment_tx"], new_addr) bitcoin_cli.generatetoaddress(1, new_addr) breached_appointments.append(appointments[i]["locator"]) + appointment_in_watcher -= 1 + appointment_in_responder += 1 sleep(1) # Test that they all show up in get_all_appointments at the correct stages. all_appointments = get_all_appointments() watching = all_appointments.get("watcher_appointments") responding = all_appointments.get("responder_trackers") - assert len(watching) == 3 and len(responding) == 2 + assert len(watching) == appointment_in_watcher and len(responding) == appointment_in_responder responder_locators = [appointment["locator"] for uuid, appointment in responding.items()] assert set(responder_locators) == set(breached_appointments) From 6ea3e8e3ff62a35ea03d8c1dea6946ff473afab8 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Wed, 3 Jun 2020 17:21:21 +0200 Subject: [PATCH 15/21] watcher - simplifies locator_cache and check_breach locator_cache.blocks was storing a dictionary with both the locator and txid pair, when only the locators were actually necessary. Reworks the code a bit to only use locators. check_breach was returning some values that could be worked out from the unaltered inputs. Also fixes some comments and docs. --- teos/watcher.py | 108 ++++++++++++++------------------- test/teos/unit/test_watcher.py | 18 +++--- 2 files changed, 54 insertions(+), 72 deletions(-) diff --git a/teos/watcher.py b/teos/watcher.py index 3d7120d..a2ee8a2 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -22,7 +22,7 @@ class AppointmentLimitReached(BasicException): class AppointmentAlreadyTriggered(BasicException): - """Raised an appointment is sent to the Watcher but that same data has already been sent to the Responder""" + """Raised when an appointment is sent to the Watcher but that same data has already been sent to the Responder""" class LocatorCache: @@ -35,11 +35,11 @@ class LocatorCache: blocks_in_cache (:obj:`int`): the numbers of blocks to keep in the cache. Attributes: - cache (:obj:`dict`): a dictionary of ``locator:dispute_txid`` pair that received appointments are checked + cache (:obj:`dict`): a dictionary of ``locator:dispute_txid`` pairs that received appointments are checked against. blocks (:obj:`OrderedDict`): An ordered dictionary of the last ``blocks_in_cache`` blocks (block_hash:locators). Used to keep track of what data belongs to what block, so data can be pruned accordingly. Also needed to - rebuilt the cache in case of a reorgs. + rebuild the cache in case of reorgs. cache_size (:obj:`int`): the size of the cache in blocks. """ @@ -50,10 +50,10 @@ class LocatorCache: def init(self, last_known_block, block_processor): """ - Sets the initial state of the block cache. + Sets the initial state of the locator cache. Args: - last_known_block (:obj:`str`): the last known block of the ``Watcher``. + last_known_block (:obj:`str`): the last known block by the ``Watcher``. block_processor (:obj:`teos.block_processor.BlockProcessor`): a ``BlockProcessor`` instance. """ @@ -62,7 +62,7 @@ class LocatorCache: target_block_hash = last_known_block for _ in range(self.cache_size): # In some setups, like regtest, it could be the case that there are no enough previous blocks. - # In those cases we pull as many as we can. + # In those cases we pull as many as we can (up to ``cache_size``). if target_block_hash: target_block = block_processor.get_block(target_block_hash) if not target_block: @@ -70,9 +70,9 @@ class LocatorCache: else: break - locators = {compute_locator(txid): txid for txid in target_block.get("tx")} - self.cache.update(locators) - self.blocks[target_block_hash] = locators + locator_txid_map = {compute_locator(txid): txid for txid in target_block.get("tx")} + self.cache.update(locator_txid_map) + self.blocks[target_block_hash] = list(locator_txid_map.keys()) target_block_hash = target_block.get("previousblockhash") self.blocks = OrderedDict(reversed((list(self.blocks.items())))) @@ -80,18 +80,15 @@ class LocatorCache: def fix_cache(self, last_known_block, block_processor): tmp_cache = LocatorCache(self.cache_size) + # We assume there are no reorgs back to genesis. If so, this would raise some log warnings. And the cache will + # be filled with less than ``cache_size`` blocks.` target_block_hash = last_known_block - for _ in range(self.cache_size): + for _ in range(tmp_cache.cache_size): target_block = block_processor.get_block(target_block_hash) if target_block: - if target_block_hash in self.blocks: - tmp_cache.cache.update(self.blocks[target_block_hash]) - tmp_cache.blocks[target_block_hash] = self.blocks[target_block_hash] - else: - locators = {compute_locator(txid): txid for txid in target_block.get("tx")} - tmp_cache.cache.update(locators) - tmp_cache.blocks[target_block_hash] = locators - + locator_txid_map = {compute_locator(txid): txid for txid in target_block.get("tx")} + tmp_cache.cache.update(locator_txid_map) + tmp_cache.blocks[target_block_hash] = list(locator_txid_map.keys()) target_block_hash = target_block.get("previousblockhash") self.blocks = OrderedDict(reversed((list(tmp_cache.blocks.items())))) @@ -103,8 +100,8 @@ class LocatorCache: def remove_older_block(self): """ Removes the older block from the cache """ - block_hash, locator_map = self.blocks.popitem(last=False) - for locator, txid in locator_map.items(): + block_hash, locators = self.blocks.popitem(last=False) + for locator in locators: del self.cache[locator] logger.debug("Block removed from cache", block_hash=block_hash) @@ -153,7 +150,7 @@ class Watcher: signing_key (:mod:`PrivateKey`): a private key used to sign accepted appointments. max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time. last_known_block (:obj:`str`): the last block known by the ``Watcher``. - last_known_block (:obj:`LocatorCache`): a cache of locators from the last ``blocks_in_cache`` blocks. + locator_cache (:obj:`LocatorCache`): a cache of locators for the last ``blocks_in_cache`` blocks. Raises: :obj:`InvalidKey `: if teos sk cannot be loaded. @@ -237,19 +234,14 @@ class Watcher: # Appointments that were triggered in blocks hold in the cache if appointment.locator in self.locator_cache.cache: try: - breach = self.check_breach(uuid, appointment, self.locator_cache.cache[appointment.locator]) + dispute_txid = self.locator_cache.cache[appointment.locator] + penalty_txid, penalty_rawtx = self.check_breach(uuid, appointment, dispute_txid) receipt = self.responder.handle_breach( - uuid, - breach["locator"], - breach["dispute_txid"], - breach["penalty_txid"], - breach["penalty_rawtx"], - user_id, - self.last_known_block, + uuid, appointment.locator, dispute_txid, penalty_txid, penalty_rawtx, user_id, self.last_known_block ) - # At this point the appointment is accepted but data is only kept if it goes through the Responder - # otherwise it is dropped. + # At this point the appointment is accepted but data is only kept if it goes through the Responder. + # Otherwise it is dropped. if receipt.delivered: self.db_manager.store_watcher_appointment(uuid, appointment.to_dict()) self.db_manager.create_append_locator_map(appointment.locator, uuid) @@ -261,7 +253,7 @@ class Watcher: # could be used to discourage user misbehaviour. pass - # Regular appointments that have not been triggered (or not recently at least) + # Regular appointments that have not been triggered (or, at least, not recently) else: self.appointments[uuid] = appointment.get_summary() @@ -321,12 +313,12 @@ class Watcher: txids = block.get("tx") # Compute the locator for every transaction in the block and add them to the cache - locators_txid_map = {compute_locator(txid): txid for txid in txids} - self.locator_cache.cache.update(locators_txid_map) - self.locator_cache.blocks[block_hash] = locators_txid_map + locator_txid_map = {compute_locator(txid): txid for txid in txids} + self.locator_cache.cache.update(locator_txid_map) + self.locator_cache.blocks[block_hash] = list(locator_txid_map.keys()) logger.debug("Block added to cache", block_hash=block_hash) - if len(self.appointments) > 0 and locators_txid_map: + if len(self.appointments) > 0 and locator_txid_map: expired_appointments = self.gatekeeper.get_expired_appointments(block["height"]) # Make sure we only try to delete what is on the Watcher (some appointments may have been triggered) expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys())) @@ -340,7 +332,7 @@ class Watcher: expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager ) - valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(locators_txid_map)) + valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(locator_txid_map)) triggered_flags = [] appointments_to_delete = [] @@ -399,12 +391,12 @@ class Watcher: self.last_known_block = block.get("hash") self.block_queue.task_done() - def get_breaches(self, locators_txid_map): + def get_breaches(self, locator_txid_map): """ Gets a dictionary of channel breaches given a map of locator:dispute_txid. Args: - locators_txid_map (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of + locator_txid_map (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of transaction ids. Returns: @@ -413,8 +405,8 @@ class Watcher: """ # Check is any of the tx_ids in the received block is an actual match - intersection = set(self.locator_uuid_map.keys()).intersection(locators_txid_map.keys()) - breaches = {locator: locators_txid_map[locator] for locator in intersection} + intersection = set(self.locator_uuid_map.keys()).intersection(locator_txid_map.keys()) + breaches = {locator: locator_txid_map[locator] for locator in intersection} if len(breaches) > 0: logger.info("List of breaches", breaches=breaches) @@ -434,8 +426,7 @@ class Watcher: dispute_txid (:obj:`str`): the id of the transaction that triggered the breach. Returns: - :obj:`dic`: The breach data in a dictionary (locator, dispute_txid, penalty_txid, penalty_rawtx), if the - breach is correct. + :obj:`tuple`: A tuple containing the penalty txid and the raw penalty tx. Raises: :obj:`EncryptionError`: If the encrypted blob from the provided appointment cannot be decrypted with the @@ -452,21 +443,14 @@ class Watcher: raise e except InvalidTransactionFormat as e: - logger.info("The breach contained an invalid transaction") + logger.info("The breach contained an invalid transaction", uuid=uuid) raise e - valid_breach = { - "locator": appointment.locator, - "dispute_txid": dispute_txid, - "penalty_txid": penalty_tx.get("txid"), - "penalty_rawtx": penalty_rawtx, - } - logger.info( "Breach found for locator", locator=appointment.locator, uuid=uuid, penalty_txid=penalty_tx.get("txid") ) - return valid_breach + return penalty_tx.get("txid"), penalty_rawtx def filter_breaches(self, breaches): """ @@ -482,7 +466,7 @@ class Watcher: :obj:`dict`: A dictionary containing all the breaches flagged either as valid or invalid. The structure is as follows: - ``{locator, dispute_txid, penalty_txid, penalty_rawtx, valid_breach}`` + ``{locator, dispute_txid, penalty_txid, penalty_rawtx}`` """ valid_breaches = {} @@ -496,22 +480,24 @@ class Watcher: appointment = ExtendedAppointment.from_dict(self.db_manager.load_watcher_appointment(uuid)) if appointment.encrypted_blob in decrypted_blobs: - penalty_tx, penalty_rawtx = decrypted_blobs[appointment.encrypted_blob] + penalty_txid, penalty_rawtx = decrypted_blobs[appointment.encrypted_blob] valid_breaches[uuid] = { "locator": appointment.locator, "dispute_txid": dispute_txid, - "penalty_txid": penalty_tx.get("txid"), + "penalty_txid": penalty_txid, "penalty_rawtx": penalty_rawtx, } else: try: - valid_breach = self.check_breach(uuid, appointment, dispute_txid) - valid_breaches[uuid] = valid_breach - decrypted_blobs[appointment.encrypted_blob] = ( - valid_breach["penalty_txid"], - valid_breach["penalty_rawtx"], - ) + penalty_txid, penalty_rawtx = self.check_breach(uuid, appointment, dispute_txid) + valid_breaches[uuid] = { + "locator": appointment.locator, + "dispute_txid": dispute_txid, + "penalty_txid": penalty_txid, + "penalty_rawtx": penalty_rawtx, + } + decrypted_blobs[appointment.encrypted_blob] = (penalty_txid, penalty_rawtx) except (EncryptionError, InvalidTransactionFormat): invalid_breaches.append(uuid) diff --git a/test/teos/unit/test_watcher.py b/test/teos/unit/test_watcher.py index 44e75d2..1c54cc9 100644 --- a/test/teos/unit/test_watcher.py +++ b/test/teos/unit/test_watcher.py @@ -148,9 +148,9 @@ def test_fix_cache(block_processor): # Now let's fake a reorg of less than ``cache_size``. We'll go two blocks into the past. current_tip = block_processor.get_best_block_hash() - current_tip_locators = list(locator_cache.blocks[current_tip].keys()) + current_tip_locators = locator_cache.blocks[current_tip] current_tip_parent = block_processor.get_block(current_tip).get("previousblockhash") - current_tip_parent_locators = list(locator_cache.blocks[current_tip_parent].keys()) + current_tip_parent_locators = locator_cache.blocks[current_tip_parent] fake_tip = block_processor.get_block(current_tip_parent).get("previousblockhash") locator_cache.fix_cache(fake_tip, block_processor) @@ -171,9 +171,9 @@ def test_fix_cache(block_processor): new_cache.fix_cache(block_processor.get_best_block_hash(), block_processor) # None of the data from the old cache is in the new cache - for block_hash, data in locator_cache.blocks.items(): + for block_hash, locators in locator_cache.blocks.items(): assert block_hash not in new_cache.blocks - for locator, txid in data.items(): + for locator in locators: assert locator not in new_cache.cache # The data in the new cache corresponds to the last ``cache_size`` blocks. @@ -181,7 +181,7 @@ def test_fix_cache(block_processor): for i in range(block_count, block_count - locator_cache.cache_size, -1): block_hash = bitcoin_cli(bitcoind_connect_params).getblockhash(i - 1) assert block_hash in new_cache.blocks - for locator, _ in new_cache.blocks[block_hash].items(): + for locator in new_cache.blocks[block_hash]: assert locator in new_cache.cache @@ -533,12 +533,8 @@ def test_check_breach(watcher): appointment, dispute_tx = generate_dummy_appointment() dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid") - valid_breach = watcher.check_breach(uuid, appointment, dispute_txid) - assert ( - valid_breach - and valid_breach.get("locator") == appointment.locator - and valid_breach.get("dispute_txid") == dispute_txid - ) + penalty_txid, penalty_rawtx = watcher.check_breach(uuid, appointment, dispute_txid) + assert Cryptographer.encrypt(penalty_rawtx, dispute_txid) == appointment.encrypted_blob def test_check_breach_random_data(watcher): From 08c794590cb31bb8f5daa7f7a17ed5448202f9ef Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Wed, 3 Jun 2020 17:39:27 +0200 Subject: [PATCH 16/21] teos - additional docs --- teos/watcher.py | 18 ++++++++++++++---- test/teos/unit/test_api.py | 2 +- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/teos/watcher.py b/teos/watcher.py index a2ee8a2..366c7f8 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -62,7 +62,7 @@ class LocatorCache: target_block_hash = last_known_block for _ in range(self.cache_size): # In some setups, like regtest, it could be the case that there are no enough previous blocks. - # In those cases we pull as many as we can (up to ``cache_size``). + # In those cases we pull as many as we can (up to cache_size). if target_block_hash: target_block = block_processor.get_block(target_block_hash) if not target_block: @@ -78,10 +78,18 @@ class LocatorCache: self.blocks = OrderedDict(reversed((list(self.blocks.items())))) def fix_cache(self, last_known_block, block_processor): + """ + Fixes an existing cache after a reorg has been detected by feeding the last ``cache_size`` blocks to it. + + Args: + last_known_block (:obj:`str`): the last known block hash after the reorg. + block_processor (:obj:`teos.block_processor.BlockProcessor`): a ``BlockProcessor`` instance. + """ + tmp_cache = LocatorCache(self.cache_size) # We assume there are no reorgs back to genesis. If so, this would raise some log warnings. And the cache will - # be filled with less than ``cache_size`` blocks.` + # be filled with less than cache_size blocks. target_block_hash = last_known_block for _ in range(tmp_cache.cache_size): target_block = block_processor.get_block(target_block_hash) @@ -463,8 +471,10 @@ class Watcher: breaches (:obj:`dict`): a dictionary containing channel breaches (``locator:txid``). Returns: - :obj:`dict`: A dictionary containing all the breaches flagged either as valid or invalid. - The structure is as follows: + :obj:`tuple`: A dictionary and a list. The former contains the valid breaches, while the latter contain the + invalid ones. + + The valid breaches dictionary has the following structure: ``{locator, dispute_txid, penalty_txid, penalty_rawtx}`` """ diff --git a/test/teos/unit/test_api.py b/test/teos/unit/test_api.py index 145fa8a..4f3930a 100644 --- a/test/teos/unit/test_api.py +++ b/test/teos/unit/test_api.py @@ -382,7 +382,7 @@ def test_add_appointment_in_cache(api, client): r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) assert r.status_code == HTTP_BAD_REQUEST and r.json.get("error_code") == errors.APPOINTMENT_ALREADY_TRIGGERED - # The appointment would be rejected even if the data is not in the cache provided we've it has been triggered + # The appointment would be rejected even if the data is not in the cache provided it has been triggered del api.watcher.locator_cache.cache[appointment.locator] r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) assert r.status_code == HTTP_BAD_REQUEST and r.json.get("error_code") == errors.APPOINTMENT_ALREADY_TRIGGERED From 2facd61f6cbf7fa8a32449238fe4584fb8caaf47 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Tue, 9 Jun 2020 16:59:01 +0200 Subject: [PATCH 17/21] teos - addresses minor comments from #149 --- teos/watcher.py | 18 +++++++++-------- test/teos/e2e/test_basic_e2e.py | 36 ++++++++++++++++----------------- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/teos/watcher.py b/teos/watcher.py index 366c7f8..66c3c5d 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -63,11 +63,11 @@ class LocatorCache: for _ in range(self.cache_size): # In some setups, like regtest, it could be the case that there are no enough previous blocks. # In those cases we pull as many as we can (up to cache_size). - if target_block_hash: - target_block = block_processor.get_block(target_block_hash) - if not target_block: - break - else: + if not target_block_hash: + break + + target_block = block_processor.get_block(target_block_hash) + if not target_block: break locator_txid_map = {compute_locator(txid): txid for txid in target_block.get("tx")} @@ -79,7 +79,7 @@ class LocatorCache: def fix_cache(self, last_known_block, block_processor): """ - Fixes an existing cache after a reorg has been detected by feeding the last ``cache_size`` blocks to it. + Fixes an existing cache after a reorg has been detected by feeding the most recent ``cache_size`` blocks to it. Args: last_known_block (:obj:`str`): the last known block hash after the reorg. @@ -94,6 +94,8 @@ class LocatorCache: for _ in range(tmp_cache.cache_size): target_block = block_processor.get_block(target_block_hash) if target_block: + # Compute the locator:txid par for every transaction in the block and update both the cache and + # the block mapping. locator_txid_map = {compute_locator(txid): txid for txid in target_block.get("tx")} tmp_cache.cache.update(locator_txid_map) tmp_cache.blocks[target_block_hash] = list(locator_txid_map.keys()) @@ -239,7 +241,7 @@ class Watcher: # Add the appointment to the Gatekeeper available_slots = self.gatekeeper.add_update_appointment(user_id, uuid, appointment) - # Appointments that were triggered in blocks hold in the cache + # Appointments that were triggered in blocks held in the cache if appointment.locator in self.locator_cache.cache: try: dispute_txid = self.locator_cache.cache[appointment.locator] @@ -315,7 +317,7 @@ class Watcher: block = self.block_processor.get_block(block_hash) logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash")) - # If a reorg is detected, the cache is fixed to cover the las `cache_size` blocks of the new chain + # If a reorg is detected, the cache is fixed to cover the last `cache_size` blocks of the new chain if self.last_known_block != block.get("previousblockhash"): self.locator_cache.fix_cache(block_hash, self.block_processor) diff --git a/test/teos/e2e/test_basic_e2e.py b/test/teos/e2e/test_basic_e2e.py index da6d5be..d2781bc 100644 --- a/test/teos/e2e/test_basic_e2e.py +++ b/test/teos/e2e/test_basic_e2e.py @@ -40,8 +40,8 @@ teosd_process = run_teosd() teos_id, user_sk, user_id = teos_cli.load_keys(cli_config.get("TEOS_PUBLIC_KEY"), cli_config.get("CLI_PRIVATE_KEY")) -appointment_in_watcher = 0 -appointment_in_responder = 0 +appointments_in_watcher = 0 +appointments_in_responder = 0 def broadcast_transaction_and_mine_block(bitcoin_cli, commitment_tx, addr): @@ -81,7 +81,7 @@ def test_commands_non_registered(bitcoin_cli): def test_commands_registered(bitcoin_cli): - global appointment_in_watcher + global appointments_in_watcher # Test registering and trying again teos_cli.register(user_id, teos_base_endpoint) @@ -98,11 +98,11 @@ def test_commands_registered(bitcoin_cli): r = get_appointment_info(appointment_data.get("locator")) assert r.get("locator") == appointment.locator assert r.get("appointment") == appointment.to_dict() - appointment_in_watcher += 1 + appointments_in_watcher += 1 def test_appointment_life_cycle(bitcoin_cli): - global appointment_in_watcher, appointment_in_responder + global appointments_in_watcher, appointments_in_responder # First of all we need to register response = teos_cli.register(user_id, teos_base_endpoint) @@ -114,7 +114,7 @@ def test_appointment_life_cycle(bitcoin_cli): appointment_data = build_appointment_data(commitment_tx_id, penalty_tx) locator = compute_locator(commitment_tx_id) appointment, signature = add_appointment(appointment_data) - appointment_in_watcher += 1 + appointments_in_watcher += 1 # Get the information from the tower to check that it matches appointment_info = get_appointment_info(locator) @@ -126,7 +126,7 @@ def test_appointment_life_cycle(bitcoin_cli): all_appointments = get_all_appointments() watching = all_appointments.get("watcher_appointments") responding = all_appointments.get("responder_trackers") - assert len(watching) == appointment_in_watcher and len(responding) == 0 + assert len(watching) == appointments_in_watcher and len(responding) == 0 # Trigger a breach and check again new_addr = bitcoin_cli.getnewaddress() @@ -134,13 +134,13 @@ def test_appointment_life_cycle(bitcoin_cli): appointment_info = get_appointment_info(locator) assert appointment_info.get("status") == "dispute_responded" assert appointment_info.get("locator") == locator - appointment_in_watcher -= 1 - appointment_in_responder += 1 + appointments_in_watcher -= 1 + appointments_in_responder += 1 all_appointments = get_all_appointments() watching = all_appointments.get("watcher_appointments") responding = all_appointments.get("responder_trackers") - assert len(watching) == appointment_in_watcher and len(responding) == appointment_in_responder + assert len(watching) == appointments_in_watcher and len(responding) == appointments_in_responder # It can be also checked by ensuring that the penalty transaction made it to the network penalty_tx_id = bitcoin_cli.decoderawtransaction(penalty_tx).get("txid") @@ -155,7 +155,7 @@ def test_appointment_life_cycle(bitcoin_cli): # Now let's mine some blocks so the appointment reaches its end. We need 100 + EXPIRY_DELTA -1 bitcoin_cli.generatetoaddress(100 + teos_config.get("EXPIRY_DELTA") - 1, new_addr) - appointment_in_responder -= 1 + appointments_in_responder -= 1 # The appointment is no longer in the tower with pytest.raises(TowerResponseError): @@ -166,12 +166,12 @@ def test_appointment_life_cycle(bitcoin_cli): response = teos_cli.register(user_id, teos_base_endpoint) assert ( response.get("available_slots") - == available_slots + teos_config.get("DEFAULT_SLOTS") + 1 - appointment_in_watcher - appointment_in_responder + == available_slots + teos_config.get("DEFAULT_SLOTS") + 1 - appointments_in_watcher - appointments_in_responder ) def test_multiple_appointments_life_cycle(bitcoin_cli): - global appointment_in_watcher, appointment_in_responder + global appointments_in_watcher, appointments_in_responder # Tests that get_all_appointments returns all the appointments the tower is storing at various stages in the # appointment lifecycle. appointments = [] @@ -196,7 +196,7 @@ def test_multiple_appointments_life_cycle(bitcoin_cli): # Send all of them to watchtower. for appt in appointments: add_appointment(appt.get("appointment_data")) - appointment_in_watcher += 1 + appointments_in_watcher += 1 # Two of these appointments are breached, and the watchtower responds to them. breached_appointments = [] @@ -205,15 +205,15 @@ def test_multiple_appointments_life_cycle(bitcoin_cli): broadcast_transaction_and_mine_block(bitcoin_cli, appointments[i]["commitment_tx"], new_addr) bitcoin_cli.generatetoaddress(1, new_addr) breached_appointments.append(appointments[i]["locator"]) - appointment_in_watcher -= 1 - appointment_in_responder += 1 + appointments_in_watcher -= 1 + appointments_in_responder += 1 sleep(1) # Test that they all show up in get_all_appointments at the correct stages. all_appointments = get_all_appointments() watching = all_appointments.get("watcher_appointments") responding = all_appointments.get("responder_trackers") - assert len(watching) == appointment_in_watcher and len(responding) == appointment_in_responder + assert len(watching) == appointments_in_watcher and len(responding) == appointments_in_responder responder_locators = [appointment["locator"] for uuid, appointment in responding.items()] assert set(responder_locators) == set(breached_appointments) @@ -409,7 +409,7 @@ def test_two_appointment_same_locator_different_penalty_different_users(bitcoin_ def test_add_appointment_trigger_on_cache(bitcoin_cli): - # This tests sending an appointment which trigger is in the cache + # This tests sending an appointment whose trigger is in the cache commitment_tx, penalty_tx = create_txs(bitcoin_cli) commitment_tx_id = bitcoin_cli.decoderawtransaction(commitment_tx).get("txid") appointment_data = build_appointment_data(commitment_tx_id, penalty_tx) From df326465a765ef3dcb538fd13d91a4b4574e6964 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Wed, 10 Jun 2020 12:30:11 +0200 Subject: [PATCH 18/21] watcher - Updates the LocatorCache so it is self handled With the current approach the cache deals with deletion and provides getters and setters so consumers do not directly access the internals --- teos/watcher.py | 71 +++++++++----- test/teos/unit/test_watcher.py | 163 ++++++++++++++++++++++++--------- 2 files changed, 166 insertions(+), 68 deletions(-) diff --git a/teos/watcher.py b/teos/watcher.py index 66c3c5d..00d3d81 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -77,9 +77,50 @@ class LocatorCache: self.blocks = OrderedDict(reversed((list(self.blocks.items())))) - def fix_cache(self, last_known_block, block_processor): + def get_txid(self, locator): """ - Fixes an existing cache after a reorg has been detected by feeding the most recent ``cache_size`` blocks to it. + Gets a txid from the locator cache. + + Args: + locator (:obj:`str`): the locator to lookup in the cache. + + Returns: + :obj:`str` or :obj:`None`: The txid linked to the given locator if found. None otherwise. + """ + return self.cache.get(locator) + + def update(self, block_hash, locator_txid_map): + """ + Updates the cache with data from a new block. Removes the oldest block if the cache is full after the addition. + + Args: + block_hash (:obj:`str`): the hash of the new block. + locator_txid_map (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of transaction + ids. + """ + + self.cache.update(locator_txid_map) + self.blocks[block_hash] = list(locator_txid_map.keys()) + logger.debug("Block added to cache", block_hash=block_hash) + + if self.is_full(): + self.remove_oldest_block() + + def is_full(self): + """ Returns whether the cache is full or not """ + return len(self.blocks) > self.cache_size + + def remove_oldest_block(self): + """ Removes the oldest block from the cache """ + block_hash, locators = self.blocks.popitem(last=False) + for locator in locators: + del self.cache[locator] + + logger.debug("Block removed from cache", block_hash=block_hash) + + def fix(self, last_known_block, block_processor): + """ + Fixes the cache after a reorg has been detected by feeding the most recent ``cache_size`` blocks to it. Args: last_known_block (:obj:`str`): the last known block hash after the reorg. @@ -104,18 +145,6 @@ class LocatorCache: self.blocks = OrderedDict(reversed((list(tmp_cache.blocks.items())))) self.cache = tmp_cache.cache - def is_full(self): - """ Returns whether the cache is full or not """ - return len(self.blocks) > self.cache_size - - def remove_older_block(self): - """ Removes the older block from the cache """ - block_hash, locators = self.blocks.popitem(last=False) - for locator in locators: - del self.cache[locator] - - logger.debug("Block removed from cache", block_hash=block_hash) - class Watcher: """ @@ -242,9 +271,9 @@ class Watcher: available_slots = self.gatekeeper.add_update_appointment(user_id, uuid, appointment) # Appointments that were triggered in blocks held in the cache - if appointment.locator in self.locator_cache.cache: + dispute_txid = self.locator_cache.get_txid(appointment.locator) + if dispute_txid: try: - dispute_txid = self.locator_cache.cache[appointment.locator] penalty_txid, penalty_rawtx = self.check_breach(uuid, appointment, dispute_txid) receipt = self.responder.handle_breach( uuid, appointment.locator, dispute_txid, penalty_txid, penalty_rawtx, user_id, self.last_known_block @@ -319,14 +348,12 @@ class Watcher: # If a reorg is detected, the cache is fixed to cover the last `cache_size` blocks of the new chain if self.last_known_block != block.get("previousblockhash"): - self.locator_cache.fix_cache(block_hash, self.block_processor) + self.locator_cache.fix(block_hash, self.block_processor) txids = block.get("tx") # Compute the locator for every transaction in the block and add them to the cache locator_txid_map = {compute_locator(txid): txid for txid in txids} - self.locator_cache.cache.update(locator_txid_map) - self.locator_cache.blocks[block_hash] = list(locator_txid_map.keys()) - logger.debug("Block added to cache", block_hash=block_hash) + self.locator_cache.update(block_hash, locator_txid_map) if len(self.appointments) > 0 and locator_txid_map: expired_appointments = self.gatekeeper.get_expired_appointments(block["height"]) @@ -392,10 +419,6 @@ class Watcher: if len(self.appointments) != 0: logger.info("No more pending appointments") - # Remove a block from the cache if the cache has reached its maximum size - if self.locator_cache.is_full(): - self.locator_cache.remove_older_block() - # Register the last processed block for the Watcher self.db_manager.store_last_block_hash_watcher(block_hash) self.last_known_block = block.get("hash") diff --git a/test/teos/unit/test_watcher.py b/test/teos/unit/test_watcher.py index 1c54cc9..d6ef90b 100644 --- a/test/teos/unit/test_watcher.py +++ b/test/teos/unit/test_watcher.py @@ -137,6 +137,122 @@ def test_locator_cache_init(block_processor): assert block_processor.get_block(k) +def test_get_txid(): + # Not much to test here, this is shadowing dict.get + locator = get_random_value_hex(16) + txid = get_random_value_hex(32) + + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + locator_cache.cache[locator] = txid + + assert locator_cache.get_txid(locator) == txid + + # A random locator should fail + assert locator_cache.get_txid(get_random_value_hex(16)) is None + + +def test_update_cache(): + # Update should add data about a new block in the cache. If the cache is full, the oldest block is dropped. + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + + block_hash = get_random_value_hex(32) + txs = [get_random_value_hex(32) for _ in range(10)] + locator_txid_map = {compute_locator(txid): txid for txid in txs} + + # Cache is empty + assert block_hash not in locator_cache.blocks + for locator in locator_txid_map.keys(): + assert locator not in locator_cache.cache + + # The data has been added to the cache + locator_cache.update(block_hash, locator_txid_map) + assert block_hash in locator_cache.blocks + for locator in locator_txid_map.keys(): + assert locator in locator_cache.cache + + +def test_update_cache_full(): + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + block_hashes = [] + big_map = {} + + for i in range(locator_cache.cache_size): + block_hash = get_random_value_hex(32) + txs = [get_random_value_hex(32) for _ in range(10)] + locator_txid_map = {compute_locator(txid): txid for txid in txs} + locator_cache.update(block_hash, locator_txid_map) + + if i == 0: + first_block_hash = block_hash + first_locator_txid_map = locator_txid_map + else: + block_hashes.append(block_hash) + big_map.update(locator_txid_map) + + # The cache is now full. + assert first_block_hash in locator_cache.blocks + for locator in first_locator_txid_map.keys(): + assert locator in locator_cache.cache + + # Add one more + block_hash = get_random_value_hex(32) + txs = [get_random_value_hex(32) for _ in range(10)] + locator_txid_map = {compute_locator(txid): txid for txid in txs} + locator_cache.update(block_hash, locator_txid_map) + + # The first block is not there anymore, but the rest are there + assert first_block_hash not in locator_cache.blocks + for locator in first_locator_txid_map.keys(): + assert locator not in locator_cache.cache + + for block_hash in block_hashes: + assert block_hash in locator_cache.blocks + + for locator in big_map.keys(): + assert locator in locator_cache.cache + + +def test_locator_cache_is_full(block_processor): + # Empty cache + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + + for _ in range(locator_cache.cache_size): + locator_cache.blocks[uuid4().hex] = 0 + assert not locator_cache.is_full() + + locator_cache.blocks[uuid4().hex] = 0 + assert locator_cache.is_full() + + +def test_locator_remove_oldest_block(block_processor): + # Empty cache + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + + # Add some blocks to the cache + for _ in range(locator_cache.cache_size): + txid = get_random_value_hex(32) + locator = txid[:16] + locator_cache.blocks[get_random_value_hex(32)] = {locator: txid} + locator_cache.cache[locator] = txid + + blocks_in_cache = locator_cache.blocks + oldest_block_hash = list(blocks_in_cache.keys())[0] + oldest_block_data = blocks_in_cache.get(oldest_block_hash) + rest_of_blocks = list(blocks_in_cache.keys())[1:] + locator_cache.remove_oldest_block() + + # Oldest block data is not in the cache + assert oldest_block_hash not in locator_cache.blocks + for locator in oldest_block_data: + assert locator not in locator_cache.cache + + # The rest of data is in the cache + assert set(rest_of_blocks).issubset(locator_cache.blocks) + for block_hash in rest_of_blocks: + for locator in locator_cache.blocks[block_hash]: + assert locator in locator_cache.cache + + def test_fix_cache(block_processor): # This tests how a reorg will create a new version of the cache # Let's start setting a full cache. We'll mine ``cache_size`` bocks to be sure it's full @@ -152,7 +268,7 @@ def test_fix_cache(block_processor): current_tip_parent = block_processor.get_block(current_tip).get("previousblockhash") current_tip_parent_locators = locator_cache.blocks[current_tip_parent] fake_tip = block_processor.get_block(current_tip_parent).get("previousblockhash") - locator_cache.fix_cache(fake_tip, block_processor) + locator_cache.fix(fake_tip, block_processor) # The last two blocks are not in the cache nor are the any of its locators assert current_tip not in locator_cache.blocks and current_tip_parent not in locator_cache.blocks @@ -168,7 +284,7 @@ def test_fix_cache(block_processor): new_cache = deepcopy(locator_cache) generate_blocks_w_delay((config.get("LOCATOR_CACHE_SIZE") * 2)) - new_cache.fix_cache(block_processor.get_best_block_hash(), block_processor) + new_cache.fix(block_processor.get_best_block_hash(), block_processor) # None of the data from the old cache is in the new cache for block_hash, locators in locator_cache.blocks.items(): @@ -185,47 +301,6 @@ def test_fix_cache(block_processor): assert locator in new_cache.cache -def test_locator_cache_is_full(block_processor): - # Empty cache - locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) - - for _ in range(locator_cache.cache_size): - locator_cache.blocks[uuid4().hex] = 0 - assert not locator_cache.is_full() - - locator_cache.blocks[uuid4().hex] = 0 - assert locator_cache.is_full() - - -def test_locator_remove_older_block(block_processor): - # Empty cache - locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) - - # Add some blocks to the cache - for _ in range(locator_cache.cache_size): - txid = get_random_value_hex(32) - locator = txid[:16] - locator_cache.blocks[get_random_value_hex(32)] = {locator: txid} - locator_cache.cache[locator] = txid - - blocks_in_cache = locator_cache.blocks - oldest_block_hash = list(blocks_in_cache.keys())[0] - oldest_block_data = blocks_in_cache.get(oldest_block_hash) - rest_of_blocks = list(blocks_in_cache.keys())[1:] - locator_cache.remove_older_block() - - # Oldest block data is not in the cache - assert oldest_block_hash not in locator_cache.blocks - for locator in oldest_block_data: - assert locator not in locator_cache.cache - - # The rest of data is in the cache - assert set(rest_of_blocks).issubset(locator_cache.blocks) - for block_hash in rest_of_blocks: - for locator in locator_cache.blocks[block_hash]: - assert locator in locator_cache.cache - - def test_watcher_init(watcher): assert isinstance(watcher.appointments, dict) and len(watcher.appointments) == 0 assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0 @@ -494,7 +569,7 @@ def test_do_watch_cache_update(watcher): assert oldest_block_hash not in watcher.locator_cache.blocks assert set(rest_of_blocks).issubset(watcher.locator_cache.blocks.keys()) - # The locators of the older block are gone but the rest remain + # The locators of the oldest block are gone but the rest remain for locator in oldest_block_data: assert locator not in watcher.locator_cache.cache for block_hash in rest_of_blocks: From 4c66d6c2cd13eda99c18e53f55cb12b117453c34 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 12 Jun 2020 12:10:45 +0200 Subject: [PATCH 19/21] cli - Adds start_block log after an accepted appointment --- cli/teos_cli.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cli/teos_cli.py b/cli/teos_cli.py index 5c8b56b..aee10c3 100644 --- a/cli/teos_cli.py +++ b/cli/teos_cli.py @@ -113,19 +113,20 @@ def add_appointment(appointment_data, user_sk, teos_id, teos_url): add_appointment_endpoint = "{}/add_appointment".format(teos_url) response = process_post_response(post_request(data, add_appointment_endpoint)) - signature = response.get("signature") + tower_signature = response.get("signature") # Check that the server signed the appointment as it should. - if not signature: + if not tower_signature: raise TowerResponseError("The response does not contain the signature of the appointment") - rpk = Cryptographer.recover_pk(appointment.serialize(), signature) + rpk = Cryptographer.recover_pk(appointment.serialize(), tower_signature) if teos_id != Cryptographer.get_compressed_pk(rpk): raise TowerResponseError("The returned appointment's signature is invalid") logger.info("Appointment accepted and signed by the Eye of Satoshi") logger.info("Remaining slots: {}".format(response.get("available_slots"))) + logger.info("Start block: {}".format(response.get("start_block"))) - return appointment, signature + return appointment, tower_signature def get_appointment(locator, user_sk, teos_id, teos_url): From 5748c73da3a2679817893e70029ecfe06968905c Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 12 Jun 2020 12:10:57 +0200 Subject: [PATCH 20/21] plugin - Adds start_block log after an accepted appointment --- watchtower-plugin/net/http.py | 1 + 1 file changed, 1 insertion(+) diff --git a/watchtower-plugin/net/http.py b/watchtower-plugin/net/http.py index ea31833..aff661c 100644 --- a/watchtower-plugin/net/http.py +++ b/watchtower-plugin/net/http.py @@ -18,6 +18,7 @@ def add_appointment(plugin, tower_id, tower, appointment_dict, signature): response = send_appointment(tower_id, tower, appointment_dict, signature) plugin.log(f"Appointment accepted and signed by {tower_id}") plugin.log(f"Remaining slots: {response.get('available_slots')}") + plugin.log(f"Start block: {response.get('start_block')}") # # TODO: Not storing the whole appointments for now. The node can recreate all the data if needed. # # DISCUSS: It may be worth checking that the available slots match instead of blindly trusting. From a49e587d7c8288e55713d16f17a6576489b9a2df Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 12 Jun 2020 12:48:28 +0200 Subject: [PATCH 21/21] watcher - adds RWLocks for the LocatorCache Threads should aboit reading the cache when it is being updated/fixed. The latter is specially relevant since during a reorg most of the cache may change. --- requirements.txt | 3 ++- teos/watcher.py | 32 +++++++++++++++++++++----------- test/teos/unit/test_watcher.py | 16 ++++++++-------- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/requirements.txt b/requirements.txt index 5ea83ae..20406f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ cryptography==2.8 coincurve pyzbase32 requests -plyvel \ No newline at end of file +plyvel +readerwriterlock \ No newline at end of file diff --git a/teos/watcher.py b/teos/watcher.py index 00d3d81..644bc79 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -1,6 +1,7 @@ from queue import Queue from threading import Thread from collections import OrderedDict +from readerwriterlock import rwlock from common.logger import Logger from common.tools import compute_locator @@ -47,6 +48,7 @@ class LocatorCache: self.cache = dict() self.blocks = OrderedDict() self.cache_size = blocks_in_cache + self.rw_lock = rwlock.RWLockWrite() def init(self, last_known_block, block_processor): """ @@ -87,7 +89,10 @@ class LocatorCache: Returns: :obj:`str` or :obj:`None`: The txid linked to the given locator if found. None otherwise. """ - return self.cache.get(locator) + + with self.rw_lock.gen_rlock(): + locator = self.cache.get(locator) + return locator def update(self, block_hash, locator_txid_map): """ @@ -99,22 +104,26 @@ class LocatorCache: ids. """ - self.cache.update(locator_txid_map) - self.blocks[block_hash] = list(locator_txid_map.keys()) - logger.debug("Block added to cache", block_hash=block_hash) + with self.rw_lock.gen_wlock(): + self.cache.update(locator_txid_map) + self.blocks[block_hash] = list(locator_txid_map.keys()) + logger.debug("Block added to cache", block_hash=block_hash) if self.is_full(): self.remove_oldest_block() def is_full(self): """ Returns whether the cache is full or not """ - return len(self.blocks) > self.cache_size + with self.rw_lock.gen_rlock(): + full = len(self.blocks) > self.cache_size + return full def remove_oldest_block(self): """ Removes the oldest block from the cache """ - block_hash, locators = self.blocks.popitem(last=False) - for locator in locators: - del self.cache[locator] + with self.rw_lock.gen_wlock(): + block_hash, locators = self.blocks.popitem(last=False) + for locator in locators: + del self.cache[locator] logger.debug("Block removed from cache", block_hash=block_hash) @@ -135,15 +144,16 @@ class LocatorCache: for _ in range(tmp_cache.cache_size): target_block = block_processor.get_block(target_block_hash) if target_block: - # Compute the locator:txid par for every transaction in the block and update both the cache and + # Compute the locator:txid pair for every transaction in the block and update both the cache and # the block mapping. locator_txid_map = {compute_locator(txid): txid for txid in target_block.get("tx")} tmp_cache.cache.update(locator_txid_map) tmp_cache.blocks[target_block_hash] = list(locator_txid_map.keys()) target_block_hash = target_block.get("previousblockhash") - self.blocks = OrderedDict(reversed((list(tmp_cache.blocks.items())))) - self.cache = tmp_cache.cache + with self.rw_lock.gen_wlock(): + self.blocks = OrderedDict(reversed((list(tmp_cache.blocks.items())))) + self.cache = tmp_cache.cache class Watcher: diff --git a/test/teos/unit/test_watcher.py b/test/teos/unit/test_watcher.py index d6ef90b..79d7a93 100644 --- a/test/teos/unit/test_watcher.py +++ b/test/teos/unit/test_watcher.py @@ -281,24 +281,24 @@ def test_fix_cache(block_processor): # Test the same for a full cache reorg. We can simulate this by adding more blocks than the cache can fit and # trigger a fix. We'll use a new cache to compare with the old - new_cache = deepcopy(locator_cache) + old_cache_blocks = deepcopy(locator_cache.blocks) generate_blocks_w_delay((config.get("LOCATOR_CACHE_SIZE") * 2)) - new_cache.fix(block_processor.get_best_block_hash(), block_processor) + locator_cache.fix(block_processor.get_best_block_hash(), block_processor) # None of the data from the old cache is in the new cache - for block_hash, locators in locator_cache.blocks.items(): - assert block_hash not in new_cache.blocks + for block_hash, locators in old_cache_blocks.items(): + assert block_hash not in locator_cache.blocks for locator in locators: - assert locator not in new_cache.cache + assert locator not in locator_cache.cache # The data in the new cache corresponds to the last ``cache_size`` blocks. block_count = block_processor.get_block_count() for i in range(block_count, block_count - locator_cache.cache_size, -1): block_hash = bitcoin_cli(bitcoind_connect_params).getblockhash(i - 1) - assert block_hash in new_cache.blocks - for locator in new_cache.blocks[block_hash]: - assert locator in new_cache.cache + assert block_hash in locator_cache.blocks + for locator in locator_cache.blocks[block_hash]: + assert locator in locator_cache.cache def test_watcher_init(watcher):