From 43217874b59b60350584475005c91aa567de94ff Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Wed, 29 Apr 2020 13:12:21 +0200 Subject: [PATCH] Splits add_appointment so it can also be used by the Retrier - The appointment constructions is left in Watchtower (via on_commit_revocation method) - The tower interaction is moved to net/http so it can be reused - Adds missing logic for invalid resposes by the tower (e.g invalid signatures) --- watchtower-plugin/net/http.py | 61 ++++++++++++++++++++-- watchtower-plugin/template.conf | 1 + watchtower-plugin/watchtower.py | 92 +++++++++++++++------------------ 3 files changed, 101 insertions(+), 53 deletions(-) diff --git a/watchtower-plugin/net/http.py b/watchtower-plugin/net/http.py index 230054d..0487913 100644 --- a/watchtower-plugin/net/http.py +++ b/watchtower-plugin/net/http.py @@ -3,13 +3,66 @@ import requests from requests import ConnectionError, ConnectTimeout from requests.exceptions import MissingSchema, InvalidSchema, InvalidURL +from common import errors from common import constants from common.appointment import Appointment +from common.exceptions import SignatureError from common.cryptographer import Cryptographer from exceptions import TowerConnectionError, TowerResponseError +def add_appointment(plugin, tower_id, tower_info, appointment_dict, signature): + try: + plugin.log("Sending appointment to {}".format(tower_id)) + response = send_appointment(tower_id, tower_info, appointment_dict, signature) + plugin.log("Appointment accepted and signed by {}".format(tower_id)) + plugin.log("Remaining slots: {}".format(response.get("available_slots"))) + + # TODO: Not storing the whole appointments for now. The node can recreate all the data if needed. + # DISCUSS: It may be worth checking that the available slots match instead of blindly trusting. + + tower_info.appointments[appointment_dict.get("locator")] = response.get("signature") + tower_info.available_slots = response.get("available_slots") + tower_info.status = "reachable" + + except SignatureError as e: + plugin.log("{} is misbehaving, not using it any longer".format(tower_id)) + tower_info.status = "misbehaving" + tower_info.invalid_appointments.append((appointment_dict, e.kwargs.get("signature"))) + + except TowerConnectionError: + # All TowerConnectionError are transitory. The connection is tried on register, so the URL cannot be malformed. + # Flag appointment for retry + plugin.log("{} cannot be reached. Adding appointment to pending".format(tower_id)) + tower_info.status = "temporarily unreachable" + + except TowerResponseError as e: + data = e.kwargs.get("data") + status_code = e.kwargs.get("status_code") + + if data and status_code == constants.HTTP_BAD_REQUEST: + if data.get("error_code") == errors.APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS: + plugin.log("There is a subscription issue with {}. Adding appointment to pending".format(tower_id)) + tower_info.status = "subscription error" + + elif data.get("error_code") >= errors.INVALID_REQUEST_FORMAT: + plugin.log("Appointment sent to {} is invalid".format(tower_id)) + tower_info.status = "reachable" + # DISCUSS: It may be worth backing up the data since otherwise the update is dropped + + elif status_code == constants.HTTP_SERVICE_UNAVAILABLE: + # Flag appointment for retry + plugin.log("{} is temporarily unavailable. Adding appointment to pending".format(tower_id)) + tower_info.status = "temporarily unreachable" + + else: + # Log unexpected behaviour + plugin.log(str(e), level="warn") + + return tower_info.status + + def send_appointment(tower_id, tower_info, appointment_dict, signature): data = {"appointment": appointment_dict, "signature": signature} @@ -19,11 +72,13 @@ def send_appointment(tower_id, tower_info, appointment_dict, signature): signature = response.get("signature") # Check that the server signed the appointment as it should. if not signature: - raise TowerResponseError("The response does not contain the signature of the appointment") + raise SignatureError("The response does not contain the signature of the appointment", signature=None) rpk = Cryptographer.recover_pk(Appointment.from_dict(appointment_dict).serialize(), signature) - if not tower_id != Cryptographer.get_compressed_pk(rpk): - raise TowerResponseError("The returned appointment's signature is invalid") + if tower_id != Cryptographer.get_compressed_pk(rpk): + raise SignatureError( + "The returned appointment's signature is invalid", tower_id=tower_id, rpk=rpk, signature=signature + ) return response diff --git a/watchtower-plugin/template.conf b/watchtower-plugin/template.conf index 18d0403..85c8305 100644 --- a/watchtower-plugin/template.conf +++ b/watchtower-plugin/template.conf @@ -1,3 +1,4 @@ [teos] api_port = 9814 +max_retries = 30 diff --git a/watchtower-plugin/watchtower.py b/watchtower-plugin/watchtower.py index 72e717e..91c8192 100755 --- a/watchtower-plugin/watchtower.py +++ b/watchtower-plugin/watchtower.py @@ -2,8 +2,8 @@ import os import plyvel from queue import Queue -from threading import Thread from pyln.client import Plugin +from threading import Thread, Lock from common.tools import compute_locator from common.appointment import Appointment @@ -17,7 +17,7 @@ from tower_info import TowerInfo from towers_dbm import TowersDBM from keys import generate_keys, load_keys from exceptions import TowerConnectionError, TowerResponseError -from net.http import post_request, process_post_response, send_appointment +from net.http import post_request, process_post_response, add_appointment DATA_DIR = os.path.expanduser("~/.watchtower/") @@ -25,7 +25,6 @@ CONF_FILE_NAME = "watchtower.conf" DEFAULT_CONF = { "DEFAULT_PORT": {"value": 9814, "type": int}, - "RETRY_DELTA": {"value": 60, "type": int}, "MAX_RETRIES": {"value": 30, "type": int}, "APPOINTMENTS_FOLDER_NAME": {"value": "appointment_receipts", "type": str, "path": True}, "TOWERS_DB": {"value": "towers", "type": str, "path": True}, @@ -40,16 +39,24 @@ class WTClient: def __init__(self, sk, user_id, config): self.sk = sk self.user_id = user_id - self.db_manager = TowersDBM(config.get("TOWERS_DB"), plugin) self.towers = {} - self.retrier = Retrier(config.get("RETRY_DELTA"), config.get("MAX_RETRIES"), Queue()) + self.tmp_unreachable_towers = [] + self.db_manager = TowersDBM(config.get("TOWERS_DB"), plugin) + self.retrier = Retrier(config.get("MAX_RETRIES"), Queue()) self.config = config + self.lock = Lock() # Populate the towers dict with data from the db for tower_id, tower_info in self.db_manager.load_all_tower_records().items(): self.towers[tower_id] = TowerInfo.from_dict(tower_info).get_summary() - Thread(target=self.retrier.do_retry, args=[plugin], daemon=True).start() + Thread(target=self.retrier.manage_retry, args=[plugin], daemon=True).start() + + def update_tower_state(self, tower_id, tower_info): + self.lock.acquire() + self.towers[tower_id] = tower_info.get_summary() + self.db_manager.store_tower_record(tower_id, tower_info) + self.lock.release() @plugin.init() @@ -112,8 +119,7 @@ def register(plugin, tower_id, host=None, port=None): # Save data tower_info = TowerInfo(tower_netaddr, response.get("available_slots")) - plugin.wt_client.towers[tower_id] = tower_info.get_summary() - plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info) + plugin.wt_client.update_tower_state(tower_id, tower_info) return response @@ -166,7 +172,9 @@ def list_towers(plugin): for tower_id, info in plugin.wt_client.towers.items(): values = {k: v for k, v in info.items() if k != "pending_appointments"} pending_appointments = [appointment.get("locator") for appointment, signature in info["pending_appointments"]] + invalid_appointments = [appointment.get("locator") for appointment, signature in info["invalid_appointments"]] values["pending_appointments"] = pending_appointments + values["invalid_appointments"] = invalid_appointments towers_info["towers"].append({"id": tower_id, **values}) return towers_info @@ -179,7 +187,12 @@ def get_tower_info(plugin, tower_id): {"appointment": appointment, "signature": signature} for appointment, signature in tower_info.pending_appointments ] + invalid_appointments = [ + {"appointment": appointment, "tower_signature": signature} + for appointment, signature in tower_info.invalid_appointments + ] tower_info.pending_appointments = pending_appointments + tower_info.invalid_appointments = invalid_appointments return {"id": tower_id, **tower_info.to_dict()} @@ -194,12 +207,15 @@ def retry_tower(plugin, tower_id): if not tower_info.pending_appointments: return {"error": "{} does not have pending appointments".format(tower_id)} + message = "Retrying tower {}".format(tower_id) + plugin.log(message) plugin.wt_client.retrier.temp_unreachable_towers.put(tower_id) - plugin.log("Retrying tower {}".format(tower_id)) + + return message @plugin.hook("commitment_revocation") -def add_appointment(plugin, **kwargs): +def on_commitment_revocation(plugin, **kwargs): try: commitment_txid, penalty_tx = arg_parser.parse_add_appointment_arguments(kwargs) appointment = Appointment( @@ -209,58 +225,34 @@ def add_appointment(plugin, **kwargs): ) signature = Cryptographer.sign(appointment.serialize(), plugin.wt_client.sk) - # Send appointment to the server. + # Send appointment to the towers. # FIXME: sending the appointment to all registered towers atm. Some management would be nice. for tower_id, tower in plugin.wt_client.towers.items(): tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id)) - if tower_info.status != "unreachable": - try: - plugin.log("Sending appointment to {}".format(tower_id)) - response = send_appointment(tower_id, tower_info, appointment.to_dict(), signature) - plugin.log("Appointment accepted and signed by {}".format(tower_id)) - plugin.log("Remaining slots: {}".format(response.get("available_slots"))) - - # TODO: Not storing the whole appointments for now. The node can recreate all the data if needed. - # DISCUSS: It may be worth checking that the available slots match instead of blindly trusting. - - tower_info.appointments[appointment.locator] = response.get("signature") - tower_info.available_slots = response.get("available_slots") - tower_info.status = "reachable" - - # Update memory and TowersDB - plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info) - plugin.wt_client.towers[tower_id] = tower_info.get_summary() - - except TowerConnectionError as e: - # All TowerConnectionError are transitory, since the connection is tried on register, so the URL - # cannot be malformed - plugin.log(str(e)) - - # Flag appointment for retry - tower_info.status = "temporarily unreachable" + if tower_info.status == "reachable": + status = add_appointment(plugin, tower_id, tower_info, appointment.to_dict(), signature) + if status == "temporarily unreachable": tower_info.pending_appointments.append((appointment.to_dict(), signature)) plugin.wt_client.retrier.temp_unreachable_towers.put(tower_id) - # Store data in memory and TowersDB - plugin.wt_client.towers[tower_id] = tower_info.get_summary() - plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info) + elif tower_info.status != "misbehaving": + if tower_info.status in ["temporarily unreachable", "unreachable"]: + plugin.log("{} is {}. Adding appointment to pending".format(tower_id, tower_info.status)) + elif tower_info == "subscription error": + plugin.log("There is a subscription issue with {}. Adding appointment to pending".format(tower_id)) - except TowerResponseError as e: - # FIXME: deal with tower errors, such as no available slots - plugin.log(str(e)) - - else: - # If the tower has been flagged as unreachable (too many retries with no success) data is simply stored - # for future submission via `retry_tower`. - plugin.log("{} is unreachable. Adding appointment to pending".format(tower_id)) tower_info.pending_appointments.append((appointment.to_dict(), signature)) - plugin.wt_client.towers[tower_id] = tower_info.get_summary() - plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info) - except (InvalidParameter, EncryptionError, SignatureError, TowerResponseError) as e: + # Update memory and TowersDB + plugin.wt_client.update_tower_state(tower_id, tower_info) + + except (InvalidParameter, EncryptionError, SignatureError) as e: plugin.log(str(e), level="warn") + except TowerConnectionError: + pass + return {"result": "continue"}