From 699da54aa01de9265e82a9c1aec5e25a8057e137 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Tue, 19 May 2020 18:24:40 +0200 Subject: [PATCH] teos - Adds LocatorCache --- common/errors.py | 1 + teos/api.py | 9 ++- teos/block_processor.py | 15 +++-- teos/watcher.py | 133 ++++++++++++++++++++++++++++++---------- 4 files changed, 121 insertions(+), 37 deletions(-) diff --git a/common/errors.py b/common/errors.py index 786a9d7..c115a31 100644 --- a/common/errors.py +++ b/common/errors.py @@ -7,6 +7,7 @@ APPOINTMENT_FIELD_TOO_SMALL = -5 APPOINTMENT_FIELD_TOO_BIG = -6 APPOINTMENT_WRONG_FIELD = -7 APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS = -8 +APPOINTMENT_ALREADY_TRIGGERED = -9 # Registration errors [-33, -64] REGISTRATION_MISSING_FIELD = -33 diff --git a/teos/api.py b/teos/api.py index 0bcbddb..b1c90ec 100644 --- a/teos/api.py +++ b/teos/api.py @@ -5,7 +5,7 @@ from flask import Flask, request, abort, jsonify from teos import LOG_PREFIX import common.errors as errors from teos.inspector import InspectionFailed -from teos.watcher import AppointmentLimitReached +from teos.watcher import AppointmentLimitReached, AppointmentAlreadyTriggered from teos.gatekeeper import NotEnoughSlots, AuthenticationFailure from common.logger import Logger @@ -192,6 +192,13 @@ class API: rcode = HTTP_SERVICE_UNAVAILABLE response = {"error": "appointment rejected"} + except AppointmentAlreadyTriggered: + rcode = HTTP_BAD_REQUEST + response = { + "error": "appointment rejected. The provided appointment has already been triggered", + "error_code": errors.APPOINTMENT_ALREADY_TRIGGERED, + } + logger.info("Sending response and disconnecting", from_addr="{}".format(remote_addr), response=response) return jsonify(response), rcode diff --git a/teos/block_processor.py b/teos/block_processor.py index 136e9f7..73b8370 100644 --- a/teos/block_processor.py +++ b/teos/block_processor.py @@ -1,4 +1,5 @@ from common.logger import Logger +from common.exceptions import BasicException from teos import LOG_PREFIX from teos.tools import bitcoin_cli @@ -7,6 +8,10 @@ from teos.utils.auth_proxy import JSONRPCException logger = Logger(actor="BlockProcessor", log_name_prefix=LOG_PREFIX) +class InvalidTransactionFormat(BasicException): + """Raised when a transaction is not properly formatted""" + + class BlockProcessor: """ The :class:`BlockProcessor` contains methods related to the blockchain. Most of its methods require communication @@ -89,17 +94,19 @@ class BlockProcessor: raw_tx (:obj:`str`): the hex representation of the transaction. Returns: - :obj:`dict` or :obj:`None`: The decoding of the given ``raw_tx`` if the transaction is well formatted. + :obj:`dict`: The decoding of the given ``raw_tx`` if the transaction is well formatted. - Returns ``None`` otherwise. + Raises: + :obj:`InvalidTransactionFormat`: If the provided ``raw_tx`` has invalid format. """ try: tx = bitcoin_cli(self.btc_connect_params).decoderawtransaction(raw_tx) except JSONRPCException as e: - tx = None - logger.error("Cannot build transaction from decoded data", error=e.error) + msg = "Cannot build transaction from decoded data" + logger.error(msg, error=e.error) + raise InvalidTransactionFormat(msg) return tx diff --git a/teos/watcher.py b/teos/watcher.py index 8d6aeba..f74e405 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -12,6 +12,7 @@ from common.exceptions import InvalidParameter, SignatureError from teos import LOG_PREFIX from teos.cleaner import Cleaner from teos.extended_appointment import ExtendedAppointment +from teos.block_processor import InvalidTransactionFormat logger = Logger(actor="Watcher", log_name_prefix=LOG_PREFIX) @@ -20,6 +21,10 @@ class AppointmentLimitReached(BasicException): """Raised when the tower maximum appointment count has been reached""" +class AppointmentAlreadyTriggered(BasicException): + """Raised an appointment is sent to the Watcher but that same data has already been sent to the Responder""" + + class LocatorCache: def __init__(self, blocks_in_cache): self.cache = dict() @@ -148,25 +153,60 @@ class Watcher: # The user_id needs to be added to the ExtendedAppointment once the former has been authenticated appointment.user_id = user_id - # The uuids are generated as the RIPMED160(locator||user_pubkey). + # The uuids are generated as the RIPEMD160(locator||user_pubkey). # If an appointment is requested by the user the uuid can be recomputed and queried straightaway (no maps). uuid = hash_160("{}{}".format(appointment.locator, user_id)) # Add the appointment to the Gatekeeper available_slots = self.gatekeeper.add_update_appointment(user_id, uuid, appointment) - self.appointments[uuid] = appointment.get_summary() + # Appointments that were triggered in blocks hold in the cache + if appointment.locator in self.locator_cache.cache: + # If this is a copy of an appointment we've already reacted to, the new appointment is rejected. + if uuid in self.responder.trackers: + message = "Appointment already in Responder" + logger.info(message) + raise AppointmentAlreadyTriggered(message) - if appointment.locator in self.locator_uuid_map: - # If the uuid is already in the map it means this is an update. - if uuid not in self.locator_uuid_map[appointment.locator]: - self.locator_uuid_map[appointment.locator].append(uuid) + try: + breach = self.filter_breach(uuid, appointment, self.locator_cache.cache[appointment.locator]) + receipt = self.responder.handle_breach( + uuid, + breach["locator"], + breach["dispute_txid"], + breach["penalty_txid"], + breach["penalty_rawtx"], + self.appointments[uuid].get("user_id"), + self.last_known_block, + ) + + # At this point the appointment is accepted but data is only kept if it goes through the Responder + # otherwise it is dropped. + if receipt.delivered: + self.db_manager.store_watcher_appointment(uuid, appointment.to_dict()) + self.db_manager.create_append_locator_map(appointment.locator, uuid) + self.db_manager.create_triggered_appointment_flag(uuid) + + except (EncryptionError, InvalidTransactionFormat): + # If data inside the encrypted blob is invalid, the appointment is accepted but the data is dropped. + # (same as with data that bounces in the Responder). This reduces the appointment slot count so it + # could be used to discourage user misbehaviour. + pass + + # Regular appointments that have not been triggered (or not recently at least) else: - # Otherwise two users have sent an appointment with the same locator, so we need to store both. - self.locator_uuid_map[appointment.locator] = [uuid] + self.appointments[uuid] = appointment.get_summary() - self.db_manager.store_watcher_appointment(uuid, appointment.to_dict()) - self.db_manager.create_append_locator_map(appointment.locator, uuid) + if appointment.locator in self.locator_uuid_map: + # If the uuid is already in the map it means this is an update. + if uuid not in self.locator_uuid_map[appointment.locator]: + self.locator_uuid_map[appointment.locator].append(uuid) + else: + # Otherwise two users have sent an appointment with the same locator, so we need to store both. + self.locator_uuid_map[appointment.locator] = [uuid] + + self.db_manager.store_watcher_appointment(uuid, appointment.to_dict()) + self.db_manager.create_append_locator_map(appointment.locator, uuid) try: signature = Cryptographer.sign(appointment.serialize(), self.signing_key) @@ -198,6 +238,17 @@ class Watcher: self.last_known_block = self.block_processor.get_best_block_hash() self.db_manager.store_last_block_hash_watcher(self.last_known_block) + # Initialise the locator cache with the last ``cache_size`` blocks. + target_block_hash = self.last_known_block + for _ in range(self.locator_cache.cache_size): + target_block = self.block_processor.get_block(target_block_hash) + locators = {compute_locator(txid): txid for txid in target_block.get("tx")} + self.locator_cache.cache.update(locators) + self.locator_cache.blocks[target_block_hash] = locators + target_block_hash = target_block.get("previousblockhash") + + self.locator_cache.blocks = OrderedDict(reversed((list(self.locator_cache.blocks.items())))) + while True: block_hash = self.block_queue.get() block = self.block_processor.get_block(block_hash) @@ -210,8 +261,7 @@ class Watcher: self.locator_cache.blocks[block_hash] = locators logger.debug("Block added to cache", block_hash=block_hash) - # FIXME: change txids for locators? - if len(self.appointments) > 0 and txids: + if len(self.appointments) > 0 and locators: expired_appointments = self.gatekeeper.get_expired_appointments(block["height"]) # Make sure we only try to delete what is on the Watcher (some appointments may have been triggered) expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys())) @@ -279,7 +329,7 @@ class Watcher: if self.locator_cache.is_full(): self.locator_cache.remove_older_block() - # Register the last processed block for the watcher + # Register the last processed block for the Watcher self.db_manager.store_last_block_hash_watcher(block_hash) self.last_known_block = block.get("hash") self.block_queue.task_done() @@ -308,11 +358,37 @@ class Watcher: return breaches + def filter_breach(self, uuid, appointment, dispute_txid): + try: + penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid) + penalty_tx = self.block_processor.decode_raw_transaction(penalty_rawtx) + + except EncryptionError as e: + logger.info("Transaction cannot be decrypted", uuid=uuid) + raise e + + except InvalidTransactionFormat as e: + logger.info("The breach contained an invalid transaction") + raise e + + valid_breach = { + "locator": appointment.locator, + "dispute_txid": dispute_txid, + "penalty_txid": penalty_tx.get("txid"), + "penalty_rawtx": penalty_rawtx, + } + + logger.info( + "Breach found for locator", locator=appointment.locator, uuid=uuid, penalty_txid=penalty_tx.get("txid") + ) + + return valid_breach + def filter_breaches(self, breaches): """ Filters the valid from the invalid channel breaches. - The :obj:`Watcher` cannot if a given ``encrypted_blob`` contains a valid transaction until a breach if seen. + The :obj:`Watcher` cannot know if an ``encrypted_blob`` contains a valid transaction until a breach is seen. Blobs that contain arbitrary data are dropped and not sent to the :obj:`Responder `. Args: @@ -337,30 +413,23 @@ class Watcher: if appointment.encrypted_blob in decrypted_blobs: penalty_tx, penalty_rawtx = decrypted_blobs[appointment.encrypted_blob] - - else: - try: - penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid) - - except EncryptionError: - penalty_rawtx = None - - penalty_tx = self.block_processor.decode_raw_transaction(penalty_rawtx) - decrypted_blobs[appointment.encrypted_blob] = (penalty_tx, penalty_rawtx) - - if penalty_tx is not None: valid_breaches[uuid] = { - "locator": locator, + "locator": appointment.locator, "dispute_txid": dispute_txid, "penalty_txid": penalty_tx.get("txid"), "penalty_rawtx": penalty_rawtx, } - logger.info( - "Breach found for locator", locator=locator, uuid=uuid, penalty_txid=penalty_tx.get("txid") - ) - else: - invalid_breaches.append(uuid) + try: + valid_breach = self.filter_breach(uuid, appointment, dispute_txid) + valid_breaches[uuid] = valid_breach + decrypted_blobs[appointment.encrypted_blob] = ( + valid_breach["penalty_txid"], + valid_breach["penalty_rawtx"], + ) + + except (EncryptionError, InvalidTransactionFormat): + invalid_breaches.append(uuid) return valid_breaches, invalid_breaches