From 504d15bfbfade6ba0417fd090404b3880fe6aca9 Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Fri, 27 Dec 2019 16:55:24 +0100 Subject: [PATCH] jitrebalance: Refactor code and implement retry logic --- jitrebalance/jitrebalance.py | 164 +++++++++++++++++------------- jitrebalance/test_jitrebalance.py | 6 +- 2 files changed, 94 insertions(+), 76 deletions(-) diff --git a/jitrebalance/jitrebalance.py b/jitrebalance/jitrebalance.py index e2aeafb..b2a8451 100755 --- a/jitrebalance/jitrebalance.py +++ b/jitrebalance/jitrebalance.py @@ -1,17 +1,101 @@ #!/usr/bin/env python3 from math import ceil -from pprint import pprint -from pyln.client import Plugin, Millisatoshi +from pyln.client import Plugin, Millisatoshi, RpcError import binascii import hashlib import secrets +import threading import time plugin = Plugin() -def get_circular_route(route, scid): - pass +def get_circular_route(scid, chan, amt, peer, exclusions, request): + """Compute a circular route with `scid` as last leg. + + """ + # Compute the last leg of the route first, so we know the parameters to + # traverse that last edge. + reverse_chan = plugin.rpc.listchannels(scid)['channels'] + assert(len(reverse_chan) == 2) + reverse_chan = [ + c for c in reverse_chan if c['channel_flags'] != chan['direction'] + ][0] + + if reverse_chan is None: + print("Could not compute parameters for the last hop") + request.set_result({"result": "continue"}) + return + + last_amt = ceil(float(amt) + + float(amt) * reverse_chan['fee_per_millionth'] / 10**6 + + reverse_chan['base_fee_millisatoshi']) + last_cltv = 9 + reverse_chan['delay'] + + route = plugin.rpc.getroute( + node_id=peer['id'], + msatoshi=last_amt, + riskfactor=1, + exclude=exclusions, + cltv=last_cltv, + )['route'] + + # Append the last hop we computed manually above + route += [{ + 'id': plugin.node_id, + 'channel': scid, + 'direction': chan['direction'], + 'msatoshi': amt, + 'amount_msat': '{}msat'.format(amt), + 'delay': 9 + }] + + return route + + +def try_rebalance(scid, chan, amt, peer, request): + # Exclude the channel we are trying to rebalance when searching for a + # path. We will manually append it to the route and bump the other + # parameters so it can be used afterwards + exclusions = [ + "{scid}/{direction}".format(scid=scid, direction=chan['direction']) + ] + + # Try up to 5 times to rebalance that last leg. + for i in range(0, 5): + route = get_circular_route(scid, chan, amt, peer, exclusions, request) + + # We're about to initiate a rebalancing, we'd better remember how we can + # settle it once we see it back here. + payment_key = secrets.token_bytes(32) + payment_hash = hashlib.sha256(payment_key).hexdigest() + plugin.rebalances[payment_hash] = { + "payment_key": binascii.hexlify(payment_key).decode('ASCII'), + "payment_hash": payment_hash, + "request": request, + } + + # After all this work we're finally in a position to judge whether a + # rebalancing is worth it at all. The rebalancing is considered worth it + # if the fees we're about to pay are less than or equal to the fees we get + # out of forwarding the payment. + plugin.log("Sending rebalance request using payment_hash={}, route={}".format( + payment_hash, route + )) + try: + plugin.rpc.sendpay(route, payment_hash) + # If the attempt is successful, we acknowledged it on the + # receiving end (a couple of line above), so we leave it dangling + # here. + plugin.rpc.waitsendpay(payment_hash) + return + except RpcError as e: + error = e.error['data'] + erring_channel = error['erring_channel'] + exclusions.append(erring_channel) + plugin.log("Excluding {} due to a failed attempt".format(erring_channel)) + + request.set_result({"result": "continue"}) @plugin.async_hook("htlc_accepted") @@ -37,7 +121,6 @@ def on_htlc_accepted(htlc, onion, plugin, request, **kwargs): return # Check to see if the next channel has sufficient capacity - scid = onion['short_channel_id'] if 'short_channel_id' in onion else '0x0x0' # Are we the destination? Then there's nothing to do. Continue. @@ -66,11 +149,8 @@ def on_htlc_accepted(htlc, onion, plugin, request, **kwargs): # TODO If we are the funder we need to take the cost of an HTLC into # account as well. #funder = chan['msatoshi_to_us_max'] == chan['msatoshi_total'] - forward_amt = Millisatoshi(onion['forward_amount']) - incoming_amt = Millisatoshi(htlc['amount']) - fee = incoming_amt - forward_amt - pprint(fee) + # Compute the amount we need to rebalance, give us a bit of breathing room # while we're at it (25% more rebalancing than strictly necessary) so we # don't end up with a completely unbalanced channel right away again, and @@ -84,69 +164,9 @@ def on_htlc_accepted(htlc, onion, plugin, request, **kwargs): request.set_result({"result": "continue"}) return - # Compute the last leg of the route first, so we know the parameters to - # traverse that last edge. - reverse_chan = plugin.rpc.listchannels(scid)['channels'] - assert(len(reverse_chan) == 2) - reverse_chan = [ - c for c in reverse_chan if c['channel_flags'] != chan['direction'] - ][0] - - if reverse_chan is None: - print("Could not compute parameters for the last hop") - request.set_result({"result": "continue"}) - return - - last_amt = ceil(float(amt) + - float(amt) * reverse_chan['fee_per_millionth'] / 10**6 + - reverse_chan['base_fee_millisatoshi']) - last_cltv = 9 + reverse_chan['delay'] - - # Exclude the channel we are trying to rebalance when searching for a - # path. We will manually append it to the route and bump the other - # parameters so it can be used afterwards - exclusions = [ - "{scid}/{direction}".format(scid=scid, direction=chan['direction']) - ] - - route = plugin.rpc.getroute( - node_id=peer['id'], - msatoshi=last_amt, - riskfactor=1, - exclude=exclusions, - cltv=last_cltv, - )['route'] - - # Append the last hop we computed manually above - route += [{ - 'id': plugin.node_id, - 'channel': scid, - 'direction': chan['direction'], - 'msatoshi': amt, - 'amount_msat': '{}msat'.format(amt), - 'delay': 9 - }] - - # We're about to initiate a rebalancing, we'd better remember how we can - # settle it once we see it back here. - - payment_key = secrets.token_bytes(32) - payment_hash = hashlib.sha256(payment_key).hexdigest() - plugin.rebalances[payment_hash] = { - "payment_key": binascii.hexlify(payment_key).decode('ASCII'), - "payment_hash": payment_hash, - "request": request, - } - - # After all this work we're finally in a position to judge whether a - # rebalancing is worth it at all. The rebalancing is considered worth it - # if the fees we're about to pay are less than or equal to the fees we get - # out of forwarding the payment. - - plugin.log("Sending rebalance request using payment_hash={}".format( - payment_hash - )) - plugin.rpc.sendpay(route, payment_hash) + t = threading.Thread(target=try_rebalance, args=(scid, chan, amt, peer, request)) + t.daemon = True + t.start() @plugin.init() diff --git a/jitrebalance/test_jitrebalance.py b/jitrebalance/test_jitrebalance.py index da4d43e..69a3d8c 100644 --- a/jitrebalance/test_jitrebalance.py +++ b/jitrebalance/test_jitrebalance.py @@ -1,6 +1,5 @@ from pyln.testing.fixtures import * # noqa: F401, F403 from pyln.testing.utils import wait_for -from pprint import pprint import os import time import unittest @@ -38,7 +37,7 @@ def test_simple_rebalance(node_factory): # Drain (l2, l3) so that a larger payment fails later on chan = l2.rpc.listpeers(l3.info['id'])['peers'][0]['channels'][0] - pprint(chan) + # Send 9 million millisatoshis + reserve + a tiny fee allowance from l3 to # l2 for the actual payment inv = l2.rpc.invoice( @@ -55,7 +54,6 @@ def test_simple_rebalance(node_factory): wait_for(no_pending_htlcs) chan = l2.rpc.listpeers(l3.info['id'])['peers'][0]['channels'][0] - pprint(chan) assert(chan['spendable_msatoshi'] < amt) # Get (l2, l5) so we can exclude it when routing from l1 to l4 @@ -77,4 +75,4 @@ def test_simple_rebalance(node_factory): # This will fail without the plugin doing a rebalancing. l1.rpc.sendpay(route, inv['payment_hash']) - pprint(l1.rpc.waitsendpay(inv['payment_hash'])) + l1.rpc.waitsendpay(inv['payment_hash'])