Files
plugins/archived/probe/probe.py
fmhoeger 92186296e2 Rename directory 'Unmaintained' back to 'archived'
Update text and links for archived plugins in README
2024-02-06 20:29:25 +00:00

297 lines
9.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""Plugin that probes the network for failed channels.
This plugin regularly performs a random probe of the network by sending a
payment to a random node in the network, with a random `payment_hash`, and
observing how the network reacts. The random `payment_hash` results in the
payments being rejected at the destination, so no funds are actually
transferred. The error messages however allow us to gather some information
about the success probability of a payment, and the stability of the channels.
The random selection of destination nodes is a worst case scenario, since it's
likely that most of the nodes in the network are leaf nodes that are not
well-connected and often offline at any point in time. Expect to see a lot of
errors about being unable to route these payments as a result of this.
The probe data is stored in a sqlite3 database for later inspection and to be
able to eventually draw pretty plots about how the network stability changes
over time. For now you can inspect the results using the `sqlite3` command
line utility:
```bash
sqlite3 ~/.lightning/probes.db "select destination, erring_channel, failcode from probes"
```
Failcode -1 and 16399 are special:
- -1 indicates that we were unable to find a route to the destination. This
usually indicates that this is a leaf node that is currently offline.
- 16399 is the code for unknown payment details and indicates a successful
probe. The destination received the incoming payment but could not find a
matching `payment_key`, which is expected since we generated the
`payment_hash` at random :-)
"""
from datetime import datetime
from pyln.client import Plugin, RpcError
from random import choice
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from time import sleep, time
import heapq
import json
import os
import random
import string
import threading
Base = declarative_base()
plugin = Plugin()
exclusions = []
temporary_exclusions = {}
class Probe(Base):
__tablename__ = "probes"
id = Column(Integer, primary_key=True)
destination = Column(String)
route = Column(String)
error = Column(String)
erring_channel = Column(String)
failcode = Column(Integer)
payment_hash = Column(String)
started_at = Column(DateTime)
finished_at = Column(DateTime)
amount = Column(Integer)
def jsdict(self):
return {
'id': self.id,
'destination': self.destination,
'amount': self.amount,
'route': self.route,
'erring_channel': self.erring_channel,
'failcode': self.failcode,
'started_at': str(self.started_at),
'finished_at': str(self.finished_at),
}
def start_probe(plugin):
t = threading.Thread(target=probe, args=[plugin, None])
t.daemon = True
t.start()
@plugin.async_method('probe')
def probe(plugin, request, node_id=None, amount=10000, **kwargs):
res = None
if node_id is None:
nodes = plugin.rpc.listnodes()['nodes']
node_id = choice(nodes)['nodeid']
s = plugin.Session()
p = Probe(
destination=node_id,
started_at=datetime.now(),
amount=amount
)
s.add(p)
try:
route = plugin.rpc.getroute(
node_id,
msatoshi=amount,
riskfactor=1,
exclude=exclusions + list(temporary_exclusions.keys())
)['route']
p.route = ','.join([r['channel'] for r in route])
p.payment_hash = ''.join(choice(string.hexdigits) for _ in range(64))
except RpcError:
p.failcode = -1
res = p.jsdict()
s.commit()
return request.set_result(res) if request else None
s.commit()
plugin.rpc.sendpay(route, p.payment_hash)
plugin.pending_probes.append({
'request': request,
'probe_id': p.id,
'payment_hash': p.payment_hash,
'callback': complete_probe,
'plugin': plugin,
})
@plugin.method('traceroute')
def traceroute(plugin, node_id, **kwargs):
traceroute = {
'destination': node_id,
'started_at': str(datetime.now()),
'probes': [],
}
try:
traceroute['route'] = plugin.rpc.getroute(
traceroute['destination'],
msatoshi=10000,
riskfactor=1,
)['route']
traceroute['payment_hash'] = ''.join(random.choice(string.hexdigits) for _ in range(64))
except RpcError:
traceroute['failcode'] = -1
return traceroute
# For each prefix length, shorten the route and attempt the payment
for i in range(1, len(traceroute['route']) + 1):
probe = {
'route': traceroute['route'][:i],
'payment_hash': ''.join(random.choice(string.hexdigits) for _ in range(64)),
'started_at': str(datetime.now()),
}
probe['destination'] = probe['route'][-1]['id']
plugin.rpc.sendpay(probe['route'], probe['payment_hash'])
try:
plugin.rpc.waitsendpay(probe['payment_hash'], timeout=30)
raise ValueError("The recipient guessed the preimage? Cryptography is broken!!!")
except RpcError as e:
probe['finished_at'] = str(datetime.now())
if e.error['code'] == 200:
probe['error'] = "Timeout"
break
else:
probe['error'] = e.error['data']
probe['failcode'] = e.error['data']['failcode']
traceroute['probes'].append(probe)
return traceroute
@plugin.method('probe-stats')
def stats(plugin):
return {
'pending_probes': len(plugin.pending_probes),
'exclusions': len(exclusions),
'temporary_exclusions': len(temporary_exclusions),
}
def complete_probe(plugin, request, probe_id, payment_hash):
s = plugin.Session()
p = s.query(Probe).get(probe_id)
try:
plugin.rpc.waitsendpay(p.payment_hash)
except RpcError as e:
error = e.error['data']
p.erring_channel = e.error['data']['erring_channel']
p.failcode = e.error['data']['failcode']
p.error = json.dumps(error)
if p.failcode in [16392, 16394]:
exclusion = "{erring_channel}/{erring_direction}".format(**error)
print('Adding exclusion for channel {} ({} total))'.format(
exclusion, len(exclusions))
)
exclusions.append(exclusion)
if p.failcode in [21, 4103]:
exclusion = "{erring_channel}/{erring_direction}".format(**error)
print('Adding temporary exclusion for channel {} ({} total))'.format(
exclusion, len(temporary_exclusions))
)
expiry = time() + plugin.probe_exclusion_duration
temporary_exclusions[exclusion] = expiry
p.finished_at = datetime.now()
res = p.jsdict()
s.commit()
s.close()
request.set_result(res)
def poll_payments(plugin):
"""Iterate through all probes and complete the finalized ones.
"""
for probe in plugin.pending_probes:
p = plugin.rpc.listsendpays(None, payment_hash=probe['payment_hash'])
if p['payments'][0]['status'] == 'pending':
continue
plugin.pending_probes.remove(probe)
cb = probe['callback']
del probe['callback']
cb(**probe)
def clear_temporary_exclusion(plugin):
timed_out = [k for k, v in temporary_exclusions.items() if v < time()]
for k in timed_out:
del temporary_exclusions[k]
print("Removed {}/{} temporary exclusions.".format(
len(timed_out), len(temporary_exclusions))
)
def schedule(plugin):
# List of scheduled calls with next runtime, function and interval
next_runs = [
(time() + 300, clear_temporary_exclusion, 300),
(time() + plugin.probe_interval, start_probe, plugin.probe_interval),
(time() + 1, poll_payments, 1),
]
heapq.heapify(next_runs)
while True:
n = heapq.heappop(next_runs)
t = n[0] - time()
if t > 0:
sleep(t)
# Call the function
n[1](plugin)
# Schedule the next run
heapq.heappush(next_runs, (time() + n[2], n[1], n[2]))
@plugin.init()
def init(configuration, options, plugin):
plugin.probe_interval = int(options['probe-interval'])
plugin.probe_exclusion_duration = int(options['probe-exclusion-duration'])
db_filename = 'sqlite:///' + os.path.join(
configuration['lightning-dir'],
'probes.db'
)
engine = create_engine(db_filename, echo=True)
Base.metadata.create_all(engine)
plugin.Session = sessionmaker()
plugin.Session.configure(bind=engine)
t = threading.Thread(target=schedule, args=[plugin])
t.daemon = True
t.start()
# Probes that are still pending and need to be checked against.
plugin.pending_probes = []
plugin.add_option(
'probe-interval',
'3600',
'How many seconds should we wait between probes?'
)
plugin.add_option(
'probe-exclusion-duration',
'1800',
'How many seconds should temporarily failed channels be excluded?'
)
plugin.run()