Files
python-teos/watchtower-plugin/test_watchtower.py
2020-05-07 11:41:41 +02:00

422 lines
16 KiB
Python

import random
import configparser
from time import sleep
from coincurve import PrivateKey
from threading import Thread
from flask import Flask, request, jsonify
from pyln.testing.fixtures import * # noqa: F401,F403
from common import errors
from common import constants
from common.appointment import Appointment
from common.cryptographer import Cryptographer
plugin_path = os.path.join(os.path.dirname(__file__), "watchtower.py")
tower_netaddr = "localhost"
tower_port = "1234"
tower_sk = PrivateKey()
tower_id = Cryptographer.get_compressed_pk(tower_sk.public_key)
mocked_return = None
class TowerMock:
def __init__(self, tower_sk):
self.sk = tower_sk
self.users = {}
self.app = Flask(__name__)
# Adds all the routes to the functions listed above.
routes = {
"/register": (self.register, ["POST"]),
"/add_appointment": (self.add_appointment, ["POST"]),
"/get_appointment": (self.get_appointment, ["POST"]),
}
for url, params in routes.items():
self.app.add_url_rule(url, view_func=params[0], methods=params[1])
# Setting Flask log to ERROR only so it does not mess with our logging. Also disabling flask initial messages
logging.getLogger("werkzeug").setLevel(logging.ERROR)
os.environ["WERKZEUG_RUN_MAIN"] = "true"
def register(self):
user_id = request.get_json().get("public_key")
if user_id not in self.users:
self.users[user_id] = {"available_slots": 100, "subscription_expiry": 4320}
else:
self.users[user_id]["available_slots"] += 100
self.users[user_id]["subscription_expiry"] = 4320
rcode = constants.HTTP_OK
response = {
"public_key": user_id,
"available_slots": self.users[user_id].get("available_slots"),
"subscription_expiry": self.users[user_id].get("subscription_expiry"),
}
return response, rcode
def add_appointment(self):
appointment = Appointment.from_dict(request.get_json().get("appointment"))
user_id = Cryptographer.get_compressed_pk(
Cryptographer.recover_pk(appointment.serialize(), request.get_json().get("signature"))
)
if mocked_return == "success":
response, rtype = add_appointment_success(appointment, self.users[user_id], self.sk)
elif mocked_return == "reject_no_slots":
response, rtype = add_appointment_reject_no_slots()
elif mocked_return == "reject_invalid":
response, rtype = add_appointment_reject_invalid()
elif mocked_return == "misbehaving_tower":
response, rtype = add_appointment_misbehaving_tower(appointment, self.users[user_id], self.sk)
else:
response, rtype = add_appointment_service_unavailable()
return jsonify(response), rtype
def get_appointment(self):
locator = request.get_json().get("locator")
message = f"get appointment {locator}"
user_id = Cryptographer.get_compressed_pk(
Cryptographer.recover_pk(message.encode(), request.get_json().get("signature"))
)
if (
user_id in self.users
and "appointments" in self.users[user_id]
and locator in self.users[user_id]["appointments"]
):
rcode = constants.HTTP_OK
response = self.users[user_id]["appointments"][locator]
response["status"] = "being_watched"
else:
rcode = constants.HTTP_NOT_FOUND
response = {"locator": locator, "status": "not_found"}
return jsonify(response), rcode
def add_appointment_success(appointment, user, tower_sk):
rcode = constants.HTTP_OK
response = {
"locator": appointment.locator,
"signature": Cryptographer.sign(appointment.serialize(), tower_sk),
"available_slots": user.get("available_slots") - 1,
"subscription_expiry": user.get("subscription_expiry"),
}
user["available_slots"] = response.get("available_slots")
if user.get("appointments"):
user["appointments"][appointment.locator] = appointment.to_dict()
else:
user["appointments"] = {appointment.locator: appointment.to_dict()}
return response, rcode
def add_appointment_reject_no_slots():
# This covers non-registered users and users with no available slots
rcode = constants.HTTP_BAD_REQUEST
response = {
"error": "appointment rejected. Invalid signature or user does not have enough slots available",
"error_code": errors.APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS,
}
return response, rcode
def add_appointment_reject_invalid():
# This covers malformed appointments (e.g. no json) and appointments with invalid data
# Pick whatever reason, should not matter
rcode = constants.HTTP_BAD_REQUEST
response = {"error": "appointment rejected", "error_code": errors.APPOINTMENT_EMPTY_FIELD}
return response, rcode
def add_appointment_service_unavailable():
# This covers any reason why the service may be unavailable (e.g. tower run out of free slots)
rcode = constants.HTTP_SERVICE_UNAVAILABLE
response = {"error": "appointment rejected"}
return response, rcode
def add_appointment_misbehaving_tower(appointment, user, tower_sk):
# This covers a tower signing with invalid keys
wrong_sk = PrivateKey.from_hex(get_random_value_hex(32))
wrong_sig = Cryptographer.sign(appointment.serialize(), wrong_sk)
response, rcode = add_appointment_success(appointment, user, tower_sk)
user["appointments"][appointment.locator]["signature"] = wrong_sig
response["signature"] = wrong_sig
return response, rcode
def get_random_value_hex(nbytes):
pseudo_random_value = random.getrandbits(8 * nbytes)
prv_hex = "{:x}".format(pseudo_random_value)
return prv_hex.zfill(2 * nbytes)
@pytest.fixture(scope="session", autouse=True)
def init_tower():
os.environ["TOWERS_DATA_DIR"] = "/tmp/watchtower"
config = configparser.ConfigParser()
config["general"] = {"max_retries": "5"}
os.makedirs(os.environ["TOWERS_DATA_DIR"], exist_ok=True)
with open(os.path.join(os.environ["TOWERS_DATA_DIR"], "watchtower.conf"), "w") as configfile:
config.write(configfile)
yield
shutil.rmtree(os.environ["TOWERS_DATA_DIR"])
@pytest.fixture(scope="session", autouse=True)
def prng_seed():
random.seed(0)
@pytest.fixture(scope="session", autouse=True)
def run_tower():
tower = TowerMock(tower_sk)
Thread(target=tower.app.run, kwargs={"host": tower_netaddr, "port": tower_port}, daemon=True).start()
def test_helpme_starts(node_factory):
l1 = node_factory.get_node()
# Test dynamically
l1.rpc.plugin_start(plugin_path)
l1.rpc.plugin_stop(plugin_path)
l1.rpc.plugin_start(plugin_path)
l1.stop()
# Then statically
l1.daemon.opts["plugin"] = plugin_path
l1.start()
def test_watchtower(node_factory):
""" Tests sending data to a single tower with short connection issue"""
global mocked_return
# FIXME: node_factory is a function scope fixture, so I cannot reuse it while splitting the tests logically
l1, l2 = node_factory.line_graph(2, opts=[{"may_fail": True, "allow_broken_log": True}, {"plugin": plugin_path}])
# Register a new tower
l2.rpc.registertower("{}@{}:{}".format(tower_id, tower_netaddr, tower_port))
# Make sure the tower in our list of towers
tower_ids = [tower.get("id") for tower in l2.rpc.listtowers().get("towers")]
assert tower_id in tower_ids
# There are no appointments in the tower at the moment
assert not l2.rpc.gettowerinfo(tower_id).get("appointments")
# Force a new commitment
mocked_return = "success"
l1.rpc.pay(l2.rpc.invoice(25000000, "lbl1", "desc")["bolt11"])
# Check that the tower got it (list is not empty anymore)
# FIXME: it would be great to check the ids, I haven't found a way to check the list of commitments though.
# simply signing the last tx won't work since every payment creates two updates.
appointments = l2.rpc.gettowerinfo(tower_id).get("appointments")
assert appointments
assert not l2.rpc.gettowerinfo(tower_id).get("pending_appointments")
# Disconnect the tower and see how appointments get backed up
mocked_return = "service_unavailable"
l1.rpc.pay(l2.rpc.invoice(25000000, "lbl2", "desc")["bolt11"])
pending_appointments = [
data.get("appointment").get("locator") for data in l2.rpc.gettowerinfo(tower_id).get("pending_appointments")
]
assert pending_appointments
assert l2.rpc.gettowerinfo(tower_id).get("pending_appointments")
# The fail has triggered the retry strategy. By "turning it back on" we should get the pending appointments trough
mocked_return = "success"
# Give it some time to switch
while l2.rpc.gettowerinfo(tower_id).get("pending_appointments"):
sleep(0.1)
# The previously pending appointments are now part of the sent appointments
assert set(pending_appointments).issubset(l2.rpc.gettowerinfo(tower_id).get("appointments").keys())
def test_watchtower_retry_offline(node_factory):
"""Tests sending data to a tower that gets offline for a while. Forces retry using ``retrytower``"""
global mocked_return
l1, l2 = node_factory.line_graph(2, opts=[{"may_fail": True, "allow_broken_log": True}, {"plugin": plugin_path}])
# Send some appointments with to tower "offline"
mocked_return = "service_unavailable"
# There are no pending appointment atm
assert not l2.rpc.gettowerinfo(tower_id).get("pending_appointments")
l1.rpc.pay(l2.rpc.invoice(25000000, "lbl3", "desc")["bolt11"])
pending_appointments = [
data.get("appointment").get("locator") for data in l2.rpc.gettowerinfo(tower_id).get("pending_appointments")
]
assert pending_appointments
# Wait until the auto-retry gives up and force a retry manually
while l2.rpc.gettowerinfo(tower_id).get("status") == "temporarily unreachable":
sleep(0.1)
l2.rpc.retrytower(tower_id)
# After retrying with an offline tower the pending appointments are the exact same
assert pending_appointments == [
data.get("appointment").get("locator") for data in l2.rpc.gettowerinfo(tower_id).get("pending_appointments")
]
# Now we can "turn the tower back on" and force a retry
mocked_return = "success"
l2.rpc.retrytower(tower_id)
# Give it some time to send everything
while l2.rpc.gettowerinfo(tower_id).get("pending_appointments"):
sleep(0.1)
# Check that all went trough
assert set(pending_appointments).issubset(l2.rpc.gettowerinfo(tower_id).get("appointments").keys())
def test_watchtower_no_slots(node_factory):
"""Tests sending data to tower for a user that has no available slots"""
global mocked_return
l1, l2 = node_factory.line_graph(2, opts=[{"may_fail": True, "allow_broken_log": True}, {"plugin": plugin_path}])
# Simulates there are no available slots when trying to send an appointment to the tower
mocked_return = "reject_no_slots"
# There are no pending appointments atm
assert not l2.rpc.gettowerinfo(tower_id).get("pending_appointments")
# Make a payment and the appointment should be left as pending
l1.rpc.pay(l2.rpc.invoice(25000000, "lbl3", "desc")["bolt11"])
pending_appointments = [
data.get("appointment").get("locator") for data in l2.rpc.gettowerinfo(tower_id).get("pending_appointments")
]
assert pending_appointments
# Retrying should work but appointment won't go trough
assert "Retrying tower" in l2.rpc.retrytower(tower_id)
assert pending_appointments == [
data.get("appointment").get("locator") for data in l2.rpc.gettowerinfo(tower_id).get("pending_appointments")
]
# Adding slots + retrying should work
mocked_return = "success"
l2.rpc.retrytower(tower_id)
while l2.rpc.gettowerinfo(tower_id).get("pending_appointments"):
sleep(0.1)
assert set(pending_appointments).issubset(l2.rpc.gettowerinfo(tower_id).get("appointments").keys())
def test_watchtower_invalid_appointment(node_factory):
"""Tests sending an invalid appointment to a tower"""
global mocked_return
l1, l2 = node_factory.line_graph(2, opts=[{"may_fail": True, "allow_broken_log": True}, {"plugin": plugin_path}])
# Simulates sending an appointment with invalid data to the tower
mocked_return = "reject_invalid"
# There are no invalid appointment atm
assert not l2.rpc.gettowerinfo(tower_id).get("invalid_appointments")
# Make a payment and the appointment should be flagged as invalid
l1.rpc.pay(l2.rpc.invoice(25000000, "lbl4", "desc")["bolt11"])
# The appointments have been saved as invalid
assert l2.rpc.gettowerinfo(tower_id).get("invalid_appointments")
def test_watchtower_multiple_towers(node_factory):
""" Test sending data to multiple towers at the same time"""
global mocked_return
# Create the new tower
another_tower_netaddr = "localhost"
another_tower_port = "5678"
another_tower_sk = PrivateKey()
another_tower_id = Cryptographer.get_compressed_pk(another_tower_sk.public_key)
another_tower = TowerMock(another_tower_sk)
Thread(
target=another_tower.app.run, kwargs={"host": another_tower_netaddr, "port": another_tower_port}, daemon=True
).start()
l1, l2 = node_factory.line_graph(2, opts=[{"may_fail": True, "allow_broken_log": True}, {"plugin": plugin_path}])
# Register a new tower
l2.rpc.registertower("{}@{}:{}".format(another_tower_id, another_tower_netaddr, another_tower_port))
# Make sure the tower in our list of towers
tower_ids = [tower.get("id") for tower in l2.rpc.listtowers().get("towers")]
assert another_tower_id in tower_ids
# Force a new commitment
mocked_return = "success"
l1.rpc.pay(l2.rpc.invoice(25000000, "lbl6", "desc")["bolt11"])
# Check that both towers got it
another_tower_appointments = l2.rpc.gettowerinfo(another_tower_id).get("appointments")
assert another_tower_appointments
assert not l2.rpc.gettowerinfo(another_tower_id).get("pending_appointments")
assert set(another_tower_appointments).issubset(l2.rpc.gettowerinfo(tower_id).get("appointments"))
def test_watchtower_misbehaving(node_factory):
"""Tests sending an appointment to a misbehaving tower"""
global mocked_return
l1, l2 = node_factory.line_graph(2, opts=[{"may_fail": True, "allow_broken_log": True}, {"plugin": plugin_path}])
# Simulates a tower that replies with an invalid signature
mocked_return = "misbehaving_tower"
# There is no proof of misbehaviour atm
assert not l2.rpc.gettowerinfo(tower_id).get("misbehaving_proof")
# Make a payment and the appointment make it to the tower, but the response will contain an invalid signature
l1.rpc.pay(l2.rpc.invoice(25000000, "lbl5", "desc")["bolt11"])
# The tower should have stored the proof of misbehaviour
tower_info = l2.rpc.gettowerinfo(tower_id)
assert tower_info.get("status") == "misbehaving"
assert tower_info.get("misbehaving_proof")
def test_get_appointment(node_factory):
l1, l2 = node_factory.line_graph(2, opts=[{"may_fail": True, "allow_broken_log": True}, {"plugin": plugin_path}])
local_appointments = l2.rpc.gettowerinfo(tower_id).get("appointments")
# Get should get a reply for every local appointment
for locator in local_appointments:
response = l2.rpc.getappointment(tower_id, locator)
assert response.get("locator") == locator
assert response.get("status") == "being_watched"
# Made up appointments should return a 404
rand_locator = get_random_value_hex(16)
response = l2.rpc.getappointment(tower_id, rand_locator)
assert response.get("locator") == rand_locator
assert response.get("status") == "not_found"