diff --git a/watchtower-plugin/arg_parser.py b/watchtower-plugin/arg_parser.py index f2d9c8b..6236c92 100644 --- a/watchtower-plugin/arg_parser.py +++ b/watchtower-plugin/arg_parser.py @@ -5,6 +5,22 @@ from common.exceptions import InvalidParameter def parse_register_arguments(tower_id, host, port, config): + """ + Parses the arguments of the register command and checks that they are correct. + + Args: + tower_id (:obj:`str`): the identifier of the tower to connect to (a compressed public key). + host (:obj:`str`): the ip or hostname to connect to, optional. + host (:obj:`int`): the port to connect to, optional. + config: (:obj:`dict`): the configuration dictionary. + + Returns: + :obj:`tuple`: the tower id and tower network address. + + Raises: + :obj:`common.exceptions.InvalidParameter`: if any of the parameters is wrong or missing. + """ + if not isinstance(tower_id, str): raise InvalidParameter(f"tower id must be a compressed public key (33-byte hex value) not {str(tower_id)}") @@ -41,6 +57,20 @@ def parse_register_arguments(tower_id, host, port, config): def parse_get_appointment_arguments(tower_id, locator): + """ + Parses the arguments of the get_appointment command and checks that they are correct. + + Args: + tower_id (:obj:`str`): the identifier of the tower to connect to (a compressed public key). + locator (:obj:`str`): the locator of the appointment to query the tower about. + + Returns: + :obj:`tuple`: the tower id and appointment locator. + + Raises: + :obj:`common.exceptions.InvalidParameter`: if any of the parameters is wrong or missing. + """ + if not is_compressed_pk(tower_id): raise InvalidParameter("tower id must be a compressed public key (33-byte hex value)") @@ -51,6 +81,21 @@ def parse_get_appointment_arguments(tower_id, locator): def parse_add_appointment_arguments(kwargs): + """ + Parses the arguments of the add_appointment command and checks that they are correct. + + The expected arguments are a commitment transaction id (32-byte hex string) and the penalty transaction. + + Args: + kwargs (:obj:`dict`): a dictionary of arguments. + + Returns: + :obj:`tuple`: the commitment transaction id and the penalty transaction. + + Raises: + :obj:`common.exceptions.InvalidParameter`: if any of the parameters is wrong or missing. + """ + # Arguments to add_appointment come from c-lightning and they have been sanitised. Checking this just in case. commitment_txid = kwargs.get("commitment_txid") penalty_tx = kwargs.get("penalty_tx") diff --git a/watchtower-plugin/keys.py b/watchtower-plugin/keys.py index e7673f2..587f8e8 100644 --- a/watchtower-plugin/keys.py +++ b/watchtower-plugin/keys.py @@ -27,8 +27,8 @@ def generate_keys(data_dir): data_dir (:obj:`str`): path to data directory where the keys will be stored. Returns: - :obj:`tuple`: a tuple containing a ``PrivateKey`` and a ``str`` representing the client sk and - compressed pk respectively. + :obj:`tuple`: a tuple containing a ``PrivateKey`` and a ``str`` representing the client sk and compressed pk + respectively. Raises: :obj:`FileExistsError`: if the key pair already exists in the given directory. @@ -56,8 +56,8 @@ def load_keys(data_dir): data_dir (:obj:`str`): path to data directory where the keys are stored. Returns: - :obj:`tuple`: a tuple containing a ``EllipticCurvePrivateKey`` and a ``str`` representing the client sk and - compressed pk respectively. + :obj:`tuple`: a tuple containing a ``PrivateKey`` and a ``str`` representing the client sk and compressed pk + respectively. Raises: :obj:`InvalidKey `: if any of the keys is invalid or cannot be loaded. diff --git a/watchtower-plugin/retrier.py b/watchtower-plugin/retrier.py index 7732402..f8c4751 100644 --- a/watchtower-plugin/retrier.py +++ b/watchtower-plugin/retrier.py @@ -11,16 +11,38 @@ MAX_RETRIES = None def check_retry(status): + """ + Checks is the job needs to be retried. Jobs are retried if max_retries is not reached and the tower status is + temporarily unreachable. + + Args: + status (:obj:`str`): the tower status. + + Returns: + :obj:`bool`: True is the status is "temporarily unreachable", False otherwise. + """ return status == "temporarily unreachable" def on_backoff(details): + """ + Function called when backing off after a retry. Logs data regarding the retry. + Args: + details: the retry details (check backoff library for more info). + """ plugin = details.get("args")[1] tower_id = details.get("args")[2] plugin.log(f"Retry {details.get('tries')} failed for tower {tower_id}, backing off") def on_giveup(details): + """ + Function called when giving up after the last retry. Logs data regarding the retry and flags the tower as + unreachable. + + Args: + details: the retry details (check backoff library for more info). + """ plugin = details.get("args")[1] tower_id = details.get("args")[2] @@ -31,20 +53,39 @@ def on_giveup(details): def set_max_retries(max_retries): + """Workaround to set max retries from Retrier to the backoff.on_predicate decorator""" global MAX_RETRIES MAX_RETRIES = max_retries def max_retries(): + """Workaround to set max retries from Retrier to the backoff.on_predicate decorator""" return MAX_RETRIES class Retrier: + """ + The Retrier is in charge of the retry process for appointments that were sent to towers that were temporarily + unreachable. + + Args: + max_retries (:obj:`int`): the maximum number of times that a tower will be retried. + temp_unreachable_towers (:obj:`Queue`): a queue of temporarily unreachable towers populated by the plugin on + failing to deliver an appointment. + """ + def __init__(self, max_retries, temp_unreachable_towers): self.temp_unreachable_towers = temp_unreachable_towers set_max_retries(max_retries) def manage_retry(self, plugin): + """ + Listens to the temporarily unreachable towers queue and creates a thread to manage each tower it gets. + + Args: + plugin (:obj:`Plugin`): the plugin object. + """ + while True: tower_id = self.temp_unreachable_towers.get() tower = plugin.wt_client.towers[tower_id] @@ -53,6 +94,23 @@ class Retrier: @backoff.on_predicate(backoff.expo, check_retry, max_tries=max_retries, on_backoff=on_backoff, on_giveup=on_giveup) def do_retry(self, plugin, tower_id, tower): + """ + Retries to send a list of pending appointments to a temporarily unreachable tower. This function is managed by + manage_retries and run in a different thread per tower. + + For every pending appointment the worker thread tries to send the data to the tower. If the tower keeps being + unreachable, the job is retries up to MAX_RETRIES. If MAX_RETRIES is reached, the worker thread gives up and the + tower is flagged as unreachable. + + Args: + plugin (:obj:`Plugin`): the plugin object. + tower_id (:obj:`str`): the id of the tower managed by the thread. + tower: (:obj:`TowerSummary`): the tower data. + + Returns: + :obj:`str`: the tower status if it is not reachable. + """ + for appointment_dict, signature in plugin.wt_client.towers[tower_id].pending_appointments: tower_update = {} try: diff --git a/watchtower-plugin/tower_info.py b/watchtower-plugin/tower_info.py index 3cf0642..ef7f321 100644 --- a/watchtower-plugin/tower_info.py +++ b/watchtower-plugin/tower_info.py @@ -1,4 +1,26 @@ class TowerInfo: + """ + TowerInfo represents all the data the plugin hold about a tower. + + Args: + netaddr (:obj:`str`): the tower network address. + available_slots (:obj:`int`): the amount of available appointment slots in the tower. + status (:obj:`str`): the tower status. The tower can be in the following status: + reachable: if the tower can be reached. + temporarily unreachable: if the tower cannot be reached but the issue is transitory. + unreachable: if the tower cannot be reached and the issue has persisted long enough, or it is permanent. + subscription error: if there has been a problem with the subscription (e.g: run out of slots). + misbehaving: if the tower has been caught misbehaving (e.g: an invalid signature has been received). + + Attributes: + appointments (:obj:`dict`): a collection of accepted appointments. + pending_appointments (:obj:`list`): a collection of pending appointments. Appointments are pending when the + tower is unreachable or the subscription has expired / run out of slots. + invalid_appointments (:obj:`list`): a collection of invalid appointments. Appointments are invalid if the tower + rejects them for not following the proper format. + misbehaving_proof (:obj:`dict`): a proof of misbehaviour from the tower. The tower is abandoned if so. + """ + def __init__(self, netaddr, available_slots, status="reachable"): self.netaddr = netaddr self.available_slots = available_slots @@ -11,6 +33,19 @@ class TowerInfo: @classmethod def from_dict(cls, tower_data): + """ + Builds a TowerInfo object from a dictionary. + + Args: + tower_data (:obj:`dict`): a dictionary containing all the TowerInfo fields. + + Returns: + :obj:`TowerInfo`: A TowerInfo object built with the provided data. + + Raises: + :obj:`ValueError`: If any of the expected fields is missing in the dictionary. + """ + netaddr = tower_data.get("netaddr") available_slots = tower_data.get("available_slots") status = tower_data.get("status") @@ -34,13 +69,42 @@ class TowerInfo: return tower def to_dict(self): + """ + Builds a dictionary from a TowerInfo object. + + Returns: + :obj:`dict`: The TowerInfo object as a dictionary. + """ return self.__dict__ def get_summary(self): + """ + Gets a summary of the TowerInfo object. + + The plugin only stores the minimal information in memory, the rest is dumped into the DB. Data kept in memory + is stored in TowerSummary objects. + + Returns: + :obj:`dict`: The summary of the TowerInfo object. + """ return TowerSummary(self) class TowerSummary: + """ + A smaller representation of the TowerInfo data to be kept in memory. + + Args: + tower_info(:obj:`TowerInfo`): A TowerInfo object. + + Attributes: + netaddr (:obj:`str`): the tower network address. + status (:obj:`str`): the status of the tower. + available_slots (:obj:`int`): the amount of available appointment slots in the tower. + pending_appointments (:obj:`list`): the collection of pending appointments. + invalid_appointments (:obj:`list`): the collection of invalid appointments. + """ + def __init__(self, tower_info): self.netaddr = tower_info.netaddr self.status = tower_info.status @@ -49,4 +113,11 @@ class TowerSummary: self.invalid_appointments = tower_info.invalid_appointments def to_dict(self): + """ + Builds a dictionary from a TowerSummary object. + + Returns: + :obj:`dict`: The TowerSummary object as a dictionary. + """ + return self.__dict__ diff --git a/watchtower-plugin/watchtower.py b/watchtower-plugin/watchtower.py index 97a7221..303efa7 100755 --- a/watchtower-plugin/watchtower.py +++ b/watchtower-plugin/watchtower.py @@ -36,11 +36,29 @@ plugin = Plugin() class WTClient: + """ + Holds all the data regarding the watchtower client. + + Fires an additional tread to take care of retries. + + Args: + sk (:obj:`PrivateKey): the user private key. Used to sign appointment sent to the towers. + user_id (:obj:`PrivateKey): the identifier of the user (compressed public key). + config (:obj:`dict`): the client configuration loaded on a dictionary. + + Attributes: + towers (:obj:`dict`): a collection of registered towers. Indexed by tower_id, populated with :obj:`TowerSummary` + objects. + db_manager (:obj:`towers_dbm.TowersDBM`): a manager to interact with the towers database. + retrier (:obj:`retrier.Retrier`): a ``Retrier`` in charge of retrying sending jobs to temporarily unreachable + towers. + lock (:obj:`Lock`): a thread lock. + """ + def __init__(self, sk, user_id, config): self.sk = sk self.user_id = user_id self.towers = {} - self.tmp_unreachable_towers = [] self.db_manager = TowersDBM(config.get("TOWERS_DB"), plugin) self.retrier = Retrier(config.get("MAX_RETRIES"), Queue()) self.config = config @@ -53,6 +71,16 @@ class WTClient: Thread(target=self.retrier.manage_retry, args=[plugin], daemon=True).start() def update_tower_state(self, tower_id, tower_update): + """ + Updates the state of a tower both in memory and disk. + + Access if restricted thought a lock to prevent race conditions. + + Args: + tower_id (:obj:`str`): the identifier of the tower to be updated. + tower_update (:obj:`dict`): a dictionary containing the data to be added / removed. + """ + self.lock.acquire() tower_info = TowerInfo.from_dict(self.db_manager.load_tower_record(tower_id)) @@ -81,6 +109,8 @@ class WTClient: @plugin.init() def init(options, configuration, plugin): + """Initializes the plugin""" + try: user_sk, user_id = generate_keys(DATA_DIR) plugin.log(f"Generating a new key pair for the watchtower client. Keys stored at {DATA_DIR}") @@ -109,7 +139,7 @@ def register(plugin, tower_id, host=None, port=None): plugin (:obj:`Plugin`): this plugin. tower_id (:obj:`str`): the identifier of the tower to connect to (a compressed public key). host (:obj:`str`): the ip or hostname to connect to, optional. - host (:obj:`int`): the port to connect to, optional. + port (:obj:`int`): the port to connect to, optional. Accepted tower_id formats: - tower_id@host:port @@ -189,8 +219,18 @@ def get_appointment(plugin, tower_id, locator): return e.to_json() -@plugin.method("listtowers", desc="List all towers registered towers.") +@plugin.method("listtowers", desc="List all registered towers.") def list_towers(plugin): + """ + Lists all the registered towers. The given information comes from memory, so it is summarized. + + Args: + plugin (:obj:`Plugin`): this plugin. + + Returns: + :obj:`dict`: a dictionary containing the registered towers data. + """ + towers_info = {"towers": []} for tower_id, tower in plugin.wt_client.towers.items(): values = {k: v for k, v in tower.to_dict().items() if k not in ["pending_appointments", "invalid_appointments"]} @@ -205,6 +245,17 @@ def list_towers(plugin): @plugin.method("gettowerinfo", desc="List all towers registered towers.") def get_tower_info(plugin, tower_id): + """ + Gets information about a given tower. Data comes from disk (DB), so all stored data is provided. + + Args: + plugin (:obj:`Plugin`): this plugin. + tower_id: (:obj:`str`): the identifier of the queried tower. + + Returns: + :obj:`dict`: a dictionary containing all data about the queried tower. + """ + tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id)) pending_appointments = [ {"appointment": appointment, "signature": signature} @@ -221,14 +272,31 @@ def get_tower_info(plugin, tower_id): @plugin.method("retrytower", desc="Retry to send pending appointment to an unreachable tower.") def retry_tower(plugin, tower_id): + """ + Triggers a manual retry of a tower, tries to send all pending appointments to to it. + + Only works if the tower is unreachable or there's been a subscription error. + + Args: + plugin (:obj:`Plugin`): this plugin. + tower_id: (:obj:`str`): the identifier of the tower to be retried. + + Returns: + + """ response = None plugin.wt_client.lock.acquire() tower = plugin.wt_client.towers.get(tower_id) if not tower: response = {"error": f"{tower_id} is not a registered tower"} + + # FIXME: it may be worth only allowing unreachable and forcing a retry on register_tower if the state is + # subscription error. if tower.status not in ["unreachable", "subscription error"]: - response = {"error": f"{tower_id} is not unreachable. {tower.status}"} + response = { + "error": f"Cannot retry tower. Expected tower status 'unreachable' or 'subscription error'. Received {tower.status}" + } if not tower.pending_appointments: response = {"error": f"{tower_id} does not have pending appointments"} @@ -244,6 +312,15 @@ def retry_tower(plugin, tower_id): @plugin.hook("commitment_revocation") def on_commitment_revocation(plugin, **kwargs): + """ + Sends an appointment to all registered towers for every net commitment transaction. + + kwargs should contain the commitment identifier (commitment_txid) and the penalty transaction (penalty_tx) + + Args: + plugin (:obj:`Plugin`): this plugin. + """ + try: commitment_txid, penalty_tx = arg_parser.parse_add_appointment_arguments(kwargs) appointment = Appointment( @@ -300,7 +377,7 @@ def on_commitment_revocation(plugin, **kwargs): except TowerResponseError as e: tower_update["status"] = e.kwargs.get("status") - if tower_update["status"] in ["temporarily unreachable", "subscription_error"]: + if tower_update["status"] in ["temporarily unreachable", "subscription error"]: plugin.log(f"Adding {appointment.locator} to pending") tower_update["pending_appointment"] = (appointment.to_dict(), signature), "add"