Move jitrebalance, feeadjuster, noise to Unmaintained directory

This commit is contained in:
fmhoeger
2024-02-03 21:06:41 -06:00
committed by mergify[bot]
parent 7bf1ab844b
commit 08c2c24e46
19 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,24 @@
# Fee Adjuster
This plugin dynamically adjusts fees according to channel balances. The default behaviour is to automatically adjust fees at startup
and following forwarding events. There is a threshold for balance deltas that must be crossed before an update is triggered. It can
also set the max htlc for channels according to available liquidity. This may reduce transaction failures but it will also reveal
information about what the current channel balance is.
## Options
- `feeadjuster-deactivate-fuzz` boolean (default `False`) deactivates update threshold randomization and hysterisis
- `feeadjuster-deactivate-fee-update` boolean (default `False`) deactivates automatic fee updates for forward events
- `feeadjuster-threshold` default 0.05 - Relative channel balance delta at which to trigger an update. Default 0.05 means 5%. Note: it's also fuzzed by 1.5%.
- `feeadjuster-threshold-abs` default 0.001btc - Absolute channel balance delta at which to always trigger an update. Note: it's also fuzzed by 1.5%.
- `feeadjuster-enough-liquidity` default 0msat (turned off) - Beyond this liquidity do not adjust fees.
This also modifies the fee curve to achieve having this amount of liquidity.
- `feeadjuster-adjustment-method` Adjustment method to calculate channel fee. Can be 'default', 'soft' for less difference or 'hard' for higher difference.
- `feeadjuster-imbalance` default 0.5 (always acts) - Ratio at which channel imbalance the feeadjuster should start acting. Set higher or lower values to
limit feeadjuster's activity to more imbalanced channels. E.g. 0.3 for '70/30'% or 0.6 for '40/60'%.
- `feeadjuster-feestrategy` Sets the per channel fee selection strategy. Can be 'global' (default) to use global config or default values, or 'median' to use
the median fees from peers of peer.
- `feeadjuster-median-multiplier` Sets the factor with which the median fee is multiplied if using the fee strategy
'median'. This allows over- or underbidding other nodes by a constant factor (default: 1.0).
- `feeadjuster-max-htlc-steps` Default 0 (turned off). Sets the number of max htlc adjustment steps. If our local channel balance drops below a step level
it will reduce the max htlc to that level, which can reduce local routing channel failures. A value of 0 disables the stepping.

View File

@@ -0,0 +1,23 @@
import re
def cln_parse_rpcversion(string):
"""
Parse cln version string to determine RPC version.
cln switched from 'semver' alike `major.minor.sub[rcX][-mod]`
to ubuntu style with version 22.11 `yy.mm[.patch][-mod]`
make sure we can read all of them for (the next 80 years).
"""
rpcversion = string
if rpcversion.startswith('v'): # strip leading 'v'
rpcversion = rpcversion[1:]
if rpcversion.find('-') != -1: # strip mods
rpcversion = rpcversion[:rpcversion.find('-')]
if re.search('.*(rc[\\d]*)$', rpcversion): # strip release candidates
rpcversion = rpcversion[:rpcversion.find('rc')]
if rpcversion.count('.') == 1: # imply patch version 0 if not given
rpcversion = rpcversion + '.0'
# split and convert numeric string parts to actual integers
return list(map(int, rpcversion.split('.')))

View File

@@ -0,0 +1,443 @@
#!/usr/bin/env python3
import random
import statistics
import time
import math
from clnutils import cln_parse_rpcversion
from pyln.client import Plugin, Millisatoshi, RpcError
from threading import Lock
plugin = Plugin()
# Our amount and the total amount in each of our channel, indexed by scid
plugin.adj_balances = {}
# Cache to avoid loads of RPC calls
plugin.our_node_id = None
plugin.peerchannels = None
plugin.channels = None
# Users can configure this
plugin.update_threshold = 0.05
# forward_event must wait for init
plugin.mutex = Lock()
plugin.mutex.acquire()
def get_adjusted_percentage(plugin: Plugin, scid: str):
"""
For big channels, there may be a wide range where the liquidity is just okay.
Note: if big_enough_liquidity is greater than {total} * 2
then percentage is actually {our} / {total}, as it was before
"""
channel = plugin.adj_balances[scid]
if plugin.big_enough_liquidity == Millisatoshi(0):
return channel["our"] / channel["total"]
min_liquidity = min(channel["total"] / 2, int(plugin.big_enough_liquidity))
theirs = channel["total"] - channel["our"]
if channel["our"] >= min_liquidity and theirs >= min_liquidity:
# the liquidity is just okay
return 0.5
if channel["our"] < min_liquidity:
# our liquidity is too low
return channel["our"] / min_liquidity / 2
# their liquidity is too low
return (min_liquidity - theirs) / min_liquidity / 2 + 0.5
def get_ratio_soft(our_percentage):
"""
Basic algorithm: lesser difference than default
"""
return 10**(0.5 - our_percentage)
def get_ratio(our_percentage):
"""
Basic algorithm: the farther we are from the optimal case, the more we
bump/lower.
"""
return 50**(0.5 - our_percentage)
def get_ratio_hard(our_percentage):
"""
Return value is between 0 and 20: 0 -> 20; 0.5 -> 1; 1 -> 0
"""
return 100**(0.5 - our_percentage) * (1 - our_percentage) * 2
def get_peerchannels(plugin: Plugin):
""" Helper to reconstruct `listpeerchannels` for older CLN versions """
# first the good case
if plugin.rpcversion[0] > 23 or plugin.rpcversion[0] == 23 and plugin.rpcversion[1] >= 2:
return plugin.rpc.listpeerchannels()["channels"]
# now the workaround
channels = []
peers = plugin.rpc.listpeers()['peers']
for peer in peers:
newchans = peer['channels']
for ch in newchans:
ch['peer_id'] = peer['id'] # all we need is to set the 'peer_id'
channels.extend(newchans)
return channels
def get_peer_id_for_scid(plugin: Plugin, scid: str):
for ch in plugin.peerchannels:
if ch.get('short_channel_id') == scid:
return ch['peer_id']
return None
def get_peerchannel(plugin: Plugin, scid: str):
for ch in plugin.peerchannels:
if ch.get("short_channel_id") == scid:
return ch
return None
def get_chan_fees(plugin: Plugin, scid: str):
channel = get_peerchannel(plugin, scid)
assert channel is not None
return {"base": channel["fee_base_msat"], "ppm": channel["fee_proportional_millionths"]}
def get_fees_global(plugin: Plugin, scid: str):
return {"base": plugin.adj_basefee, "ppm": plugin.adj_ppmfee}
def get_fees_median(plugin: Plugin, scid: str):
""" Median fees from peers or peer.
The assumption is that our node competes in fees to other peers of a peer.
"""
peer_id = get_peer_id_for_scid(plugin, scid)
assert peer_id is not None
if plugin.listchannels_by_dst:
plugin.channels = plugin.rpc.call("listchannels",
{"destination": peer_id})['channels']
channels_to_peer = [ch for ch in plugin.channels
if ch['destination'] == peer_id
and ch['source'] != plugin.our_node_id]
if len(channels_to_peer) == 0:
return None
# fees > ~5000 (base and ppm) are currently about top 2% of network fee extremists
fees_ppm = [ch['fee_per_millionth'] for ch in channels_to_peer if 0 < ch['fee_per_millionth'] < 5000]
fees_base = [ch['base_fee_millisatoshi'] for ch in channels_to_peer if 0 < ch['base_fee_millisatoshi'] < 5000]
# if lists are emtpy use default values, otherwise statistics.median will fail.
if len(fees_ppm) == 0:
fees_ppm = [int(plugin.adj_ppmfee / plugin.median_multiplier)]
if len(fees_base) == 0:
fees_base = [int(plugin.adj_basefee / plugin.median_multiplier)]
return {"base": statistics.median(fees_base) * plugin.median_multiplier,
"ppm": statistics.median(fees_ppm) * plugin.median_multiplier}
def setchannelfee(plugin: Plugin, scid: str, base: int, ppm: int, min_htlc: int = None, max_htlc: int = None):
fees = get_chan_fees(plugin, scid)
if fees is None or base == fees['base'] and ppm == fees['ppm']:
return False
try:
plugin.rpc.setchannel(scid, base, ppm, min_htlc, max_htlc)
return True
except RpcError as e:
plugin.log(f"Could not adjust fees for channel {scid}: '{e}'", level="error")
return False
def significant_update(plugin: Plugin, scid: str):
channel = plugin.adj_balances[scid]
last_liquidity = channel.get("last_liquidity")
if last_liquidity is None:
return True
# Only update on substantial balance moves to avoid flooding, and add
# some pseudo-randomness to avoid too easy channel balance probing
update_threshold = plugin.update_threshold
update_threshold_abs = int(plugin.update_threshold_abs)
if not plugin.deactivate_fuzz:
update_threshold += random.uniform(-0.015, 0.015)
update_threshold_abs += update_threshold_abs * random.uniform(-0.015, 0.015)
last_percentage = last_liquidity / channel["total"]
percentage = channel["our"] / channel["total"]
if (abs(last_percentage - percentage) > update_threshold
or abs(last_liquidity - channel["our"]) > update_threshold_abs):
return True
return False
def maybe_adjust_fees(plugin: Plugin, scids: list):
channels_adjusted = 0
for scid in scids:
our = plugin.adj_balances[scid]["our"]
total = plugin.adj_balances[scid]["total"]
percentage = our / total
base = int(plugin.adj_basefee)
ppm = int(plugin.adj_ppmfee)
# select ideal values per channel
fees = plugin.fee_strategy(plugin, scid)
if fees is not None:
base = int(fees['base'])
ppm = int(fees['ppm'])
# reset to normal fees if imbalance is not high enough
if (percentage > plugin.imbalance and percentage < 1 - plugin.imbalance):
if setchannelfee(plugin, scid, base, ppm):
plugin.log(f"Set default fees as imbalance is too low for {scid}: ppm {ppm} base {base}msat")
plugin.adj_balances[scid]["last_liquidity"] = our
channels_adjusted += 1
continue
if not significant_update(plugin, scid):
continue
percentage = get_adjusted_percentage(plugin, scid)
assert 0 <= percentage and percentage <= 1
ratio = plugin.get_ratio(percentage)
if plugin.max_htlc_steps >= 1:
max_htlc = int(total * math.ceil(plugin.max_htlc_steps * percentage) / plugin.max_htlc_steps)
else:
max_htlc = None
if setchannelfee(plugin, scid, base, int(ppm * ratio), None, max_htlc):
plugin.log(f"Adjusted fees of {scid} with a ratio of {ratio}: ppm {int(ppm * ratio)} base {base}msat max_htlc {max_htlc}")
plugin.adj_balances[scid]["last_liquidity"] = our
channels_adjusted += 1
return channels_adjusted
def maybe_add_new_balances(plugin: Plugin, scids: list):
for scid in scids:
if scid not in plugin.adj_balances:
chan = get_peerchannel(plugin, scid)
assert chan is not None
plugin.adj_balances[scid] = {
"our": int(chan["to_us_msat"]),
"total": int(chan["total_msat"])
}
@plugin.subscribe("forward_event")
def forward_event(plugin: Plugin, forward_event: dict, **kwargs):
if not plugin.forward_event_subscription:
return
plugin.mutex.acquire(blocking=True)
plugin.peerchannels = get_peerchannels(plugin)
if plugin.fee_strategy == get_fees_median and not plugin.listchannels_by_dst:
plugin.channels = plugin.rpc.listchannels()['channels']
if forward_event["status"] == "settled":
in_scid = forward_event["in_channel"]
out_scid = forward_event["out_channel"]
maybe_add_new_balances(plugin, [in_scid, out_scid])
if plugin.rpcversion[0] == 0 and plugin.rpcversion[1] < 12:
plugin.adj_balances[in_scid]["our"] += int(Millisatoshi(forward_event["in_msatoshi"]))
plugin.adj_balances[out_scid]["our"] -= int(Millisatoshi(forward_event["out_msatoshi"]))
else:
plugin.adj_balances[in_scid]["our"] += int(Millisatoshi(forward_event["in_msat"]))
plugin.adj_balances[out_scid]["our"] -= int(Millisatoshi(forward_event["out_msat"]))
try:
# Pseudo-randomly add some hysterisis to the update
if not plugin.deactivate_fuzz and random.randint(0, 9) == 9:
time.sleep(random.randint(0, 5))
maybe_adjust_fees(plugin, [in_scid, out_scid])
except Exception as e:
plugin.log("Adjusting fees: " + str(e), level="error")
plugin.mutex.release()
@plugin.method("feeadjust")
def feeadjust(plugin: Plugin, scid: str = None):
"""Adjust fees for all channels (default) or just a given `scid`.
This method is automatically called in plugin init, or can be called manually after a successful payment.
Otherwise, the plugin keeps the fees up-to-date.
To stop setting the channels with a list of nodes place a file called `feeadjuster-exclude.list` in the
lightningd data directory with a simple line-by-line list of pubkeys.
"""
plugin.mutex.acquire(blocking=True)
plugin.peerchannels = get_peerchannels(plugin)
if plugin.fee_strategy == get_fees_median and not plugin.listchannels_by_dst:
plugin.channels = plugin.rpc.listchannels()['channels']
channels_adjusted = 0
try:
with open('feeadjuster-exclude.list') as file:
exclude_list = [l.rstrip("\n") for l in file]
print("Excluding the channels with the nodes:", exclude_list)
except FileNotFoundError:
exclude_list = []
print("There is no feeadjuster-exclude.list given, applying the options to the channels with all peers.")
for chan in plugin.peerchannels:
if chan["peer_id"] in exclude_list:
continue
if chan["state"] == "CHANNELD_NORMAL":
_scid = chan.get("short_channel_id")
if scid is not None and scid != _scid:
continue
plugin.adj_balances[_scid] = {
"our": int(chan["to_us_msat"]),
"total": int(chan["total_msat"])
}
channels_adjusted += maybe_adjust_fees(plugin, [_scid])
msg = f"{channels_adjusted} channel(s) adjusted"
plugin.log(msg)
plugin.mutex.release()
return msg
@plugin.method("feeadjuster-toggle")
def feeadjuster_toggle(plugin: Plugin, value: bool = None):
"""Activates/Deactivates automatic fee updates for forward events.
The status will be set to value.
"""
msg = {"forward_event_subscription": {"previous": plugin.forward_event_subscription}}
if value is None:
plugin.forward_event_subscription = not plugin.forward_event_subscription
else:
plugin.forward_event_subscription = bool(value)
msg["forward_event_subscription"]["current"] = plugin.forward_event_subscription
return msg
@plugin.init()
def init(options: dict, configuration: dict, plugin: Plugin, **kwargs):
# do all the stuff that needs to be done just once ...
plugin.getinfo = plugin.rpc.getinfo()
plugin.rpcversion = cln_parse_rpcversion(plugin.getinfo.get('version'))
plugin.our_node_id = plugin.getinfo["id"]
plugin.deactivate_fuzz = options.get("feeadjuster-deactivate-fuzz")
plugin.forward_event_subscription = not options.get("feeadjuster-deactivate-fee-update")
plugin.update_threshold = float(options.get("feeadjuster-threshold"))
plugin.update_threshold_abs = Millisatoshi(options.get("feeadjuster-threshold-abs"))
plugin.big_enough_liquidity = Millisatoshi(options.get("feeadjuster-enough-liquidity"))
plugin.imbalance = float(options.get("feeadjuster-imbalance"))
plugin.max_htlc_steps = int(options.get("feeadjuster-max-htlc-steps"))
adjustment_switch = {
"soft": get_ratio_soft,
"hard": get_ratio_hard,
"default": get_ratio
}
plugin.get_ratio = adjustment_switch.get(options.get("feeadjuster-adjustment-method"), get_ratio)
fee_strategy_switch = {
"global": get_fees_global,
"median": get_fees_median
}
plugin.fee_strategy = fee_strategy_switch.get(options.get("feeadjuster-feestrategy"), get_fees_global)
plugin.median_multiplier = float(options.get("feeadjuster-median-multiplier"))
config = plugin.rpc.listconfigs()
plugin.adj_basefee = config["fee-base"]
plugin.adj_ppmfee = config["fee-per-satoshi"]
# normalize the imbalance percentage value to 0%-50%
if plugin.imbalance < 0 or plugin.imbalance > 1:
raise ValueError("feeadjuster-imbalance must be between 0 and 1.")
if plugin.imbalance > 0.5:
plugin.imbalance = 1 - plugin.imbalance
# detect if server supports the new listchannels by `destination` (#4614)
plugin.listchannels_by_dst = False
rpchelp = plugin.rpc.help().get('help')
if len([c for c in rpchelp if c["command"].startswith("listchannels ")
and "destination" in c["command"]]) == 1:
plugin.listchannels_by_dst = True
# Detect if server supports new 'setchannel' command over setchannelfee.
# If not, make plugin.rpc.setchannel a 'symlink' to setchannelfee
if len([c for c in rpchelp if c["command"].startswith("setchannel ")]) == 0:
plugin.rpc.setchannel = plugin.rpc.setchannelfee
plugin.log(f"Plugin feeadjuster initialized "
f"({plugin.adj_basefee} base / {plugin.adj_ppmfee} ppm) with an "
f"imbalance of {int(100 * plugin.imbalance)}%/{int(100 * ( 1 - plugin.imbalance))}%, "
f"update_threshold: {int(100 * plugin.update_threshold)}%, "
f"update_threshold_abs: {plugin.update_threshold_abs}, "
f"enough_liquidity: {plugin.big_enough_liquidity}, "
f"deactivate_fuzz: {plugin.deactivate_fuzz}, "
f"forward_event_subscription: {plugin.forward_event_subscription}, "
f"adjustment_method: {plugin.get_ratio.__name__}, "
f"fee_strategy: {plugin.fee_strategy.__name__}, "
f"listchannels_by_dst: {plugin.listchannels_by_dst},"
f"max_htlc_steps: {plugin.max_htlc_steps}")
plugin.mutex.release()
feeadjust(plugin)
plugin.add_option(
"feeadjuster-deactivate-fuzz",
False,
"Deactivate update threshold randomization and hysterisis.",
"flag"
)
plugin.add_option(
"feeadjuster-deactivate-fee-update",
False,
"Deactivate automatic fee updates for forward events.",
"flag"
)
plugin.add_option(
"feeadjuster-threshold",
"0.05",
"Relative channel balance delta at which to trigger an update. Default 0.05 means 5%. "
"Note: it's also fuzzed by 1.5%",
"string"
)
plugin.add_option(
"feeadjuster-threshold-abs",
"0.001btc",
"Absolute channel balance delta at which to always trigger an update. "
"Note: it's also fuzzed by 1.5%",
"string"
)
plugin.add_option(
"feeadjuster-enough-liquidity",
"0msat",
"Beyond this liquidity do not adjust fees. "
"This also modifies the fee curve to achieve having this amount of liquidity. "
"Default: '0msat' (turned off).",
"string"
)
plugin.add_option(
"feeadjuster-adjustment-method",
"default",
"Adjustment method to calculate channel fee"
"Can be 'default', 'soft' for less difference or 'hard' for higher difference"
"string"
)
plugin.add_option(
"feeadjuster-imbalance",
"0.5",
"Ratio at which channel imbalance the feeadjuster should start acting. "
"Default: 0.5 (always). Set higher or lower values to limit feeadjuster's "
"activity to more imbalanced channels. "
"E.g. 0.3 for '70/30'% or 0.6 for '40/60'%.",
"string"
)
plugin.add_option(
"feeadjuster-feestrategy",
"global",
"Sets the per channel fee selection strategy. "
"Can be 'global' to use global config or default values, "
"or 'median' to use the median fees from peers of peer "
"Default: 'global'.",
"string"
)
plugin.add_option(
"feeadjuster-median-multiplier",
"1.0",
"Sets the factor with which the median fee is multiplied if using the fee strategy 'median'. "
"This allows over or underbidding other nodes by a constant factor"
"Default: '1.0'.",
"string"
)
plugin.add_option(
"feeadjuster-max-htlc-steps",
"0",
"Sets the number of max htlc adjustment steps. "
"This will reduce the max htlc according to available "
"liquidity, which can reduce local routing channel failures."
"A value of 0 disables the stepping.",
"string"
)
plugin.run()

View File

@@ -0,0 +1 @@
pyln-client>=0.12

View File

@@ -0,0 +1,43 @@
from clnutils import cln_parse_rpcversion
def test_rpcversion():
foo = cln_parse_rpcversion("0.11.2")
assert(foo[0] == 0)
assert(foo[1] == 11)
assert(foo[2] == 2)
foo = cln_parse_rpcversion("0.11.2rc2-modded")
assert(foo[0] == 0)
assert(foo[1] == 11)
assert(foo[2] == 2)
foo = cln_parse_rpcversion("22.11")
assert(foo[0] == 22)
assert(foo[1] == 11)
assert(foo[2] == 0)
foo = cln_parse_rpcversion("22.11rc1")
assert(foo[0] == 22)
assert(foo[1] == 11)
assert(foo[2] == 0)
foo = cln_parse_rpcversion("22.11rc1-modded")
assert(foo[0] == 22)
assert(foo[1] == 11)
assert(foo[2] == 0)
foo = cln_parse_rpcversion("22.11-modded")
assert(foo[0] == 22)
assert(foo[1] == 11)
assert(foo[2] == 0)
foo = cln_parse_rpcversion("22.11.0")
assert(foo[0] == 22)
assert(foo[1] == 11)
assert(foo[2] == 0)
foo = cln_parse_rpcversion("22.11.1")
assert(foo[0] == 22)
assert(foo[1] == 11)
assert(foo[2] == 1)

View File

@@ -0,0 +1,331 @@
import os
import random
import string
import unittest
from pyln.testing.fixtures import * # noqa: F401,F403
from pyln.testing.utils import wait_for
plugin_path = os.path.join(os.path.dirname(__file__), "feeadjuster.py")
def test_feeadjuster_starts(node_factory):
l1 = node_factory.get_node()
# Test dynamically
l1.rpc.plugin_start(plugin_path)
l1.daemon.wait_for_log("Plugin feeadjuster initialized.*")
l1.rpc.plugin_stop(plugin_path)
l1.rpc.plugin_start(plugin_path)
l1.daemon.wait_for_log("Plugin feeadjuster initialized.*")
l1.stop()
# Then statically
l1.daemon.opts["plugin"] = plugin_path
l1.start()
# Start at 0 and 're-await' the two inits above. Otherwise this is flaky.
l1.daemon.logsearch_start = 0
l1.daemon.wait_for_logs(["Plugin feeadjuster initialized.*",
"Plugin feeadjuster initialized.*",
"Plugin feeadjuster initialized.*"])
l1.rpc.plugin_stop(plugin_path)
# We adjust fees in init
l1, l2, l3 = node_factory.line_graph(3, wait_for_announce=True)
scid_A = l2.rpc.listpeerchannels(l1.info["id"])["channels"][0]["short_channel_id"]
scid_B = l2.rpc.listpeerchannels(l3.info["id"])["channels"][0]["short_channel_id"]
l2.rpc.plugin_start(plugin_path)
l2.daemon.wait_for_logs([f"Adjusted fees of {scid_A}.*",
f"Adjusted fees of {scid_B}.*"])
def get_chan_fees(l, scid):
for half in l.rpc.listchannels(scid)["channels"]:
if l.info["id"] == half["source"]:
return (half["base_fee_millisatoshi"], half["fee_per_millionth"])
def wait_for_fees(l, scids, fees):
for scid in scids:
wait_for(lambda: get_chan_fees(l, scid) == fees)
def wait_for_not_fees(l, scids, fees):
for scid in scids:
wait_for(lambda: not get_chan_fees(l, scid) == fees)
def pay(l, ll, amount):
label = ''.join(random.choices(string.ascii_letters, k=20))
invoice = ll.rpc.invoice(amount, label, "desc")
route = l.rpc.getroute(ll.info["id"], amount, riskfactor=0, fuzzpercent=0)
l.rpc.sendpay(route["route"], invoice["payment_hash"], payment_secret=invoice.get('payment_secret'))
l.rpc.waitsendpay(invoice["payment_hash"])
def sync_gossip(nodes, scids):
node = nodes[0]
nodes = nodes[1:]
for scid in scids:
for n in nodes:
wait_for(lambda: node.rpc.listchannels(scid) == n.rpc.listchannels(scid))
def test_feeadjuster_adjusts(node_factory):
"""
A rather simple network:
A B
l1 <========> l2 <=========> l3
l2 will adjust its configuration-set base and proportional fees for
channels A and B as l1 and l3 exchange payments.
"""
base_fee = 5000
ppm_fee = 300
l2_opts = {
"fee-base": base_fee,
"fee-per-satoshi": ppm_fee,
"plugin": plugin_path,
"feeadjuster-deactivate-fuzz": None,
}
l1, l2, l3 = node_factory.line_graph(3, opts=[{}, l2_opts, {}],
wait_for_announce=True)
chan_A = l2.rpc.listpeerchannels(l1.info["id"])["channels"][0]
chan_B = l2.rpc.listpeerchannels(l3.info["id"])["channels"][0]
scid_A = chan_A["short_channel_id"]
scid_B = chan_B["short_channel_id"]
nodes = [l1, l2, l3]
scids = [scid_A, scid_B]
# Fees don't get updated until there is a forwarding event!
assert all([get_chan_fees(l2, scid) == (base_fee, ppm_fee)
for scid in scids])
chan_total = int(chan_A["total_msat"])
assert chan_total == int(chan_B["total_msat"])
# The first payment will trigger fee adjustment, no matter its value
amount = int(chan_total * 0.04)
pay(l1, l3, amount)
wait_for(lambda: all([get_chan_fees(l2, scid) != (base_fee, ppm_fee)
for scid in scids]))
# Send most of the balance to the other side..
amount = int(chan_total * 0.8)
pay(l1, l3, amount)
l2.daemon.wait_for_logs([f'Adjusted fees of {scid_A} with a ratio of 0.2',
f'Adjusted fees of {scid_B} with a ratio of 3.'])
# ..And back
sync_gossip(nodes, scids)
pay(l3, l1, amount)
l2.daemon.wait_for_logs([f'Adjusted fees of {scid_A} with a ratio of 6.',
f'Adjusted fees of {scid_B} with a ratio of 0.1'])
# Sending a payment worth 3% of the channel balance should not trigger
# fee adjustment
sync_gossip(nodes, scids)
fees_before = [get_chan_fees(l2, scid) for scid in [scid_A, scid_B]]
amount = int(chan_total * 0.03)
pay(l1, l3, amount)
sync_gossip(nodes, scids)
assert fees_before == [get_chan_fees(l2, scid) for scid in scids]
# But sending another 3%-worth payment does trigger adjustment (total sent
# since last adjustment is >5%)
pay(l1, l3, amount)
l2.daemon.wait_for_logs([f'Adjusted fees of {scid_A} with a ratio of 4.',
f'Adjusted fees of {scid_B} with a ratio of 0.2'])
def test_feeadjuster_imbalance(node_factory):
"""
A rather simple network:
A B
l1 <========> l2 <=========> l3
l2 will adjust its configuration-set base and proportional fees for
channels A and B as l1 and l3 exchange payments.
"""
base_fee = 5000
ppm_fee = 300
l2_opts = {
"fee-base": base_fee,
"fee-per-satoshi": ppm_fee,
"plugin": plugin_path,
"feeadjuster-deactivate-fuzz": None,
"feeadjuster-imbalance": 0.7, # should be normalized to 30/70
}
l1, l2, l3 = node_factory.line_graph(3, opts=[{}, l2_opts, {}],
wait_for_announce=True)
chan_A = l2.rpc.listpeerchannels(l1.info["id"])["channels"][0]
chan_B = l2.rpc.listpeerchannels(l3.info["id"])["channels"][0]
scid_A = chan_A["short_channel_id"]
scid_B = chan_B["short_channel_id"]
scids = [scid_A, scid_B]
default_fees = [(base_fee, ppm_fee), (base_fee, ppm_fee)]
chan_total = int(chan_A["total_msat"])
assert chan_total == int(chan_B["total_msat"])
l2.daemon.logsearch_start = 0
l2.daemon.wait_for_log('imbalance of 30%/70%')
# we force feeadjust initially to test this method and check if it applies
# default fees when balancing the channel below
l2.rpc.feeadjust()
l2.daemon.wait_for_logs([
f"Adjusted fees.*{scid_A}",
f"Adjusted fees.*{scid_B}"
])
log_offset = len(l2.daemon.logs)
wait_for_not_fees(l2, scids, default_fees[0])
# First bring channel to somewhat of a balance
amount = int(chan_total * 0.5)
pay(l1, l3, amount)
l2.daemon.wait_for_logs([
f'Set default fees as imbalance is too low for {scid_A}',
f'Set default fees as imbalance is too low for {scid_B}'
])
wait_for_fees(l2, scids, default_fees[0])
# Because of the 70/30 imbalance limiter, a 15% payment must not yet trigger
# 50% + 15% = 65% .. which is < 70%
amount = int(chan_total * 0.15)
pay(l1, l3, amount)
assert not l2.daemon.is_in_log("Adjusted fees", log_offset)
# Sending another 20% must now trigger because the imbalance
pay(l1, l3, amount)
l2.daemon.wait_for_logs([
f"Adjusted fees.*{scid_A}",
f"Adjusted fees.*{scid_B}"
])
wait_for_not_fees(l2, scids, default_fees[0])
# Bringing it back must cause default fees
pay(l3, l1, amount)
l2.daemon.wait_for_logs([
f'Set default fees as imbalance is too low for {scid_A}',
f'Set default fees as imbalance is too low for {scid_B}'
])
wait_for_fees(l2, scids, default_fees[0])
def test_feeadjuster_big_enough_liquidity(node_factory):
"""
A rather simple network:
A B
l1 <========> l2 <=========> l3
l2 will adjust its configuration-set base and proportional fees for
channels A and B as l1 and l3 exchange payments.
"""
base_fee = 5000
ppm_fee = 300
l2_opts = {
"fee-base": base_fee,
"fee-per-satoshi": ppm_fee,
"plugin": plugin_path,
"feeadjuster-deactivate-fuzz": None,
"feeadjuster-imbalance": 0.5,
"feeadjuster-enough-liquidity": "0.001btc",
"feeadjuster-threshold-abs": "0.0001btc",
}
# channels' size: 0.01btc
# between 0.001btc and 0.009btc the liquidity is big enough
l1, l2, l3 = node_factory.line_graph(3, fundamount=10**6, opts=[{}, l2_opts, {}],
wait_for_announce=True)
chan_A = l2.rpc.listpeerchannels(l1.info["id"])["channels"][0]
chan_B = l2.rpc.listpeerchannels(l3.info["id"])["channels"][0]
scid_A = chan_A["short_channel_id"]
scid_B = chan_B["short_channel_id"]
scids = [scid_A, scid_B]
default_fees = [(base_fee, ppm_fee), (base_fee, ppm_fee)]
chan_total = int(chan_A["total_msat"])
assert chan_total == int(chan_B["total_msat"])
l2.daemon.logsearch_start = 0
l2.daemon.wait_for_log('enough_liquidity: 100000000msat')
# we force feeadjust initially to test this method and check if it applies
# default fees when balancing the channel below
l2.rpc.feeadjust()
l2.daemon.wait_for_logs([
f"Adjusted fees.*{scid_A}",
f"Adjusted fees.*{scid_B}"
])
wait_for_not_fees(l2, scids, default_fees[0])
# Bring channels to beyond big enough liquidity with 0.003btc
amount = 300000000
pay(l1, l3, amount)
l2.daemon.wait_for_logs([
f"Adjusted fees of {scid_A} with a ratio of 1.0",
f"Adjusted fees of {scid_B} with a ratio of 1.0"
])
log_offset = len(l2.daemon.logs)
wait_for_fees(l2, scids, default_fees[0])
# Let's move another 0.003btc -> the channels will be at 0.006btc
amount = 300000000
pay(l1, l3, amount)
l2.wait_for_htlcs()
assert not l2.daemon.is_in_log("Adjusted fees", log_offset)
# Sending another 0.0033btc will result in a channel balance of 0.0093btc
# It must trigger because the remaining liquidity is not big enough
amount = 330000000
pay(l1, l3, amount)
l2.daemon.wait_for_logs([
f"Adjusted fees.*{scid_A}",
f"Adjusted fees.*{scid_B}"
])
wait_for_not_fees(l2, scids, default_fees[0])
def test_feeadjuster_median(node_factory):
"""
A rather simple network:
a b c
l1 <=======> l2 <=======> l3 <=======> l4
l2 will adjust its configuration-set base and proportional fees for
channels A and B as l1 and l3 exchange payments.
l4 is needed so l2 can make a median peers-of-peer calculation on l3.
"""
opts = {
"fee-base": 1337,
"fee-per-satoshi": 42,
}
l2_opts = {
"fee-base": 1000,
"fee-per-satoshi": 100,
"plugin": plugin_path,
"feeadjuster-deactivate-fuzz": None,
"feeadjuster-imbalance": 0.5,
"feeadjuster-feestrategy": "median"
}
l1, l2, l3, _ = node_factory.line_graph(4, opts=[opts, l2_opts, opts, opts],
wait_for_announce=True)
scid_a = l2.rpc.listpeerchannels(l1.info["id"])["channels"][0]["short_channel_id"]
scid_b = l2.rpc.listpeerchannels(l3.info["id"])["channels"][0]["short_channel_id"]
# we do a manual feeadjust
l2.rpc.feeadjust()
l2.daemon.wait_for_logs([
f"Adjusted fees.*{scid_a}",
f"Adjusted fees.*{scid_b}"
])
# since there is only l4 with channel c towards l3, l2 should take that value
chan_b = l2.rpc.listpeerchannels(l3.info['id'])['channels'][0]
assert chan_b['fee_base_msat'] == 1337
assert chan_b['fee_proportional_millionths'] < 42 # we could do the actual ratio math, but meh

View File

@@ -0,0 +1,235 @@
#!/usr/bin/env python3
from math import ceil
from pyln.client import Plugin, Millisatoshi, RpcError
import binascii
import hashlib
import secrets
import threading
import time
plugin = Plugin()
def get_reverse_chan(scid, chan):
for c in plugin.rpc.listchannels(scid)['channels']:
if c['channel_flags'] != chan['direction']:
return c
return None
def get_circular_route(scid, chan, amt, peer, exclusions, request):
"""Compute a circular route with `scid` as last leg.
"""
# Compute the last leg of the route first, so we know the parameters to
# traverse that last edge.
reverse_chan = get_reverse_chan(scid, chan)
if reverse_chan is None:
plugin.log("Could not compute parameters for the last hop")
return None
last_amt = ceil(float(amt) +
float(amt) * reverse_chan['fee_per_millionth'] / 10**6 +
reverse_chan['base_fee_millisatoshi'])
last_cltv = 9 + reverse_chan['delay']
try:
route = plugin.rpc.getroute(
node_id=peer['id'],
amount_msat=last_amt,
riskfactor=1,
exclude=exclusions,
cltv=last_cltv,
)['route']
# Append the last hop we computed manually above
route += [{
'id': plugin.node_id,
'channel': scid,
'direction': chan['direction'],
'amount_msat': '{}msat'.format(amt),
'delay': 9
}]
return route
except RpcError:
plugin.log("Could not get a route, no remaining one? Exclusions : {}"
.format(exclusions))
return None
def try_rebalance(scid, chan, amt, peer, request):
# Exclude the channel we are trying to rebalance when searching for a
# path. We will manually append it to the route and bump the other
# parameters so it can be used afterwards
exclusions = [
"{scid}/{direction}".format(scid=scid, direction=chan['direction'])
]
# Try as many routes as possible before the timeout expires
stop_time = int(time.time()) + plugin.rebalance_timeout
while int(time.time()) <= stop_time:
route = get_circular_route(scid, chan, amt, peer, exclusions, request)
# We exhausted all the possibilities, Game Over
if route is None:
request.set_result({"result": "continue"})
return
# We're about to initiate a rebalancing, we'd better remember how we can
# settle it once we see it back here.
payment_key = secrets.token_bytes(32)
payment_hash = hashlib.sha256(payment_key).hexdigest()
plugin.rebalances[payment_hash] = {
"payment_key": binascii.hexlify(payment_key).decode('ASCII'),
"payment_hash": payment_hash,
"request": request,
}
# After all this work we're finally in a position to judge whether a
# rebalancing is worth it at all. The rebalancing is considered worth it
# if the fees we're about to pay are less than or equal to the fees we get
# out of forwarding the payment.
plugin.log("Sending rebalance request using payment_hash={}, route={}".format(
payment_hash, route
))
try:
plugin.rpc.sendpay(route, payment_hash)
# If the attempt is successful, we acknowledged it on the
# receiving end (a couple of line above), so we leave it dangling
# here.
if (plugin.rpc.waitsendpay(payment_hash).get("status")
== "complete"):
plugin.log("Succesfully re-filled outgoing capacity in {},"
"payment_hash={}".format(scid, payment_hash))
return
except RpcError as e:
if not "data" in e.error:
raise e
data = e.error['data']
# The erring_channel field can not be present (shouldn't happen) or
# can be "0x0x0"
erring_channel = data.get('erring_channel', '0x0x0')
if erring_channel != '0x0x0':
if erring_channel == scid:
break
erring_direction = data['erring_direction']
exclusions.append("{}/{}".format(erring_channel,
erring_direction))
plugin.log("Excluding {} due to a failed attempt"
.format(erring_channel))
plugin.log("Timed out while trying to rebalance")
request.set_result({"result": "continue"})
def get_peer_and_channel(peers, scid):
"""Look for the channel identified by {scid} in our list of {peers}"""
for peer in peers:
channels = []
if 'channels' in peer:
channels = peer["channels"]
elif 'num_channels' in peer and peer['num_channels'] > 0:
channels = plugin.rpc.listpeerchannels(peer["id"])["channels"]
for channel in channels:
if channel.get("short_channel_id") == scid:
return (peer, channel)
return (None, None)
@plugin.async_hook("htlc_accepted")
def on_htlc_accepted(htlc, onion, plugin, request, **kwargs):
plugin.log("Got an incoming HTLC htlc={}".format(htlc))
# The HTLC might be a rebalance we ourselves initiated, better check
# against the list of pending ones.
rebalance = plugin.rebalances.get(htlc['payment_hash'], None)
if rebalance is not None:
# Settle the rebalance, before settling the request that initiated the
# rebalance.
request.set_result({
"result": "resolve",
"payment_key": rebalance['payment_key']
})
# Now wait for it to settle correctly
plugin.rpc.waitsendpay(htlc['payment_hash'])
rebalance['request'].set_result({"result": "continue"})
# Clean up our stash of active rebalancings.
del plugin.rebalances[htlc['payment_hash']]
return
# Check to see if the next channel has sufficient capacity
scid = onion['short_channel_id'] if 'short_channel_id' in onion else '0x0x0'
# Are we the destination? Then there's nothing to do. Continue.
if scid == '0x0x0':
request.set_result({"result": "continue"})
return
# Locate the channel + direction that would be the next in the path
peers = plugin.rpc.listpeers()['peers']
peer, chan = get_peer_and_channel(peers, scid)
if peer is None or chan is None:
return
# Check if the channel is active and routable, otherwise there's little
# point in even trying
if not peer['connected'] or chan['state'] != "CHANNELD_NORMAL":
request.set_result({"result": "continue"})
return
# Need to consider who the funder is, since they are paying the fees.
# TODO If we are the funder we need to take the cost of an HTLC into
# account as well.
# funder = chan['msatoshi_to_us_max'] == chan['msatoshi_total']
forward_amt = Millisatoshi(onion['forward_msat'])
# If we have enough capacity just let it through now. Otherwise the
# Millisatoshi raises an error for negative amounts in the calculation
# below.
if forward_amt < chan['spendable_msat']:
request.set_result({"result": "continue"})
return
# Compute the amount we need to rebalance, give us a bit of breathing room
# while we're at it (25% more rebalancing than strictly necessary) so we
# don't end up with a completely unbalanced channel right away again, and
# to account for a bit of fuzziness when it comes to dipping into the
# reserve.
amt = ceil(int(forward_amt - chan['spendable_msat']) * 1.25)
# If we have a higher balance than is required we don't need to rebalance,
# just stop here.
if amt <= 0:
request.set_result({"result": "continue"})
return
t = threading.Thread(target=try_rebalance, args=(scid, chan, amt, peer, request))
t.daemon = True
t.start()
@plugin.init()
def init(options, configuration, plugin):
plugin.log("jitrebalance.py initializing {}".format(configuration))
plugin.node_id = plugin.rpc.getinfo()['id']
plugin.rebalance_timeout = int(options.get("jitrebalance-try-timeout"))
# Set of currently active rebalancings, keyed by their payment_hash
plugin.rebalances = {}
plugin.add_option(
"jitrebalance-try-timeout",
60,
"Number of seconds before we stop trying to rebalance a channel.",
opt_type="int"
)
plugin.run()

View File

@@ -0,0 +1 @@
pyln-client==0.7.3

View File

@@ -0,0 +1,185 @@
from pyln.client import RpcError
from pyln.testing.fixtures import * # noqa: F401, F403
from pyln.testing.utils import wait_for
import os
import time
import pytest
import unittest
currdir = os.path.dirname(__file__)
plugin = os.path.join(currdir, 'jitrebalance.py')
hold_plugin = os.path.join(currdir, 'tests/hold_htlcs.py')
reject_plugin = os.path.join(currdir, 'tests/refuse_htlcs.py')
def test_simple_rebalance(node_factory):
"""Simple rebalance that routes along a cycle to enable the original payment
l1 ---- l2 ---- l3 ----- l4
| /
| /
| /
l5
We are going to drain the channel (l2, l3) of most of its funds and then
ask l1 to route through [l1, l2, l3, l4]. Under normal circumstances
that'd fail since (l2, l3) doesn't have sufficient funds. l2 however will
attempt to rebalance (l2,l3) using a circular route (l2, l5, l3, l2) to
get the required funds back.
"""
print(plugin)
opts = [{}, {'plugin': plugin}, {}, {}, {}]
l1, l2, l3, l4, l5 = node_factory.get_nodes(5, opts=opts)
amt = 10**7
# Open the channels
channels = [(l1, l2), (l3, l2), (l3, l4), (l2, l5), (l5, l3)]
for src, dst in channels:
src.openchannel(dst, capacity=10**6)
# Drain (l2, l3) so that a larger payment fails later on
chan = l2.rpc.listpeerchannels(l3.info['id'])['channels'][0]
# Send 9 million millisatoshis + reserve + a tiny fee allowance from l3 to
# l2 for the actual payment
inv = l2.rpc.invoice(
chan['our_reserve_msat'] + 9000000 + 100,
"imbalance", "imbalance"
)
time.sleep(1)
l3.rpc.pay(inv['bolt11'])
def no_pending_htlcs():
return l2.rpc.listpeerchannels(l3.info['id'])['channels'][0]['htlcs'] == []
wait_for(no_pending_htlcs)
chan = l2.rpc.listpeerchannels(l3.info['id'])['channels'][0]
assert(int(chan['spendable_msat']) < amt)
# Get (l2, l5) so we can exclude it when routing from l1 to l4
scid = l2.rpc.listpeerchannels(l5.info['id'])['channels'][0]['short_channel_id']
# The actual invoice that l1 will attempt to pay to l4, and that will be
# larger than the current capacity of (l2, l3) so it triggers a
# rebalancing.
inv = l4.rpc.invoice(amt, "test", "test")
# Now wait for gossip to settle and l1 to learn the topology so it can
# then route the payment. We do this now since we already did what we
# could without this info
wait_for(lambda: len(l1.rpc.listchannels()['channels']) == 2 * len(channels))
route = l1.rpc.getroute(node_id=l4.info['id'], msatoshi=amt, riskfactor=1,
exclude=[scid + '/0', scid + '/1'])['route']
# This will succeed with l2 doing a rebalancing just-in-time !
l1.rpc.sendpay(route, inv['payment_hash'], payment_secret=inv.get('payment_secret'))
assert l1.rpc.waitsendpay(inv['payment_hash'])['status'] == 'complete'
assert l2.daemon.is_in_log('Succesfully re-filled outgoing capacity')
def test_rebalance_failure(node_factory):
"""Same setup as the first test :
l1 ---- l2 ---- l3 ----- l4
| /
| /
| /
l5
We now test failures (l5 rejects HTLCs, l3 takes too long to resolve it).
"""
# First, the "no route left" case.
opts = [{}, {'plugin': plugin, 'jitrebalance-try-timeout': 3}, {}, {},
{'plugin': reject_plugin}]
l1, l2, l3, l4, l5 = node_factory.get_nodes(5, opts=opts)
amt = 10**7
# Open the channels
channels = [(l1, l2), (l3, l2), (l3, l4), (l2, l5), (l5, l3)]
for src, dst in channels:
src.openchannel(dst, capacity=10**6)
# Drain (l2, l3) so that a larger payment fails later on
chan = l2.rpc.listpeerchannels(l3.info['id'])['channels'][0]
# Send 9 million millisatoshis + reserve + a tiny fee allowance from l3 to
# l2 for the actual payment
inv = l2.rpc.invoice(
chan['our_reserve_msat'] + 9000000 + 100,
"imbalance", "imbalance"
)
time.sleep(1)
l3.rpc.pay(inv['bolt11'])
def no_pending_htlcs():
return l2.rpc.listpeerchannels(l3.info['id'])['channels'][0]['htlcs'] == []
wait_for(no_pending_htlcs)
chan = l2.rpc.listpeerchannels(l3.info['id'])['channels'][0]
assert(int(chan['spendable_msat']) < amt)
# Get (l2, l5) so we can exclude it when routing from l1 to l4
scid = l2.rpc.listpeerchannels(l5.info['id'])['channels'][0]['short_channel_id']
# The actual invoice that l1 will attempt to pay to l4, and that will be
# larger than the current capacity of (l2, l3) so it triggers a
# rebalancing.
inv = l4.rpc.invoice(amt, "test", "test")
# Now wait for gossip to settle and l1 to learn the topology so it can
# then route the payment. We do this now since we already did what we
# could without this info
wait_for(lambda: len(l1.rpc.listchannels()['channels']) == 2 * len(channels))
route = l1.rpc.getroute(node_id=l4.info['id'], msatoshi=amt, riskfactor=1,
exclude=[scid + '/0', scid + '/1'])['route']
# This will exclude [l5, l3] and fail as there is no route left
l1.rpc.sendpay(route, inv['payment_hash'], payment_secret=inv.get('payment_secret'))
with pytest.raises(RpcError, match='WIRE_TEMPORARY_CHANNEL_FAILURE'):
l1.rpc.waitsendpay(inv['payment_hash'])
assert l2.daemon.is_in_log('Could not get a route, no remaining one?')
l5.rpc.plugin_stop(reject_plugin)
# Now test the timeout on number of attempts
l3.rpc.plugin_start(hold_plugin)
l1.rpc.sendpay(route, inv['payment_hash'], payment_secret=inv.get('payment_secret'))
# l3 will hold on the HTLC, and at the time it rejects it, l2 won't try
# other routes as it exceeded its timeout
with pytest.raises(RpcError, match='WIRE_TEMPORARY_CHANNEL_FAILURE'):
l1.rpc.waitsendpay(inv['payment_hash'])
assert l2.daemon.is_in_log('Timed out while trying to rebalance')
def test_issue_88(node_factory):
"""Reproduce issue #88: crash due to unconfirmed channel.
l2 has a channel open with l4, that is not confirmed yet, doesn't have a
stable short_channel_id, and will crash.
"""
l1, l2, l3 = node_factory.line_graph(3, opts=[{}, {'plugin': plugin}, {}], wait_for_announce=True)
l4 = node_factory.get_node()
l2.connect(l4)
l2.rpc.fundchannel(l4.info['id'], 10**5)
peers = l2.rpc.listpeers()['peers']
channels = l2.rpc.listpeerchannels()['channels']
# We should have 3 peers...
assert(len(peers) == 3)
# ... but only 2 channels with a short_channel_id...
assert(sum([1 for c in channels if 'short_channel_id' in c]) == 2)
# ... and one with l4, without a short_channel_id
assert('short_channel_id' not in l4.rpc.listpeerchannels(l2.info['id'])['channels'][0])
# Now if we send a payment l1 -> l2 -> l3, then l2 will stumble while
# attempting to access the short_channel_id on the l2 -> l4 channel:
inv = l3.rpc.invoice(1000, 'lbl', 'desc')['bolt11']
l1.rpc.pay(inv)

View File

@@ -0,0 +1,20 @@
#!/usr/bin/env python3
"""Plugin that holds on to HTLCs for 5 seconds, then reject them."""
from pyln.client import Plugin
import time
plugin = Plugin()
@plugin.hook("htlc_accepted")
def on_htlc_accepted(htlc, onion, plugin, **kwargs):
time.sleep(5)
return {'result': 'fail', 'failure_message': '2002'}
@plugin.init()
def init(options, configuration, plugin):
plugin.log("hold_htlcs.py initializing")
plugin.run()

View File

@@ -0,0 +1,18 @@
#!/usr/bin/env python3
"""Plugin that refuses all HTLCs."""
from pyln.client import Plugin
plugin = Plugin()
@plugin.hook("htlc_accepted")
def on_htlc_accepted(htlc, onion, plugin, **kwargs):
return {'result': 'fail', 'failure_message': '2002'}
@plugin.init()
def init(options, configuration, plugin):
plugin.log("refuse_htlcs.py initializing")
plugin.run()

View File

@@ -0,0 +1,67 @@
The Noise plugin allows sending and receiving private messages through the
Lightning Network. It is implemented on top to Core-Lightning's ~createonion~ and
~sendonion~ RPC methods that allow delivering custom payloads to a specific
node, as well as the ~htlc_accepted~ hook which can be used to extract the
message from the onion payload.
You can send a message using the following RPC method:
#+BEGIN_SRC bash
lightning-cli sendmsg 02a5deaa47804c518bb4a1c6f04a85b92b796516bd32c4114a51b00d73e251f999 "Hello world 👋"
#+END_SRC
In addition a message can also be accompanied by a payment (using the
~keysend~ protocol draft) by specifying an amount of millisatoshis as the last
argument:
#+BEGIN_SRC bash
lightning-cli sendmsg 02a5deaa47804c518bb4a1c6f04a85b92b796516bd32c4114a51b00d73e251f999 "Here's my rent" 31337
#+END_SRC
You can read the last message received using the following command:
#+BEGIN_SRC bash
lightning-cli recvmsg msg_id
#+END_SRC
The ~msg_id~ indicates the id number of each message received in chronological
order, so we can retrieve each message individually. If you'd just like to wait
for the next message then do not specify any ~msg_id~.
You can output all messages received using the following command:
#+BEGIN_SRC bash
lightning-cli allmsgs
#+END_SRC
* Todo
- [ ] Persist messages across restarts
- [X] Use ~rpc_command~ to intercept any payment listing and add the keysend
payments to it. (No longer needed, since keysends are handled correctly by
~listpays~.)
* Protocol
The protocol was heavily inspired by the [[https://github.com/joostjager/whatsat#protocol][WhatSat protocol]]:
| record type | length (bytes) | value |
|-------------+----------------+-----------------------------------------------------------------|
| 5482373484 | 32 | key send preimage |
| 34349334 | variable | chat message |
| 34349335 | 65 | compressed signature + recovery id |
| 34349339 | 33 | sender pubkey |
| 34349343 | 8 | timestamp in nano seconds since unix epoch (big endian encoded) |
The key differences are that we don't explicitly pass the sender pubkey, since
we can recover that from the signature itself, and we use the compressed 64
byte signature, instead of the DER encoded signature. This saves us 39 bytes
for the pubkey (5 byte type, 1 byte length, 33 byte value) and about 6 bytes
for the signature, but requires that we change the TLV type for the signature
(from ~34349337~ to ~34349335~). More could be achieved by giving ~keysend~ a
smaller type which currently is 9 bytes and could get down to 1 byte. We'll
need to wait for the spec to catch up :wink:
The signature is computed by serializing all other TLV fields, hex-encoding
the resulting TLV payload, and signing it using ~lightning-cli signmessage~
returning the ~zbase32~ encoded signature. The signature consists of a 1 byte
recovery ID and the 64 byte raw signature.

256
Unmaintained/noise/noise.py Executable file
View File

@@ -0,0 +1,256 @@
#!/usr/bin/env python3
from binascii import hexlify
from onion import OnionPayload
from onion import TlvPayload
from pyln.client import Plugin, RpcError
import hashlib
import os
import struct
import time
import zbase32
plugin = Plugin()
TLV_KEYSEND_PREIMAGE = 5482373484
TLV_NOISE_MESSAGE = 34349334
TLV_NOISE_SIGNATURE = 34349335
TLV_NOISE_TIMESTAMP = 34349343
class Message(object):
def __init__(self, sender, body, signature, payment=None, id=None):
self.id = id
self.sender = sender
self.body = body
self.signature = signature
self.payment = payment
self.verified = None
def to_dict(self):
return {
"id": self.id,
"sender": self.sender,
"body": self.body,
"signature": hexlify(self.signature).decode('ASCII'),
"payment": self.payment.to_dict() if self.payment is not None else None,
"verified": self.verified,
}
class Payment(object):
def __init__(self, payment_key, amount):
self.payment_key = payment_key
self.amount = amount
def to_dict(self):
return {
"payment_key": hexlify(self.payment_key).decode('ASCII'),
"payment_hash": hashlib.sha256(self.payment_key).hexdigest(),
"amount": self.amount,
}
def serialize_payload(n, blockheight):
block, tx, out = n['channel'].split('x')
scid = int(block) << 40 | int(tx) << 16 | int(out)
def minint(i):
if i < 2**8:
return struct.pack("!B", i)
elif i < 2**16:
return struct.pack('!H', i)
elif i < 2**32:
return struct.pack("!I", i)
else:
return struct.pack("!Q", i)
amt = int(n['amount_msat'])
cltv = blockheight + n['delay']
payload = TlvPayload()
payload.add_field(2, minint(amt))
payload.add_field(4, minint(cltv))
payload.add_field(6, minint(scid))
return payload.to_bytes().hex()
def buildpath(plugin, node_id, payload, amt, exclusions):
blockheight = plugin.rpc.getinfo()['blockheight']
route = plugin.rpc.getroute(node_id, amt, 10, exclude=exclusions)['route']
first_hop = route[0]
# Need to shift the parameters by one hop
hops = []
for h, n in zip(route[:-1], route[1:]):
# We tell the node h about the parameters to use for n (a.k.a. h + 1)
hops.append({
"type": "tlv",
"pubkey": h['id'],
"payload": serialize_payload(n, blockheight)
})
# The last hop has a special payload:
hops.append({
"type": "tlv",
"pubkey": route[-1]['id'],
"payload": hexlify(payload).decode('ASCII'),
})
return first_hop, hops, route
def deliver(node_id, payload, amt, payment_hash, max_attempts=5):
"""Do your best to deliver `payload` to `node_id`.
"""
exclusions = []
payment_hash = hexlify(payment_hash).decode('ASCII')
for attempt in range(max_attempts):
plugin.log("Starting attempt {} to deliver message to {}".format(attempt, node_id))
first_hop, hops, route = buildpath(plugin, node_id, payload, amt, exclusions)
onion = plugin.rpc.createonion(hops=hops, assocdata=payment_hash)
plugin.rpc.sendonion(onion=onion['onion'],
first_hop=first_hop,
payment_hash=payment_hash,
shared_secrets=onion['shared_secrets']
)
try:
plugin.rpc.waitsendpay(payment_hash=payment_hash)
return {'route': route, 'payment_hash': payment_hash, 'attempt': attempt}
except RpcError as e:
failcode = e.error['data']['failcode']
failingidx = e.error['data']['erring_index']
if failcode == 16399 or failingidx == len(hops):
return {'route': route, 'payment_hash': payment_hash, 'attempt': attempt + 1}
plugin.log("Retrying delivery.")
# TODO Store the failing channel in the exclusions
raise ValueError('Could not reach destination {node_id}'.format(node_id=node_id))
@plugin.async_method('sendmsg')
def sendmsg(node_id, msg, plugin, request, pay=None, **kwargs):
"""Sends a chat message.
First paramter is the message payload as a string.
"""
timestamp = struct.pack("!Q", int(time.time() * 1000))
payload = TlvPayload()
payload.add_field(TLV_NOISE_MESSAGE, msg.encode('UTF-8'))
payload.add_field(TLV_NOISE_TIMESTAMP, timestamp)
payment_key = os.urandom(32)
payment_hash = hashlib.sha256(payment_key).digest()
# If we don't want to tell the recipient how to claim the funds unset the
# payment_key
if pay is not None:
payload.add_field(TLV_KEYSEND_PREIMAGE, payment_key)
# Signature generation always has to be last, otherwise we won't agree on
# the TLV payload and verification ends up with a bogus sender node_id.
sigmsg = hexlify(payload.to_bytes()).decode('ASCII')
sig = plugin.rpc.signmessage(sigmsg)
plugin.rpc.checkmessage(sigmsg, sig['zbase'])
sig = zbase32.decode(sig['zbase'])
payload.add_field(TLV_NOISE_SIGNATURE, sig)
res = deliver(
node_id,
payload.to_bytes(),
amt=pay if pay is not None else 10,
payment_hash=payment_hash
)
request.set_result(res)
@plugin.async_method('allmsgs')
def allmsgs(plugin, request, **kwargs):
"""
Prints all messages that have been received
"""
msg_list = {}
for i in range(len(plugin.messages)):
res = plugin.messages[int(i)].to_dict()
res['total_messages'] = len(plugin.messages)
msg_list["message"+str(i)] = res
request.set_result(msg_list)
@plugin.async_method('recvmsg')
def recvmsg(plugin, request, msg_id=None, **kwargs):
"""Receives a chat message.
Returns a `concurrent.futures.Future`.
Optional parameter `msg_id` can be supplied to return an older messages.
"""
msg_id = int(msg_id) if msg_id is not None else len(plugin.messages)
if msg_id < len(plugin.messages):
res = plugin.messages[msg_id].to_dict()
res['total_messages'] = len(plugin.messages)
request.set_result(res)
else:
plugin.receive_waiters.append(request)
@plugin.hook('htlc_accepted')
def on_htlc_accepted(onion, htlc, plugin, **kwargs):
payload = OnionPayload.from_hex(onion['payload'])
if not isinstance(payload, TlvPayload):
plugin.log("Payload is not a TLV payload")
return {'result': 'continue'}
body_field = payload.get(34349334)
signature_field = payload.get(34349335)
if body_field is None or signature_field is None:
plugin.log("Missing message body or signature, ignoring HTLC")
return {'result': 'continue'}
msg = Message(
id=len(plugin.messages),
sender=None,
body=body_field.value,
signature=signature_field.value,
payment=None)
# Filter out the signature so we can check it against the rest of the payload
sigpayload = TlvPayload()
sigpayload.fields = filter(lambda x: x.typenum != TLV_NOISE_SIGNATURE, payload.fields)
sigmsg = hexlify(sigpayload.to_bytes()).decode('ASCII')
zsig = zbase32.encode(msg.signature).decode('ASCII')
sigcheck = plugin.rpc.checkmessage(sigmsg, zsig)
msg.sender = sigcheck['pubkey']
msg.verified = sigcheck['verified']
preimage = payload.get(TLV_KEYSEND_PREIMAGE)
if preimage is not None:
msg.payment = Payment(preimage.value, htlc['amount_msat'])
res = {
'result': 'resolve',
'payment_key': hexlify(preimage.value).decode('ASCII')
}
else:
res = {'result': 'continue'}
plugin.messages.append(msg)
for r in plugin.receive_waiters:
m = msg.to_dict()
m['total_messages'] = len(plugin.messages)
r.set_result(m)
plugin.receive_waiters = []
return res
@plugin.init()
def init(configuration, options, plugin, **kwargs):
print("Starting noise chat plugin")
plugin.messages = []
plugin.receive_waiters = []
plugin.run()

237
Unmaintained/noise/onion.py Normal file
View File

@@ -0,0 +1,237 @@
from primitives import varint_decode, varint_encode
from io import BytesIO, SEEK_CUR
from binascii import hexlify, unhexlify
import struct
class OnionPayload(object):
@classmethod
def from_bytes(cls, b):
if isinstance(b, bytes):
b = BytesIO(b)
realm = b.read(1)
b.seek(-1, SEEK_CUR)
if realm == b'\x00':
return LegacyOnionPayload.from_bytes(b)
elif realm != b'\x01':
return TlvPayload.from_bytes(b, skip_length=False)
else:
raise ValueError("Onion payloads with realm 0x01 are unsupported")
@classmethod
def from_hex(cls, s):
if isinstance(s, str):
s = s.encode('ASCII')
return cls.from_bytes(bytes(unhexlify(s)))
def to_bytes(self):
raise ValueError("OnionPayload is an abstract class, use "
"LegacyOnionPayload or TlvPayload instead")
def to_hex(self):
return hexlify(self.to_bytes()).decode('ASCII')
class LegacyOnionPayload(OnionPayload):
def __init__(self, amt_to_forward, outgoing_cltv_value,
short_channel_id=None, padding=None):
assert(padding is None or len(padding) == 12)
self.padding = b'\x00' * 12 if padding is None else padding
if isinstance(amt_to_forward, str):
self.amt_to_forward = int(amt_to_forward)
else:
self.amt_to_forward = amt_to_forward
self.outgoing_cltv_value = outgoing_cltv_value
if isinstance(short_channel_id, str) and 'x' in short_channel_id:
# Convert the short_channel_id from its string representation to its numeric representation
block, tx, out = short_channel_id.split('x')
num_scid = int(block) << 40 | int(tx) << 16 | int(out)
self.short_channel_id = num_scid
elif isinstance(short_channel_id, int):
self.short_channel_id = short_channel_id
else:
raise ValueError("short_channel_id format cannot be recognized: {}".format(short_channel_id))
@classmethod
def from_bytes(cls, b):
if isinstance(b, bytes):
b = BytesIO(b)
assert(b.read(1) == b'\x00')
s, a, o = struct.unpack("!QQL", b.read(20))
padding = b.read(12)
return LegacyOnionPayload(a, o, s, padding)
def to_bytes(self, include_realm=True):
b = b''
if include_realm:
b += b'\x00'
b += struct.pack("!Q", self.short_channel_id)
b += struct.pack("!Q", self.amt_to_forward)
b += struct.pack("!L", self.outgoing_cltv_value)
b += self.padding
assert(len(b) == 32 + include_realm)
return b
def to_hex(self, include_realm=True):
return hexlify(self.to_bytes(include_realm)).decode('ASCII')
def __str__(self):
return ("LegacyOnionPayload[scid={self.short_channel_id}, "
"amt_to_forward={self.amt_to_forward}, "
"outgoing_cltv={self.outgoing_cltv_value}]").format(self=self)
class TlvPayload(OnionPayload):
def __init__(self, fields=None):
self.fields = [] if fields is None else fields
@classmethod
def from_bytes(cls, b, skip_length=False):
if isinstance(b, str):
b = b.encode('ASCII')
if isinstance(b, bytes):
b = BytesIO(b)
if skip_length:
# Consume the entire remainder of the buffer.
payload_length = len(b.getvalue()) - b.tell()
else:
payload_length = varint_decode(b)
instance = TlvPayload()
start = b.tell()
while b.tell() < start + payload_length:
typenum = varint_decode(b)
if typenum is None:
break
length = varint_decode(b)
if length is None:
raise ValueError(
"Unable to read length at position {}".format(b.tell())
)
val = b.read(length)
# Get the subclass that is the correct interpretation of this
# field. Default to the binary field type.
c = tlv_types.get(typenum, (TlvField, "unknown"))
cls = c[0]
field = cls.from_bytes(typenum=typenum, b=val, description=c[1])
instance.fields.append(field)
return instance
@classmethod
def from_hex(cls, h):
return cls.from_bytes(unhexlify(h))
def add_field(self, typenum, value):
self.fields.append(TlvField(typenum=typenum, value=value))
def get(self, key, default=None):
for f in self.fields:
if f.typenum == key:
return f
return default
def to_bytes(self):
self.fields = sorted(self.fields, key=lambda f: f.typenum)
ser = [f.to_bytes() for f in self.fields]
b = BytesIO()
varint_encode(sum([len(b) for b in ser]), b)
for f in ser:
b.write(f)
return b.getvalue()
def __str__(self):
return "TlvPayload[" + ', '.join([str(f) for f in self.fields]) + "]"
class TlvField(object):
def __init__(self, typenum, value=None, description=None):
self.typenum = typenum
self.value = value
self.description = description
@classmethod
def from_bytes(cls, typenum, b, description=None):
return TlvField(typenum=typenum, value=b, description=description)
def __str__(self):
return "TlvField[{description},{num}={hex}]".format(
description=self.description,
num=self.typenum,
hex=hexlify(self.value).decode('ASCII')
)
def to_bytes(self):
b = BytesIO()
varint_encode(self.typenum, b)
varint_encode(len(self.value), b)
b.write(self.value)
return b.getvalue()
class Tu32Field(TlvField):
pass
class Tu64Field(TlvField):
pass
class ShortChannelIdField(TlvField):
pass
class TextField(TlvField):
@classmethod
def from_bytes(cls, typenum, b, description=None):
val = b.decode('UTF-8')
return TextField(typenum, value=val, description=description)
def to_bytes(self):
b = BytesIO()
val = self.value.encode('UTF-8')
varint_encode(self.typenum, b)
varint_encode(len(val), b)
b.write(val)
return b.getvalue()
def __str__(self):
return "TextField[{description},{num}=\"{val}\"]".format(
description=self.description,
num=self.typenum,
val=self.value,
)
class HashField(TlvField):
pass
class SignatureField(TlvField):
pass
# A mapping of known TLV types
tlv_types = {
2: (Tu64Field, 'amt_to_forward'),
4: (Tu32Field, 'outgoing_cltv_value'),
6: (ShortChannelIdField, 'short_channel_id'),
34349334: (TextField, 'noise_message_body'),
34349336: (SignatureField, 'noise_message_signature'),
}

View File

@@ -0,0 +1,74 @@
import struct
def varint_encode(i, w):
"""Encode an integer `i` into the writer `w`
"""
if i < 0xFD:
w.write(struct.pack("!B", i))
elif i <= 0xFFFF:
w.write(struct.pack("!BH", 0xFD, i))
elif i <= 0xFFFFFFFF:
w.write(struct.pack("!BL", 0xFE, i))
else:
w.write(struct.pack("!BQ", 0xFF, i))
def varint_decode(r):
"""Decode an integer from reader `r`
"""
raw = r.read(1)
if len(raw) != 1:
return None
i, = struct.unpack("!B", raw)
if i < 0xFD:
return i
elif i == 0xFD:
return struct.unpack("!H", r.read(2))[0]
elif i == 0xFE:
return struct.unpack("!L", r.read(4))[0]
else:
return struct.unpack("!Q", r.read(8))[0]
class ShortChannelId(object):
def __init__(self, block, txnum, outnum):
self.block = block
self.txnum = txnum
self.outnum = outnum
@classmethod
def from_bytes(cls, b):
assert(len(b) == 8)
i, = struct.unpack("!Q", b)
return cls.from_int(i)
@classmethod
def from_int(cls, i):
block = (i >> 40) & 0xFFFFFF
txnum = (i >> 16) & 0xFFFFFF
outnum = (i >> 0) & 0xFFFF
return cls(block=block, txnum=txnum, outnum=outnum)
@classmethod
def from_str(self, s):
block, txnum, outnum = s.split('x')
return ShortChannelId(block=int(block), txnum=int(txnum),
outnum=int(outnum))
def to_int(self):
return self.block << 40 | self.txnum << 16 | self.outnum
def to_bytes(self):
return struct.pack("!Q", self.to_int())
def __str__(self):
return "{self.block}x{self.txnum}x{self.outnum}".format(self=self)
def __eq__(self, other):
return (
self.block == other.block and
self.txnum == other.txnum and
self.outnum == other.outnum
)

View File

@@ -0,0 +1,2 @@
flaky>=3.7.0
pyln-testing>=23.02

View File

@@ -0,0 +1 @@
pyln-client>=23.02

View File

@@ -0,0 +1,218 @@
from onion import TlvPayload
from flaky import flaky
from pprint import pprint
from pyln.client import RpcError
from pyln.testing.fixtures import * # noqa: F401,F403
from pyln.testing.utils import wait_for
import hashlib
import os
import pytest
import unittest
import zbase32
plugin = os.path.join(os.path.dirname(__file__), 'noise.py')
def test_sendmsg_success(node_factory, executor):
opts = [{'plugin': plugin}, {}, {'plugin': plugin}]
l1, l2, l3 = node_factory.line_graph(3, wait_for_announce=True, opts=opts)
recv = executor.submit(l3.rpc.recvmsg)
l1.rpc.sendmsg(l3.info['id'], "Hello world!")
# This one is tailing the incoming messages
m1 = recv.result(10)
# This one should get the same result:
m2 = l3.rpc.recvmsg(msg_id=-1)
# They should be the same :-)
assert(m1 == m2)
assert(m2['sender'] == l1.info['id'])
assert(m2['verified'] is True)
@flaky # since we cannot force a payment to take a specific route
@unittest.skipIf(True, "Just not stable")
def test_sendmsg_retry(node_factory, executor):
"""Fail a sendmsg using a cheap route, and check that it retries.
```dot
digraph {
l1 -> l2;
l2 -> l3;
l3 -> l4 [label = "fee-base=100'000"];
l2 -> l5;
l5 -> l4 [label = "fee-base=normal"];
}
```
By having a huge fee on the l3 -> l4 edge we force the initial attempt to
go through l1 -> l2 -> l5 -> l4, which should fail since l5 is offline (l1
should still be unaware about this).
"""
opts = [{'plugin': plugin}, {}, {'fee-base': 10000}, {'plugin': plugin}]
l1, l2, l3, l4 = node_factory.line_graph(4, opts=opts)
l5 = node_factory.get_node()
l2.openchannel(l5, 10**6)
l5.openchannel(l4, 10**6)
def gossip_synced(nodes):
for a, b in zip(nodes[:-1], nodes[1:]):
if a.rpc.listchannels() != b.rpc.listchannels():
return False
return True
wait_for(lambda: [c['active'] for c in l1.rpc.listchannels()['channels']] == [True] * 10)
# Now stop l5 so the first attempt will fail.
l5.stop()
executor.submit(l4.rpc.recvmsg)
send = executor.submit(l1.rpc.sendmsg, l4.info['id'], "Hello world!")
# Just making sure our view didn't change since we initiated the attempt
assert([c['active'] for c in l1.rpc.listchannels()['channels']] == [True] * 10)
pprint(l1.rpc.listchannels())
l1.daemon.wait_for_log(r'Retrying delivery')
sres = send.result(10)
assert(sres['attempt'] == 2)
pprint(sres)
l4.rpc.recvmsg(msg_id=-1)
def test_zbase32():
zb32 = b'd75qtmgijm79rpooshmgzjwji9gj7dsdat8remuskyjp9oq1ugkaoj6orbxzhuo4njtyh96e3aq84p1tiuz77nchgxa1s4ka4carnbiy'
b = zbase32.decode(zb32)
enc = zbase32.encode(b)
assert(enc == zb32)
def test_msg_and_keysend(node_factory, executor):
opts = [{'plugin': plugin}, {}, {'plugin': plugin}]
l1, l2, l3 = node_factory.line_graph(3, wait_for_announce=True, opts=opts)
amt = 10000
# Check that l3 does not have funds initially
assert(l3.rpc.listpeerchannels()['channels'][0]['to_us_msat'] == 0)
l1.rpc.sendmsg(l3.info['id'], "Hello world!", amt)
m = l3.rpc.recvmsg(msg_id=-1)
assert(m['sender'] == l1.info['id'])
assert(m['verified'] is True)
p = m['payment']
assert(p is not None)
assert(p['payment_key'] is not None)
assert(p['amount'] == 10000)
# Check that l3 actually got the funds I sent it
wait_for(lambda: l3.rpc.listpeerchannels()['channels'][0]['to_us_msat'] == amt)
def test_forward_ok(node_factory, executor):
"""All nodes run plugin, forwarding node doesn't crash.
Reproduces the crash mentioned by @darosior in this comment:
https://github.com/lightningd/plugins/pull/68#issuecomment-577251902
"""
opts = [{'plugin': plugin}] * 3
l1, l2, l3 = node_factory.line_graph(3, wait_for_announce=True, opts=opts)
recv = executor.submit(l3.rpc.recvmsg)
l1.rpc.sendmsg(l3.info['id'], "Hello world!")
# This one is tailing the incoming messages
m1 = recv.result(10)
# This one should get the same result:
m2 = l3.rpc.recvmsg(msg_id=-1)
# They should be the same :-)
assert(m1 == m2)
assert(m2['sender'] == l1.info['id'])
assert(m2['verified'] is True)
def test_read_tip(node_factory, executor):
"""Testcase for issue #331 https://github.com/lightningd/plugins/issues/331
We try to read the topmost message by its ID.
"""
opts = [{'plugin': plugin}] * 3
l1, l2, l3 = node_factory.line_graph(3, wait_for_announce=True, opts=opts)
l1.rpc.sendmsg(l3.info['id'], "test 1")
msg = executor.submit(l3.rpc.recvmsg, 0).result(10)
assert msg.get('body') == "test 1"
def test_read_order(node_factory, executor):
""" A testcase that sends and reads several times and checks correct order.
"""
opts = [{'plugin': plugin}] * 3
l1, l2, l3 = node_factory.line_graph(3, wait_for_announce=True, opts=opts)
# send a bunch at once
l1.rpc.sendmsg(l3.info['id'], "test 0")
l1.rpc.sendmsg(l3.info['id'], "test 1")
l1.rpc.sendmsg(l3.info['id'], "test 2")
# check them all by using `msg_id`
assert executor.submit(l3.rpc.recvmsg, 0).result(10).get('id') == 0
assert executor.submit(l3.rpc.recvmsg, 0).result(10).get('body') == "test 0"
assert executor.submit(l3.rpc.recvmsg, 1).result(10).get('id') == 1
assert executor.submit(l3.rpc.recvmsg, 1).result(10).get('body') == "test 1"
assert executor.submit(l3.rpc.recvmsg, 2).result(10).get('id') == 2
assert executor.submit(l3.rpc.recvmsg, 2).result(10).get('body') == "test 2"
# now async by waiting on a future to get a message with most recent 'id'
recv = executor.submit(l3.rpc.recvmsg)
l1.rpc.sendmsg(l3.info['id'], "test 3")
result = recv.result(10)
assert result.get('id') == 3
assert result.get('body') == "test 3"
# peak the same `msg_id` := 3
assert executor.submit(l3.rpc.recvmsg, 3).result(10).get('id') == 3
assert executor.submit(l3.rpc.recvmsg, 3).result(10).get('body') == "test 3"
def test_missing_tlv_fields(node_factory):
"""If we're missing a field we should not crash
"""
opts = [{'plugin': plugin}] * 2
l1, l2 = node_factory.line_graph(2, wait_for_announce=True, opts=opts)
payment_key = os.urandom(32)
payment_hash = hashlib.sha256(payment_key).hexdigest()
route = l1.rpc.getroute(l2.info['id'], 10, 10)['route']
def send(key, value):
hops = [{"type": "tlv", "pubkey": l2.info['id'], "payload": None}]
payload = TlvPayload()
payload.add_field(key, value)
hops[0]['payload'] = payload.to_hex()
onion = l1.rpc.createonion(hops=hops, assocdata=payment_hash)
l1.rpc.sendonion(
onion=onion['onion'],
first_hop=route[0],
payment_hash=payment_hash,
shared_secrets=onion['shared_secrets'],
)
with pytest.raises(RpcError, match=r'WIRE_INVALID_ONION_PAYLOAD'):
l1.rpc.waitsendpay(payment_hash)
send(34349334, b'Message body')
assert(l2.daemon.wait_for_log(r'Missing message body or signature'))
send(34349335, b'00' * 32)
assert(l2.daemon.wait_for_log(r'Missing message body or signature'))

View File

@@ -0,0 +1,55 @@
import bitstring
zbase32_chars = b'ybndrfg8ejkmcpqxot1uwisza345h769'
zbase32_revchars = [
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 18, 255, 25, 26, 27, 30, 29, 7, 31, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 24, 1, 12, 3, 8, 5, 6, 28, 21, 9, 10, 255, 11, 2,
16, 13, 14, 4, 22, 17, 19, 255, 20, 15, 0, 23, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255
]
def bitarray_to_u5(barr):
assert len(barr) % 5 == 0
ret = []
s = bitstring.ConstBitStream(barr)
while s.pos != s.len:
ret.append(s.read(5).uint)
return ret
def u5_to_bitarray(arr):
ret = bitstring.BitArray()
for a in arr:
ret += bitstring.pack("uint:5", a)
return ret
def encode(b):
uint5s = bitarray_to_u5(b)
res = [zbase32_chars[c] for c in uint5s]
return bytes(res)
def decode(b):
if isinstance(b, str):
b = b.encode('ASCII')
uint5s = []
for c in b:
uint5s.append(zbase32_revchars[c])
dec = u5_to_bitarray(uint5s)
return dec.bytes