Splits add_appointment so it can also be used by the Retrier

- The appointment constructions is left in Watchtower (via on_commit_revocation method)
- The tower interaction is moved to net/http so it can be reused
- Adds missing logic for invalid resposes by the tower (e.g invalid signatures)
This commit is contained in:
Sergi Delgado Segura
2020-04-29 13:12:21 +02:00
parent 7bbfd925f4
commit 43217874b5
3 changed files with 101 additions and 53 deletions

View File

@@ -2,8 +2,8 @@
import os
import plyvel
from queue import Queue
from threading import Thread
from pyln.client import Plugin
from threading import Thread, Lock
from common.tools import compute_locator
from common.appointment import Appointment
@@ -17,7 +17,7 @@ from tower_info import TowerInfo
from towers_dbm import TowersDBM
from keys import generate_keys, load_keys
from exceptions import TowerConnectionError, TowerResponseError
from net.http import post_request, process_post_response, send_appointment
from net.http import post_request, process_post_response, add_appointment
DATA_DIR = os.path.expanduser("~/.watchtower/")
@@ -25,7 +25,6 @@ CONF_FILE_NAME = "watchtower.conf"
DEFAULT_CONF = {
"DEFAULT_PORT": {"value": 9814, "type": int},
"RETRY_DELTA": {"value": 60, "type": int},
"MAX_RETRIES": {"value": 30, "type": int},
"APPOINTMENTS_FOLDER_NAME": {"value": "appointment_receipts", "type": str, "path": True},
"TOWERS_DB": {"value": "towers", "type": str, "path": True},
@@ -40,16 +39,24 @@ class WTClient:
def __init__(self, sk, user_id, config):
self.sk = sk
self.user_id = user_id
self.db_manager = TowersDBM(config.get("TOWERS_DB"), plugin)
self.towers = {}
self.retrier = Retrier(config.get("RETRY_DELTA"), config.get("MAX_RETRIES"), Queue())
self.tmp_unreachable_towers = []
self.db_manager = TowersDBM(config.get("TOWERS_DB"), plugin)
self.retrier = Retrier(config.get("MAX_RETRIES"), Queue())
self.config = config
self.lock = Lock()
# Populate the towers dict with data from the db
for tower_id, tower_info in self.db_manager.load_all_tower_records().items():
self.towers[tower_id] = TowerInfo.from_dict(tower_info).get_summary()
Thread(target=self.retrier.do_retry, args=[plugin], daemon=True).start()
Thread(target=self.retrier.manage_retry, args=[plugin], daemon=True).start()
def update_tower_state(self, tower_id, tower_info):
self.lock.acquire()
self.towers[tower_id] = tower_info.get_summary()
self.db_manager.store_tower_record(tower_id, tower_info)
self.lock.release()
@plugin.init()
@@ -112,8 +119,7 @@ def register(plugin, tower_id, host=None, port=None):
# Save data
tower_info = TowerInfo(tower_netaddr, response.get("available_slots"))
plugin.wt_client.towers[tower_id] = tower_info.get_summary()
plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info)
plugin.wt_client.update_tower_state(tower_id, tower_info)
return response
@@ -166,7 +172,9 @@ def list_towers(plugin):
for tower_id, info in plugin.wt_client.towers.items():
values = {k: v for k, v in info.items() if k != "pending_appointments"}
pending_appointments = [appointment.get("locator") for appointment, signature in info["pending_appointments"]]
invalid_appointments = [appointment.get("locator") for appointment, signature in info["invalid_appointments"]]
values["pending_appointments"] = pending_appointments
values["invalid_appointments"] = invalid_appointments
towers_info["towers"].append({"id": tower_id, **values})
return towers_info
@@ -179,7 +187,12 @@ def get_tower_info(plugin, tower_id):
{"appointment": appointment, "signature": signature}
for appointment, signature in tower_info.pending_appointments
]
invalid_appointments = [
{"appointment": appointment, "tower_signature": signature}
for appointment, signature in tower_info.invalid_appointments
]
tower_info.pending_appointments = pending_appointments
tower_info.invalid_appointments = invalid_appointments
return {"id": tower_id, **tower_info.to_dict()}
@@ -194,12 +207,15 @@ def retry_tower(plugin, tower_id):
if not tower_info.pending_appointments:
return {"error": "{} does not have pending appointments".format(tower_id)}
message = "Retrying tower {}".format(tower_id)
plugin.log(message)
plugin.wt_client.retrier.temp_unreachable_towers.put(tower_id)
plugin.log("Retrying tower {}".format(tower_id))
return message
@plugin.hook("commitment_revocation")
def add_appointment(plugin, **kwargs):
def on_commitment_revocation(plugin, **kwargs):
try:
commitment_txid, penalty_tx = arg_parser.parse_add_appointment_arguments(kwargs)
appointment = Appointment(
@@ -209,58 +225,34 @@ def add_appointment(plugin, **kwargs):
)
signature = Cryptographer.sign(appointment.serialize(), plugin.wt_client.sk)
# Send appointment to the server.
# Send appointment to the towers.
# FIXME: sending the appointment to all registered towers atm. Some management would be nice.
for tower_id, tower in plugin.wt_client.towers.items():
tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id))
if tower_info.status != "unreachable":
try:
plugin.log("Sending appointment to {}".format(tower_id))
response = send_appointment(tower_id, tower_info, appointment.to_dict(), signature)
plugin.log("Appointment accepted and signed by {}".format(tower_id))
plugin.log("Remaining slots: {}".format(response.get("available_slots")))
# TODO: Not storing the whole appointments for now. The node can recreate all the data if needed.
# DISCUSS: It may be worth checking that the available slots match instead of blindly trusting.
tower_info.appointments[appointment.locator] = response.get("signature")
tower_info.available_slots = response.get("available_slots")
tower_info.status = "reachable"
# Update memory and TowersDB
plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info)
plugin.wt_client.towers[tower_id] = tower_info.get_summary()
except TowerConnectionError as e:
# All TowerConnectionError are transitory, since the connection is tried on register, so the URL
# cannot be malformed
plugin.log(str(e))
# Flag appointment for retry
tower_info.status = "temporarily unreachable"
if tower_info.status == "reachable":
status = add_appointment(plugin, tower_id, tower_info, appointment.to_dict(), signature)
if status == "temporarily unreachable":
tower_info.pending_appointments.append((appointment.to_dict(), signature))
plugin.wt_client.retrier.temp_unreachable_towers.put(tower_id)
# Store data in memory and TowersDB
plugin.wt_client.towers[tower_id] = tower_info.get_summary()
plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info)
elif tower_info.status != "misbehaving":
if tower_info.status in ["temporarily unreachable", "unreachable"]:
plugin.log("{} is {}. Adding appointment to pending".format(tower_id, tower_info.status))
elif tower_info == "subscription error":
plugin.log("There is a subscription issue with {}. Adding appointment to pending".format(tower_id))
except TowerResponseError as e:
# FIXME: deal with tower errors, such as no available slots
plugin.log(str(e))
else:
# If the tower has been flagged as unreachable (too many retries with no success) data is simply stored
# for future submission via `retry_tower`.
plugin.log("{} is unreachable. Adding appointment to pending".format(tower_id))
tower_info.pending_appointments.append((appointment.to_dict(), signature))
plugin.wt_client.towers[tower_id] = tower_info.get_summary()
plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info)
except (InvalidParameter, EncryptionError, SignatureError, TowerResponseError) as e:
# Update memory and TowersDB
plugin.wt_client.update_tower_state(tower_id, tower_info)
except (InvalidParameter, EncryptionError, SignatureError) as e:
plugin.log(str(e), level="warn")
except TowerConnectionError:
pass
return {"result": "continue"}