diff --git a/watchtower-plugin/net/http.py b/watchtower-plugin/net/http.py index 0487913..d523c9a 100644 --- a/watchtower-plugin/net/http.py +++ b/watchtower-plugin/net/http.py @@ -12,30 +12,25 @@ from common.cryptographer import Cryptographer from exceptions import TowerConnectionError, TowerResponseError -def add_appointment(plugin, tower_id, tower_info, appointment_dict, signature): +def add_appointment(plugin, tower_id, tower, appointment_dict, signature): try: - plugin.log("Sending appointment to {}".format(tower_id)) - response = send_appointment(tower_id, tower_info, appointment_dict, signature) - plugin.log("Appointment accepted and signed by {}".format(tower_id)) - plugin.log("Remaining slots: {}".format(response.get("available_slots"))) + plugin.log(f"Sending appointment {appointment_dict.get('locator')} to {tower_id}") + response = send_appointment(tower_id, tower, appointment_dict, signature) + plugin.log(f"Appointment accepted and signed by {tower_id}") + plugin.log(f"Remaining slots: {response.get('available_slots')}") - # TODO: Not storing the whole appointments for now. The node can recreate all the data if needed. - # DISCUSS: It may be worth checking that the available slots match instead of blindly trusting. - - tower_info.appointments[appointment_dict.get("locator")] = response.get("signature") - tower_info.available_slots = response.get("available_slots") - tower_info.status = "reachable" + # # TODO: Not storing the whole appointments for now. The node can recreate all the data if needed. + # # DISCUSS: It may be worth checking that the available slots match instead of blindly trusting. + return response.get("signature"), response.get("available_slots") except SignatureError as e: - plugin.log("{} is misbehaving, not using it any longer".format(tower_id)) - tower_info.status = "misbehaving" - tower_info.invalid_appointments.append((appointment_dict, e.kwargs.get("signature"))) + plugin.log(f"{tower_id} is misbehaving, not using it any longer") + raise e - except TowerConnectionError: - # All TowerConnectionError are transitory. The connection is tried on register, so the URL cannot be malformed. - # Flag appointment for retry - plugin.log("{} cannot be reached. Adding appointment to pending".format(tower_id)) - tower_info.status = "temporarily unreachable" + except TowerConnectionError as e: + plugin.log(f"{tower_id} cannot be reached") + + raise e except TowerResponseError as e: data = e.kwargs.get("data") @@ -43,30 +38,28 @@ def add_appointment(plugin, tower_id, tower_info, appointment_dict, signature): if data and status_code == constants.HTTP_BAD_REQUEST: if data.get("error_code") == errors.APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS: - plugin.log("There is a subscription issue with {}. Adding appointment to pending".format(tower_id)) - tower_info.status = "subscription error" + message = f"There is a subscription issue with {tower_id}" + raise TowerResponseError(message, status="subscription error") elif data.get("error_code") >= errors.INVALID_REQUEST_FORMAT: - plugin.log("Appointment sent to {} is invalid".format(tower_id)) - tower_info.status = "reachable" # DISCUSS: It may be worth backing up the data since otherwise the update is dropped + message = f"Appointment sent to {tower_id} is invalid" + raise TowerResponseError(message, status="reachable") elif status_code == constants.HTTP_SERVICE_UNAVAILABLE: # Flag appointment for retry - plugin.log("{} is temporarily unavailable. Adding appointment to pending".format(tower_id)) - tower_info.status = "temporarily unreachable" + message = f"{tower_id} is temporarily unavailable" - else: - # Log unexpected behaviour - plugin.log(str(e), level="warn") + raise TowerResponseError(message, status="temporarily unreachable") - return tower_info.status + # Log unexpected behaviour without raising + plugin.log(str(e), level="warn") -def send_appointment(tower_id, tower_info, appointment_dict, signature): +def send_appointment(tower_id, tower, appointment_dict, signature): data = {"appointment": appointment_dict, "signature": signature} - add_appointment_endpoint = "{}/add_appointment".format(tower_info.netaddr) + add_appointment_endpoint = f"{tower.get('netaddr')}/add_appointment" response = process_post_response(post_request(data, add_appointment_endpoint, tower_id)) signature = response.get("signature") @@ -103,13 +96,13 @@ def post_request(data, endpoint, tower_id): return requests.post(url=endpoint, json=data, timeout=5) except ConnectTimeout: - message = "Cannot connect to {}. Connection timeout".format(tower_id) + message = f"Cannot connect to {tower_id}. Connection timeout" except ConnectionError: - message = "Cannot connect to {}. Tower cannot be reached".format(tower_id) + message = f"Cannot connect to {tower_id}. Tower cannot be reached" except (InvalidSchema, MissingSchema, InvalidURL): - message = "Invalid URL. No schema, or invalid schema, found (url={}, tower_id={}).".format(endpoint, tower_id) + message = f"Invalid URL. No schema, or invalid schema, found (url={endpoint}, tower_id={tower_id})" raise TowerConnectionError(message) diff --git a/watchtower-plugin/retrier.py b/watchtower-plugin/retrier.py index a4cbf3d..e4e1d8d 100644 --- a/watchtower-plugin/retrier.py +++ b/watchtower-plugin/retrier.py @@ -1,8 +1,10 @@ import backoff from threading import Thread -from tower_info import TowerInfo +from common.exceptions import SignatureError + from net.http import add_appointment +from exceptions import TowerConnectionError, TowerResponseError MAX_RETRIES = None @@ -11,19 +13,17 @@ MAX_RETRIES = None def on_backoff(details): plugin = details.get("args")[1] tower_id = details.get("args")[2] - plugin.log("Retry {} failed for tower {}, backing off".format(details.get("tries"), tower_id)) + plugin.log(f"Retry {details.get('tries')} failed for tower {tower_id}, backing off") def on_giveup(details): plugin = details.get("args")[1] tower_id = details.get("args")[2] - tower_info = details.get("args")[3] - plugin.log("Max retries reached, abandoning tower {}".format(tower_id)) + plugin.log(f"Max retries reached, abandoning tower {tower_id}") - tower_info.status = "unreachable" - plugin.wt_client.towers[tower_id]["status"] = "unreachable" - plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info) + tower_update = {"status": "unreachable"} + plugin.wt_client.update_tower_state(tower_id, tower_update) def set_max_retries(max_retries): @@ -43,9 +43,9 @@ class Retrier: def manage_retry(self, plugin): while True: tower_id = self.temp_unreachable_towers.get() - tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id)) + tower = plugin.wt_client.towers[tower_id] - Thread(target=self.do_retry, args=[plugin, tower_id, tower_info], daemon=True).start() + Thread(target=self.do_retry, args=[plugin, tower_id, tower], daemon=True).start() @backoff.on_predicate( backoff.expo, @@ -54,15 +54,32 @@ class Retrier: on_backoff=on_backoff, on_giveup=on_giveup, ) - def do_retry(self, plugin, tower_id, tower_info): + def do_retry(self, plugin, tower_id, tower): for appointment_dict, signature in plugin.wt_client.towers[tower_id]["pending_appointments"]: - status = add_appointment(plugin, tower_id, tower_info, appointment_dict, signature) + tower_update = {} + try: + tower_signature, available_slots = add_appointment(plugin, tower_id, tower, appointment_dict, signature) + tower_update["status"] = "reachable" + tower_update["appointment"] = (appointment_dict.get("locator"), tower_signature) + tower_update["available_slots"] = available_slots - if status in ["reachable", "misbehaving"]: - tower_info.pending_appointments.remove([appointment_dict, signature]) + except SignatureError as e: + tower_update["status"] = "misbehaving" + tower_update["invalid_appointment"] = (appointment_dict, e.kwargs.get("signature")) + except TowerConnectionError: + tower_update["status"] = "temporarily unreachable" + + except TowerResponseError as e: + tower_update["status"] = e.kwargs.get("status") + + if tower_update["status"] in ["reachable", "misbehaving"]: + tower_update["pending_appointment"] = ([appointment_dict, signature], "remove") + + if tower_update["status"] != "temporarily unreachable": # Update memory and TowersDB - plugin.wt_client.update_tower_state(tower_id, tower_info) + plugin.wt_client.update_tower_state(tower_id, tower_update) - else: - return status + # Continue looping if reachable, return for either retry or stop otherwise + if tower_update["status"] != "reachable": + return tower_update.get("status") diff --git a/watchtower-plugin/watchtower.py b/watchtower-plugin/watchtower.py index 91c8192..dc24663 100755 --- a/watchtower-plugin/watchtower.py +++ b/watchtower-plugin/watchtower.py @@ -20,7 +20,7 @@ from exceptions import TowerConnectionError, TowerResponseError from net.http import post_request, process_post_response, add_appointment -DATA_DIR = os.path.expanduser("~/.watchtower/") +DATA_DIR = os.getenv("TOWERS_DATA_DIR", os.path.expanduser("~/.watchtower/")) CONF_FILE_NAME = "watchtower.conf" DEFAULT_CONF = { @@ -52,8 +52,25 @@ class WTClient: Thread(target=self.retrier.manage_retry, args=[plugin], daemon=True).start() - def update_tower_state(self, tower_id, tower_info): + def update_tower_state(self, tower_id, tower_update): self.lock.acquire() + tower_info = TowerInfo.from_dict(self.db_manager.load_tower_record(tower_id)) + + if "status" in tower_update: + tower_info.status = tower_update.get("status") + if "appointment" in tower_update: + locator, signature = tower_update.get("appointment") + tower_info.appointments[locator] = signature + tower_info.available_slots = tower_update.get("available_slots") + if "pending_appointment" in tower_update: + data, action = tower_update.get("pending_appointment") + if action == "add": + tower_info.pending_appointments.append(list(data)) + else: + tower_info.pending_appointments.remove(list(data)) + if "invalid_appointment" in tower_update: + tower_info.pending_appointments.append(list(tower_update.get("invalid_appointment"))) + self.towers[tower_id] = tower_info.get_summary() self.db_manager.store_tower_record(tower_id, tower_info) self.lock.release() @@ -63,13 +80,13 @@ class WTClient: def init(options, configuration, plugin): try: user_sk, user_id = generate_keys(DATA_DIR) - plugin.log("Generating a new key pair for the watchtower client. Keys stored at {}".format(DATA_DIR)) + plugin.log(f"Generating a new key pair for the watchtower client. Keys stored at {DATA_DIR}") except FileExistsError: plugin.log("A key file for the watchtower client already exists. Loading it") user_sk, user_id = load_keys(DATA_DIR) - plugin.log("Plugin watchtower client initialized. User id = {}".format(user_id)) + plugin.log(f"Plugin watchtower client initialized. User id = {user_id}") config_loader = ConfigLoader(DATA_DIR, CONF_FILE_NAME, DEFAULT_CONF, {}) try: @@ -109,17 +126,20 @@ def register(plugin, tower_id, host=None, port=None): tower_netaddr = "http://" + tower_netaddr # Send request to the server. - register_endpoint = "{}/register".format(tower_netaddr) + register_endpoint = f"{tower_netaddr}/register" data = {"public_key": plugin.wt_client.user_id} - plugin.log("Registering in the Eye of Satoshi (tower_id={})".format(tower_id)) + plugin.log(f"Registering in the Eye of Satoshi (tower_id={tower_id})") response = process_post_response(post_request(data, register_endpoint, tower_id)) - plugin.log("Registration succeeded. Available slots: {}".format(response.get("available_slots"))) + plugin.log(f"Registration succeeded. Available slots: {response.get('available_slots')}") # Save data tower_info = TowerInfo(tower_netaddr, response.get("available_slots")) - plugin.wt_client.update_tower_state(tower_id, tower_info) + plugin.wt_client.lock.acquire() + plugin.wt_client.towers[tower_id] = tower_info.get_summary() + plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info) + plugin.wt_client.lock.release() return response @@ -150,13 +170,14 @@ def get_appointment(plugin, tower_id, locator): if tower_id not in plugin.wt_client.towers: raise InvalidParameter("tower_id is not within the registered towers", tower_id=tower_id) - message = "get appointment {}".format(locator) + message = f"get appointment {locator}" signature = Cryptographer.sign(message.encode(), plugin.wt_client.sk) data = {"locator": locator, "signature": signature} # Send request to the server. - get_appointment_endpoint = "{}/get_appointment".format(plugin.wt_client.towers[tower_id].get("netaddr")) - plugin.log("Requesting appointment from {}".format(tower_id)) + tower_netaddr = plugin.wt_client.towers[tower_id].get("netaddr") + get_appointment_endpoint = f"{tower_netaddr}/get_appointment" + plugin.log(f"Requesting appointment from {tower_id}") response = process_post_response(post_request(data, get_appointment_endpoint, tower_id)) return response @@ -198,16 +219,16 @@ def get_tower_info(plugin, tower_id): @plugin.method("retrytower", desc="Retry to send pending appointment to an unreachable tower.") def retry_tower(plugin, tower_id): - tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id)) + tower = plugin.wt_client.towers.get(tower_id) - if not tower_info: - return {"error": "{} is not a registered tower".format(tower_id)} - if tower_info.status != "unreachable": - return {"error": "{} is not unreachable".format(tower_id)} - if not tower_info.pending_appointments: - return {"error": "{} does not have pending appointments".format(tower_id)} + if not tower: + return {"error": f"{tower_id} is not a registered tower"} + if tower.get("status") != "unreachable": + return {"error": f"{tower_id} is not unreachable"} + if not tower.get("pending_appointments"): + return {"error": f"{tower_id} does not have pending appointments"} - message = "Retrying tower {}".format(tower_id) + message = f"Retrying tower {tower_id}" plugin.log(message) plugin.wt_client.retrier.temp_unreachable_towers.put(tower_id) @@ -225,33 +246,60 @@ def on_commitment_revocation(plugin, **kwargs): ) signature = Cryptographer.sign(appointment.serialize(), plugin.wt_client.sk) - # Send appointment to the towers. - # FIXME: sending the appointment to all registered towers atm. Some management would be nice. - for tower_id, tower in plugin.wt_client.towers.items(): - tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id)) - - if tower_info.status == "reachable": - status = add_appointment(plugin, tower_id, tower_info, appointment.to_dict(), signature) - if status == "temporarily unreachable": - tower_info.pending_appointments.append((appointment.to_dict(), signature)) - plugin.wt_client.retrier.temp_unreachable_towers.put(tower_id) - - elif tower_info.status != "misbehaving": - if tower_info.status in ["temporarily unreachable", "unreachable"]: - plugin.log("{} is {}. Adding appointment to pending".format(tower_id, tower_info.status)) - elif tower_info == "subscription error": - plugin.log("There is a subscription issue with {}. Adding appointment to pending".format(tower_id)) - - tower_info.pending_appointments.append((appointment.to_dict(), signature)) - - # Update memory and TowersDB - plugin.wt_client.update_tower_state(tower_id, tower_info) - except (InvalidParameter, EncryptionError, SignatureError) as e: plugin.log(str(e), level="warn") + return {"result": "continue"} - except TowerConnectionError: - pass + # Send appointment to the towers. + # FIXME: sending the appointment to all registered towers atm. Some management would be nice. + for tower_id, tower in plugin.wt_client.towers.items(): + tower_update = {} + + if tower.get("status") == "misbehaving": + return {"result": "continue"} + + try: + if tower.get("status") == "reachable": + signature, available_slots = add_appointment(plugin, tower_id, tower, appointment.to_dict(), signature) + tower_update["appointment"] = (appointment.locator, signature) + tower_update["available_slots"] = available_slots + + else: + if tower.get("status") in ["temporarily unreachable", "unreachable"]: + plugin.log(f"{tower_id} is {tower.get('status')}. Adding {appointment.locator} to pending") + elif tower.get("status") == "subscription error": + plugin.log(f"There is a subscription issue with {tower_id}. Adding appointment to pending") + + tower_update["pending_appointment"] = (appointment.to_dict(), signature), "add" + + except SignatureError as e: + tower_update["status"] = "misbehaving" + tower_update["invalid_appointment"] = (appointment.to_dict(), e.kwargs.get("signature")) + + except TowerConnectionError: + # All TowerConnectionError are transitory. Connections are tried on register so URLs cannot be malformed. + # Flag appointment for retry + tower_update["status"] = "temporarily unreachable" + plugin.log(f"Adding {appointment.locator} to pending") + tower_update["pending_appointment"] = (appointment.to_dict(), signature), "add" + tower_update["retry"] = True + + except TowerResponseError as e: + tower_update["status"] = e.kwargs.get("status") + + if tower_update["status"] in ["temporarily unreachable", "subscription_error"]: + plugin.log(f"Adding {appointment.locator} to pending") + tower_update["pending_appointment"] = (appointment.to_dict(), signature), "add" + + if tower_update["status"] == "temporarily unreachable": + tower_update["retry"] = True + + finally: + # Update memory and TowersDB + plugin.wt_client.update_tower_state(tower_id, tower_update) + + if tower_update.get("retry"): + plugin.wt_client.retrier.temp_unreachable_towers.put(tower_id) return {"result": "continue"}