mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
Refactors plugin to have a single point of db data loading
The previous approach was a bit messy with the db access and could have potential race conditions and data inconsistency Also replaces format for f-functions for readability
This commit is contained in:
@@ -12,30 +12,25 @@ from common.cryptographer import Cryptographer
|
||||
from exceptions import TowerConnectionError, TowerResponseError
|
||||
|
||||
|
||||
def add_appointment(plugin, tower_id, tower_info, appointment_dict, signature):
|
||||
def add_appointment(plugin, tower_id, tower, appointment_dict, signature):
|
||||
try:
|
||||
plugin.log("Sending appointment to {}".format(tower_id))
|
||||
response = send_appointment(tower_id, tower_info, appointment_dict, signature)
|
||||
plugin.log("Appointment accepted and signed by {}".format(tower_id))
|
||||
plugin.log("Remaining slots: {}".format(response.get("available_slots")))
|
||||
plugin.log(f"Sending appointment {appointment_dict.get('locator')} to {tower_id}")
|
||||
response = send_appointment(tower_id, tower, appointment_dict, signature)
|
||||
plugin.log(f"Appointment accepted and signed by {tower_id}")
|
||||
plugin.log(f"Remaining slots: {response.get('available_slots')}")
|
||||
|
||||
# TODO: Not storing the whole appointments for now. The node can recreate all the data if needed.
|
||||
# DISCUSS: It may be worth checking that the available slots match instead of blindly trusting.
|
||||
|
||||
tower_info.appointments[appointment_dict.get("locator")] = response.get("signature")
|
||||
tower_info.available_slots = response.get("available_slots")
|
||||
tower_info.status = "reachable"
|
||||
# # TODO: Not storing the whole appointments for now. The node can recreate all the data if needed.
|
||||
# # DISCUSS: It may be worth checking that the available slots match instead of blindly trusting.
|
||||
return response.get("signature"), response.get("available_slots")
|
||||
|
||||
except SignatureError as e:
|
||||
plugin.log("{} is misbehaving, not using it any longer".format(tower_id))
|
||||
tower_info.status = "misbehaving"
|
||||
tower_info.invalid_appointments.append((appointment_dict, e.kwargs.get("signature")))
|
||||
plugin.log(f"{tower_id} is misbehaving, not using it any longer")
|
||||
raise e
|
||||
|
||||
except TowerConnectionError:
|
||||
# All TowerConnectionError are transitory. The connection is tried on register, so the URL cannot be malformed.
|
||||
# Flag appointment for retry
|
||||
plugin.log("{} cannot be reached. Adding appointment to pending".format(tower_id))
|
||||
tower_info.status = "temporarily unreachable"
|
||||
except TowerConnectionError as e:
|
||||
plugin.log(f"{tower_id} cannot be reached")
|
||||
|
||||
raise e
|
||||
|
||||
except TowerResponseError as e:
|
||||
data = e.kwargs.get("data")
|
||||
@@ -43,30 +38,28 @@ def add_appointment(plugin, tower_id, tower_info, appointment_dict, signature):
|
||||
|
||||
if data and status_code == constants.HTTP_BAD_REQUEST:
|
||||
if data.get("error_code") == errors.APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS:
|
||||
plugin.log("There is a subscription issue with {}. Adding appointment to pending".format(tower_id))
|
||||
tower_info.status = "subscription error"
|
||||
message = f"There is a subscription issue with {tower_id}"
|
||||
raise TowerResponseError(message, status="subscription error")
|
||||
|
||||
elif data.get("error_code") >= errors.INVALID_REQUEST_FORMAT:
|
||||
plugin.log("Appointment sent to {} is invalid".format(tower_id))
|
||||
tower_info.status = "reachable"
|
||||
# DISCUSS: It may be worth backing up the data since otherwise the update is dropped
|
||||
message = f"Appointment sent to {tower_id} is invalid"
|
||||
raise TowerResponseError(message, status="reachable")
|
||||
|
||||
elif status_code == constants.HTTP_SERVICE_UNAVAILABLE:
|
||||
# Flag appointment for retry
|
||||
plugin.log("{} is temporarily unavailable. Adding appointment to pending".format(tower_id))
|
||||
tower_info.status = "temporarily unreachable"
|
||||
message = f"{tower_id} is temporarily unavailable"
|
||||
|
||||
else:
|
||||
# Log unexpected behaviour
|
||||
raise TowerResponseError(message, status="temporarily unreachable")
|
||||
|
||||
# Log unexpected behaviour without raising
|
||||
plugin.log(str(e), level="warn")
|
||||
|
||||
return tower_info.status
|
||||
|
||||
|
||||
def send_appointment(tower_id, tower_info, appointment_dict, signature):
|
||||
def send_appointment(tower_id, tower, appointment_dict, signature):
|
||||
data = {"appointment": appointment_dict, "signature": signature}
|
||||
|
||||
add_appointment_endpoint = "{}/add_appointment".format(tower_info.netaddr)
|
||||
add_appointment_endpoint = f"{tower.get('netaddr')}/add_appointment"
|
||||
response = process_post_response(post_request(data, add_appointment_endpoint, tower_id))
|
||||
|
||||
signature = response.get("signature")
|
||||
@@ -103,13 +96,13 @@ def post_request(data, endpoint, tower_id):
|
||||
return requests.post(url=endpoint, json=data, timeout=5)
|
||||
|
||||
except ConnectTimeout:
|
||||
message = "Cannot connect to {}. Connection timeout".format(tower_id)
|
||||
message = f"Cannot connect to {tower_id}. Connection timeout"
|
||||
|
||||
except ConnectionError:
|
||||
message = "Cannot connect to {}. Tower cannot be reached".format(tower_id)
|
||||
message = f"Cannot connect to {tower_id}. Tower cannot be reached"
|
||||
|
||||
except (InvalidSchema, MissingSchema, InvalidURL):
|
||||
message = "Invalid URL. No schema, or invalid schema, found (url={}, tower_id={}).".format(endpoint, tower_id)
|
||||
message = f"Invalid URL. No schema, or invalid schema, found (url={endpoint}, tower_id={tower_id})"
|
||||
|
||||
raise TowerConnectionError(message)
|
||||
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import backoff
|
||||
from threading import Thread
|
||||
|
||||
from tower_info import TowerInfo
|
||||
from common.exceptions import SignatureError
|
||||
|
||||
from net.http import add_appointment
|
||||
from exceptions import TowerConnectionError, TowerResponseError
|
||||
|
||||
|
||||
MAX_RETRIES = None
|
||||
@@ -11,19 +13,17 @@ MAX_RETRIES = None
|
||||
def on_backoff(details):
|
||||
plugin = details.get("args")[1]
|
||||
tower_id = details.get("args")[2]
|
||||
plugin.log("Retry {} failed for tower {}, backing off".format(details.get("tries"), tower_id))
|
||||
plugin.log(f"Retry {details.get('tries')} failed for tower {tower_id}, backing off")
|
||||
|
||||
|
||||
def on_giveup(details):
|
||||
plugin = details.get("args")[1]
|
||||
tower_id = details.get("args")[2]
|
||||
tower_info = details.get("args")[3]
|
||||
|
||||
plugin.log("Max retries reached, abandoning tower {}".format(tower_id))
|
||||
plugin.log(f"Max retries reached, abandoning tower {tower_id}")
|
||||
|
||||
tower_info.status = "unreachable"
|
||||
plugin.wt_client.towers[tower_id]["status"] = "unreachable"
|
||||
plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info)
|
||||
tower_update = {"status": "unreachable"}
|
||||
plugin.wt_client.update_tower_state(tower_id, tower_update)
|
||||
|
||||
|
||||
def set_max_retries(max_retries):
|
||||
@@ -43,9 +43,9 @@ class Retrier:
|
||||
def manage_retry(self, plugin):
|
||||
while True:
|
||||
tower_id = self.temp_unreachable_towers.get()
|
||||
tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id))
|
||||
tower = plugin.wt_client.towers[tower_id]
|
||||
|
||||
Thread(target=self.do_retry, args=[plugin, tower_id, tower_info], daemon=True).start()
|
||||
Thread(target=self.do_retry, args=[plugin, tower_id, tower], daemon=True).start()
|
||||
|
||||
@backoff.on_predicate(
|
||||
backoff.expo,
|
||||
@@ -54,15 +54,32 @@ class Retrier:
|
||||
on_backoff=on_backoff,
|
||||
on_giveup=on_giveup,
|
||||
)
|
||||
def do_retry(self, plugin, tower_id, tower_info):
|
||||
def do_retry(self, plugin, tower_id, tower):
|
||||
for appointment_dict, signature in plugin.wt_client.towers[tower_id]["pending_appointments"]:
|
||||
status = add_appointment(plugin, tower_id, tower_info, appointment_dict, signature)
|
||||
tower_update = {}
|
||||
try:
|
||||
tower_signature, available_slots = add_appointment(plugin, tower_id, tower, appointment_dict, signature)
|
||||
tower_update["status"] = "reachable"
|
||||
tower_update["appointment"] = (appointment_dict.get("locator"), tower_signature)
|
||||
tower_update["available_slots"] = available_slots
|
||||
|
||||
if status in ["reachable", "misbehaving"]:
|
||||
tower_info.pending_appointments.remove([appointment_dict, signature])
|
||||
except SignatureError as e:
|
||||
tower_update["status"] = "misbehaving"
|
||||
tower_update["invalid_appointment"] = (appointment_dict, e.kwargs.get("signature"))
|
||||
|
||||
except TowerConnectionError:
|
||||
tower_update["status"] = "temporarily unreachable"
|
||||
|
||||
except TowerResponseError as e:
|
||||
tower_update["status"] = e.kwargs.get("status")
|
||||
|
||||
if tower_update["status"] in ["reachable", "misbehaving"]:
|
||||
tower_update["pending_appointment"] = ([appointment_dict, signature], "remove")
|
||||
|
||||
if tower_update["status"] != "temporarily unreachable":
|
||||
# Update memory and TowersDB
|
||||
plugin.wt_client.update_tower_state(tower_id, tower_info)
|
||||
plugin.wt_client.update_tower_state(tower_id, tower_update)
|
||||
|
||||
else:
|
||||
return status
|
||||
# Continue looping if reachable, return for either retry or stop otherwise
|
||||
if tower_update["status"] != "reachable":
|
||||
return tower_update.get("status")
|
||||
|
||||
@@ -20,7 +20,7 @@ from exceptions import TowerConnectionError, TowerResponseError
|
||||
from net.http import post_request, process_post_response, add_appointment
|
||||
|
||||
|
||||
DATA_DIR = os.path.expanduser("~/.watchtower/")
|
||||
DATA_DIR = os.getenv("TOWERS_DATA_DIR", os.path.expanduser("~/.watchtower/"))
|
||||
CONF_FILE_NAME = "watchtower.conf"
|
||||
|
||||
DEFAULT_CONF = {
|
||||
@@ -52,8 +52,25 @@ class WTClient:
|
||||
|
||||
Thread(target=self.retrier.manage_retry, args=[plugin], daemon=True).start()
|
||||
|
||||
def update_tower_state(self, tower_id, tower_info):
|
||||
def update_tower_state(self, tower_id, tower_update):
|
||||
self.lock.acquire()
|
||||
tower_info = TowerInfo.from_dict(self.db_manager.load_tower_record(tower_id))
|
||||
|
||||
if "status" in tower_update:
|
||||
tower_info.status = tower_update.get("status")
|
||||
if "appointment" in tower_update:
|
||||
locator, signature = tower_update.get("appointment")
|
||||
tower_info.appointments[locator] = signature
|
||||
tower_info.available_slots = tower_update.get("available_slots")
|
||||
if "pending_appointment" in tower_update:
|
||||
data, action = tower_update.get("pending_appointment")
|
||||
if action == "add":
|
||||
tower_info.pending_appointments.append(list(data))
|
||||
else:
|
||||
tower_info.pending_appointments.remove(list(data))
|
||||
if "invalid_appointment" in tower_update:
|
||||
tower_info.pending_appointments.append(list(tower_update.get("invalid_appointment")))
|
||||
|
||||
self.towers[tower_id] = tower_info.get_summary()
|
||||
self.db_manager.store_tower_record(tower_id, tower_info)
|
||||
self.lock.release()
|
||||
@@ -63,13 +80,13 @@ class WTClient:
|
||||
def init(options, configuration, plugin):
|
||||
try:
|
||||
user_sk, user_id = generate_keys(DATA_DIR)
|
||||
plugin.log("Generating a new key pair for the watchtower client. Keys stored at {}".format(DATA_DIR))
|
||||
plugin.log(f"Generating a new key pair for the watchtower client. Keys stored at {DATA_DIR}")
|
||||
|
||||
except FileExistsError:
|
||||
plugin.log("A key file for the watchtower client already exists. Loading it")
|
||||
user_sk, user_id = load_keys(DATA_DIR)
|
||||
|
||||
plugin.log("Plugin watchtower client initialized. User id = {}".format(user_id))
|
||||
plugin.log(f"Plugin watchtower client initialized. User id = {user_id}")
|
||||
config_loader = ConfigLoader(DATA_DIR, CONF_FILE_NAME, DEFAULT_CONF, {})
|
||||
|
||||
try:
|
||||
@@ -109,17 +126,20 @@ def register(plugin, tower_id, host=None, port=None):
|
||||
tower_netaddr = "http://" + tower_netaddr
|
||||
|
||||
# Send request to the server.
|
||||
register_endpoint = "{}/register".format(tower_netaddr)
|
||||
register_endpoint = f"{tower_netaddr}/register"
|
||||
data = {"public_key": plugin.wt_client.user_id}
|
||||
|
||||
plugin.log("Registering in the Eye of Satoshi (tower_id={})".format(tower_id))
|
||||
plugin.log(f"Registering in the Eye of Satoshi (tower_id={tower_id})")
|
||||
|
||||
response = process_post_response(post_request(data, register_endpoint, tower_id))
|
||||
plugin.log("Registration succeeded. Available slots: {}".format(response.get("available_slots")))
|
||||
plugin.log(f"Registration succeeded. Available slots: {response.get('available_slots')}")
|
||||
|
||||
# Save data
|
||||
tower_info = TowerInfo(tower_netaddr, response.get("available_slots"))
|
||||
plugin.wt_client.update_tower_state(tower_id, tower_info)
|
||||
plugin.wt_client.lock.acquire()
|
||||
plugin.wt_client.towers[tower_id] = tower_info.get_summary()
|
||||
plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info)
|
||||
plugin.wt_client.lock.release()
|
||||
|
||||
return response
|
||||
|
||||
@@ -150,13 +170,14 @@ def get_appointment(plugin, tower_id, locator):
|
||||
if tower_id not in plugin.wt_client.towers:
|
||||
raise InvalidParameter("tower_id is not within the registered towers", tower_id=tower_id)
|
||||
|
||||
message = "get appointment {}".format(locator)
|
||||
message = f"get appointment {locator}"
|
||||
signature = Cryptographer.sign(message.encode(), plugin.wt_client.sk)
|
||||
data = {"locator": locator, "signature": signature}
|
||||
|
||||
# Send request to the server.
|
||||
get_appointment_endpoint = "{}/get_appointment".format(plugin.wt_client.towers[tower_id].get("netaddr"))
|
||||
plugin.log("Requesting appointment from {}".format(tower_id))
|
||||
tower_netaddr = plugin.wt_client.towers[tower_id].get("netaddr")
|
||||
get_appointment_endpoint = f"{tower_netaddr}/get_appointment"
|
||||
plugin.log(f"Requesting appointment from {tower_id}")
|
||||
|
||||
response = process_post_response(post_request(data, get_appointment_endpoint, tower_id))
|
||||
return response
|
||||
@@ -198,16 +219,16 @@ def get_tower_info(plugin, tower_id):
|
||||
|
||||
@plugin.method("retrytower", desc="Retry to send pending appointment to an unreachable tower.")
|
||||
def retry_tower(plugin, tower_id):
|
||||
tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id))
|
||||
tower = plugin.wt_client.towers.get(tower_id)
|
||||
|
||||
if not tower_info:
|
||||
return {"error": "{} is not a registered tower".format(tower_id)}
|
||||
if tower_info.status != "unreachable":
|
||||
return {"error": "{} is not unreachable".format(tower_id)}
|
||||
if not tower_info.pending_appointments:
|
||||
return {"error": "{} does not have pending appointments".format(tower_id)}
|
||||
if not tower:
|
||||
return {"error": f"{tower_id} is not a registered tower"}
|
||||
if tower.get("status") != "unreachable":
|
||||
return {"error": f"{tower_id} is not unreachable"}
|
||||
if not tower.get("pending_appointments"):
|
||||
return {"error": f"{tower_id} does not have pending appointments"}
|
||||
|
||||
message = "Retrying tower {}".format(tower_id)
|
||||
message = f"Retrying tower {tower_id}"
|
||||
plugin.log(message)
|
||||
plugin.wt_client.retrier.temp_unreachable_towers.put(tower_id)
|
||||
|
||||
@@ -225,33 +246,60 @@ def on_commitment_revocation(plugin, **kwargs):
|
||||
)
|
||||
signature = Cryptographer.sign(appointment.serialize(), plugin.wt_client.sk)
|
||||
|
||||
except (InvalidParameter, EncryptionError, SignatureError) as e:
|
||||
plugin.log(str(e), level="warn")
|
||||
return {"result": "continue"}
|
||||
|
||||
# Send appointment to the towers.
|
||||
# FIXME: sending the appointment to all registered towers atm. Some management would be nice.
|
||||
for tower_id, tower in plugin.wt_client.towers.items():
|
||||
tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id))
|
||||
tower_update = {}
|
||||
|
||||
if tower_info.status == "reachable":
|
||||
status = add_appointment(plugin, tower_id, tower_info, appointment.to_dict(), signature)
|
||||
if status == "temporarily unreachable":
|
||||
tower_info.pending_appointments.append((appointment.to_dict(), signature))
|
||||
plugin.wt_client.retrier.temp_unreachable_towers.put(tower_id)
|
||||
if tower.get("status") == "misbehaving":
|
||||
return {"result": "continue"}
|
||||
|
||||
elif tower_info.status != "misbehaving":
|
||||
if tower_info.status in ["temporarily unreachable", "unreachable"]:
|
||||
plugin.log("{} is {}. Adding appointment to pending".format(tower_id, tower_info.status))
|
||||
elif tower_info == "subscription error":
|
||||
plugin.log("There is a subscription issue with {}. Adding appointment to pending".format(tower_id))
|
||||
try:
|
||||
if tower.get("status") == "reachable":
|
||||
signature, available_slots = add_appointment(plugin, tower_id, tower, appointment.to_dict(), signature)
|
||||
tower_update["appointment"] = (appointment.locator, signature)
|
||||
tower_update["available_slots"] = available_slots
|
||||
|
||||
tower_info.pending_appointments.append((appointment.to_dict(), signature))
|
||||
else:
|
||||
if tower.get("status") in ["temporarily unreachable", "unreachable"]:
|
||||
plugin.log(f"{tower_id} is {tower.get('status')}. Adding {appointment.locator} to pending")
|
||||
elif tower.get("status") == "subscription error":
|
||||
plugin.log(f"There is a subscription issue with {tower_id}. Adding appointment to pending")
|
||||
|
||||
# Update memory and TowersDB
|
||||
plugin.wt_client.update_tower_state(tower_id, tower_info)
|
||||
tower_update["pending_appointment"] = (appointment.to_dict(), signature), "add"
|
||||
|
||||
except (InvalidParameter, EncryptionError, SignatureError) as e:
|
||||
plugin.log(str(e), level="warn")
|
||||
except SignatureError as e:
|
||||
tower_update["status"] = "misbehaving"
|
||||
tower_update["invalid_appointment"] = (appointment.to_dict(), e.kwargs.get("signature"))
|
||||
|
||||
except TowerConnectionError:
|
||||
pass
|
||||
# All TowerConnectionError are transitory. Connections are tried on register so URLs cannot be malformed.
|
||||
# Flag appointment for retry
|
||||
tower_update["status"] = "temporarily unreachable"
|
||||
plugin.log(f"Adding {appointment.locator} to pending")
|
||||
tower_update["pending_appointment"] = (appointment.to_dict(), signature), "add"
|
||||
tower_update["retry"] = True
|
||||
|
||||
except TowerResponseError as e:
|
||||
tower_update["status"] = e.kwargs.get("status")
|
||||
|
||||
if tower_update["status"] in ["temporarily unreachable", "subscription_error"]:
|
||||
plugin.log(f"Adding {appointment.locator} to pending")
|
||||
tower_update["pending_appointment"] = (appointment.to_dict(), signature), "add"
|
||||
|
||||
if tower_update["status"] == "temporarily unreachable":
|
||||
tower_update["retry"] = True
|
||||
|
||||
finally:
|
||||
# Update memory and TowersDB
|
||||
plugin.wt_client.update_tower_state(tower_id, tower_update)
|
||||
|
||||
if tower_update.get("retry"):
|
||||
plugin.wt_client.retrier.temp_unreachable_towers.put(tower_id)
|
||||
|
||||
return {"result": "continue"}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user