From 3f15459f2cfc20c3ce8a92c26258f387ba5d284a Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Tue, 14 Apr 2020 20:55:44 +0200 Subject: [PATCH] Creates ExtendedAppointment as an appointment with user information --- common/appointment.py | 19 ++--- common/constants.py | 3 + teos/api.py | 88 +++++---------------- teos/builder.py | 14 ++-- teos/cleaner.py | 34 ++++---- teos/extended_appointment.py | 37 +++++++++ teos/gatekeeper.py | 88 +++++++++++---------- teos/inspector.py | 13 +++- teos/responder.py | 145 +++++++++++++++++++---------------- teos/teosd.py | 21 +++-- teos/watcher.py | 130 ++++++++++++++----------------- 11 files changed, 291 insertions(+), 301 deletions(-) create mode 100644 teos/extended_appointment.py diff --git a/common/appointment.py b/common/appointment.py index 232ed14..2534f16 100644 --- a/common/appointment.py +++ b/common/appointment.py @@ -25,14 +25,12 @@ class Appointment: """ Builds an appointment from a dictionary. - This method is useful to load data from a database. - Args: appointment_data (:obj:`dict`): a dictionary containing the following keys: ``{locator, to_self_delay, encrypted_blob}`` Returns: - :obj:`Appointment `: An appointment initialized using the provided data. + :obj:`Appointment `: An appointment initialized using the provided data. Raises: ValueError: If one of the mandatory keys is missing in ``appointment_data``. @@ -40,13 +38,13 @@ class Appointment: locator = appointment_data.get("locator") to_self_delay = appointment_data.get("to_self_delay") - encrypted_blob_data = appointment_data.get("encrypted_blob") + encrypted_blob = appointment_data.get("encrypted_blob") - if any(v is None for v in [locator, to_self_delay, encrypted_blob_data]): + if any(v is None for v in [locator, to_self_delay, encrypted_blob]): raise ValueError("Wrong appointment data, some fields are missing") else: - appointment = cls(locator, to_self_delay, encrypted_blob_data) + appointment = cls(locator, to_self_delay, encrypted_blob) return appointment @@ -58,14 +56,7 @@ class Appointment: :obj:`dict`: A dictionary containing the appointment attributes. """ - # ToDO: #3-improve-appointment-structure - appointment = { - "locator": self.locator, - "to_self_delay": self.to_self_delay, - "encrypted_blob": self.encrypted_blob, - } - - return appointment + return self.__dict__ def serialize(self): """ diff --git a/common/constants.py b/common/constants.py index 904db90..6e8a6ee 100644 --- a/common/constants.py +++ b/common/constants.py @@ -8,5 +8,8 @@ HTTP_BAD_REQUEST = 400 HTTP_NOT_FOUND = 404 HTTP_SERVICE_UNAVAILABLE = 503 +# LN general nomenclature +IRREVOCABLY_RESOLVED = 100 + # Temporary constants, may be changed ENCRYPTED_BLOB_MAX_SIZE_HEX = 2 * 2048 diff --git a/teos/api.py b/teos/api.py index 6bab385..d1b31a4 100644 --- a/teos/api.py +++ b/teos/api.py @@ -1,22 +1,16 @@ import os import logging -from math import ceil from flask import Flask, request, abort, jsonify from teos import LOG_PREFIX import teos.errors as errors from teos.inspector import InspectionFailed -from teos.gatekeeper import NotEnoughSlots, IdentificationFailure +from teos.watcher import AppointmentLimitReached +from teos.gatekeeper import NotEnoughSlots, AuthenticationFailure from common.logger import Logger from common.cryptographer import hash_160 -from common.constants import ( - HTTP_OK, - HTTP_BAD_REQUEST, - HTTP_SERVICE_UNAVAILABLE, - HTTP_NOT_FOUND, - ENCRYPTED_BLOB_MAX_SIZE_HEX, -) +from common.constants import HTTP_OK, HTTP_BAD_REQUEST, HTTP_SERVICE_UNAVAILABLE, HTTP_NOT_FOUND # ToDo: #5-add-async-to-api @@ -130,11 +124,11 @@ class API: if client_pk: try: rcode = HTTP_OK - available_slots, subscription_end_time = self.gatekeeper.add_update_user(client_pk) + available_slots, subscription_expiry = self.gatekeeper.add_update_user(client_pk) response = { "public_key": client_pk, "available_slots": available_slots, - "subscription_end_time": subscription_end_time, + "subscription_expiry": subscription_expiry, } except ValueError as e: @@ -176,67 +170,17 @@ class API: except TypeError as e: return abort(HTTP_BAD_REQUEST, e) - # We kind of have the chicken an the egg problem here. Data must be verified and the signature must be checked: - # - If we verify the data first, we may encounter that the signature is wrong and wasted some time. - # - If we check the signature first, we may need to verify some of the information or expose to build - # appointments with potentially wrong data, which may be exploitable. - # - # The first approach seems safer since it only implies a bunch of pretty quick checks. - try: appointment = self.inspector.inspect(request_data.get("appointment")) - user_pk = self.gatekeeper.identify_user(appointment.serialize(), request_data.get("signature")) - - # Check if the appointment is an update. Updates will return a summary. - appointment_uuid = hash_160("{}{}".format(appointment.locator, user_pk)) - appointment_summary = self.watcher.get_appointment_summary(appointment_uuid) - - if appointment_summary: - used_slots = ceil(appointment_summary.get("size") / ENCRYPTED_BLOB_MAX_SIZE_HEX) - required_slots = ceil(len(appointment.encrypted_blob) / ENCRYPTED_BLOB_MAX_SIZE_HEX) - slot_diff = required_slots - used_slots - - # For updates we only reserve the slot difference provided the new one is bigger. - required_slots = slot_diff if slot_diff > 0 else 0 - - else: - # For regular appointments 1 slot is reserved per ENCRYPTED_BLOB_MAX_SIZE_HEX block. - slot_diff = 0 - required_slots = ceil(len(appointment.encrypted_blob) / ENCRYPTED_BLOB_MAX_SIZE_HEX) - - # Slots are reserved before adding the appointments to prevent race conditions. - # DISCUSS: It may be worth using signals here to avoid race conditions anyway. - self.gatekeeper.fill_slots(user_pk, required_slots) - - appointment_added, signature = self.watcher.add_appointment( - appointment, user_pk, self.gatekeeper.registered_users[user_pk].subscription_end_time - ) - - if appointment_added: - # If the appointment is added and the update is smaller than the original, the difference is given back. - if slot_diff < 0: - self.gatekeeper.free_slots(user_pk, abs(slot_diff)) - - rcode = HTTP_OK - response = { - "locator": appointment.locator, - "signature": signature, - "available_slots": self.gatekeeper.registered_users[user_pk].available_slots, - "subscription_end_time": self.gatekeeper.registered_users[user_pk].subscription_end_time, - } - - else: - # If the appointment is not added the reserved slots are given back - self.gatekeeper.free_slots(user_pk, required_slots) - rcode = HTTP_SERVICE_UNAVAILABLE - response = {"error": "appointment rejected"} + response = self.watcher.add_appointment(appointment, request_data.get("signature")) + rcode = HTTP_OK except InspectionFailed as e: rcode = HTTP_BAD_REQUEST error = "appointment rejected. Error {}: {}".format(e.erno, e.reason) response = {"error": error} - except (IdentificationFailure, NotEnoughSlots): + except (AuthenticationFailure, NotEnoughSlots): rcode = HTTP_BAD_REQUEST error = "appointment rejected. Error {}: {}".format( errors.APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS, @@ -244,6 +188,10 @@ class API: ) response = {"error": error} + except AppointmentLimitReached: + rcode = HTTP_SERVICE_UNAVAILABLE + response = {"error": "appointment rejected"} + logger.info("Sending response and disconnecting", from_addr="{}".format(remote_addr), response=response) return jsonify(response), rcode @@ -285,7 +233,7 @@ class API: message = "get appointment {}".format(locator).encode() signature = request_data.get("signature") - user_pk = self.gatekeeper.identify_user(message, signature) + user_pk = self.gatekeeper.authenticate_user(message, signature) triggered_appointments = self.watcher.db_manager.load_all_triggered_flags() uuid = hash_160("{}{}".format(locator, user_pk)) @@ -295,8 +243,8 @@ class API: appointment_data = self.watcher.db_manager.load_responder_tracker(uuid) if appointment_data: rcode = HTTP_OK - # Remove expiry field from appointment data since it is an internal field - appointment_data.pop("expiry") + # Remove user_id field from appointment data since it is an internal field + appointment_data.pop("user_id") response = {"locator": locator, "status": "dispute_responded", "appointment": appointment_data} else: rcode = HTTP_NOT_FOUND @@ -307,14 +255,14 @@ class API: appointment_data = self.watcher.db_manager.load_watcher_appointment(uuid) if appointment_data: rcode = HTTP_OK - # Remove expiry field from appointment data since it is an internal field - appointment_data.pop("expiry") + # Remove user_id field from appointment data since it is an internal field + appointment_data.pop("user_id") response = {"locator": locator, "status": "being_watched", "appointment": appointment_data} else: rcode = HTTP_NOT_FOUND response = {"locator": locator, "status": "not_found"} - except (InspectionFailed, IdentificationFailure): + except (InspectionFailed, AuthenticationFailure): rcode = HTTP_NOT_FOUND response = {"locator": locator, "status": "not_found"} diff --git a/teos/builder.py b/teos/builder.py index 831236f..e6226ed 100644 --- a/teos/builder.py +++ b/teos/builder.py @@ -7,19 +7,19 @@ class Builder: @staticmethod def build_appointments(appointments_data): """ - Builds an appointments dictionary (``uuid: Appointment``) and a locator_uuid_map (``locator: uuid``) given a - dictionary of appointments from the database. + Builds an appointments dictionary (``uuid: ExtendedAppointment``) and a locator_uuid_map (``locator: uuid``) + given a dictionary of appointments from the database. Args: appointments_data (:obj:`dict`): a dictionary of dictionaries representing all the :obj:`Watcher ` appointments stored in the database. The structure is as follows: - ``{uuid: {locator: str, start_time: int, ...}, uuid: {locator:...}}`` + ``{uuid: {locator: str, ...}, uuid: {locator:...}}`` Returns: :obj:`tuple`: A tuple with two dictionaries. ``appointments`` containing the appointment information in - :obj:`Appointment ` objects and ``locator_uuid_map`` containing a map of - appointment (``uuid:locator``). + :obj:`ExtendedAppointment ` objects and ``locator_uuid_map`` + containing a map of appointment (``uuid:locator``). """ appointments = {} @@ -28,7 +28,7 @@ class Builder: for uuid, data in appointments_data.items(): appointments[uuid] = { "locator": data.get("locator"), - "end_time": data.get("end_time"), + "user_id": data.get("user_id"), "size": len(data.get("encrypted_blob")), } @@ -67,7 +67,7 @@ class Builder: trackers[uuid] = { "penalty_txid": data.get("penalty_txid"), "locator": data.get("locator"), - "appointment_end": data.get("appointment_end"), + "user_id": data.get("user_id"), } if data.get("penalty_txid") in tx_tracker_map: diff --git a/teos/cleaner.py b/teos/cleaner.py index a9cba0a..5ddc109 100644 --- a/teos/cleaner.py +++ b/teos/cleaner.py @@ -87,7 +87,7 @@ class Cleaner: @staticmethod def delete_expired_appointments(expired_appointments, appointments, locator_uuid_map, db_manager): """ - Deletes appointments which ``end_time`` has been reached (with no trigger) both from memory + Deletes appointments which ``expiry`` has been reached (with no trigger) both from memory (:obj:`Watcher `) and disk. Args: @@ -181,10 +181,10 @@ class Cleaner: db_manager.create_triggered_appointment_flag(uuid) @staticmethod - def delete_completed_trackers(completed_trackers, height, trackers, tx_tracker_map, db_manager): + def delete_trackers(completed_trackers, height, trackers, tx_tracker_map, db_manager, expired=False): """ - Deletes a completed tracker both from memory (:obj:`Responder `) and disk (from the - Responder's and Watcher's databases). + Deletes completed/expired trackers both from memory (:obj:`Responder `) and disk + (from the Responder's and Watcher's databases). Args: trackers (:obj:`dict`): a dictionary containing all the :obj:`Responder ` @@ -195,17 +195,23 @@ class Cleaner: height (:obj:`int`): the block height at which the trackers were completed. db_manager (:obj:`AppointmentsDBM `): a ``AppointmentsDBM`` instance to interact with the database. + expired (:obj:`bool`): whether the trackers have expired or not. Defaults to False. """ locator_maps_to_update = {} - for uuid, confirmations in completed_trackers.items(): - logger.info( - "Appointment completed. Appointment ended after reaching enough confirmations", - uuid=uuid, - height=height, - confirmations=confirmations, - ) + for uuid in completed_trackers: + + if expired: + logger.info( + "Appointment couldn't be completed. Expiry reached but penalty didn't make it to the chain", + uuid=uuid, + height=height, + ) + else: + logger.info( + "Appointment completed. Penalty transaction was irrevocably confirmed", uuid=uuid, height=height + ) penalty_txid = trackers[uuid].get("penalty_txid") locator = trackers[uuid].get("locator") @@ -229,6 +235,6 @@ class Cleaner: Cleaner.update_delete_db_locator_map(uuids, locator, db_manager) # Delete appointment from the db (from watchers's and responder's db) and remove flag - db_manager.batch_delete_responder_trackers(list(completed_trackers.keys())) - db_manager.batch_delete_watcher_appointments(list(completed_trackers.keys())) - db_manager.batch_delete_triggered_appointment_flag(list(completed_trackers.keys())) + db_manager.batch_delete_responder_trackers(completed_trackers) + db_manager.batch_delete_watcher_appointments(completed_trackers) + db_manager.batch_delete_triggered_appointment_flag(completed_trackers) diff --git a/teos/extended_appointment.py b/teos/extended_appointment.py new file mode 100644 index 0000000..7dd2c65 --- /dev/null +++ b/teos/extended_appointment.py @@ -0,0 +1,37 @@ +from common.appointment import Appointment + + +class ExtendedAppointment(Appointment): + def __init__(self, locator, to_self_delay, encrypted_blob, user_id): + super().__init__(locator, to_self_delay, encrypted_blob) + self.user_id = user_id + + @classmethod + def from_dict(cls, appointment_data): + """ + Builds an appointment from a dictionary. + + This method is useful to load data from a database. + + Args: + appointment_data (:obj:`dict`): a dictionary containing the following keys: + ``{locator, to_self_delay, encrypted_blob, expiry}`` + + Returns: + :obj:`ExtendedAppointment `: An appointment initialized + using the provided data. + + Raises: + ValueError: If one of the mandatory keys is missing in ``appointment_data``. + """ + + appointment = Appointment.from_dict(appointment_data) + user_id = appointment_data.get("user_id") + + if not user_id: + raise ValueError("Wrong appointment data, user_id is missing") + + else: + appointment = cls(appointment.locator, appointment.to_self_delay, appointment.encrypted_blob, user_id) + + return appointment diff --git a/teos/gatekeeper.py b/teos/gatekeeper.py index baa21d8..52811ff 100644 --- a/teos/gatekeeper.py +++ b/teos/gatekeeper.py @@ -1,19 +1,21 @@ +from math import ceil +from threading import Lock + from common.tools import is_compressed_pk from common.cryptographer import Cryptographer +from common.constants import ENCRYPTED_BLOB_MAX_SIZE_HEX from common.exceptions import InvalidParameter, InvalidKey, SignatureError class NotEnoughSlots(ValueError): """Raised when trying to subtract more slots than a user has available""" - def __init__(self, user_pk, requested_slots): - self.user_pk = user_pk - self.requested_slots = requested_slots + pass -class IdentificationFailure(Exception): +class AuthenticationFailure(Exception): """ - Raised when a user can not be identified. Either the user public key cannot be recovered or the user is + Raised when a user can not be authenticated. Either the user public key cannot be recovered or the user is not found within the registered ones. """ @@ -21,12 +23,12 @@ class IdentificationFailure(Exception): class UserInfo: - def __init__(self, available_slots, subscription_end_time, appointments=None): + def __init__(self, available_slots, subscription_expiry, appointments=None): self.available_slots = available_slots - self.subscription_end_time = subscription_end_time + self.subscription_expiry = subscription_expiry if not appointments: - self.appointments = {} + self.appointments = [] else: self.appointments = appointments @@ -34,9 +36,9 @@ class UserInfo: def from_dict(cls, user_data): available_slots = user_data.get("available_slots") appointments = user_data.get("appointments") - subscription_end_time = user_data.get("subscription_end_time") + subscription_expiry = user_data.get("subscription_expiry") - if any(v is None for v in [available_slots, appointments, subscription_end_time]): + if any(v is None for v in [available_slots, appointments, subscription_expiry]): raise ValueError("Wrong appointment data, some fields are missing") return cls(available_slots, subscription_expiry, appointments) @@ -54,14 +56,16 @@ class Gatekeeper: registered_users (:obj:`dict`): a map of user_pk:UserInfo. """ - def __init__(self, user_db, block_processor, default_slots, default_subscription_duration): + def __init__(self, user_db, block_processor, default_slots, default_subscription_duration, expiry_delta): self.default_slots = default_slots - self.block_processor = block_processor self.default_subscription_duration = default_subscription_duration + self.expiry_delta = expiry_delta + self.block_processor = block_processor self.user_db = user_db self.registered_users = { user_id: UserInfo.from_dict(user_data) for user_id, user_data in user_db.load_all_users().items() } + self.lock = Lock() def add_update_user(self, user_pk): """ @@ -91,9 +95,9 @@ class Gatekeeper: self.user_db.store_user(user_pk, self.registered_users[user_pk].to_dict()) - return self.registered_users[user_pk].available_slots, self.registered_users[user_pk].subscription_end_time + return self.registered_users[user_pk].available_slots, self.registered_users[user_pk].subscription_expiry - def identify_user(self, message, signature): + def authenticate_user(self, message, signature): """ Checks if a request comes from a registered user by ec-recovering their public key from a signed message. @@ -105,7 +109,7 @@ class Gatekeeper: :obj:`str`: a compressed key recovered from the signature and matching a registered user. Raises: - :obj:`IdentificationFailure`: if the user cannot be identified. + :obj:`AuthenticationFailure`: if the user cannot be authenticated. """ try: @@ -115,40 +119,34 @@ class Gatekeeper: if compressed_pk in self.registered_users: return compressed_pk else: - raise IdentificationFailure("User not found.") + raise AuthenticationFailure("User not found.") except (InvalidParameter, InvalidKey, SignatureError): - raise IdentificationFailure("Wrong message or signature.") + raise AuthenticationFailure("Wrong message or signature.") - def fill_slots(self, user_pk, n): - """ - Fills a given number os slots of the user subscription. - - Args: - user_pk(:obj:`str`): the public key that identifies the user (33-bytes hex str). - n (:obj:`int`): the number of slots to fill. - - Raises: - :obj:`NotEnoughSlots`: if the user subscription does not have enough slots. - """ - - # DISCUSS: we may want to return a different exception if the user does not exist - if user_pk in self.registered_users and n <= self.registered_users.get(user_pk).available_slots: - self.registered_users[user_pk].available_slots -= n - self.user_db.store_user(user_pk, self.registered_users[user_pk].to_dict()) + def update_available_slots(self, user_id, new_appointment, old_appointment=None): + self.lock.acquire() + if old_appointment: + # For updates the difference between the existing appointment and the update is computed. + used_slots = ceil(new_appointment.get("size") / ENCRYPTED_BLOB_MAX_SIZE_HEX) + required_slots = ceil(old_appointment.get("size") / ENCRYPTED_BLOB_MAX_SIZE_HEX) - used_slots else: - raise NotEnoughSlots(user_pk, n) + # For regular appointments 1 slot is reserved per ENCRYPTED_BLOB_MAX_SIZE_HEX block. + required_slots = ceil(new_appointment.get("size") / ENCRYPTED_BLOB_MAX_SIZE_HEX) - def free_slots(self, user_pk, n): - """ - Frees some slots of a user subscription. + if required_slots <= self.registered_users.get(user_id).available_slots: + # Filling / freeing slots depending on whether this is an update or not, and if it is bigger or smaller than + # the old appointment. + self.registered_users.get(user_id).available_slots -= required_slots + else: + self.lock.release() + raise NotEnoughSlots() - Args: - user_pk(:obj:`str`): the public key that identifies the user (33-bytes hex str). - n (:obj:`int`): the number of slots to free. - """ + self.lock.release() + return self.registered_users.get(user_id).available_slots - # DISCUSS: if the user does not exist we may want to log or return an exception. - if user_pk in self.registered_users: - self.registered_users[user_pk].available_slots += n - self.user_db.store_user(user_pk, self.registered_users[user_pk].to_dict()) + def get_expiring_appointments(self, block_height): + expiring_appointments = [] + for user_id, user_info in self.registered_users.items(): + if block_height > user_info.subscription_expiry + self.expiry_delta: + expiring_appointments.extend(user_info.appointments) diff --git a/teos/inspector.py b/teos/inspector.py index 8effcf4..2185a20 100644 --- a/teos/inspector.py +++ b/teos/inspector.py @@ -3,9 +3,9 @@ import re from common.logger import Logger from common.tools import is_locator from common.constants import LOCATOR_LEN_HEX -from common.appointment import Appointment from teos import errors, LOG_PREFIX +from teos.extended_appointment import ExtendedAppointment logger = Logger(actor="Inspector", log_name_prefix=LOG_PREFIX) @@ -49,7 +49,8 @@ class Inspector: Returns: - :obj:`Appointment `: An appointment initialized with the provided data. + :obj:`Extended `: An appointment initialized with + the provided data. Raises: :obj:`InspectionFailed`: if any of the fields is wrong. @@ -68,7 +69,13 @@ class Inspector: self.check_to_self_delay(appointment_data.get("to_self_delay")) self.check_blob(appointment_data.get("encrypted_blob")) - return Appointment.from_dict(appointment_data) + # Set user_id to None since we still don't know it, it'll be set by the API after querying the gatekeeper + return ExtendedAppointment( + appointment_data.get("locator"), + appointment_data.get("to_self_delay"), + appointment_data.get("encrypted_blob"), + user_id=None, + ) @staticmethod def check_locator(locator): diff --git a/teos/responder.py b/teos/responder.py index 862733f..16d19c8 100644 --- a/teos/responder.py +++ b/teos/responder.py @@ -2,9 +2,11 @@ from queue import Queue from threading import Thread from teos import LOG_PREFIX -from common.logger import Logger from teos.cleaner import Cleaner +from common.logger import Logger +from common.constants import IRREVOCABLY_RESOLVED + CONFIRMATIONS_BEFORE_RETRY = 6 MIN_CONFIRMATIONS = 6 @@ -26,16 +28,15 @@ class TransactionTracker: dispute_txid (:obj:`str`): the id of the transaction that created the channel breach and triggered the penalty. penalty_txid (:obj:`str`): the id of the transaction that was encrypted under ``dispute_txid``. penalty_rawtx (:obj:`str`): the raw transaction that was broadcast as a consequence of the channel breach. - appointment_end (:obj:`int`): the block at which the tower will stop monitoring the blockchain for this - appointment. + user_id(:obj:`str`): the public key that identifies the user (33-bytes hex str). """ - def __init__(self, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end): + def __init__(self, locator, dispute_txid, penalty_txid, penalty_rawtx, user_id): self.locator = locator self.dispute_txid = dispute_txid self.penalty_txid = penalty_txid self.penalty_rawtx = penalty_rawtx - self.appointment_end = appointment_end + self.user_id = user_id @classmethod def from_dict(cls, tx_tracker_data): @@ -60,13 +61,13 @@ class TransactionTracker: dispute_txid = tx_tracker_data.get("dispute_txid") penalty_txid = tx_tracker_data.get("penalty_txid") penalty_rawtx = tx_tracker_data.get("penalty_rawtx") - appointment_end = tx_tracker_data.get("appointment_end") + user_id = tx_tracker_data.get("user_id") - if any(v is None for v in [locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end]): + if any(v is None for v in [locator, dispute_txid, penalty_txid, penalty_rawtx, user_id]): raise ValueError("Wrong transaction tracker data, some fields are missing") else: - tx_tracker = cls(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end) + tx_tracker = cls(locator, dispute_txid, penalty_txid, penalty_rawtx, user_id) return tx_tracker @@ -83,7 +84,7 @@ class TransactionTracker: "dispute_txid": self.dispute_txid, "penalty_txid": self.penalty_txid, "penalty_rawtx": self.penalty_rawtx, - "appointment_end": self.appointment_end, + "user_id": self.user_id, } return tx_tracker @@ -104,7 +105,7 @@ class Responder: Attributes: trackers (:obj:`dict`): A dictionary containing the minimum information about the :obj:`TransactionTracker` - required by the :obj:`Responder` (``penalty_txid``, ``locator`` and ``end_time``). + required by the :obj:`Responder` (``penalty_txid``, ``locator`` and ``user_id``). Each entry is identified by a ``uuid``. tx_tracker_map (:obj:`dict`): A ``penalty_txid:uuid`` map used to allow the :obj:`Responder` to deal with several trackers triggered by the same ``penalty_txid``. @@ -121,13 +122,14 @@ class Responder: last_known_block (:obj:`str`): the last block known by the ``Responder``. """ - def __init__(self, db_manager, carrier, block_processor): + def __init__(self, db_manager, gatekeeper, carrier, block_processor): self.trackers = dict() self.tx_tracker_map = dict() self.unconfirmed_txs = [] self.missed_confirmations = dict() self.block_queue = Queue() self.db_manager = db_manager + self.gatekeeper = gatekeeper self.carrier = carrier self.block_processor = block_processor self.last_known_block = db_manager.load_last_block_hash_responder() @@ -169,7 +171,7 @@ class Responder: return synchronized - def handle_breach(self, uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, block_hash): + def handle_breach(self, uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, user_id, block_hash): """ Requests the :obj:`Responder` to handle a channel breach. This is the entry point of the :obj:`Responder`. @@ -179,8 +181,7 @@ class Responder: dispute_txid (:obj:`str`): the id of the transaction that created the channel breach. penalty_txid (:obj:`str`): the id of the decrypted transaction included in the appointment. penalty_rawtx (:obj:`str`): the raw transaction to be broadcast in response of the breach. - appointment_end (:obj:`int`): the block height at which the :obj:`Responder` will stop monitoring for this - penalty transaction. + user_id(:obj:`str`): the public key that identifies the user (33-bytes hex str). block_hash (:obj:`str`): the block hash at which the breach was seen (used to see if we are on sync). Returns: @@ -191,9 +192,7 @@ class Responder: receipt = self.carrier.send_transaction(penalty_rawtx, penalty_txid) if receipt.delivered: - self.add_tracker( - uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, receipt.confirmations - ) + self.add_tracker(uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, user_id, receipt.confirmations) else: # TODO: Add the missing reasons (e.g. RPC_VERIFY_REJECTED) @@ -204,7 +203,7 @@ class Responder: return receipt - def add_tracker(self, uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end, confirmations=0): + def add_tracker(self, uuid, locator, dispute_txid, penalty_txid, penalty_rawtx, user_id, confirmations=0): """ Creates a :obj:`TransactionTracker` after successfully broadcasting a ``penalty_tx``. @@ -217,20 +216,15 @@ class Responder: dispute_txid (:obj:`str`): the id of the transaction that created the channel breach. penalty_txid (:obj:`str`): the id of the decrypted transaction included in the appointment. penalty_rawtx (:obj:`str`): the raw transaction to be broadcast. - appointment_end (:obj:`int`): the block height at which the :obj:`Responder` will stop monitoring for the - tracker. + user_id(:obj:`str`): the public key that identifies the user (33-bytes hex str). confirmations (:obj:`int`): the confirmation count of the ``penalty_tx``. In normal conditions it will be zero, but if the transaction is already on the blockchain this won't be the case. """ - tracker = TransactionTracker(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end) + tracker = TransactionTracker(locator, dispute_txid, penalty_txid, penalty_rawtx, user_id) - # We only store the penalty_txid, locator and appointment_end in memory. The rest is dumped into the db. - self.trackers[uuid] = { - "penalty_txid": tracker.penalty_txid, - "locator": locator, - "appointment_end": appointment_end, - } + # We only store the penalty_txid, locator and user_id in memory. The rest is dumped into the db. + self.trackers[uuid] = {"penalty_txid": tracker.penalty_txid, "locator": locator, "user_id": user_id} if penalty_txid in self.tx_tracker_map: self.tx_tracker_map[penalty_txid].append(uuid) @@ -244,9 +238,7 @@ class Responder: self.db_manager.store_responder_tracker(uuid, tracker.to_dict()) - logger.info( - "New tracker added", dispute_txid=dispute_txid, penalty_txid=penalty_txid, appointment_end=appointment_end - ) + logger.info("New tracker added", dispute_txid=dispute_txid, penalty_txid=penalty_txid, user_id=user_id) def do_watch(self): """ @@ -271,15 +263,22 @@ class Responder: if self.last_known_block == block.get("previousblockhash"): self.check_confirmations(txids) - - height = block.get("height") - completed_trackers = self.get_completed_trackers(height) - Cleaner.delete_completed_trackers( - completed_trackers, height, self.trackers, self.tx_tracker_map, self.db_manager + Cleaner.delete_trackers( + self.get_completed_trackers(), + block.get("height"), + self.trackers, + self.tx_tracker_map, + self.db_manager, ) - - txs_to_rebroadcast = self.get_txs_to_rebroadcast() - self.rebroadcast(txs_to_rebroadcast) + Cleaner.delete_trackers( + self.get_expired_trackers(block.get("height")), + block.get("height"), + self.trackers, + self.tx_tracker_map, + self.db_manager, + expired=True, + ) + self.rebroadcast(self.get_txs_to_rebroadcast()) # NOTCOVERED else: @@ -295,7 +294,7 @@ class Responder: # Clear the receipts issued in this block self.carrier.issued_receipts = {} - if len(self.trackers) != 0: + if len(self.trackers) == 0: logger.info("No more pending trackers") # Register the last processed block for the responder @@ -349,40 +348,56 @@ class Responder: return txs_to_rebroadcast - def get_completed_trackers(self, height): + def get_completed_trackers(self): """ - Gets the trackers that has already been fulfilled based on a given height (``end_time`` was reached with a - minimum confirmation count). + Gets the trackers that has already been fulfilled based on a given height (the justice transaction is + irrevocably resolved). + + Returns: + :obj:`list`: a list of completed trackers uuids. + """ + + completed_trackers = [] + # FIXME: This is here for duplicated penalties, we should be able to get rid of it once we prevent duplicates in + # the responder. + checked_txs = {} + + for uuid, tracker_data in self.trackers.items(): + if tracker_data.get("penalty_txid") not in self.unconfirmed_txs: + if tracker_data.get("penalty_txid") not in checked_txs: + tx = self.carrier.get_transaction(tracker_data.get("penalty_txid")) + else: + tx = checked_txs.get(tracker_data.get("penalty_txid")) + + if tx is not None: + confirmations = tx.get("confirmations") + checked_txs[tracker_data.get("penalty_txid")] = tx + + if confirmations is not None and confirmations >= IRREVOCABLY_RESOLVED: + # The end of the appointment has been reached + completed_trackers.append(uuid) + + return completed_trackers + + def get_expired_trackers(self, height): + """ + Gets trackers than are expired due to the user subscription expiring. + + Only gets those trackers which penalty transaction is not going trough (probably because of low fees), the rest + will be eventually completed once they are irrevocably resolved. Args: height (:obj:`int`): the height of the last received block. Returns: - :obj:`dict`: a dict (``uuid:confirmations``) of the completed trackers. + :obj:`list`: a list of the expired trackers uuids. """ - completed_trackers = {} - checked_txs = {} + expired_trackers = [ + uuid for uuid in self.gatekeeper.get_expired_appointment(height) if uuid in self.unconfirmed_txs + ] - for uuid, tracker_data in self.trackers.items(): - appointment_end = tracker_data.get("appointment_end") - penalty_txid = tracker_data.get("penalty_txid") - if appointment_end <= height and penalty_txid not in self.unconfirmed_txs: - - if penalty_txid not in checked_txs: - tx = self.carrier.get_transaction(penalty_txid) - else: - tx = checked_txs.get(penalty_txid) - - if tx is not None: - confirmations = tx.get("confirmations") - checked_txs[penalty_txid] = tx - - if confirmations is not None and confirmations >= MIN_CONFIRMATIONS: - # The end of the appointment has been reached - completed_trackers[uuid] = confirmations - - return completed_trackers + return expired_trackers def rebroadcast(self, txs_to_rebroadcast): """ @@ -465,7 +480,7 @@ class Responder: tracker.dispute_txid, tracker.penalty_txid, tracker.penalty_rawtx, - tracker.appointment_end, + tracker.user_id, block_hash, ) diff --git a/teos/teosd.py b/teos/teosd.py index 03575d1..46a4fe2 100644 --- a/teos/teosd.py +++ b/teos/teosd.py @@ -70,16 +70,19 @@ def main(command_line_conf): block_processor = BlockProcessor(bitcoind_connect_params) carrier = Carrier(bitcoind_connect_params) - responder = Responder(db_manager, carrier, block_processor) - watcher = Watcher( - db_manager, + gatekeeper = Gatekeeper( + UsersDBM(config.get("USERS_DB_PATH")), block_processor, - responder, - secret_key_der, - config.get("MAX_APPOINTMENTS"), + config.get("DEFAULT_SLOTS"), + config.get("DEFAULT_SUBSCRIPTION_DURATION"), config.get("EXPIRY_DELTA"), ) + responder = Responder(db_manager, gatekeeper, carrier, block_processor) + watcher = Watcher( + db_manager, gatekeeper, block_processor, responder, secret_key_der, config.get("MAX_APPOINTMENTS") + ) + # Create the chain monitor and start monitoring the chain chain_monitor = ChainMonitor( watcher.block_queue, watcher.responder.block_queue, block_processor, bitcoind_feed_params @@ -151,12 +154,6 @@ def main(command_line_conf): # Fire the API and the ChainMonitor # FIXME: 92-block-data-during-bootstrap-db chain_monitor.monitor_chain() - gatekeeper = Gatekeeper( - UsersDBM(config.get("USERS_DB_PATH")), - block_processor, - config.get("DEFAULT_SLOTS"), - config.get("DEFAULT_SUBSCRIPTION_DURATION"), - ) inspector = Inspector(block_processor, config.get("MIN_TO_SELF_DELAY")) API(config.get("API_BIND"), config.get("API_PORT"), inspector, watcher, gatekeeper).start() except Exception as e: diff --git a/teos/watcher.py b/teos/watcher.py index 5f60fd3..a060bd4 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -3,17 +3,22 @@ from threading import Thread from common.logger import Logger from common.tools import compute_locator -from common.appointment import Appointment +from common.exceptions import BasicException from common.exceptions import EncryptionError from common.cryptographer import Cryptographer, hash_160 from common.exceptions import InvalidParameter, SignatureError from teos import LOG_PREFIX from teos.cleaner import Cleaner +from teos.extended_appointment import ExtendedAppointment logger = Logger(actor="Watcher", log_name_prefix=LOG_PREFIX) +class AppointmentLimitReached(BasicException): + """Raised when the tower maximum appointment count has been reached""" + + class Watcher: """ The :class:`Watcher` is in charge of watching for channel breaches for the appointments accepted by the tower. @@ -36,12 +41,11 @@ class Watcher: responder (:obj:`Responder `): a ``Responder`` instance. sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance). max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time. - expiry_delta (:obj:`int`): the additional time the ``Watcher`` will keep an expired appointment around. Attributes: - appointments (:obj:`dict`): a dictionary containing a summary of the appointments (:obj:`Appointment - ` instances) accepted by the tower (``locator``, ``end_time``, and ``size``). - It's populated trough ``add_appointment``. + appointments (:obj:`dict`): a dictionary containing a summary of the appointments (:obj:`ExtendedAppointment + ` instances) accepted by the tower (``locator``, + ``user_id``, and ``size``). It's populated trough ``add_appointment``. locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several appointments with the same ``locator``. block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is @@ -61,15 +65,15 @@ class Watcher: """ - def __init__(self, db_manager, block_processor, responder, sk_der, max_appointments, expiry_delta): + def __init__(self, db_manager, gatekeeper, block_processor, responder, sk_der, max_appointments): self.appointments = dict() self.locator_uuid_map = dict() self.block_queue = Queue() self.db_manager = db_manager + self.gatekeeper = gatekeeper self.block_processor = block_processor self.responder = responder self.max_appointments = max_appointments - self.expiry_delta = expiry_delta self.signing_key = Cryptographer.load_private_key_der(sk_der) self.last_known_block = db_manager.load_last_block_hash_watcher() @@ -81,21 +85,7 @@ class Watcher: return watcher_thread - def get_appointment_summary(self, uuid): - """ - Returns the summary of an appointment. The summary consists of the data kept in memory: - {locator, end_time, and size} - - Args: - uuid (:obj:`str`): a 16-byte hex string identifying the appointment. - - Returns: - :obj:`dict` or :obj:`None`: a dictionary with the appointment summary, or ``None`` if the appointment is not - found. - """ - return self.appointments.get(uuid) - - def add_appointment(self, appointment, user_pk, end_time): + def add_appointment(self, appointment, signature): """ Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached. @@ -111,61 +101,63 @@ class Watcher: identified by ``uuid`` and stored in ``appointments`` and ``locator_uuid_map``. Args: - appointment (:obj:`Appointment `): the appointment to be added to the - :obj:`Watcher`. - user_pk(:obj:`str`): the public key that identifies the user who sent the appointment (33-bytes hex str). - end_time (:obj:`int`): the block height where the tower will stop watching for breaches. + appointment (:obj:`ExtendedAppointment `): the appointment to + be added to the :obj:`Watcher`. + signature (:obj:`str`): the user's appointment signature (hex-encoded). Returns: - :obj:`tuple`: A tuple signaling if the appointment has been added or not (based on ``max_appointments``). - The structure looks as follows: + :obj:`dict`: The tower response as a dict, containing: locator, signature, available_slots and + subscription_expiry. - - ``(True, signature)`` if the appointment has been accepted. - - ``(False, None)`` otherwise. + Raises: + :obj:`AppointmentLimitReached`: If the tower cannot hold more appointments (cap reached). + :obj:`AuthenticationFailure `: If the user cannot be authenticated. + :obj:`NotEnoughSlots `: If the user does not have enough available slots, + so the appointment is rejected """ - if len(self.appointments) < self.max_appointments: + if len(self.appointments) >= self.max_appointments: + message = "Maximum appointments reached, appointment rejected" + logger.info(message, locator=appointment.locator) + raise AppointmentLimitReached(message) - # The uuids are generated as the RIPMED160(locator||user_pubkey), that way the tower does not need to know - # anything about the user from this point on (no need to store user_pk in the database). - # If an appointment is requested by the user the uuid can be recomputed and queried straightaway (no maps). - uuid = hash_160("{}{}".format(appointment.locator, user_pk)) - self.appointments[uuid] = { - "locator": appointment.locator, - "end_time": end_time, - "size": len(appointment.encrypted_blob), - } + user_id = self.gatekeeper.authenticate_user(appointment.serialize(), signature) - if appointment.locator in self.locator_uuid_map: - # If the uuid is already in the map it means this is an update. - if uuid not in self.locator_uuid_map[appointment.locator]: - self.locator_uuid_map[appointment.locator].append(uuid) + # The uuids are generated as the RIPMED160(locator||user_pubkey). + # If an appointment is requested by the user the uuid can be recomputed and queried straightaway (no maps). + uuid = hash_160("{}{}".format(appointment.locator, user_id)) + appointment_dict = {"locator": appointment.locator, "user_id": user_id, "size": len(appointment.encrypted_blob)} - else: - self.locator_uuid_map[appointment.locator] = [uuid] - - self.db_manager.store_watcher_appointment(uuid, appointment.to_dict(), end_time) - self.db_manager.create_append_locator_map(appointment.locator, uuid) - - appointment_added = True - - try: - signature = Cryptographer.sign(appointment.serialize(), self.signing_key) - - except (InvalidParameter, SignatureError): - # This should never happen since data is sanitized, just in case to avoid a crash - logger.error("Data couldn't be signed", appointment=appointment.to_dict()) - signature = None - - logger.info("New appointment accepted", locator=appointment.locator) + available_slots = self.gatekeeper.update_available_slots(user_id, appointment_dict, self.appointments.get(uuid)) + self.gatekeeper.registered_users.appointments.append(uuid) + self.appointments[uuid] = appointment_dict + if appointment.locator in self.locator_uuid_map: + # If the uuid is already in the map it means this is an update. + if uuid not in self.locator_uuid_map[appointment.locator]: + self.locator_uuid_map[appointment.locator].append(uuid) else: - appointment_added = False + self.locator_uuid_map[appointment.locator] = [uuid] + + self.db_manager.store_watcher_appointment(uuid, appointment.to_dict()) + self.db_manager.create_append_locator_map(appointment.locator, uuid) + + try: + signature = Cryptographer.sign(appointment.serialize(), self.signing_key) + + except (InvalidParameter, SignatureError): + # This should never happen since data is sanitized, just in case to avoid a crash + logger.error("Data couldn't be signed", appointment=appointment.to_dict()) signature = None - logger.info("Maximum appointments reached, appointment rejected", locator=appointment.locator) + logger.info("New appointment accepted", locator=appointment.locator) - return appointment_added, signature + return { + "locator": appointment.locator, + "signature": signature, + "available_slots": available_slots, + "subscription_expiry": self.gatekeeper.registered_users[user_id].subscription_expiry, + } def do_watch(self): """ @@ -188,11 +180,7 @@ class Watcher: if len(self.appointments) > 0 and block is not None: txids = block.get("tx") - expired_appointments = [ - uuid - for uuid, appointment_data in self.appointments.items() - if block["height"] > appointment_data.get("end_time") + self.expiry_delta - ] + expired_appointments = self.gatekeeper.get_expired_appointment(block["height"]) Cleaner.delete_expired_appointments( expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager @@ -217,7 +205,7 @@ class Watcher: breach["dispute_txid"], breach["penalty_txid"], breach["penalty_rawtx"], - self.appointments[uuid].get("end_time"), + self.appointments[uuid].get("user_id"), block_hash, ) @@ -296,7 +284,7 @@ class Watcher: for locator, dispute_txid in breaches.items(): for uuid in self.locator_uuid_map[locator]: - appointment = Appointment.from_dict(self.db_manager.load_watcher_appointment(uuid)) + appointment = ExtendedAppointment.from_dict(self.db_manager.load_watcher_appointment(uuid)) if appointment.encrypted_blob in decrypted_blobs: penalty_tx, penalty_rawtx = decrypted_blobs[appointment.encrypted_blob]