mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
plugin - adds Retrier and improves get_tower_info and listtowers
- Logs data identified by tower_id instead of endpoint - Adds Retrier to deal with retries (for connection errors and rejects) and moves sending logic to net/http - Adds pending appointments to TowerInfo and serves that information via `get_tower_info` and `list_towers` - Deals with connection errors (but not with rejections yet)
This commit is contained in:
@@ -1,6 +1,8 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import os
|
import os
|
||||||
import plyvel
|
import plyvel
|
||||||
|
from queue import Queue
|
||||||
|
from threading import Thread
|
||||||
from pyln.client import Plugin
|
from pyln.client import Plugin
|
||||||
|
|
||||||
from common.tools import compute_locator
|
from common.tools import compute_locator
|
||||||
@@ -10,11 +12,12 @@ from common.cryptographer import Cryptographer
|
|||||||
from common.exceptions import InvalidParameter, SignatureError, EncryptionError
|
from common.exceptions import InvalidParameter, SignatureError, EncryptionError
|
||||||
|
|
||||||
import arg_parser
|
import arg_parser
|
||||||
|
from retrier import Retrier
|
||||||
from tower_info import TowerInfo
|
from tower_info import TowerInfo
|
||||||
from towers_dbm import TowersDBM
|
from towers_dbm import TowersDBM
|
||||||
from keys import generate_keys, load_keys
|
from keys import generate_keys, load_keys
|
||||||
from net.http import post_request, process_post_response
|
|
||||||
from exceptions import TowerConnectionError, TowerResponseError
|
from exceptions import TowerConnectionError, TowerResponseError
|
||||||
|
from net.http import post_request, process_post_response, send_appointment
|
||||||
|
|
||||||
|
|
||||||
DATA_DIR = os.path.expanduser("~/.watchtower/")
|
DATA_DIR = os.path.expanduser("~/.watchtower/")
|
||||||
@@ -22,6 +25,8 @@ CONF_FILE_NAME = "watchtower.conf"
|
|||||||
|
|
||||||
DEFAULT_CONF = {
|
DEFAULT_CONF = {
|
||||||
"DEFAULT_PORT": {"value": 9814, "type": int},
|
"DEFAULT_PORT": {"value": 9814, "type": int},
|
||||||
|
"RETRY_DELTA": {"value": 60, "type": int},
|
||||||
|
"MAX_RETRIES": {"value": 30, "type": int},
|
||||||
"APPOINTMENTS_FOLDER_NAME": {"value": "appointment_receipts", "type": str, "path": True},
|
"APPOINTMENTS_FOLDER_NAME": {"value": "appointment_receipts", "type": str, "path": True},
|
||||||
"TOWERS_DB": {"value": "towers", "type": str, "path": True},
|
"TOWERS_DB": {"value": "towers", "type": str, "path": True},
|
||||||
"PRIVATE_KEY": {"value": "sk.der", "type": str, "path": True},
|
"PRIVATE_KEY": {"value": "sk.der", "type": str, "path": True},
|
||||||
@@ -37,12 +42,15 @@ class WTClient:
|
|||||||
self.user_id = user_id
|
self.user_id = user_id
|
||||||
self.db_manager = TowersDBM(config.get("TOWERS_DB"), plugin)
|
self.db_manager = TowersDBM(config.get("TOWERS_DB"), plugin)
|
||||||
self.towers = {}
|
self.towers = {}
|
||||||
|
self.retrier = Retrier(config.get("RETRY_DELTA"), config.get("MAX_RETRIES"), Queue())
|
||||||
self.config = config
|
self.config = config
|
||||||
|
|
||||||
# Populate the towers dict with data from the db
|
# Populate the towers dict with data from the db
|
||||||
for tower_id, tower_info in self.db_manager.load_all_tower_records().items():
|
for tower_id, tower_info in self.db_manager.load_all_tower_records().items():
|
||||||
self.towers[tower_id] = TowerInfo.from_dict(tower_info).get_summary()
|
self.towers[tower_id] = TowerInfo.from_dict(tower_info).get_summary()
|
||||||
|
|
||||||
|
Thread(target=self.retrier.do_retry, args=[plugin], daemon=True).start()
|
||||||
|
|
||||||
|
|
||||||
@plugin.init()
|
@plugin.init()
|
||||||
def init(options, configuration, plugin):
|
def init(options, configuration, plugin):
|
||||||
@@ -97,9 +105,9 @@ def register(plugin, tower_id, host=None, port=None):
|
|||||||
register_endpoint = "{}/register".format(tower_netaddr)
|
register_endpoint = "{}/register".format(tower_netaddr)
|
||||||
data = {"public_key": plugin.wt_client.user_id}
|
data = {"public_key": plugin.wt_client.user_id}
|
||||||
|
|
||||||
plugin.log("Registering in the Eye of Satoshi")
|
plugin.log("Registering in the Eye of Satoshi (tower_id={})".format(tower_id))
|
||||||
|
|
||||||
response = process_post_response(post_request(data, register_endpoint))
|
response = process_post_response(post_request(data, register_endpoint, tower_id))
|
||||||
plugin.log("Registration succeeded. Available slots: {}".format(response.get("available_slots")))
|
plugin.log("Registration succeeded. Available slots: {}".format(response.get("available_slots")))
|
||||||
|
|
||||||
# Save data
|
# Save data
|
||||||
@@ -142,9 +150,9 @@ def get_appointment(plugin, tower_id, locator):
|
|||||||
|
|
||||||
# Send request to the server.
|
# Send request to the server.
|
||||||
get_appointment_endpoint = "{}/get_appointment".format(plugin.wt_client.towers[tower_id].get("netaddr"))
|
get_appointment_endpoint = "{}/get_appointment".format(plugin.wt_client.towers[tower_id].get("netaddr"))
|
||||||
plugin.log("Requesting appointment from the Eye of Satoshi at {}".format(get_appointment_endpoint))
|
plugin.log("Requesting appointment from {}".format(tower_id))
|
||||||
|
|
||||||
response = process_post_response(post_request(data, get_appointment_endpoint))
|
response = process_post_response(post_request(data, get_appointment_endpoint, tower_id))
|
||||||
return response
|
return response
|
||||||
|
|
||||||
except (InvalidParameter, TowerConnectionError, TowerResponseError) as e:
|
except (InvalidParameter, TowerConnectionError, TowerResponseError) as e:
|
||||||
@@ -155,17 +163,39 @@ def get_appointment(plugin, tower_id, locator):
|
|||||||
@plugin.method("listtowers", desc="List all towers registered towers.")
|
@plugin.method("listtowers", desc="List all towers registered towers.")
|
||||||
def list_towers(plugin):
|
def list_towers(plugin):
|
||||||
towers_info = {"towers": []}
|
towers_info = {"towers": []}
|
||||||
for k, v in plugin.wt_client.towers.items():
|
for tower_id, info in plugin.wt_client.towers.items():
|
||||||
towers_info["towers"].append({"id": k, **v})
|
values = {k: v for k, v in info.items() if k != "pending_appointments"}
|
||||||
|
pending_appointments = [appointment.get("locator") for appointment, signature in info["pending_appointments"]]
|
||||||
|
values["pending_appointments"] = pending_appointments
|
||||||
|
towers_info["towers"].append({"id": tower_id, **values})
|
||||||
|
|
||||||
return towers_info
|
return towers_info
|
||||||
|
|
||||||
|
|
||||||
@plugin.method("gettowerinfo", desc="List all towers registered towers.")
|
@plugin.method("gettowerinfo", desc="List all towers registered towers.")
|
||||||
def get_tower_info(plugin, tower_id):
|
def get_tower_info(plugin, tower_id):
|
||||||
tower_info = plugin.wt_client.db_manager.load_tower_record(tower_id)
|
tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id))
|
||||||
|
pending_appointments = [
|
||||||
|
{"appointment": appointment, "signature": signature}
|
||||||
|
for appointment, signature in tower_info.pending_appointments
|
||||||
|
]
|
||||||
|
tower_info.pending_appointments = pending_appointments
|
||||||
|
return {"id": tower_id, **tower_info.to_dict()}
|
||||||
|
|
||||||
return {"id": tower_id, **tower_info}
|
|
||||||
|
@plugin.method("retrytower", desc="Retry to send pending appointment to an unreachable tower.")
|
||||||
|
def retry_tower(plugin, tower_id):
|
||||||
|
tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id))
|
||||||
|
|
||||||
|
if not tower_info:
|
||||||
|
return {"error": "{} is not a registered tower".format(tower_id)}
|
||||||
|
if tower_info.status != "unreachable":
|
||||||
|
return {"error": "{} is not unreachable".format(tower_id)}
|
||||||
|
if not tower_info.pending_appointments:
|
||||||
|
return {"error": "{} does not have pending appointments".format(tower_id)}
|
||||||
|
|
||||||
|
plugin.wt_client.retrier.temp_unreachable_towers.put(tower_id)
|
||||||
|
plugin.log("Retrying tower {}".format(tower_id))
|
||||||
|
|
||||||
|
|
||||||
@plugin.hook("commitment_revocation")
|
@plugin.hook("commitment_revocation")
|
||||||
@@ -177,35 +207,24 @@ def add_appointment(plugin, **kwargs):
|
|||||||
to_self_delay=20, # does not matter for now, any value 20-2^32-1 would do
|
to_self_delay=20, # does not matter for now, any value 20-2^32-1 would do
|
||||||
encrypted_blob=Cryptographer.encrypt(penalty_tx, commitment_txid),
|
encrypted_blob=Cryptographer.encrypt(penalty_tx, commitment_txid),
|
||||||
)
|
)
|
||||||
|
|
||||||
signature = Cryptographer.sign(appointment.serialize(), plugin.wt_client.sk)
|
signature = Cryptographer.sign(appointment.serialize(), plugin.wt_client.sk)
|
||||||
data = {"appointment": appointment.to_dict(), "signature": signature}
|
|
||||||
|
|
||||||
# Send appointment to the server.
|
# Send appointment to the server.
|
||||||
# FIXME: sending the appointment to all registered towers atm. Some management would be nice.
|
# FIXME: sending the appointment to all registered towers atm. Some management would be nice.
|
||||||
for tower_id, tower in plugin.wt_client.towers.items():
|
for tower_id, tower in plugin.wt_client.towers.items():
|
||||||
tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id))
|
tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id))
|
||||||
|
|
||||||
|
if tower_info.status != "unreachable":
|
||||||
try:
|
try:
|
||||||
plugin.log("Sending appointment to the Eye of Satoshi at {}".format(tower.get("netaddr")))
|
plugin.log("Sending appointment to {}".format(tower_id))
|
||||||
add_appointment_endpoint = "{}/add_appointment".format(tower.get("netaddr"))
|
response = send_appointment(tower_id, tower_info, appointment.to_dict(), signature)
|
||||||
response = process_post_response(post_request(data, add_appointment_endpoint))
|
plugin.log("Appointment accepted and signed by {}".format(tower_id))
|
||||||
|
|
||||||
signature = response.get("signature")
|
|
||||||
# Check that the server signed the appointment as it should.
|
|
||||||
if not signature:
|
|
||||||
raise TowerResponseError("The response does not contain the signature of the appointment")
|
|
||||||
|
|
||||||
rpk = Cryptographer.recover_pk(appointment.serialize(), signature)
|
|
||||||
if not tower_id != Cryptographer.get_compressed_pk(rpk):
|
|
||||||
raise TowerResponseError("The returned appointment's signature is invalid")
|
|
||||||
|
|
||||||
plugin.log("Appointment accepted and signed by the Eye of Satoshi at {}".format(tower.get("netaddr")))
|
|
||||||
plugin.log("Remaining slots: {}".format(response.get("available_slots")))
|
plugin.log("Remaining slots: {}".format(response.get("available_slots")))
|
||||||
|
|
||||||
# TODO: Not storing the whole appointments for now. The node can recreate all the data if needed.
|
# TODO: Not storing the whole appointments for now. The node can recreate all the data if needed.
|
||||||
# DISCUSS: It may be worth checking that the available slots match instead of blindly trusting.
|
# DISCUSS: It may be worth checking that the available slots match instead of blindly trusting.
|
||||||
|
|
||||||
tower_info.appointments[appointment.locator] = signature
|
tower_info.appointments[appointment.locator] = response.get("signature")
|
||||||
tower_info.available_slots = response.get("available_slots")
|
tower_info.available_slots = response.get("available_slots")
|
||||||
tower_info.status = "reachable"
|
tower_info.status = "reachable"
|
||||||
|
|
||||||
@@ -214,19 +233,31 @@ def add_appointment(plugin, **kwargs):
|
|||||||
plugin.wt_client.towers[tower_id] = tower_info.get_summary()
|
plugin.wt_client.towers[tower_id] = tower_info.get_summary()
|
||||||
|
|
||||||
except TowerConnectionError as e:
|
except TowerConnectionError as e:
|
||||||
# TODO: Implement retry logic
|
# All TowerConnectionError are transitory, since the connection is tried on register, so the URL
|
||||||
|
# cannot be malformed
|
||||||
plugin.log(str(e))
|
plugin.log(str(e))
|
||||||
if e.kwargs.get("transitory"):
|
|
||||||
tower_info.status = "temporarily unreachable"
|
|
||||||
else:
|
|
||||||
tower_info.status = "unreachable"
|
|
||||||
|
|
||||||
|
# Flag appointment for retry
|
||||||
|
tower_info.status = "temporarily unreachable"
|
||||||
|
tower_info.pending_appointments.append((appointment.to_dict(), signature))
|
||||||
|
plugin.wt_client.retrier.temp_unreachable_towers.put(tower_id)
|
||||||
|
|
||||||
|
# Store data in memory and TowersDB
|
||||||
plugin.wt_client.towers[tower_id] = tower_info.get_summary()
|
plugin.wt_client.towers[tower_id] = tower_info.get_summary()
|
||||||
plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info)
|
plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info)
|
||||||
|
|
||||||
except TowerResponseError as e:
|
except TowerResponseError as e:
|
||||||
|
# FIXME: deal with tower errors, such as no available slots
|
||||||
plugin.log(str(e))
|
plugin.log(str(e))
|
||||||
|
|
||||||
|
else:
|
||||||
|
# If the tower has been flagged as unreachable (too many retries with no success) data is simply stored
|
||||||
|
# for future submission via `retry_tower`.
|
||||||
|
plugin.log("{} is unreachable. Adding appointment to pending".format(tower_id))
|
||||||
|
tower_info.pending_appointments.append((appointment.to_dict(), signature))
|
||||||
|
plugin.wt_client.towers[tower_id] = tower_info.get_summary()
|
||||||
|
plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info)
|
||||||
|
|
||||||
except (InvalidParameter, EncryptionError, SignatureError, TowerResponseError) as e:
|
except (InvalidParameter, EncryptionError, SignatureError, TowerResponseError) as e:
|
||||||
plugin.log(str(e), level="warn")
|
plugin.log(str(e), level="warn")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user