plugin - Adds docstrings

This commit is contained in:
Sergi Delgado Segura
2020-05-05 19:48:28 +02:00
parent bef8df8d36
commit 94e36ebeda
5 changed files with 260 additions and 9 deletions

View File

@@ -5,6 +5,22 @@ from common.exceptions import InvalidParameter
def parse_register_arguments(tower_id, host, port, config): def parse_register_arguments(tower_id, host, port, config):
"""
Parses the arguments of the register command and checks that they are correct.
Args:
tower_id (:obj:`str`): the identifier of the tower to connect to (a compressed public key).
host (:obj:`str`): the ip or hostname to connect to, optional.
host (:obj:`int`): the port to connect to, optional.
config: (:obj:`dict`): the configuration dictionary.
Returns:
:obj:`tuple`: the tower id and tower network address.
Raises:
:obj:`common.exceptions.InvalidParameter`: if any of the parameters is wrong or missing.
"""
if not isinstance(tower_id, str): if not isinstance(tower_id, str):
raise InvalidParameter(f"tower id must be a compressed public key (33-byte hex value) not {str(tower_id)}") raise InvalidParameter(f"tower id must be a compressed public key (33-byte hex value) not {str(tower_id)}")
@@ -41,6 +57,20 @@ def parse_register_arguments(tower_id, host, port, config):
def parse_get_appointment_arguments(tower_id, locator): def parse_get_appointment_arguments(tower_id, locator):
"""
Parses the arguments of the get_appointment command and checks that they are correct.
Args:
tower_id (:obj:`str`): the identifier of the tower to connect to (a compressed public key).
locator (:obj:`str`): the locator of the appointment to query the tower about.
Returns:
:obj:`tuple`: the tower id and appointment locator.
Raises:
:obj:`common.exceptions.InvalidParameter`: if any of the parameters is wrong or missing.
"""
if not is_compressed_pk(tower_id): if not is_compressed_pk(tower_id):
raise InvalidParameter("tower id must be a compressed public key (33-byte hex value)") raise InvalidParameter("tower id must be a compressed public key (33-byte hex value)")
@@ -51,6 +81,21 @@ def parse_get_appointment_arguments(tower_id, locator):
def parse_add_appointment_arguments(kwargs): def parse_add_appointment_arguments(kwargs):
"""
Parses the arguments of the add_appointment command and checks that they are correct.
The expected arguments are a commitment transaction id (32-byte hex string) and the penalty transaction.
Args:
kwargs (:obj:`dict`): a dictionary of arguments.
Returns:
:obj:`tuple`: the commitment transaction id and the penalty transaction.
Raises:
:obj:`common.exceptions.InvalidParameter`: if any of the parameters is wrong or missing.
"""
# Arguments to add_appointment come from c-lightning and they have been sanitised. Checking this just in case. # Arguments to add_appointment come from c-lightning and they have been sanitised. Checking this just in case.
commitment_txid = kwargs.get("commitment_txid") commitment_txid = kwargs.get("commitment_txid")
penalty_tx = kwargs.get("penalty_tx") penalty_tx = kwargs.get("penalty_tx")

View File

@@ -27,8 +27,8 @@ def generate_keys(data_dir):
data_dir (:obj:`str`): path to data directory where the keys will be stored. data_dir (:obj:`str`): path to data directory where the keys will be stored.
Returns: Returns:
:obj:`tuple`: a tuple containing a ``PrivateKey`` and a ``str`` representing the client sk and :obj:`tuple`: a tuple containing a ``PrivateKey`` and a ``str`` representing the client sk and compressed pk
compressed pk respectively. respectively.
Raises: Raises:
:obj:`FileExistsError`: if the key pair already exists in the given directory. :obj:`FileExistsError`: if the key pair already exists in the given directory.
@@ -56,8 +56,8 @@ def load_keys(data_dir):
data_dir (:obj:`str`): path to data directory where the keys are stored. data_dir (:obj:`str`): path to data directory where the keys are stored.
Returns: Returns:
:obj:`tuple`: a tuple containing a ``EllipticCurvePrivateKey`` and a ``str`` representing the client sk and :obj:`tuple`: a tuple containing a ``PrivateKey`` and a ``str`` representing the client sk and compressed pk
compressed pk respectively. respectively.
Raises: Raises:
:obj:`InvalidKey <cli.exceptions.InvalidKey>`: if any of the keys is invalid or cannot be loaded. :obj:`InvalidKey <cli.exceptions.InvalidKey>`: if any of the keys is invalid or cannot be loaded.

View File

@@ -11,16 +11,38 @@ MAX_RETRIES = None
def check_retry(status): def check_retry(status):
"""
Checks is the job needs to be retried. Jobs are retried if max_retries is not reached and the tower status is
temporarily unreachable.
Args:
status (:obj:`str`): the tower status.
Returns:
:obj:`bool`: True is the status is "temporarily unreachable", False otherwise.
"""
return status == "temporarily unreachable" return status == "temporarily unreachable"
def on_backoff(details): def on_backoff(details):
"""
Function called when backing off after a retry. Logs data regarding the retry.
Args:
details: the retry details (check backoff library for more info).
"""
plugin = details.get("args")[1] plugin = details.get("args")[1]
tower_id = details.get("args")[2] tower_id = details.get("args")[2]
plugin.log(f"Retry {details.get('tries')} failed for tower {tower_id}, backing off") plugin.log(f"Retry {details.get('tries')} failed for tower {tower_id}, backing off")
def on_giveup(details): def on_giveup(details):
"""
Function called when giving up after the last retry. Logs data regarding the retry and flags the tower as
unreachable.
Args:
details: the retry details (check backoff library for more info).
"""
plugin = details.get("args")[1] plugin = details.get("args")[1]
tower_id = details.get("args")[2] tower_id = details.get("args")[2]
@@ -31,20 +53,39 @@ def on_giveup(details):
def set_max_retries(max_retries): def set_max_retries(max_retries):
"""Workaround to set max retries from Retrier to the backoff.on_predicate decorator"""
global MAX_RETRIES global MAX_RETRIES
MAX_RETRIES = max_retries MAX_RETRIES = max_retries
def max_retries(): def max_retries():
"""Workaround to set max retries from Retrier to the backoff.on_predicate decorator"""
return MAX_RETRIES return MAX_RETRIES
class Retrier: class Retrier:
"""
The Retrier is in charge of the retry process for appointments that were sent to towers that were temporarily
unreachable.
Args:
max_retries (:obj:`int`): the maximum number of times that a tower will be retried.
temp_unreachable_towers (:obj:`Queue`): a queue of temporarily unreachable towers populated by the plugin on
failing to deliver an appointment.
"""
def __init__(self, max_retries, temp_unreachable_towers): def __init__(self, max_retries, temp_unreachable_towers):
self.temp_unreachable_towers = temp_unreachable_towers self.temp_unreachable_towers = temp_unreachable_towers
set_max_retries(max_retries) set_max_retries(max_retries)
def manage_retry(self, plugin): def manage_retry(self, plugin):
"""
Listens to the temporarily unreachable towers queue and creates a thread to manage each tower it gets.
Args:
plugin (:obj:`Plugin`): the plugin object.
"""
while True: while True:
tower_id = self.temp_unreachable_towers.get() tower_id = self.temp_unreachable_towers.get()
tower = plugin.wt_client.towers[tower_id] tower = plugin.wt_client.towers[tower_id]
@@ -53,6 +94,23 @@ class Retrier:
@backoff.on_predicate(backoff.expo, check_retry, max_tries=max_retries, on_backoff=on_backoff, on_giveup=on_giveup) @backoff.on_predicate(backoff.expo, check_retry, max_tries=max_retries, on_backoff=on_backoff, on_giveup=on_giveup)
def do_retry(self, plugin, tower_id, tower): def do_retry(self, plugin, tower_id, tower):
"""
Retries to send a list of pending appointments to a temporarily unreachable tower. This function is managed by
manage_retries and run in a different thread per tower.
For every pending appointment the worker thread tries to send the data to the tower. If the tower keeps being
unreachable, the job is retries up to MAX_RETRIES. If MAX_RETRIES is reached, the worker thread gives up and the
tower is flagged as unreachable.
Args:
plugin (:obj:`Plugin`): the plugin object.
tower_id (:obj:`str`): the id of the tower managed by the thread.
tower: (:obj:`TowerSummary`): the tower data.
Returns:
:obj:`str`: the tower status if it is not reachable.
"""
for appointment_dict, signature in plugin.wt_client.towers[tower_id].pending_appointments: for appointment_dict, signature in plugin.wt_client.towers[tower_id].pending_appointments:
tower_update = {} tower_update = {}
try: try:

View File

@@ -1,4 +1,26 @@
class TowerInfo: class TowerInfo:
"""
TowerInfo represents all the data the plugin hold about a tower.
Args:
netaddr (:obj:`str`): the tower network address.
available_slots (:obj:`int`): the amount of available appointment slots in the tower.
status (:obj:`str`): the tower status. The tower can be in the following status:
reachable: if the tower can be reached.
temporarily unreachable: if the tower cannot be reached but the issue is transitory.
unreachable: if the tower cannot be reached and the issue has persisted long enough, or it is permanent.
subscription error: if there has been a problem with the subscription (e.g: run out of slots).
misbehaving: if the tower has been caught misbehaving (e.g: an invalid signature has been received).
Attributes:
appointments (:obj:`dict`): a collection of accepted appointments.
pending_appointments (:obj:`list`): a collection of pending appointments. Appointments are pending when the
tower is unreachable or the subscription has expired / run out of slots.
invalid_appointments (:obj:`list`): a collection of invalid appointments. Appointments are invalid if the tower
rejects them for not following the proper format.
misbehaving_proof (:obj:`dict`): a proof of misbehaviour from the tower. The tower is abandoned if so.
"""
def __init__(self, netaddr, available_slots, status="reachable"): def __init__(self, netaddr, available_slots, status="reachable"):
self.netaddr = netaddr self.netaddr = netaddr
self.available_slots = available_slots self.available_slots = available_slots
@@ -11,6 +33,19 @@ class TowerInfo:
@classmethod @classmethod
def from_dict(cls, tower_data): def from_dict(cls, tower_data):
"""
Builds a TowerInfo object from a dictionary.
Args:
tower_data (:obj:`dict`): a dictionary containing all the TowerInfo fields.
Returns:
:obj:`TowerInfo`: A TowerInfo object built with the provided data.
Raises:
:obj:`ValueError`: If any of the expected fields is missing in the dictionary.
"""
netaddr = tower_data.get("netaddr") netaddr = tower_data.get("netaddr")
available_slots = tower_data.get("available_slots") available_slots = tower_data.get("available_slots")
status = tower_data.get("status") status = tower_data.get("status")
@@ -34,13 +69,42 @@ class TowerInfo:
return tower return tower
def to_dict(self): def to_dict(self):
"""
Builds a dictionary from a TowerInfo object.
Returns:
:obj:`dict`: The TowerInfo object as a dictionary.
"""
return self.__dict__ return self.__dict__
def get_summary(self): def get_summary(self):
"""
Gets a summary of the TowerInfo object.
The plugin only stores the minimal information in memory, the rest is dumped into the DB. Data kept in memory
is stored in TowerSummary objects.
Returns:
:obj:`dict`: The summary of the TowerInfo object.
"""
return TowerSummary(self) return TowerSummary(self)
class TowerSummary: class TowerSummary:
"""
A smaller representation of the TowerInfo data to be kept in memory.
Args:
tower_info(:obj:`TowerInfo`): A TowerInfo object.
Attributes:
netaddr (:obj:`str`): the tower network address.
status (:obj:`str`): the status of the tower.
available_slots (:obj:`int`): the amount of available appointment slots in the tower.
pending_appointments (:obj:`list`): the collection of pending appointments.
invalid_appointments (:obj:`list`): the collection of invalid appointments.
"""
def __init__(self, tower_info): def __init__(self, tower_info):
self.netaddr = tower_info.netaddr self.netaddr = tower_info.netaddr
self.status = tower_info.status self.status = tower_info.status
@@ -49,4 +113,11 @@ class TowerSummary:
self.invalid_appointments = tower_info.invalid_appointments self.invalid_appointments = tower_info.invalid_appointments
def to_dict(self): def to_dict(self):
"""
Builds a dictionary from a TowerSummary object.
Returns:
:obj:`dict`: The TowerSummary object as a dictionary.
"""
return self.__dict__ return self.__dict__

View File

@@ -36,11 +36,29 @@ plugin = Plugin()
class WTClient: class WTClient:
"""
Holds all the data regarding the watchtower client.
Fires an additional tread to take care of retries.
Args:
sk (:obj:`PrivateKey): the user private key. Used to sign appointment sent to the towers.
user_id (:obj:`PrivateKey): the identifier of the user (compressed public key).
config (:obj:`dict`): the client configuration loaded on a dictionary.
Attributes:
towers (:obj:`dict`): a collection of registered towers. Indexed by tower_id, populated with :obj:`TowerSummary`
objects.
db_manager (:obj:`towers_dbm.TowersDBM`): a manager to interact with the towers database.
retrier (:obj:`retrier.Retrier`): a ``Retrier`` in charge of retrying sending jobs to temporarily unreachable
towers.
lock (:obj:`Lock`): a thread lock.
"""
def __init__(self, sk, user_id, config): def __init__(self, sk, user_id, config):
self.sk = sk self.sk = sk
self.user_id = user_id self.user_id = user_id
self.towers = {} self.towers = {}
self.tmp_unreachable_towers = []
self.db_manager = TowersDBM(config.get("TOWERS_DB"), plugin) self.db_manager = TowersDBM(config.get("TOWERS_DB"), plugin)
self.retrier = Retrier(config.get("MAX_RETRIES"), Queue()) self.retrier = Retrier(config.get("MAX_RETRIES"), Queue())
self.config = config self.config = config
@@ -53,6 +71,16 @@ class WTClient:
Thread(target=self.retrier.manage_retry, args=[plugin], daemon=True).start() Thread(target=self.retrier.manage_retry, args=[plugin], daemon=True).start()
def update_tower_state(self, tower_id, tower_update): def update_tower_state(self, tower_id, tower_update):
"""
Updates the state of a tower both in memory and disk.
Access if restricted thought a lock to prevent race conditions.
Args:
tower_id (:obj:`str`): the identifier of the tower to be updated.
tower_update (:obj:`dict`): a dictionary containing the data to be added / removed.
"""
self.lock.acquire() self.lock.acquire()
tower_info = TowerInfo.from_dict(self.db_manager.load_tower_record(tower_id)) tower_info = TowerInfo.from_dict(self.db_manager.load_tower_record(tower_id))
@@ -81,6 +109,8 @@ class WTClient:
@plugin.init() @plugin.init()
def init(options, configuration, plugin): def init(options, configuration, plugin):
"""Initializes the plugin"""
try: try:
user_sk, user_id = generate_keys(DATA_DIR) user_sk, user_id = generate_keys(DATA_DIR)
plugin.log(f"Generating a new key pair for the watchtower client. Keys stored at {DATA_DIR}") plugin.log(f"Generating a new key pair for the watchtower client. Keys stored at {DATA_DIR}")
@@ -109,7 +139,7 @@ def register(plugin, tower_id, host=None, port=None):
plugin (:obj:`Plugin`): this plugin. plugin (:obj:`Plugin`): this plugin.
tower_id (:obj:`str`): the identifier of the tower to connect to (a compressed public key). tower_id (:obj:`str`): the identifier of the tower to connect to (a compressed public key).
host (:obj:`str`): the ip or hostname to connect to, optional. host (:obj:`str`): the ip or hostname to connect to, optional.
host (:obj:`int`): the port to connect to, optional. port (:obj:`int`): the port to connect to, optional.
Accepted tower_id formats: Accepted tower_id formats:
- tower_id@host:port - tower_id@host:port
@@ -189,8 +219,18 @@ def get_appointment(plugin, tower_id, locator):
return e.to_json() return e.to_json()
@plugin.method("listtowers", desc="List all towers registered towers.") @plugin.method("listtowers", desc="List all registered towers.")
def list_towers(plugin): def list_towers(plugin):
"""
Lists all the registered towers. The given information comes from memory, so it is summarized.
Args:
plugin (:obj:`Plugin`): this plugin.
Returns:
:obj:`dict`: a dictionary containing the registered towers data.
"""
towers_info = {"towers": []} towers_info = {"towers": []}
for tower_id, tower in plugin.wt_client.towers.items(): for tower_id, tower in plugin.wt_client.towers.items():
values = {k: v for k, v in tower.to_dict().items() if k not in ["pending_appointments", "invalid_appointments"]} values = {k: v for k, v in tower.to_dict().items() if k not in ["pending_appointments", "invalid_appointments"]}
@@ -205,6 +245,17 @@ def list_towers(plugin):
@plugin.method("gettowerinfo", desc="List all towers registered towers.") @plugin.method("gettowerinfo", desc="List all towers registered towers.")
def get_tower_info(plugin, tower_id): def get_tower_info(plugin, tower_id):
"""
Gets information about a given tower. Data comes from disk (DB), so all stored data is provided.
Args:
plugin (:obj:`Plugin`): this plugin.
tower_id: (:obj:`str`): the identifier of the queried tower.
Returns:
:obj:`dict`: a dictionary containing all data about the queried tower.
"""
tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id)) tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id))
pending_appointments = [ pending_appointments = [
{"appointment": appointment, "signature": signature} {"appointment": appointment, "signature": signature}
@@ -221,14 +272,31 @@ def get_tower_info(plugin, tower_id):
@plugin.method("retrytower", desc="Retry to send pending appointment to an unreachable tower.") @plugin.method("retrytower", desc="Retry to send pending appointment to an unreachable tower.")
def retry_tower(plugin, tower_id): def retry_tower(plugin, tower_id):
"""
Triggers a manual retry of a tower, tries to send all pending appointments to to it.
Only works if the tower is unreachable or there's been a subscription error.
Args:
plugin (:obj:`Plugin`): this plugin.
tower_id: (:obj:`str`): the identifier of the tower to be retried.
Returns:
"""
response = None response = None
plugin.wt_client.lock.acquire() plugin.wt_client.lock.acquire()
tower = plugin.wt_client.towers.get(tower_id) tower = plugin.wt_client.towers.get(tower_id)
if not tower: if not tower:
response = {"error": f"{tower_id} is not a registered tower"} response = {"error": f"{tower_id} is not a registered tower"}
# FIXME: it may be worth only allowing unreachable and forcing a retry on register_tower if the state is
# subscription error.
if tower.status not in ["unreachable", "subscription error"]: if tower.status not in ["unreachable", "subscription error"]:
response = {"error": f"{tower_id} is not unreachable. {tower.status}"} response = {
"error": f"Cannot retry tower. Expected tower status 'unreachable' or 'subscription error'. Received {tower.status}"
}
if not tower.pending_appointments: if not tower.pending_appointments:
response = {"error": f"{tower_id} does not have pending appointments"} response = {"error": f"{tower_id} does not have pending appointments"}
@@ -244,6 +312,15 @@ def retry_tower(plugin, tower_id):
@plugin.hook("commitment_revocation") @plugin.hook("commitment_revocation")
def on_commitment_revocation(plugin, **kwargs): def on_commitment_revocation(plugin, **kwargs):
"""
Sends an appointment to all registered towers for every net commitment transaction.
kwargs should contain the commitment identifier (commitment_txid) and the penalty transaction (penalty_tx)
Args:
plugin (:obj:`Plugin`): this plugin.
"""
try: try:
commitment_txid, penalty_tx = arg_parser.parse_add_appointment_arguments(kwargs) commitment_txid, penalty_tx = arg_parser.parse_add_appointment_arguments(kwargs)
appointment = Appointment( appointment = Appointment(
@@ -300,7 +377,7 @@ def on_commitment_revocation(plugin, **kwargs):
except TowerResponseError as e: except TowerResponseError as e:
tower_update["status"] = e.kwargs.get("status") tower_update["status"] = e.kwargs.get("status")
if tower_update["status"] in ["temporarily unreachable", "subscription_error"]: if tower_update["status"] in ["temporarily unreachable", "subscription error"]:
plugin.log(f"Adding {appointment.locator} to pending") plugin.log(f"Adding {appointment.locator} to pending")
tower_update["pending_appointment"] = (appointment.to_dict(), signature), "add" tower_update["pending_appointment"] = (appointment.to_dict(), signature), "add"