From 9f2c90d12e2a0c2a5033c71afac1794d59628ed8 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 27 Apr 2020 19:10:43 +0200 Subject: [PATCH] plugin - adds Retrier and improves get_tower_info and listtowers - Logs data identified by tower_id instead of endpoint - Adds Retrier to deal with retries (for connection errors and rejects) and moves sending logic to net/http - Adds pending appointments to TowerInfo and serves that information via `get_tower_info` and `list_towers` - Deals with connection errors (but not with rejections yet) --- watchtower-plugin/watchtower.py | 115 ++++++++++++++++++++------------ 1 file changed, 73 insertions(+), 42 deletions(-) diff --git a/watchtower-plugin/watchtower.py b/watchtower-plugin/watchtower.py index c24af5e..72e717e 100755 --- a/watchtower-plugin/watchtower.py +++ b/watchtower-plugin/watchtower.py @@ -1,6 +1,8 @@ #!/usr/bin/env python3 import os import plyvel +from queue import Queue +from threading import Thread from pyln.client import Plugin from common.tools import compute_locator @@ -10,11 +12,12 @@ from common.cryptographer import Cryptographer from common.exceptions import InvalidParameter, SignatureError, EncryptionError import arg_parser +from retrier import Retrier from tower_info import TowerInfo from towers_dbm import TowersDBM from keys import generate_keys, load_keys -from net.http import post_request, process_post_response from exceptions import TowerConnectionError, TowerResponseError +from net.http import post_request, process_post_response, send_appointment DATA_DIR = os.path.expanduser("~/.watchtower/") @@ -22,6 +25,8 @@ CONF_FILE_NAME = "watchtower.conf" DEFAULT_CONF = { "DEFAULT_PORT": {"value": 9814, "type": int}, + "RETRY_DELTA": {"value": 60, "type": int}, + "MAX_RETRIES": {"value": 30, "type": int}, "APPOINTMENTS_FOLDER_NAME": {"value": "appointment_receipts", "type": str, "path": True}, "TOWERS_DB": {"value": "towers", "type": str, "path": True}, "PRIVATE_KEY": {"value": "sk.der", "type": str, "path": True}, @@ -37,12 +42,15 @@ class WTClient: self.user_id = user_id self.db_manager = TowersDBM(config.get("TOWERS_DB"), plugin) self.towers = {} + self.retrier = Retrier(config.get("RETRY_DELTA"), config.get("MAX_RETRIES"), Queue()) self.config = config # Populate the towers dict with data from the db for tower_id, tower_info in self.db_manager.load_all_tower_records().items(): self.towers[tower_id] = TowerInfo.from_dict(tower_info).get_summary() + Thread(target=self.retrier.do_retry, args=[plugin], daemon=True).start() + @plugin.init() def init(options, configuration, plugin): @@ -97,9 +105,9 @@ def register(plugin, tower_id, host=None, port=None): register_endpoint = "{}/register".format(tower_netaddr) data = {"public_key": plugin.wt_client.user_id} - plugin.log("Registering in the Eye of Satoshi") + plugin.log("Registering in the Eye of Satoshi (tower_id={})".format(tower_id)) - response = process_post_response(post_request(data, register_endpoint)) + response = process_post_response(post_request(data, register_endpoint, tower_id)) plugin.log("Registration succeeded. Available slots: {}".format(response.get("available_slots"))) # Save data @@ -142,9 +150,9 @@ def get_appointment(plugin, tower_id, locator): # Send request to the server. get_appointment_endpoint = "{}/get_appointment".format(plugin.wt_client.towers[tower_id].get("netaddr")) - plugin.log("Requesting appointment from the Eye of Satoshi at {}".format(get_appointment_endpoint)) + plugin.log("Requesting appointment from {}".format(tower_id)) - response = process_post_response(post_request(data, get_appointment_endpoint)) + response = process_post_response(post_request(data, get_appointment_endpoint, tower_id)) return response except (InvalidParameter, TowerConnectionError, TowerResponseError) as e: @@ -155,17 +163,39 @@ def get_appointment(plugin, tower_id, locator): @plugin.method("listtowers", desc="List all towers registered towers.") def list_towers(plugin): towers_info = {"towers": []} - for k, v in plugin.wt_client.towers.items(): - towers_info["towers"].append({"id": k, **v}) + for tower_id, info in plugin.wt_client.towers.items(): + values = {k: v for k, v in info.items() if k != "pending_appointments"} + pending_appointments = [appointment.get("locator") for appointment, signature in info["pending_appointments"]] + values["pending_appointments"] = pending_appointments + towers_info["towers"].append({"id": tower_id, **values}) return towers_info @plugin.method("gettowerinfo", desc="List all towers registered towers.") def get_tower_info(plugin, tower_id): - tower_info = plugin.wt_client.db_manager.load_tower_record(tower_id) + tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id)) + pending_appointments = [ + {"appointment": appointment, "signature": signature} + for appointment, signature in tower_info.pending_appointments + ] + tower_info.pending_appointments = pending_appointments + return {"id": tower_id, **tower_info.to_dict()} - return {"id": tower_id, **tower_info} + +@plugin.method("retrytower", desc="Retry to send pending appointment to an unreachable tower.") +def retry_tower(plugin, tower_id): + tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id)) + + if not tower_info: + return {"error": "{} is not a registered tower".format(tower_id)} + if tower_info.status != "unreachable": + return {"error": "{} is not unreachable".format(tower_id)} + if not tower_info.pending_appointments: + return {"error": "{} does not have pending appointments".format(tower_id)} + + plugin.wt_client.retrier.temp_unreachable_towers.put(tower_id) + plugin.log("Retrying tower {}".format(tower_id)) @plugin.hook("commitment_revocation") @@ -177,56 +207,57 @@ def add_appointment(plugin, **kwargs): to_self_delay=20, # does not matter for now, any value 20-2^32-1 would do encrypted_blob=Cryptographer.encrypt(penalty_tx, commitment_txid), ) - signature = Cryptographer.sign(appointment.serialize(), plugin.wt_client.sk) - data = {"appointment": appointment.to_dict(), "signature": signature} # Send appointment to the server. # FIXME: sending the appointment to all registered towers atm. Some management would be nice. for tower_id, tower in plugin.wt_client.towers.items(): tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id)) - try: - plugin.log("Sending appointment to the Eye of Satoshi at {}".format(tower.get("netaddr"))) - add_appointment_endpoint = "{}/add_appointment".format(tower.get("netaddr")) - response = process_post_response(post_request(data, add_appointment_endpoint)) - signature = response.get("signature") - # Check that the server signed the appointment as it should. - if not signature: - raise TowerResponseError("The response does not contain the signature of the appointment") + if tower_info.status != "unreachable": + try: + plugin.log("Sending appointment to {}".format(tower_id)) + response = send_appointment(tower_id, tower_info, appointment.to_dict(), signature) + plugin.log("Appointment accepted and signed by {}".format(tower_id)) + plugin.log("Remaining slots: {}".format(response.get("available_slots"))) - rpk = Cryptographer.recover_pk(appointment.serialize(), signature) - if not tower_id != Cryptographer.get_compressed_pk(rpk): - raise TowerResponseError("The returned appointment's signature is invalid") + # TODO: Not storing the whole appointments for now. The node can recreate all the data if needed. + # DISCUSS: It may be worth checking that the available slots match instead of blindly trusting. - plugin.log("Appointment accepted and signed by the Eye of Satoshi at {}".format(tower.get("netaddr"))) - plugin.log("Remaining slots: {}".format(response.get("available_slots"))) + tower_info.appointments[appointment.locator] = response.get("signature") + tower_info.available_slots = response.get("available_slots") + tower_info.status = "reachable" - # TODO: Not storing the whole appointments for now. The node can recreate all the data if needed. - # DISCUSS: It may be worth checking that the available slots match instead of blindly trusting. + # Update memory and TowersDB + plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info) + plugin.wt_client.towers[tower_id] = tower_info.get_summary() - tower_info.appointments[appointment.locator] = signature - tower_info.available_slots = response.get("available_slots") - tower_info.status = "reachable" + except TowerConnectionError as e: + # All TowerConnectionError are transitory, since the connection is tried on register, so the URL + # cannot be malformed + plugin.log(str(e)) - # Update memory and TowersDB - plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info) - plugin.wt_client.towers[tower_id] = tower_info.get_summary() - - except TowerConnectionError as e: - # TODO: Implement retry logic - plugin.log(str(e)) - if e.kwargs.get("transitory"): + # Flag appointment for retry tower_info.status = "temporarily unreachable" - else: - tower_info.status = "unreachable" + tower_info.pending_appointments.append((appointment.to_dict(), signature)) + plugin.wt_client.retrier.temp_unreachable_towers.put(tower_id) + # Store data in memory and TowersDB + plugin.wt_client.towers[tower_id] = tower_info.get_summary() + plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info) + + except TowerResponseError as e: + # FIXME: deal with tower errors, such as no available slots + plugin.log(str(e)) + + else: + # If the tower has been flagged as unreachable (too many retries with no success) data is simply stored + # for future submission via `retry_tower`. + plugin.log("{} is unreachable. Adding appointment to pending".format(tower_id)) + tower_info.pending_appointments.append((appointment.to_dict(), signature)) plugin.wt_client.towers[tower_id] = tower_info.get_summary() plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info) - except TowerResponseError as e: - plugin.log(str(e)) - except (InvalidParameter, EncryptionError, SignatureError, TowerResponseError) as e: plugin.log(str(e), level="warn")