drain: refactor and cleanup

This commit is contained in:
Michael Schmoock
2019-12-20 10:14:12 +01:00
committed by Christian Decker
parent 6f0380bdfb
commit d66ea9f8e5
3 changed files with 83 additions and 41 deletions

View File

@@ -12,7 +12,7 @@ circular payments to yourself. This can be useful for:
## Installation ## Installation
This plugin relies on the `pylightning` library. As with most plugins you should This plugin relies on the `pyln-client` library. As with most plugins you should
be able to install dependencies with `pip`: be able to install dependencies with `pip`:
```bash ```bash
@@ -20,13 +20,13 @@ pip3 install -r requirements.txt
``` ```
You might need to also specify the `--user` command line flag depending on You might need to also specify the `--user` command line flag depending on
your environment. If you dont want this and your plugin only uses `pylightning` your environment. If you dont want this and your plugin only uses `pyln-client`
as the only dependency, you can also start `lightningd` with the `PYTHONPATH` as the only dependency, you can also start `lightningd` with the `PYTHONPATH`
environment variable to the `pylightning` package of your `lightningd` environment variable to the `pyln-client` package of your `lightningd`
installation. For example: installation, for example:
``` ```
PYTHONPATH=/path/to/lightning.git/contrib/pylightning lightningd --plugin=... PYTHONPATH=/home/user/lightning.git/contrib/pyln-client lightningd --plugin=...
``` ```
## Startup ## Startup
@@ -114,8 +114,7 @@ lightning-cli setbalance scid [percentage] [chunks] [maxfeepercent] [retry_for]
## TODOs ## TODOs
- fix: use hook instead of waitsendpay to prevent race conditions
- fix: occasionally strange route errors. maybe try increasing chunks on route errors. - fix: occasionally strange route errors. maybe try increasing chunks on route errors.
- fix: sometimes, if we ran in error, not all chunk results are returned, i.e. [2/4, error] but not 1/4.
This maybe relate to the waitsendpay timed out race condition. Can be solved by a new plugin hook.
- feat: set HTLC_FEE MIN/MAX/STP by feerate - feat: set HTLC_FEE MIN/MAX/STP by feerate
- chore: reconsider use of listchannels - chore: reconsider use of listchannels

View File

@@ -1,5 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from pyln.client import Plugin, Millisatoshi, RpcError from pyln.client import Plugin, Millisatoshi, RpcError
from utils import *
import re import re
import time import time
import uuid import uuid
@@ -46,7 +47,7 @@ def setup_routing_fees(plugin, payload, route, amount, substractfees: bool=False
# BOLT #7 requires fee >= fee_base_msat + ( amount_to_forward * fee_proportional_millionths / 1000000 ) # BOLT #7 requires fee >= fee_base_msat + ( amount_to_forward * fee_proportional_millionths / 1000000 )
fee += (amount_iter * ch['fee_per_millionth'] + 10**6 - 1) // 10**6 # integer math trick to round up fee += (amount_iter * ch['fee_per_millionth'] + 10**6 - 1) // 10**6 # integer math trick to round up
if fee > amount_iter: if fee > amount_iter:
raise RpcError(payload['command'], payload, {'message': 'cannot cover fees to %s %s' % (payload['command'], amount)}) raise RpcError(payload['command'], payload, {'message': 'Cannot cover fees to %s %s' % (payload['command'], amount)})
amount_iter -= fee amount_iter -= fee
first = False first = False
r['msatoshi'] = amount_iter.millisatoshis r['msatoshi'] = amount_iter.millisatoshis
@@ -54,7 +55,9 @@ def setup_routing_fees(plugin, payload, route, amount, substractfees: bool=False
# This raises an error when a channel is not normal or peer is not connected # This raises an error when a channel is not normal or peer is not connected
def get_channel(plugin, payload, peer_id, scid): def get_channel(plugin, payload, peer_id, scid=None):
if scid is None:
scid = payload['scid']
peer = plugin.rpc.listpeers(peer_id).get('peers')[0] peer = plugin.rpc.listpeers(peer_id).get('peers')[0]
channel = next(c for c in peer['channels'] if 'short_channel_id' in c and c['short_channel_id'] == scid) channel = next(c for c in peer['channels'] if 'short_channel_id' in c and c['short_channel_id'] == scid)
if channel['state'] != "CHANNELD_NORMAL": if channel['state'] != "CHANNELD_NORMAL":
@@ -68,16 +71,7 @@ def spendable_from_scid(plugin, payload, scid=None):
if scid is None: if scid is None:
scid = payload['scid'] scid = payload['scid']
# only fetch funds once to reduce RPC load peer_id = peer_from_scid(plugin, payload, scid)
if not "funds" in payload:
payload['funds'] = plugin.rpc.listfunds().get('channels')
try:
channel_funds = next(c for c in payload['funds'] if 'short_channel_id' in c and c['short_channel_id'] == scid)
except StopIteration:
return Millisatoshi(0), Millisatoshi(0)
peer_id = channel_funds['peer_id']
funds_our = Millisatoshi(channel_funds['our_amount_msat'])
try: try:
channel_peer = get_channel(plugin, payload, peer_id, scid) channel_peer = get_channel(plugin, payload, peer_id, scid)
except RpcError: except RpcError:
@@ -104,12 +98,15 @@ def spendable_from_scid(plugin, payload, scid=None):
return spendable, receivable return spendable, receivable
def peer_from_scid(plugin, payload, short_channel_id, my_id): def peer_from_scid(plugin, payload, scid=None):
channels = plugin.rpc.listchannels(short_channel_id).get('channels') if scid is None:
scid = payload['scid']
channels = plugin.rpc.listchannels(scid).get('channels')
try: try:
return next(c for c in channels if c['source'] == my_id)['destination'] return next(c for c in channels if c['source'] == payload['my_id'])['destination']
except StopIteration: except StopIteration:
raise RpcError(payload['command'], payload, {'message': 'Cannot find peer for channel: ' + short_channel_id}) raise RpcError(payload['command'], payload, {'message': 'Cannot find peer for channel: ' + scid})
def find_worst_channel(route): def find_worst_channel(route):
@@ -126,7 +123,7 @@ def find_worst_channel(route):
return worst return worst
def test_or_set_chunks(plugin, payload, my_id): def test_or_set_chunks(plugin, payload):
scid = payload['scid'] scid = payload['scid']
cmd = payload['command'] cmd = payload['command']
spendable, receivable = spendable_from_scid(plugin, payload) spendable, receivable = spendable_from_scid(plugin, payload)
@@ -143,7 +140,7 @@ def test_or_set_chunks(plugin, payload, my_id):
# get all spendable/receivables for our channels # get all spendable/receivables for our channels
channels = {} channels = {}
for channel in plugin.rpc.listchannels(source=my_id).get('channels'): for channel in plugin.rpc.listchannels(source = payload['my_id']).get('channels'):
if channel['short_channel_id'] == scid: if channel['short_channel_id'] == scid:
continue continue
spend, recv = spendable_from_scid(plugin, payload, channel['short_channel_id']) spend, recv = spendable_from_scid(plugin, payload, channel['short_channel_id'])
@@ -153,8 +150,8 @@ def test_or_set_chunks(plugin, payload, my_id):
} }
# test if selected chunks fit into other channel capacities # test if selected chunks fit into other channel capacities
if payload['chunks'] >= 1: chunks = payload['chunks']
chunks = payload['chunks'] if chunks > 0:
chunksize = amount / chunks chunksize = amount / chunks
fit = 0 fit = 0
for i in channels: for i in channels:
@@ -208,15 +205,15 @@ def cleanup(plugin, payload, error=None):
if successful_chunks == payload['chunks']: if successful_chunks == payload['chunks']:
return payload['success_msg'] return payload['success_msg']
if successful_chunks > 0: if successful_chunks > 0:
payload['success_msg'] += ['Partially completed %d/%d chunks. Error: %s' % (successful_chunks, payload['chunks'], str(error))] error = RpcError(payload['command'], payload, {'message': 'Partially completed %d/%d chunks. Error: %s' % (successful_chunks, payload['chunks'], str(error))})
return payload['success_msg']
if error is None: if error is None:
error = RpcError(payload['command'], payload, {'message': 'Command failed, no chunk succeeded.'}) error = RpcError(payload['command'], payload, {'message': 'Command failed, no chunk succeeded.'})
raise error raise error
def try_for_htlc_fee(plugin, payload, my_id, peer_id, amount, chunk, spendable_before): def try_for_htlc_fee(plugin, payload, peer_id, amount, chunk, spendable_before):
start_ts = int(time.time()) start_ts = int(time.time())
my_id = payload['my_id']
label = payload['command'] + "-" + str(uuid.uuid4()) label = payload['command'] + "-" + str(uuid.uuid4())
payload['labels'] += [label] payload['labels'] += [label]
description = "%s %s %s%s [%d/%d]" % (payload['command'], payload['scid'], payload['percentage'], '%', chunk+1, payload['chunks']) description = "%s %s %s%s [%d/%d]" % (payload['command'], payload['scid'], payload['percentage'], '%', chunk+1, payload['chunks'])
@@ -268,16 +265,14 @@ def try_for_htlc_fee(plugin, payload, my_id, peer_id, amount, chunk, spendable_b
plugin.log(" - %s %14s %s" % (r['id'], r['channel'], r['amount_msat'])) plugin.log(" - %s %14s %s" % (r['id'], r['channel'], r['amount_msat']))
try: try:
ours = get_ours(plugin, payload['scid'])
plugin.rpc.sendpay(route, payment_hash, label) plugin.rpc.sendpay(route, payment_hash, label)
result = plugin.rpc.waitsendpay(payment_hash, payload['retry_for'] + start_ts - int(time.time())) result = plugin.rpc.waitsendpay(payment_hash, payload['retry_for'] + start_ts - int(time.time()))
if result.get('status') == 'complete': if result.get('status') == 'complete':
payload['success_msg'] += ["%dmsat sent over %d hops to %s %dmsat [%d/%d]" % (amount + fees, len(route), payload['command'], amount, chunk+1, payload['chunks'])] payload['success_msg'] += ["%dmsat sent over %d hops to %s %dmsat [%d/%d]" % (amount + fees, len(route), payload['command'], amount, chunk+1, payload['chunks'])]
# we need to wait for gossipd to update to new state, # we need to wait for HTLC to resolve, so remaining amounts
# so remaining amounts will be calculated correctly for the next chunk # can be calculated correctly for the next chunk
spendable, _ = spendable_from_scid(plugin, payload) wait_ours(plugin, payload['scid'], ours)
while spendable == spendable_before:
time.sleep(0.5)
spendable, _ = spendable_from_scid(plugin, payload)
return True return True
return False return False
@@ -327,6 +322,9 @@ def read_params(command: str, scid: str, percentage: float,
"success_msg" : [], "success_msg" : [],
} }
# cache some often required data
payload['my_id'] = plugin.rpc.getinfo().get('id')
# translate a 'setbalance' into respective drain or fill # translate a 'setbalance' into respective drain or fill
if command == 'setbalance': if command == 'setbalance':
spendable, receivable = spendable_from_scid(plugin, payload) spendable, receivable = spendable_from_scid(plugin, payload)
@@ -348,10 +346,9 @@ def read_params(command: str, scid: str, percentage: float,
def execute(payload: dict): def execute(payload: dict):
my_id = plugin.rpc.getinfo().get('id') peer_id = peer_from_scid(plugin, payload)
peer_id = peer_from_scid(plugin, payload, payload['scid'], my_id) get_channel(plugin, payload, peer_id) # ensures or raises error
get_channel(plugin, payload, peer_id, payload['scid']) # ensures or raises error test_or_set_chunks(plugin, payload)
test_or_set_chunks(plugin, payload, my_id)
plugin.log("%s %s %d%% %d chunks" % (payload['command'], payload['scid'], payload['percentage'], payload['chunks'])) plugin.log("%s %s %d%% %d chunks" % (payload['command'], payload['scid'], payload['percentage'], payload['chunks']))
# iterate of chunks, default just one # iterate of chunks, default just one
@@ -386,7 +383,7 @@ def execute(payload: dict):
plugin.log("Trying... chunk:%s/%s spendable:%s receivable:%s htlc_fee:%s => amount:%s" % (chunk+1, payload['chunks'], spendable, receivable, htlc_fee, amount)) plugin.log("Trying... chunk:%s/%s spendable:%s receivable:%s htlc_fee:%s => amount:%s" % (chunk+1, payload['chunks'], spendable, receivable, htlc_fee, amount))
try: try:
result = try_for_htlc_fee(plugin, payload, my_id, peer_id, amount, chunk, spendable) result = try_for_htlc_fee(plugin, payload, peer_id, amount, chunk, spendable)
except Exception as err: except Exception as err:
if "htlc_fee unknown" in str(err): if "htlc_fee unknown" in str(err):
if htlc_fee == HTLC_FEE_NUL: if htlc_fee == HTLC_FEE_NUL:
@@ -436,6 +433,7 @@ def fill(plugin, scid: str, percentage: float=100, chunks: int=0, maxfeepercent:
payload = read_params('fill', scid, percentage, chunks, maxfeepercent, retry_for, exemptfee) payload = read_params('fill', scid, percentage, chunks, maxfeepercent, retry_for, exemptfee)
return execute(payload) return execute(payload)
@plugin.method("setbalance") @plugin.method("setbalance")
def setbalance(plugin, scid: str, percentage: float=50, chunks: int=0, maxfeepercent: float=0.5, def setbalance(plugin, scid: str, percentage: float=50, chunks: int=0, maxfeepercent: float=0.5,
retry_for: int=60, exemptfee: Millisatoshi=Millisatoshi(5000)): retry_for: int=60, exemptfee: Millisatoshi=Millisatoshi(5000)):
@@ -448,6 +446,7 @@ def setbalance(plugin, scid: str, percentage: float=50, chunks: int=0, maxfeeper
payload = read_params('setbalance', scid, percentage, chunks, maxfeepercent, retry_for, exemptfee) payload = read_params('setbalance', scid, percentage, chunks, maxfeepercent, retry_for, exemptfee)
return execute(payload) return execute(payload)
@plugin.init() @plugin.init()
def init(options, configuration, plugin): def init(options, configuration, plugin):
plugin.options['cltv-final']['value'] = plugin.rpc.listconfigs().get('cltv-final') plugin.options['cltv-final']['value'] = plugin.rpc.listconfigs().get('cltv-final')

44
drain/utils.py Normal file
View File

@@ -0,0 +1,44 @@
import time
TIMEOUT=60
def wait_for(success, timeout=TIMEOUT):
start_time = time.time()
interval = 0.25
while not success() and time.time() < start_time + timeout:
time.sleep(interval)
interval *= 2
if interval > 5:
interval = 5
if time.time() > start_time + timeout:
raise ValueError("Timeout waiting for {}", success)
# waits for a bunch of nodes HTLCs to settle
def wait_for_all_htlcs(nodes):
for n in nodes:
n.wait_for_htlcs()
# returns our_amount_msat for a given node and scid
def get_ours(node, scid):
return [c for c in node.rpc.listfunds()['channels'] if c['short_channel_id'] == scid][0]['our_amount_msat']
# these wait for the HTLC commit settlement
def wait_ours(node, scid, ours_before):
wait_for(lambda: ours_before != get_ours(node, scid))
return get_ours(node, scid)
def wait_ours_above(node, scid, value):
wait_for(lambda: get_ours(node, scid) > value)
return get_ours(node, scid)
def wait_ours_below(node, scid, value):
wait_for(lambda: get_ours(node, scid) < value)
return get_ours(node, scid)