diff --git a/cli/teos_cli.py b/cli/teos_cli.py index 5c8b56b..aee10c3 100644 --- a/cli/teos_cli.py +++ b/cli/teos_cli.py @@ -113,19 +113,20 @@ def add_appointment(appointment_data, user_sk, teos_id, teos_url): add_appointment_endpoint = "{}/add_appointment".format(teos_url) response = process_post_response(post_request(data, add_appointment_endpoint)) - signature = response.get("signature") + tower_signature = response.get("signature") # Check that the server signed the appointment as it should. - if not signature: + if not tower_signature: raise TowerResponseError("The response does not contain the signature of the appointment") - rpk = Cryptographer.recover_pk(appointment.serialize(), signature) + rpk = Cryptographer.recover_pk(appointment.serialize(), tower_signature) if teos_id != Cryptographer.get_compressed_pk(rpk): raise TowerResponseError("The returned appointment's signature is invalid") logger.info("Appointment accepted and signed by the Eye of Satoshi") logger.info("Remaining slots: {}".format(response.get("available_slots"))) + logger.info("Start block: {}".format(response.get("start_block"))) - return appointment, signature + return appointment, tower_signature def get_appointment(locator, user_sk, teos_id, teos_url): diff --git a/common/errors.py b/common/errors.py index 786a9d7..c115a31 100644 --- a/common/errors.py +++ b/common/errors.py @@ -7,6 +7,7 @@ APPOINTMENT_FIELD_TOO_SMALL = -5 APPOINTMENT_FIELD_TOO_BIG = -6 APPOINTMENT_WRONG_FIELD = -7 APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS = -8 +APPOINTMENT_ALREADY_TRIGGERED = -9 # Registration errors [-33, -64] REGISTRATION_MISSING_FIELD = -33 diff --git a/requirements.txt b/requirements.txt index 5ea83ae..20406f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ cryptography==2.8 coincurve pyzbase32 requests -plyvel \ No newline at end of file +plyvel +readerwriterlock \ No newline at end of file diff --git a/teos/__init__.py b/teos/__init__.py index 1a29ddb..b65b7e7 100644 --- a/teos/__init__.py +++ b/teos/__init__.py @@ -21,6 +21,7 @@ DEFAULT_CONF = { "DEFAULT_SUBSCRIPTION_DURATION": {"value": 4320, "type": int}, "EXPIRY_DELTA": {"value": 6, "type": int}, "MIN_TO_SELF_DELAY": {"value": 20, "type": int}, + "LOCATOR_CACHE_SIZE": {"value": 6, "type": int}, "LOG_FILE": {"value": "teos.log", "type": str, "path": True}, "TEOS_SECRET_KEY": {"value": "teos_sk.der", "type": str, "path": True}, "APPOINTMENTS_DB_PATH": {"value": "appointments", "type": str, "path": True}, diff --git a/teos/api.py b/teos/api.py index 0bcbddb..b1c90ec 100644 --- a/teos/api.py +++ b/teos/api.py @@ -5,7 +5,7 @@ from flask import Flask, request, abort, jsonify from teos import LOG_PREFIX import common.errors as errors from teos.inspector import InspectionFailed -from teos.watcher import AppointmentLimitReached +from teos.watcher import AppointmentLimitReached, AppointmentAlreadyTriggered from teos.gatekeeper import NotEnoughSlots, AuthenticationFailure from common.logger import Logger @@ -192,6 +192,13 @@ class API: rcode = HTTP_SERVICE_UNAVAILABLE response = {"error": "appointment rejected"} + except AppointmentAlreadyTriggered: + rcode = HTTP_BAD_REQUEST + response = { + "error": "appointment rejected. The provided appointment has already been triggered", + "error_code": errors.APPOINTMENT_ALREADY_TRIGGERED, + } + logger.info("Sending response and disconnecting", from_addr="{}".format(remote_addr), response=response) return jsonify(response), rcode diff --git a/teos/block_processor.py b/teos/block_processor.py index 136e9f7..73b8370 100644 --- a/teos/block_processor.py +++ b/teos/block_processor.py @@ -1,4 +1,5 @@ from common.logger import Logger +from common.exceptions import BasicException from teos import LOG_PREFIX from teos.tools import bitcoin_cli @@ -7,6 +8,10 @@ from teos.utils.auth_proxy import JSONRPCException logger = Logger(actor="BlockProcessor", log_name_prefix=LOG_PREFIX) +class InvalidTransactionFormat(BasicException): + """Raised when a transaction is not properly formatted""" + + class BlockProcessor: """ The :class:`BlockProcessor` contains methods related to the blockchain. Most of its methods require communication @@ -89,17 +94,19 @@ class BlockProcessor: raw_tx (:obj:`str`): the hex representation of the transaction. Returns: - :obj:`dict` or :obj:`None`: The decoding of the given ``raw_tx`` if the transaction is well formatted. + :obj:`dict`: The decoding of the given ``raw_tx`` if the transaction is well formatted. - Returns ``None`` otherwise. + Raises: + :obj:`InvalidTransactionFormat`: If the provided ``raw_tx`` has invalid format. """ try: tx = bitcoin_cli(self.btc_connect_params).decoderawtransaction(raw_tx) except JSONRPCException as e: - tx = None - logger.error("Cannot build transaction from decoded data", error=e.error) + msg = "Cannot build transaction from decoded data" + logger.error(msg, error=e.error) + raise InvalidTransactionFormat(msg) return tx diff --git a/teos/teosd.py b/teos/teosd.py index 8e1a694..f1fa1df 100644 --- a/teos/teosd.py +++ b/teos/teosd.py @@ -84,7 +84,13 @@ def main(command_line_conf): db_manager = AppointmentsDBM(config.get("APPOINTMENTS_DB_PATH")) responder = Responder(db_manager, gatekeeper, carrier, block_processor) watcher = Watcher( - db_manager, gatekeeper, block_processor, responder, secret_key_der, config.get("MAX_APPOINTMENTS") + db_manager, + gatekeeper, + block_processor, + responder, + secret_key_der, + config.get("MAX_APPOINTMENTS"), + config.get("LOCATOR_CACHE_SIZE"), ) # Create the chain monitor and start monitoring the chain diff --git a/teos/watcher.py b/teos/watcher.py index ff5ff40..644bc79 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -1,5 +1,7 @@ from queue import Queue from threading import Thread +from collections import OrderedDict +from readerwriterlock import rwlock from common.logger import Logger from common.tools import compute_locator @@ -11,6 +13,7 @@ from common.exceptions import InvalidParameter, SignatureError from teos import LOG_PREFIX from teos.cleaner import Cleaner from teos.extended_appointment import ExtendedAppointment +from teos.block_processor import InvalidTransactionFormat logger = Logger(actor="Watcher", log_name_prefix=LOG_PREFIX) @@ -19,6 +22,140 @@ class AppointmentLimitReached(BasicException): """Raised when the tower maximum appointment count has been reached""" +class AppointmentAlreadyTriggered(BasicException): + """Raised when an appointment is sent to the Watcher but that same data has already been sent to the Responder""" + + +class LocatorCache: + """ + The LocatorCache keeps the data about the last ``cache_size`` blocks around so appointments can be checked against + it. The data is indexed by locator and it's mainly built during the normal ``Watcher`` operation so no extra steps + are normally needed. + + Args: + blocks_in_cache (:obj:`int`): the numbers of blocks to keep in the cache. + + Attributes: + cache (:obj:`dict`): a dictionary of ``locator:dispute_txid`` pairs that received appointments are checked + against. + blocks (:obj:`OrderedDict`): An ordered dictionary of the last ``blocks_in_cache`` blocks (block_hash:locators). + Used to keep track of what data belongs to what block, so data can be pruned accordingly. Also needed to + rebuild the cache in case of reorgs. + cache_size (:obj:`int`): the size of the cache in blocks. + """ + + def __init__(self, blocks_in_cache): + self.cache = dict() + self.blocks = OrderedDict() + self.cache_size = blocks_in_cache + self.rw_lock = rwlock.RWLockWrite() + + def init(self, last_known_block, block_processor): + """ + Sets the initial state of the locator cache. + + Args: + last_known_block (:obj:`str`): the last known block by the ``Watcher``. + block_processor (:obj:`teos.block_processor.BlockProcessor`): a ``BlockProcessor`` instance. + """ + + # This is needed as a separate method from __init__ since it has to be initialized right before start watching. + # Not doing so implies store temporary variables in the Watcher and initialising the cache as None. + target_block_hash = last_known_block + for _ in range(self.cache_size): + # In some setups, like regtest, it could be the case that there are no enough previous blocks. + # In those cases we pull as many as we can (up to cache_size). + if not target_block_hash: + break + + target_block = block_processor.get_block(target_block_hash) + if not target_block: + break + + locator_txid_map = {compute_locator(txid): txid for txid in target_block.get("tx")} + self.cache.update(locator_txid_map) + self.blocks[target_block_hash] = list(locator_txid_map.keys()) + target_block_hash = target_block.get("previousblockhash") + + self.blocks = OrderedDict(reversed((list(self.blocks.items())))) + + def get_txid(self, locator): + """ + Gets a txid from the locator cache. + + Args: + locator (:obj:`str`): the locator to lookup in the cache. + + Returns: + :obj:`str` or :obj:`None`: The txid linked to the given locator if found. None otherwise. + """ + + with self.rw_lock.gen_rlock(): + locator = self.cache.get(locator) + return locator + + def update(self, block_hash, locator_txid_map): + """ + Updates the cache with data from a new block. Removes the oldest block if the cache is full after the addition. + + Args: + block_hash (:obj:`str`): the hash of the new block. + locator_txid_map (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of transaction + ids. + """ + + with self.rw_lock.gen_wlock(): + self.cache.update(locator_txid_map) + self.blocks[block_hash] = list(locator_txid_map.keys()) + logger.debug("Block added to cache", block_hash=block_hash) + + if self.is_full(): + self.remove_oldest_block() + + def is_full(self): + """ Returns whether the cache is full or not """ + with self.rw_lock.gen_rlock(): + full = len(self.blocks) > self.cache_size + return full + + def remove_oldest_block(self): + """ Removes the oldest block from the cache """ + with self.rw_lock.gen_wlock(): + block_hash, locators = self.blocks.popitem(last=False) + for locator in locators: + del self.cache[locator] + + logger.debug("Block removed from cache", block_hash=block_hash) + + def fix(self, last_known_block, block_processor): + """ + Fixes the cache after a reorg has been detected by feeding the most recent ``cache_size`` blocks to it. + + Args: + last_known_block (:obj:`str`): the last known block hash after the reorg. + block_processor (:obj:`teos.block_processor.BlockProcessor`): a ``BlockProcessor`` instance. + """ + + tmp_cache = LocatorCache(self.cache_size) + + # We assume there are no reorgs back to genesis. If so, this would raise some log warnings. And the cache will + # be filled with less than cache_size blocks. + target_block_hash = last_known_block + for _ in range(tmp_cache.cache_size): + target_block = block_processor.get_block(target_block_hash) + if target_block: + # Compute the locator:txid pair for every transaction in the block and update both the cache and + # the block mapping. + locator_txid_map = {compute_locator(txid): txid for txid in target_block.get("tx")} + tmp_cache.cache.update(locator_txid_map) + tmp_cache.blocks[target_block_hash] = list(locator_txid_map.keys()) + target_block_hash = target_block.get("previousblockhash") + + with self.rw_lock.gen_wlock(): + self.blocks = OrderedDict(reversed((list(tmp_cache.blocks.items())))) + self.cache = tmp_cache.cache + + class Watcher: """ The :class:`Watcher` is in charge of watching for channel breaches for the appointments accepted by the tower. @@ -41,6 +178,8 @@ class Watcher: responder (:obj:`Responder `): a ``Responder`` instance. sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance). max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time. + blocks_in_cache (:obj:`int`): the number of blocks to keep in cache so recently triggered appointments can be + covered. Attributes: appointments (:obj:`dict`): a dictionary containing a summary of the appointments (:obj:`ExtendedAppointment @@ -60,13 +199,14 @@ class Watcher: signing_key (:mod:`PrivateKey`): a private key used to sign accepted appointments. max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time. last_known_block (:obj:`str`): the last block known by the ``Watcher``. + locator_cache (:obj:`LocatorCache`): a cache of locators for the last ``blocks_in_cache`` blocks. Raises: :obj:`InvalidKey `: if teos sk cannot be loaded. """ - def __init__(self, db_manager, gatekeeper, block_processor, responder, sk_der, max_appointments): + def __init__(self, db_manager, gatekeeper, block_processor, responder, sk_der, max_appointments, blocks_in_cache): self.appointments = dict() self.locator_uuid_map = dict() self.block_queue = Queue() @@ -77,6 +217,7 @@ class Watcher: self.max_appointments = max_appointments self.signing_key = Cryptographer.load_private_key_der(sk_der) self.last_known_block = db_manager.load_last_block_hash_watcher() + self.locator_cache = LocatorCache(blocks_in_cache) def awake(self): """Starts a new thread to monitor the blockchain for channel breaches""" @@ -126,24 +267,55 @@ class Watcher: # The user_id needs to be added to the ExtendedAppointment once the former has been authenticated appointment.user_id = user_id - # The uuids are generated as the RIPMED160(locator||user_pubkey). + # The uuids are generated as the RIPEMD160(locator||user_pubkey). # If an appointment is requested by the user the uuid can be recomputed and queried straightaway (no maps). uuid = hash_160("{}{}".format(appointment.locator, user_id)) + # If this is a copy of an appointment we've already reacted to, the new appointment is rejected. + if uuid in self.responder.trackers: + message = "Appointment already in Responder" + logger.info(message) + raise AppointmentAlreadyTriggered(message) + # Add the appointment to the Gatekeeper available_slots = self.gatekeeper.add_update_appointment(user_id, uuid, appointment) - self.appointments[uuid] = appointment.get_summary() - if appointment.locator in self.locator_uuid_map: - # If the uuid is already in the map it means this is an update. - if uuid not in self.locator_uuid_map[appointment.locator]: - self.locator_uuid_map[appointment.locator].append(uuid) + # Appointments that were triggered in blocks held in the cache + dispute_txid = self.locator_cache.get_txid(appointment.locator) + if dispute_txid: + try: + penalty_txid, penalty_rawtx = self.check_breach(uuid, appointment, dispute_txid) + receipt = self.responder.handle_breach( + uuid, appointment.locator, dispute_txid, penalty_txid, penalty_rawtx, user_id, self.last_known_block + ) + + # At this point the appointment is accepted but data is only kept if it goes through the Responder. + # Otherwise it is dropped. + if receipt.delivered: + self.db_manager.store_watcher_appointment(uuid, appointment.to_dict()) + self.db_manager.create_append_locator_map(appointment.locator, uuid) + self.db_manager.create_triggered_appointment_flag(uuid) + + except (EncryptionError, InvalidTransactionFormat): + # If data inside the encrypted blob is invalid, the appointment is accepted but the data is dropped. + # (same as with data that bounces in the Responder). This reduces the appointment slot count so it + # could be used to discourage user misbehaviour. + pass + + # Regular appointments that have not been triggered (or, at least, not recently) else: - # Otherwise two users have sent an appointment with the same locator, so we need to store both. - self.locator_uuid_map[appointment.locator] = [uuid] + self.appointments[uuid] = appointment.get_summary() - self.db_manager.store_watcher_appointment(uuid, appointment.to_dict()) - self.db_manager.create_append_locator_map(appointment.locator, uuid) + if appointment.locator in self.locator_uuid_map: + # If the uuid is already in the map it means this is an update. + if uuid not in self.locator_uuid_map[appointment.locator]: + self.locator_uuid_map[appointment.locator].append(uuid) + else: + # Otherwise two users have sent an appointment with the same locator, so we need to store both. + self.locator_uuid_map[appointment.locator] = [uuid] + + self.db_manager.store_watcher_appointment(uuid, appointment.to_dict()) + self.db_manager.create_append_locator_map(appointment.locator, uuid) try: signature = Cryptographer.sign(appointment.serialize(), self.signing_key) @@ -157,6 +329,7 @@ class Watcher: return { "locator": appointment.locator, + "start_block": self.last_known_block, "signature": signature, "available_slots": available_slots, "subscription_expiry": self.gatekeeper.registered_users[user_id].subscription_expiry, @@ -175,14 +348,24 @@ class Watcher: self.last_known_block = self.block_processor.get_best_block_hash() self.db_manager.store_last_block_hash_watcher(self.last_known_block) + # Initialise the locator cache with the last ``cache_size`` blocks. + self.locator_cache.init(self.last_known_block, self.block_processor) + while True: block_hash = self.block_queue.get() block = self.block_processor.get_block(block_hash) logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash")) - if len(self.appointments) > 0 and block is not None: - txids = block.get("tx") + # If a reorg is detected, the cache is fixed to cover the last `cache_size` blocks of the new chain + if self.last_known_block != block.get("previousblockhash"): + self.locator_cache.fix(block_hash, self.block_processor) + txids = block.get("tx") + # Compute the locator for every transaction in the block and add them to the cache + locator_txid_map = {compute_locator(txid): txid for txid in txids} + self.locator_cache.update(block_hash, locator_txid_map) + + if len(self.appointments) > 0 and locator_txid_map: expired_appointments = self.gatekeeper.get_expired_appointments(block["height"]) # Make sure we only try to delete what is on the Watcher (some appointments may have been triggered) expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys())) @@ -196,7 +379,7 @@ class Watcher: expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager ) - valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(txids)) + valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(locator_txid_map)) triggered_flags = [] appointments_to_delete = [] @@ -246,28 +429,27 @@ class Watcher: if len(self.appointments) != 0: logger.info("No more pending appointments") - # Register the last processed block for the watcher + # Register the last processed block for the Watcher self.db_manager.store_last_block_hash_watcher(block_hash) self.last_known_block = block.get("hash") self.block_queue.task_done() - def get_breaches(self, txids): + def get_breaches(self, locator_txid_map): """ - Gets a list of channel breaches given the list of transaction ids. + Gets a dictionary of channel breaches given a map of locator:dispute_txid. Args: - txids (:obj:`list`): the list of transaction ids included in the last received block. + locator_txid_map (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of + transaction ids. Returns: :obj:`dict`: A dictionary (``locator:txid``) with all the breaches found. An empty dictionary if none are found. """ - potential_locators = {compute_locator(txid): txid for txid in txids} - # Check is any of the tx_ids in the received block is an actual match - intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys()) - breaches = {locator: potential_locators[locator] for locator in intersection} + intersection = set(self.locator_uuid_map.keys()).intersection(locator_txid_map.keys()) + breaches = {locator: locator_txid_map[locator] for locator in intersection} if len(breaches) > 0: logger.info("List of breaches", breaches=breaches) @@ -277,21 +459,59 @@ class Watcher: return breaches + def check_breach(self, uuid, appointment, dispute_txid): + """ + Checks if a breach is valid. Valid breaches should decrypt to a valid transaction. + + Args: + uuid (:obj:`str`): the uuid of the appointment that was triggered by the breach. + appointment (:obj:`teos.extended_appointment.ExtendedAppointment`): the appointment data. + dispute_txid (:obj:`str`): the id of the transaction that triggered the breach. + + Returns: + :obj:`tuple`: A tuple containing the penalty txid and the raw penalty tx. + + Raises: + :obj:`EncryptionError`: If the encrypted blob from the provided appointment cannot be decrypted with the + key derived from the breach transaction id. + :obj:`InvalidTransactionFormat`: If the decrypted data does not have a valid transaction format. + """ + + try: + penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid) + penalty_tx = self.block_processor.decode_raw_transaction(penalty_rawtx) + + except EncryptionError as e: + logger.info("Transaction cannot be decrypted", uuid=uuid) + raise e + + except InvalidTransactionFormat as e: + logger.info("The breach contained an invalid transaction", uuid=uuid) + raise e + + logger.info( + "Breach found for locator", locator=appointment.locator, uuid=uuid, penalty_txid=penalty_tx.get("txid") + ) + + return penalty_tx.get("txid"), penalty_rawtx + def filter_breaches(self, breaches): """ Filters the valid from the invalid channel breaches. - The :obj:`Watcher` cannot if a given ``encrypted_blob`` contains a valid transaction until a breach if seen. + The :obj:`Watcher` cannot know if an ``encrypted_blob`` contains a valid transaction until a breach is seen. Blobs that contain arbitrary data are dropped and not sent to the :obj:`Responder `. Args: breaches (:obj:`dict`): a dictionary containing channel breaches (``locator:txid``). Returns: - :obj:`dict`: A dictionary containing all the breaches flagged either as valid or invalid. - The structure is as follows: + :obj:`tuple`: A dictionary and a list. The former contains the valid breaches, while the latter contain the + invalid ones. - ``{locator, dispute_txid, penalty_txid, penalty_rawtx, valid_breach}`` + The valid breaches dictionary has the following structure: + + ``{locator, dispute_txid, penalty_txid, penalty_rawtx}`` """ valid_breaches = {} @@ -305,31 +525,26 @@ class Watcher: appointment = ExtendedAppointment.from_dict(self.db_manager.load_watcher_appointment(uuid)) if appointment.encrypted_blob in decrypted_blobs: - penalty_tx, penalty_rawtx = decrypted_blobs[appointment.encrypted_blob] - - else: - try: - penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid) - - except EncryptionError: - penalty_rawtx = None - - penalty_tx = self.block_processor.decode_raw_transaction(penalty_rawtx) - decrypted_blobs[appointment.encrypted_blob] = (penalty_tx, penalty_rawtx) - - if penalty_tx is not None: + penalty_txid, penalty_rawtx = decrypted_blobs[appointment.encrypted_blob] valid_breaches[uuid] = { - "locator": locator, + "locator": appointment.locator, "dispute_txid": dispute_txid, - "penalty_txid": penalty_tx.get("txid"), + "penalty_txid": penalty_txid, "penalty_rawtx": penalty_rawtx, } - logger.info( - "Breach found for locator", locator=locator, uuid=uuid, penalty_txid=penalty_tx.get("txid") - ) - else: - invalid_breaches.append(uuid) + try: + penalty_txid, penalty_rawtx = self.check_breach(uuid, appointment, dispute_txid) + valid_breaches[uuid] = { + "locator": appointment.locator, + "dispute_txid": dispute_txid, + "penalty_txid": penalty_txid, + "penalty_rawtx": penalty_rawtx, + } + decrypted_blobs[appointment.encrypted_blob] = (penalty_txid, penalty_rawtx) + + except (EncryptionError, InvalidTransactionFormat): + invalid_breaches.append(uuid) return valid_breaches, invalid_breaches diff --git a/test/teos/e2e/conftest.py b/test/teos/e2e/conftest.py index d4bb314..67e8a38 100644 --- a/test/teos/e2e/conftest.py +++ b/test/teos/e2e/conftest.py @@ -11,6 +11,7 @@ from common.config_loader import ConfigLoader getcontext().prec = 10 +utxos = [] @pytest.fixture(scope="session") @@ -37,11 +38,13 @@ def prng_seed(): def setup_node(bitcoin_cli): # This method will create a new address a mine bitcoin so the node can be used for testing new_addr = bitcoin_cli.getnewaddress() - bitcoin_cli.generatetoaddress(106, new_addr) + bitcoin_cli.generatetoaddress(200, new_addr) def create_txs(bitcoin_cli, n=1): - utxos = bitcoin_cli.listunspent() + global utxos + if not utxos: + utxos = bitcoin_cli.listunspent() if len(utxos) < n: raise ValueError("There're no enough UTXOs.") diff --git a/test/teos/e2e/test_basic_e2e.py b/test/teos/e2e/test_basic_e2e.py index dbcfa0f..d2781bc 100644 --- a/test/teos/e2e/test_basic_e2e.py +++ b/test/teos/e2e/test_basic_e2e.py @@ -40,6 +40,9 @@ teosd_process = run_teosd() teos_id, user_sk, user_id = teos_cli.load_keys(cli_config.get("TEOS_PUBLIC_KEY"), cli_config.get("CLI_PRIVATE_KEY")) +appointments_in_watcher = 0 +appointments_in_responder = 0 + def broadcast_transaction_and_mine_block(bitcoin_cli, commitment_tx, addr): # Broadcast the commitment transaction and mine a block @@ -78,6 +81,8 @@ def test_commands_non_registered(bitcoin_cli): def test_commands_registered(bitcoin_cli): + global appointments_in_watcher + # Test registering and trying again teos_cli.register(user_id, teos_base_endpoint) @@ -93,9 +98,12 @@ def test_commands_registered(bitcoin_cli): r = get_appointment_info(appointment_data.get("locator")) assert r.get("locator") == appointment.locator assert r.get("appointment") == appointment.to_dict() + appointments_in_watcher += 1 def test_appointment_life_cycle(bitcoin_cli): + global appointments_in_watcher, appointments_in_responder + # First of all we need to register response = teos_cli.register(user_id, teos_base_endpoint) available_slots = response.get("available_slots") @@ -106,6 +114,7 @@ def test_appointment_life_cycle(bitcoin_cli): appointment_data = build_appointment_data(commitment_tx_id, penalty_tx) locator = compute_locator(commitment_tx_id) appointment, signature = add_appointment(appointment_data) + appointments_in_watcher += 1 # Get the information from the tower to check that it matches appointment_info = get_appointment_info(locator) @@ -117,7 +126,7 @@ def test_appointment_life_cycle(bitcoin_cli): all_appointments = get_all_appointments() watching = all_appointments.get("watcher_appointments") responding = all_appointments.get("responder_trackers") - assert len(watching) == 1 and len(responding) == 0 + assert len(watching) == appointments_in_watcher and len(responding) == 0 # Trigger a breach and check again new_addr = bitcoin_cli.getnewaddress() @@ -125,11 +134,13 @@ def test_appointment_life_cycle(bitcoin_cli): appointment_info = get_appointment_info(locator) assert appointment_info.get("status") == "dispute_responded" assert appointment_info.get("locator") == locator + appointments_in_watcher -= 1 + appointments_in_responder += 1 all_appointments = get_all_appointments() watching = all_appointments.get("watcher_appointments") responding = all_appointments.get("responder_trackers") - assert len(watching) == 0 and len(responding) == 1 + assert len(watching) == appointments_in_watcher and len(responding) == appointments_in_responder # It can be also checked by ensuring that the penalty transaction made it to the network penalty_tx_id = bitcoin_cli.decoderawtransaction(penalty_tx).get("txid") @@ -144,6 +155,7 @@ def test_appointment_life_cycle(bitcoin_cli): # Now let's mine some blocks so the appointment reaches its end. We need 100 + EXPIRY_DELTA -1 bitcoin_cli.generatetoaddress(100 + teos_config.get("EXPIRY_DELTA") - 1, new_addr) + appointments_in_responder -= 1 # The appointment is no longer in the tower with pytest.raises(TowerResponseError): @@ -152,10 +164,14 @@ def test_appointment_life_cycle(bitcoin_cli): # Check that the appointment is not in the Gatekeeper by checking the available slots (should have increase by 1) # We can do so by topping up the subscription (FIXME: find a better way to check this). response = teos_cli.register(user_id, teos_base_endpoint) - assert response.get("available_slots") == available_slots + teos_config.get("DEFAULT_SLOTS") + 1 + assert ( + response.get("available_slots") + == available_slots + teos_config.get("DEFAULT_SLOTS") + 1 - appointments_in_watcher - appointments_in_responder + ) def test_multiple_appointments_life_cycle(bitcoin_cli): + global appointments_in_watcher, appointments_in_responder # Tests that get_all_appointments returns all the appointments the tower is storing at various stages in the # appointment lifecycle. appointments = [] @@ -180,6 +196,7 @@ def test_multiple_appointments_life_cycle(bitcoin_cli): # Send all of them to watchtower. for appt in appointments: add_appointment(appt.get("appointment_data")) + appointments_in_watcher += 1 # Two of these appointments are breached, and the watchtower responds to them. breached_appointments = [] @@ -188,13 +205,15 @@ def test_multiple_appointments_life_cycle(bitcoin_cli): broadcast_transaction_and_mine_block(bitcoin_cli, appointments[i]["commitment_tx"], new_addr) bitcoin_cli.generatetoaddress(1, new_addr) breached_appointments.append(appointments[i]["locator"]) + appointments_in_watcher -= 1 + appointments_in_responder += 1 sleep(1) # Test that they all show up in get_all_appointments at the correct stages. all_appointments = get_all_appointments() watching = all_appointments.get("watcher_appointments") responding = all_appointments.get("responder_trackers") - assert len(watching) == 3 and len(responding) == 2 + assert len(watching) == appointments_in_watcher and len(responding) == appointments_in_responder responder_locators = [appointment["locator"] for uuid, appointment in responding.items()] assert set(responder_locators) == set(breached_appointments) @@ -389,6 +408,73 @@ def test_two_appointment_same_locator_different_penalty_different_users(bitcoin_ assert appointment_info.get("appointment").get("penalty_tx") == appointment1_data.get("penalty_tx") +def test_add_appointment_trigger_on_cache(bitcoin_cli): + # This tests sending an appointment whose trigger is in the cache + commitment_tx, penalty_tx = create_txs(bitcoin_cli) + commitment_tx_id = bitcoin_cli.decoderawtransaction(commitment_tx).get("txid") + appointment_data = build_appointment_data(commitment_tx_id, penalty_tx) + locator = compute_locator(commitment_tx_id) + + # Let's send the commitment to the network and mine a block + broadcast_transaction_and_mine_block(bitcoin_cli, commitment_tx, bitcoin_cli.getnewaddress()) + + # Send the data to the tower and request it back. It should have gone straightaway to the Responder + add_appointment(appointment_data) + assert get_appointment_info(locator).get("status") == "dispute_responded" + + +def test_add_appointment_invalid_trigger_on_cache(bitcoin_cli): + # This tests sending an invalid appointment which trigger is in the cache + commitment_tx, penalty_tx = create_txs(bitcoin_cli) + commitment_tx_id = bitcoin_cli.decoderawtransaction(commitment_tx).get("txid") + + # We can just flip the justice tx so it is invalid + appointment_data = build_appointment_data(commitment_tx_id, penalty_tx[::-1]) + locator = compute_locator(commitment_tx_id) + + # Let's send the commitment to the network and mine a block + broadcast_transaction_and_mine_block(bitcoin_cli, commitment_tx, bitcoin_cli.getnewaddress()) + sleep(1) + + # Send the data to the tower and request it back. It should get accepted but the data will be dropped. + add_appointment(appointment_data) + with pytest.raises(TowerResponseError): + get_appointment_info(locator) + + +def test_add_appointment_trigger_on_cache_cannot_decrypt(bitcoin_cli): + commitment_tx, penalty_tx = create_txs(bitcoin_cli) + + # Let's send the commitment to the network and mine a block + broadcast_transaction_and_mine_block(bitcoin_cli, commitment_tx, bitcoin_cli.getnewaddress()) + sleep(1) + + # The appointment data is built using a random 32-byte value. + appointment_data = build_appointment_data(get_random_value_hex(32), penalty_tx) + + # We cannot use teos_cli.add_appointment here since it computes the locator internally, so let's do it manually. + appointment_data["locator"] = compute_locator(bitcoin_cli.decoderawtransaction(commitment_tx).get("txid")) + appointment_data["encrypted_blob"] = Cryptographer.encrypt(penalty_tx, get_random_value_hex(32)) + appointment = Appointment.from_dict(appointment_data) + + signature = Cryptographer.sign(appointment.serialize(), user_sk) + data = {"appointment": appointment.to_dict(), "signature": signature} + + # Send appointment to the server. + response = teos_cli.post_request(data, teos_add_appointment_endpoint) + response_json = teos_cli.process_post_response(response) + + # Check that the server has accepted the appointment + signature = response_json.get("signature") + rpk = Cryptographer.recover_pk(appointment.serialize(), signature) + assert teos_id == Cryptographer.get_compressed_pk(rpk) + assert response_json.get("locator") == appointment.locator + + # The appointment should should have been inmediately dropped + with pytest.raises(TowerResponseError): + get_appointment_info(appointment_data["locator"]) + + def test_appointment_shutdown_teos_trigger_back_online(bitcoin_cli): global teosd_process diff --git a/test/teos/unit/test_api.py b/test/teos/unit/test_api.py index daac059..4f3930a 100644 --- a/test/teos/unit/test_api.py +++ b/test/teos/unit/test_api.py @@ -4,13 +4,21 @@ from binascii import hexlify from teos.api import API import common.errors as errors -from teos.watcher import Watcher from teos.inspector import Inspector from teos.gatekeeper import UserInfo from teos.appointments_dbm import AppointmentsDBM from teos.responder import Responder, TransactionTracker +from teos.extended_appointment import ExtendedAppointment +from teos.watcher import Watcher, AppointmentAlreadyTriggered -from test.teos.unit.conftest import get_random_value_hex, generate_dummy_appointment, generate_keypair, get_config +from test.teos.unit.conftest import ( + get_random_value_hex, + generate_dummy_appointment, + generate_keypair, + get_config, + create_dummy_transaction, + compute_locator, +) from common.cryptographer import Cryptographer, hash_160 from common.constants import ( @@ -60,7 +68,15 @@ def api(db_manager, carrier, block_processor, gatekeeper, run_bitcoind): sk, pk = generate_keypair() responder = Responder(db_manager, gatekeeper, carrier, block_processor) - watcher = Watcher(db_manager, gatekeeper, block_processor, responder, sk.to_der(), MAX_APPOINTMENTS) + watcher = Watcher( + db_manager, + gatekeeper, + block_processor, + responder, + sk.to_der(), + MAX_APPOINTMENTS, + config.get("LOCATOR_CACHE_SIZE"), + ) inspector = Inspector(block_processor, config.get("MIN_TO_SELF_DELAY")) api = API(config.get("API_HOST"), config.get("API_PORT"), inspector, watcher) @@ -157,6 +173,7 @@ def test_add_appointment(api, client, appointment): r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) assert r.status_code == HTTP_OK assert r.json.get("available_slots") == 0 + assert r.json.get("start_block") == api.watcher.last_known_block def test_add_appointment_no_json(api, client, appointment): @@ -242,6 +259,7 @@ def test_add_appointment_multiple_times_same_user(api, client, appointment, n=MU r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) assert r.status_code == HTTP_OK assert r.json.get("available_slots") == n - 1 + assert r.json.get("start_block") == api.watcher.last_known_block # Since all updates came from the same user, only the last one is stored assert len(api.watcher.locator_uuid_map[appointment.locator]) == 1 @@ -264,6 +282,7 @@ def test_add_appointment_multiple_times_different_users(api, client, appointment r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": signature}, compressed_pk) assert r.status_code == HTTP_OK assert r.json.get("available_slots") == 1 + assert r.json.get("start_block") == api.watcher.last_known_block # Check that all the appointments have been added and that there are no duplicates assert len(set(api.watcher.locator_uuid_map[appointment.locator])) == n @@ -275,14 +294,22 @@ def test_add_appointment_update_same_size(api, client, appointment): appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) - assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 0 + and r.json.get("start_block") == api.watcher.last_known_block + ) # The user has no additional slots, but it should be able to update # Let's just reverse the encrypted blob for example appointment.encrypted_blob = appointment.encrypted_blob[::-1] appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) - assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 0 + and r.json.get("start_block") == api.watcher.last_known_block + ) def test_add_appointment_update_bigger(api, client, appointment): @@ -297,7 +324,11 @@ def test_add_appointment_update_bigger(api, client, appointment): appointment.encrypted_blob = TWO_SLOTS_BLOTS appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) - assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 0 + and r.json.get("start_block") == api.watcher.last_known_block + ) # Check that it'll fail if no enough slots are available # Double the size from before @@ -314,13 +345,101 @@ def test_add_appointment_update_smaller(api, client, appointment): appointment.encrypted_blob = TWO_SLOTS_BLOTS appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) - assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 0 + and r.json.get("start_block") == api.watcher.last_known_block + ) # Let's update with one just small enough appointment.encrypted_blob = "A" * (ENCRYPTED_BLOB_MAX_SIZE_HEX - 2) appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) - assert r.status_code == HTTP_OK and r.json.get("available_slots") == 1 + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 1 + and r.json.get("start_block") == api.watcher.last_known_block + ) + + +def test_add_appointment_in_cache(api, client): + api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0) + appointment, dispute_tx = generate_dummy_appointment() + appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) + + # Add the data to the cache + dispute_txid = api.watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid") + api.watcher.locator_cache.cache[appointment.locator] = dispute_txid + + r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 0 + and r.json.get("start_block") == api.watcher.last_known_block + ) + + # Trying to add it again should fail, since it is already in the Responder + r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) + assert r.status_code == HTTP_BAD_REQUEST and r.json.get("error_code") == errors.APPOINTMENT_ALREADY_TRIGGERED + + # The appointment would be rejected even if the data is not in the cache provided it has been triggered + del api.watcher.locator_cache.cache[appointment.locator] + r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) + assert r.status_code == HTTP_BAD_REQUEST and r.json.get("error_code") == errors.APPOINTMENT_ALREADY_TRIGGERED + + +def test_add_appointment_in_cache_cannot_decrypt(api, client): + api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0) + appointment, dispute_tx = generate_dummy_appointment() + appointment.encrypted_blob = appointment.encrypted_blob[::-1] + appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) + + # Add the data to the cache + dispute_txid = api.watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid") + api.watcher.locator_cache.cache[dispute_txid] = appointment.locator + + # The appointment should be accepted + r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 0 + and r.json.get("start_block") == api.watcher.last_known_block + ) + + +def test_add_appointment_in_cache_invalid_transaction(api, client): + api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0) + + # We need to create the appointment manually + dispute_tx = create_dummy_transaction() + dispute_txid = dispute_tx.tx_id.hex() + penalty_tx = create_dummy_transaction(dispute_txid) + + locator = compute_locator(dispute_txid) + dummy_appointment_data = {"tx": penalty_tx.hex(), "tx_id": dispute_txid, "to_self_delay": 20} + encrypted_blob = Cryptographer.encrypt(dummy_appointment_data.get("tx")[::-1], dummy_appointment_data.get("tx_id")) + + appointment_data = { + "locator": locator, + "to_self_delay": dummy_appointment_data.get("to_self_delay"), + "encrypted_blob": encrypted_blob, + "user_id": get_random_value_hex(16), + } + + appointment = ExtendedAppointment.from_dict(appointment_data) + api.watcher.locator_cache.cache[appointment.locator] = dispute_tx.tx_id.hex() + appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) + + # Add the data to the cache + api.watcher.locator_cache.cache[dispute_txid] = appointment.locator + + # The appointment should be accepted + r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) + assert ( + r.status_code == HTTP_OK + and r.json.get("available_slots") == 0 + and r.json.get("start_block") == api.watcher.last_known_block + ) def test_add_too_many_appointment(api, client): @@ -337,7 +456,7 @@ def test_add_too_many_appointment(api, client): r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) if i < free_appointment_slots: - assert r.status_code == HTTP_OK + assert r.status_code == HTTP_OK and r.json.get("start_block") == api.watcher.last_known_block else: assert r.status_code == HTTP_SERVICE_UNAVAILABLE diff --git a/test/teos/unit/test_block_processor.py b/test/teos/unit/test_block_processor.py index f61082c..4a0bf88 100644 --- a/test/teos/unit/test_block_processor.py +++ b/test/teos/unit/test_block_processor.py @@ -1,3 +1,5 @@ +import pytest +from teos.watcher import InvalidTransactionFormat from test.teos.unit.conftest import get_random_value_hex, generate_block, generate_blocks, fork @@ -46,7 +48,8 @@ def test_decode_raw_transaction(block_processor): def test_decode_raw_transaction_invalid(block_processor): # Same but with an invalid one - assert block_processor.decode_raw_transaction(hex_tx[::-1]) is None + with pytest.raises(InvalidTransactionFormat): + block_processor.decode_raw_transaction(hex_tx[::-1]) def test_get_missed_blocks(block_processor): diff --git a/test/teos/unit/test_builder.py b/test/teos/unit/test_builder.py index f804e54..581581e 100644 --- a/test/teos/unit/test_builder.py +++ b/test/teos/unit/test_builder.py @@ -102,6 +102,7 @@ def test_update_states_empty_list(db_manager, gatekeeper, carrier, block_process responder=Responder(db_manager, gatekeeper, carrier, block_processor), sk_der=generate_keypair()[0].to_der(), max_appointments=config.get("MAX_APPOINTMENTS"), + blocks_in_cache=config.get("LOCATOR_CACHE_SIZE"), ) missed_blocks_watcher = [] @@ -123,6 +124,7 @@ def test_update_states_responder_misses_more(run_bitcoind, db_manager, gatekeepe responder=Responder(db_manager, gatekeeper, carrier, block_processor), sk_der=generate_keypair()[0].to_der(), max_appointments=config.get("MAX_APPOINTMENTS"), + blocks_in_cache=config.get("LOCATOR_CACHE_SIZE"), ) blocks = [] @@ -148,6 +150,7 @@ def test_update_states_watcher_misses_more(db_manager, gatekeeper, carrier, bloc responder=Responder(db_manager, gatekeeper, carrier, block_processor), sk_der=generate_keypair()[0].to_der(), max_appointments=config.get("MAX_APPOINTMENTS"), + blocks_in_cache=config.get("LOCATOR_CACHE_SIZE"), ) blocks = [] diff --git a/test/teos/unit/test_watcher.py b/test/teos/unit/test_watcher.py index 25f84e5..79d7a93 100644 --- a/test/teos/unit/test_watcher.py +++ b/test/teos/unit/test_watcher.py @@ -1,6 +1,7 @@ import pytest from uuid import uuid4 from shutil import rmtree +from copy import deepcopy from threading import Thread from coincurve import PrivateKey @@ -9,22 +10,32 @@ from teos.tools import bitcoin_cli from teos.responder import Responder from teos.gatekeeper import UserInfo from teos.chain_monitor import ChainMonitor -from teos.appointments_dbm import AppointmentsDBM from teos.block_processor import BlockProcessor -from teos.watcher import Watcher, AppointmentLimitReached +from teos.appointments_dbm import AppointmentsDBM +from teos.extended_appointment import ExtendedAppointment from teos.gatekeeper import Gatekeeper, AuthenticationFailure, NotEnoughSlots +from teos.watcher import ( + Watcher, + AppointmentLimitReached, + LocatorCache, + EncryptionError, + InvalidTransactionFormat, + AppointmentAlreadyTriggered, +) from common.tools import compute_locator from common.cryptographer import Cryptographer from test.teos.unit.conftest import ( generate_blocks_w_delay, + generate_blocks, generate_dummy_appointment, get_random_value_hex, generate_keypair, get_config, bitcoind_feed_params, bitcoind_connect_params, + create_dummy_transaction, ) APPOINTMENTS = 5 @@ -55,7 +66,15 @@ def watcher(db_manager, gatekeeper): carrier = Carrier(bitcoind_connect_params) responder = Responder(db_manager, gatekeeper, carrier, block_processor) - watcher = Watcher(db_manager, gatekeeper, block_processor, responder, signing_key.to_der(), MAX_APPOINTMENTS) + watcher = Watcher( + db_manager, + gatekeeper, + block_processor, + responder, + signing_key.to_der(), + MAX_APPOINTMENTS, + config.get("LOCATOR_CACHE_SIZE"), + ) chain_monitor = ChainMonitor( watcher.block_queue, watcher.responder.block_queue, block_processor, bitcoind_feed_params @@ -91,7 +110,198 @@ def create_appointments(n): return appointments, locator_uuid_map, dispute_txs -def test_init(run_bitcoind, watcher): +def test_locator_cache_init_not_enough_blocks(run_bitcoind, block_processor): + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + # Make sure there are at least 3 blocks + block_count = block_processor.get_block_count() + if block_count < 3: + generate_blocks_w_delay(3 - block_count) + + # Simulate there are only 3 blocks + third_block_hash = bitcoin_cli(bitcoind_connect_params).getblockhash(2) + locator_cache.init(third_block_hash, block_processor) + assert len(locator_cache.blocks) == 3 + for k, v in locator_cache.blocks.items(): + assert block_processor.get_block(k) + + +def test_locator_cache_init(block_processor): + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + + # Generate enough blocks so the cache can start full + generate_blocks(2 * locator_cache.cache_size) + + locator_cache.init(block_processor.get_best_block_hash(), block_processor) + assert len(locator_cache.blocks) == locator_cache.cache_size + for k, v in locator_cache.blocks.items(): + assert block_processor.get_block(k) + + +def test_get_txid(): + # Not much to test here, this is shadowing dict.get + locator = get_random_value_hex(16) + txid = get_random_value_hex(32) + + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + locator_cache.cache[locator] = txid + + assert locator_cache.get_txid(locator) == txid + + # A random locator should fail + assert locator_cache.get_txid(get_random_value_hex(16)) is None + + +def test_update_cache(): + # Update should add data about a new block in the cache. If the cache is full, the oldest block is dropped. + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + + block_hash = get_random_value_hex(32) + txs = [get_random_value_hex(32) for _ in range(10)] + locator_txid_map = {compute_locator(txid): txid for txid in txs} + + # Cache is empty + assert block_hash not in locator_cache.blocks + for locator in locator_txid_map.keys(): + assert locator not in locator_cache.cache + + # The data has been added to the cache + locator_cache.update(block_hash, locator_txid_map) + assert block_hash in locator_cache.blocks + for locator in locator_txid_map.keys(): + assert locator in locator_cache.cache + + +def test_update_cache_full(): + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + block_hashes = [] + big_map = {} + + for i in range(locator_cache.cache_size): + block_hash = get_random_value_hex(32) + txs = [get_random_value_hex(32) for _ in range(10)] + locator_txid_map = {compute_locator(txid): txid for txid in txs} + locator_cache.update(block_hash, locator_txid_map) + + if i == 0: + first_block_hash = block_hash + first_locator_txid_map = locator_txid_map + else: + block_hashes.append(block_hash) + big_map.update(locator_txid_map) + + # The cache is now full. + assert first_block_hash in locator_cache.blocks + for locator in first_locator_txid_map.keys(): + assert locator in locator_cache.cache + + # Add one more + block_hash = get_random_value_hex(32) + txs = [get_random_value_hex(32) for _ in range(10)] + locator_txid_map = {compute_locator(txid): txid for txid in txs} + locator_cache.update(block_hash, locator_txid_map) + + # The first block is not there anymore, but the rest are there + assert first_block_hash not in locator_cache.blocks + for locator in first_locator_txid_map.keys(): + assert locator not in locator_cache.cache + + for block_hash in block_hashes: + assert block_hash in locator_cache.blocks + + for locator in big_map.keys(): + assert locator in locator_cache.cache + + +def test_locator_cache_is_full(block_processor): + # Empty cache + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + + for _ in range(locator_cache.cache_size): + locator_cache.blocks[uuid4().hex] = 0 + assert not locator_cache.is_full() + + locator_cache.blocks[uuid4().hex] = 0 + assert locator_cache.is_full() + + +def test_locator_remove_oldest_block(block_processor): + # Empty cache + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + + # Add some blocks to the cache + for _ in range(locator_cache.cache_size): + txid = get_random_value_hex(32) + locator = txid[:16] + locator_cache.blocks[get_random_value_hex(32)] = {locator: txid} + locator_cache.cache[locator] = txid + + blocks_in_cache = locator_cache.blocks + oldest_block_hash = list(blocks_in_cache.keys())[0] + oldest_block_data = blocks_in_cache.get(oldest_block_hash) + rest_of_blocks = list(blocks_in_cache.keys())[1:] + locator_cache.remove_oldest_block() + + # Oldest block data is not in the cache + assert oldest_block_hash not in locator_cache.blocks + for locator in oldest_block_data: + assert locator not in locator_cache.cache + + # The rest of data is in the cache + assert set(rest_of_blocks).issubset(locator_cache.blocks) + for block_hash in rest_of_blocks: + for locator in locator_cache.blocks[block_hash]: + assert locator in locator_cache.cache + + +def test_fix_cache(block_processor): + # This tests how a reorg will create a new version of the cache + # Let's start setting a full cache. We'll mine ``cache_size`` bocks to be sure it's full + generate_blocks_w_delay((config.get("LOCATOR_CACHE_SIZE"))) + + locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE")) + locator_cache.init(block_processor.get_best_block_hash(), block_processor) + assert len(locator_cache.blocks) == locator_cache.cache_size + + # Now let's fake a reorg of less than ``cache_size``. We'll go two blocks into the past. + current_tip = block_processor.get_best_block_hash() + current_tip_locators = locator_cache.blocks[current_tip] + current_tip_parent = block_processor.get_block(current_tip).get("previousblockhash") + current_tip_parent_locators = locator_cache.blocks[current_tip_parent] + fake_tip = block_processor.get_block(current_tip_parent).get("previousblockhash") + locator_cache.fix(fake_tip, block_processor) + + # The last two blocks are not in the cache nor are the any of its locators + assert current_tip not in locator_cache.blocks and current_tip_parent not in locator_cache.blocks + for locator in current_tip_parent_locators + current_tip_locators: + assert locator not in locator_cache.cache + + # The fake tip is the new tip, and two additional blocks are at the bottom + assert fake_tip in locator_cache.blocks and list(locator_cache.blocks.keys())[-1] == fake_tip + assert len(locator_cache.blocks) == locator_cache.cache_size + + # Test the same for a full cache reorg. We can simulate this by adding more blocks than the cache can fit and + # trigger a fix. We'll use a new cache to compare with the old + old_cache_blocks = deepcopy(locator_cache.blocks) + + generate_blocks_w_delay((config.get("LOCATOR_CACHE_SIZE") * 2)) + locator_cache.fix(block_processor.get_best_block_hash(), block_processor) + + # None of the data from the old cache is in the new cache + for block_hash, locators in old_cache_blocks.items(): + assert block_hash not in locator_cache.blocks + for locator in locators: + assert locator not in locator_cache.cache + + # The data in the new cache corresponds to the last ``cache_size`` blocks. + block_count = block_processor.get_block_count() + for i in range(block_count, block_count - locator_cache.cache_size, -1): + block_hash = bitcoin_cli(bitcoind_connect_params).getblockhash(i - 1) + assert block_hash in locator_cache.blocks + for locator in locator_cache.blocks[block_hash]: + assert locator in locator_cache.cache + + +def test_watcher_init(watcher): assert isinstance(watcher.appointments, dict) and len(watcher.appointments) == 0 assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0 assert watcher.block_queue.empty() @@ -101,6 +311,7 @@ def test_init(run_bitcoind, watcher): assert isinstance(watcher.responder, Responder) assert isinstance(watcher.max_appointments, int) assert isinstance(watcher.signing_key, PrivateKey) + assert isinstance(watcher.locator_cache, LocatorCache) def test_add_appointment_non_registered(watcher): @@ -171,6 +382,102 @@ def test_add_appointment(watcher): assert len(watcher.locator_uuid_map[appointment.locator]) == 2 +def test_add_appointment_in_cache(watcher): + # Generate an appointment and add the dispute txid to the cache + user_sk, user_pk = generate_keypair() + user_id = Cryptographer.get_compressed_pk(user_pk) + watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=10) + + appointment, dispute_tx = generate_dummy_appointment() + dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid") + watcher.locator_cache.cache[appointment.locator] = dispute_txid + + # Try to add the appointment + response = watcher.add_appointment(appointment, Cryptographer.sign(appointment.serialize(), user_sk)) + + # The appointment is accepted but it's not in the Watcher + assert ( + response + and response.get("locator") == appointment.locator + and Cryptographer.get_compressed_pk(watcher.signing_key.public_key) + == Cryptographer.get_compressed_pk(Cryptographer.recover_pk(appointment.serialize(), response.get("signature"))) + ) + assert not watcher.locator_uuid_map.get(appointment.locator) + + # It went to the Responder straightaway + assert appointment.locator in [tracker.get("locator") for tracker in watcher.responder.trackers.values()] + + # Trying to send it again should fail since it is already in the Responder + with pytest.raises(AppointmentAlreadyTriggered): + watcher.add_appointment(appointment, Cryptographer.sign(appointment.serialize(), user_sk)) + + +def test_add_appointment_in_cache_invalid_blob(watcher): + # Generate an appointment with an invalid transaction and add the dispute txid to the cache + user_sk, user_pk = generate_keypair() + user_id = Cryptographer.get_compressed_pk(user_pk) + watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=10) + + # We need to create the appointment manually + dispute_tx = create_dummy_transaction() + dispute_txid = dispute_tx.tx_id.hex() + penalty_tx = create_dummy_transaction(dispute_txid) + + locator = compute_locator(dispute_txid) + dummy_appointment_data = {"tx": penalty_tx.hex(), "tx_id": dispute_txid, "to_self_delay": 20} + encrypted_blob = Cryptographer.encrypt(dummy_appointment_data.get("tx")[::-1], dummy_appointment_data.get("tx_id")) + + appointment_data = { + "locator": locator, + "to_self_delay": dummy_appointment_data.get("to_self_delay"), + "encrypted_blob": encrypted_blob, + "user_id": get_random_value_hex(16), + } + + appointment = ExtendedAppointment.from_dict(appointment_data) + watcher.locator_cache.cache[appointment.locator] = dispute_tx.tx_id.hex() + + # Try to add the appointment + response = watcher.add_appointment(appointment, Cryptographer.sign(appointment.serialize(), user_sk)) + + # The appointment is accepted but dropped (same as an invalid appointment that gets triggered) + assert ( + response + and response.get("locator") == appointment.locator + and Cryptographer.get_compressed_pk(watcher.signing_key.public_key) + == Cryptographer.get_compressed_pk(Cryptographer.recover_pk(appointment.serialize(), response.get("signature"))) + ) + + assert not watcher.locator_uuid_map.get(appointment.locator) + assert appointment.locator not in [tracker.get("locator") for tracker in watcher.responder.trackers.values()] + + +def test_add_appointment_in_cache_invalid_transaction(watcher): + # Generate an appointment that cannot be decrypted and add the dispute txid to the cache + user_sk, user_pk = generate_keypair() + user_id = Cryptographer.get_compressed_pk(user_pk) + watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=10) + + appointment, dispute_tx = generate_dummy_appointment() + appointment.encrypted_blob = appointment.encrypted_blob[::-1] + dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid") + watcher.locator_cache.cache[appointment.locator] = dispute_txid + + # Try to add the appointment + response = watcher.add_appointment(appointment, Cryptographer.sign(appointment.serialize(), user_sk)) + + # The appointment is accepted but dropped (same as an invalid appointment that gets triggered) + assert ( + response + and response.get("locator") == appointment.locator + and Cryptographer.get_compressed_pk(watcher.signing_key.public_key) + == Cryptographer.get_compressed_pk(Cryptographer.recover_pk(appointment.serialize(), response.get("signature"))) + ) + + assert not watcher.locator_uuid_map.get(appointment.locator) + assert appointment.locator not in [tracker.get("locator") for tracker in watcher.responder.trackers.values()] + + def test_add_too_many_appointments(watcher): # Simulate the user is registered user_sk, user_pk = generate_keypair() @@ -246,9 +553,37 @@ def test_do_watch(watcher, temp_db_manager): # FIXME: We should also add cases where the transactions are invalid. bitcoind_mock needs to be extended for this. +def test_do_watch_cache_update(watcher): + # Test that data is properly added/remove to/from the cache + + for _ in range(10): + blocks_in_cache = watcher.locator_cache.blocks + oldest_block_hash = list(blocks_in_cache.keys())[0] + oldest_block_data = blocks_in_cache.get(oldest_block_hash) + rest_of_blocks = list(blocks_in_cache.keys())[1:] + assert len(watcher.locator_cache.blocks) == watcher.locator_cache.cache_size + + generate_blocks_w_delay(1) + + # The last oldest block is gone but the rest remain + assert oldest_block_hash not in watcher.locator_cache.blocks + assert set(rest_of_blocks).issubset(watcher.locator_cache.blocks.keys()) + + # The locators of the oldest block are gone but the rest remain + for locator in oldest_block_data: + assert locator not in watcher.locator_cache.cache + for block_hash in rest_of_blocks: + for locator in watcher.locator_cache.blocks[block_hash]: + assert locator in watcher.locator_cache.cache + + # The size of the cache is the same + assert len(watcher.locator_cache.blocks) == watcher.locator_cache.cache_size + + def test_get_breaches(watcher, txids, locator_uuid_map): watcher.locator_uuid_map = locator_uuid_map - potential_breaches = watcher.get_breaches(txids) + locators_txid_map = {compute_locator(txid): txid for txid in txids} + potential_breaches = watcher.get_breaches(locators_txid_map) # All the txids must breach assert locator_uuid_map.keys() == potential_breaches.keys() @@ -258,38 +593,50 @@ def test_get_breaches_random_data(watcher, locator_uuid_map): # The likelihood of finding a potential breach with random data should be negligible watcher.locator_uuid_map = locator_uuid_map txids = [get_random_value_hex(32) for _ in range(TEST_SET_SIZE)] + locators_txid_map = {compute_locator(txid): txid for txid in txids} - potential_breaches = watcher.get_breaches(txids) + potential_breaches = watcher.get_breaches(locators_txid_map) # None of the txids should breach assert len(potential_breaches) == 0 -def test_filter_breaches_random_data(watcher): - appointments = {} - locator_uuid_map = {} - breaches = {} +def test_check_breach(watcher): + # A breach will be flagged as valid only if the encrypted blob can be properly decrypted and the resulting data + # matches a transaction format. + uuid = uuid4().hex + appointment, dispute_tx = generate_dummy_appointment() + dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid") - for i in range(TEST_SET_SIZE): - dummy_appointment, _ = generate_dummy_appointment() - uuid = uuid4().hex - appointments[uuid] = {"locator": dummy_appointment.locator, "user_id": dummy_appointment.user_id} - watcher.db_manager.store_watcher_appointment(uuid, dummy_appointment.to_dict()) - watcher.db_manager.create_append_locator_map(dummy_appointment.locator, uuid) + penalty_txid, penalty_rawtx = watcher.check_breach(uuid, appointment, dispute_txid) + assert Cryptographer.encrypt(penalty_rawtx, dispute_txid) == appointment.encrypted_blob - locator_uuid_map[dummy_appointment.locator] = [uuid] - if i % 2: - dispute_txid = get_random_value_hex(32) - breaches[dummy_appointment.locator] = dispute_txid +def test_check_breach_random_data(watcher): + # If a breach triggers an appointment with random data as encrypted blob, the check should fail. + uuid = uuid4().hex + appointment, dispute_tx = generate_dummy_appointment() + dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid") - watcher.locator_uuid_map = locator_uuid_map - watcher.appointments = appointments + # Set the blob to something "random" + appointment.encrypted_blob = get_random_value_hex(200) - valid_breaches, invalid_breaches = watcher.filter_breaches(breaches) + with pytest.raises(EncryptionError): + watcher.check_breach(uuid, appointment, dispute_txid) - # We have "triggered" TEST_SET_SIZE/2 breaches, all of them invalid. - assert len(valid_breaches) == 0 and len(invalid_breaches) == TEST_SET_SIZE / 2 + +def test_check_breach_invalid_transaction(watcher): + # If the breach triggers an appointment with data that can be decrypted but does not match a transaction, it should + # fail + uuid = uuid4().hex + appointment, dispute_tx = generate_dummy_appointment() + dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid") + + # Set the blob to something "random" + appointment.encrypted_blob = Cryptographer.encrypt(get_random_value_hex(200), dispute_txid) + + with pytest.raises(InvalidTransactionFormat): + watcher.check_breach(uuid, appointment, dispute_txid) def test_filter_valid_breaches(watcher): @@ -323,3 +670,30 @@ def test_filter_valid_breaches(watcher): # We have "triggered" a single breach and it was valid. assert len(invalid_breaches) == 0 and len(valid_breaches) == 1 + + +def test_filter_breaches_random_data(watcher): + appointments = {} + locator_uuid_map = {} + breaches = {} + + for i in range(TEST_SET_SIZE): + dummy_appointment, _ = generate_dummy_appointment() + uuid = uuid4().hex + appointments[uuid] = {"locator": dummy_appointment.locator, "user_id": dummy_appointment.user_id} + watcher.db_manager.store_watcher_appointment(uuid, dummy_appointment.to_dict()) + watcher.db_manager.create_append_locator_map(dummy_appointment.locator, uuid) + + locator_uuid_map[dummy_appointment.locator] = [uuid] + + if i % 2: + dispute_txid = get_random_value_hex(32) + breaches[dummy_appointment.locator] = dispute_txid + + watcher.locator_uuid_map = locator_uuid_map + watcher.appointments = appointments + + valid_breaches, invalid_breaches = watcher.filter_breaches(breaches) + + # We have "triggered" TEST_SET_SIZE/2 breaches, all of them invalid. + assert len(valid_breaches) == 0 and len(invalid_breaches) == TEST_SET_SIZE / 2 diff --git a/watchtower-plugin/net/http.py b/watchtower-plugin/net/http.py index ea31833..aff661c 100644 --- a/watchtower-plugin/net/http.py +++ b/watchtower-plugin/net/http.py @@ -18,6 +18,7 @@ def add_appointment(plugin, tower_id, tower, appointment_dict, signature): response = send_appointment(tower_id, tower, appointment_dict, signature) plugin.log(f"Appointment accepted and signed by {tower_id}") plugin.log(f"Remaining slots: {response.get('available_slots')}") + plugin.log(f"Start block: {response.get('start_block')}") # # TODO: Not storing the whole appointments for now. The node can recreate all the data if needed. # # DISCUSS: It may be worth checking that the available slots match instead of blindly trusting.