diff --git a/drain/README.md b/drain/README.md index 7e30e89..db3abf1 100644 --- a/drain/README.md +++ b/drain/README.md @@ -12,7 +12,7 @@ circular payments to yourself. This can be useful for: ## Installation -This plugin relies on the `pylightning` library. As with most plugins you should +This plugin relies on the `pyln-client` library. As with most plugins you should be able to install dependencies with `pip`: ```bash @@ -20,13 +20,13 @@ pip3 install -r requirements.txt ``` You might need to also specify the `--user` command line flag depending on -your environment. If you dont want this and your plugin only uses `pylightning` +your environment. If you dont want this and your plugin only uses `pyln-client` as the only dependency, you can also start `lightningd` with the `PYTHONPATH` -environment variable to the `pylightning` package of your `lightningd` -installation. For example: +environment variable to the `pyln-client` package of your `lightningd` +installation, for example: ``` -PYTHONPATH=/path/to/lightning.git/contrib/pylightning lightningd --plugin=... +PYTHONPATH=/home/user/lightning.git/contrib/pyln-client lightningd --plugin=... ``` ## Startup @@ -114,8 +114,7 @@ lightning-cli setbalance scid [percentage] [chunks] [maxfeepercent] [retry_for] ## TODOs + - fix: use hook instead of waitsendpay to prevent race conditions - fix: occasionally strange route errors. maybe try increasing chunks on route errors. - - fix: sometimes, if we ran in error, not all chunk results are returned, i.e. [2/4, error] but not 1/4. - This maybe relate to the waitsendpay timed out race condition. Can be solved by a new plugin hook. - feat: set HTLC_FEE MIN/MAX/STP by feerate - chore: reconsider use of listchannels diff --git a/drain/drain.py b/drain/drain.py index ecffc71..f33e440 100755 --- a/drain/drain.py +++ b/drain/drain.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 from pyln.client import Plugin, Millisatoshi, RpcError +from utils import * import re import time import uuid @@ -46,7 +47,7 @@ def setup_routing_fees(plugin, payload, route, amount, substractfees: bool=False # BOLT #7 requires fee >= fee_base_msat + ( amount_to_forward * fee_proportional_millionths / 1000000 ) fee += (amount_iter * ch['fee_per_millionth'] + 10**6 - 1) // 10**6 # integer math trick to round up if fee > amount_iter: - raise RpcError(payload['command'], payload, {'message': 'cannot cover fees to %s %s' % (payload['command'], amount)}) + raise RpcError(payload['command'], payload, {'message': 'Cannot cover fees to %s %s' % (payload['command'], amount)}) amount_iter -= fee first = False r['msatoshi'] = amount_iter.millisatoshis @@ -54,7 +55,9 @@ def setup_routing_fees(plugin, payload, route, amount, substractfees: bool=False # This raises an error when a channel is not normal or peer is not connected -def get_channel(plugin, payload, peer_id, scid): +def get_channel(plugin, payload, peer_id, scid=None): + if scid is None: + scid = payload['scid'] peer = plugin.rpc.listpeers(peer_id).get('peers')[0] channel = next(c for c in peer['channels'] if 'short_channel_id' in c and c['short_channel_id'] == scid) if channel['state'] != "CHANNELD_NORMAL": @@ -68,16 +71,7 @@ def spendable_from_scid(plugin, payload, scid=None): if scid is None: scid = payload['scid'] - # only fetch funds once to reduce RPC load - if not "funds" in payload: - payload['funds'] = plugin.rpc.listfunds().get('channels') - - try: - channel_funds = next(c for c in payload['funds'] if 'short_channel_id' in c and c['short_channel_id'] == scid) - except StopIteration: - return Millisatoshi(0), Millisatoshi(0) - peer_id = channel_funds['peer_id'] - funds_our = Millisatoshi(channel_funds['our_amount_msat']) + peer_id = peer_from_scid(plugin, payload, scid) try: channel_peer = get_channel(plugin, payload, peer_id, scid) except RpcError: @@ -104,12 +98,15 @@ def spendable_from_scid(plugin, payload, scid=None): return spendable, receivable -def peer_from_scid(plugin, payload, short_channel_id, my_id): - channels = plugin.rpc.listchannels(short_channel_id).get('channels') +def peer_from_scid(plugin, payload, scid=None): + if scid is None: + scid = payload['scid'] + + channels = plugin.rpc.listchannels(scid).get('channels') try: - return next(c for c in channels if c['source'] == my_id)['destination'] + return next(c for c in channels if c['source'] == payload['my_id'])['destination'] except StopIteration: - raise RpcError(payload['command'], payload, {'message': 'Cannot find peer for channel: ' + short_channel_id}) + raise RpcError(payload['command'], payload, {'message': 'Cannot find peer for channel: ' + scid}) def find_worst_channel(route): @@ -126,7 +123,7 @@ def find_worst_channel(route): return worst -def test_or_set_chunks(plugin, payload, my_id): +def test_or_set_chunks(plugin, payload): scid = payload['scid'] cmd = payload['command'] spendable, receivable = spendable_from_scid(plugin, payload) @@ -143,7 +140,7 @@ def test_or_set_chunks(plugin, payload, my_id): # get all spendable/receivables for our channels channels = {} - for channel in plugin.rpc.listchannels(source=my_id).get('channels'): + for channel in plugin.rpc.listchannels(source = payload['my_id']).get('channels'): if channel['short_channel_id'] == scid: continue spend, recv = spendable_from_scid(plugin, payload, channel['short_channel_id']) @@ -153,8 +150,8 @@ def test_or_set_chunks(plugin, payload, my_id): } # test if selected chunks fit into other channel capacities - if payload['chunks'] >= 1: - chunks = payload['chunks'] + chunks = payload['chunks'] + if chunks > 0: chunksize = amount / chunks fit = 0 for i in channels: @@ -208,15 +205,15 @@ def cleanup(plugin, payload, error=None): if successful_chunks == payload['chunks']: return payload['success_msg'] if successful_chunks > 0: - payload['success_msg'] += ['Partially completed %d/%d chunks. Error: %s' % (successful_chunks, payload['chunks'], str(error))] - return payload['success_msg'] + error = RpcError(payload['command'], payload, {'message': 'Partially completed %d/%d chunks. Error: %s' % (successful_chunks, payload['chunks'], str(error))}) if error is None: error = RpcError(payload['command'], payload, {'message': 'Command failed, no chunk succeeded.'}) raise error -def try_for_htlc_fee(plugin, payload, my_id, peer_id, amount, chunk, spendable_before): +def try_for_htlc_fee(plugin, payload, peer_id, amount, chunk, spendable_before): start_ts = int(time.time()) + my_id = payload['my_id'] label = payload['command'] + "-" + str(uuid.uuid4()) payload['labels'] += [label] description = "%s %s %s%s [%d/%d]" % (payload['command'], payload['scid'], payload['percentage'], '%', chunk+1, payload['chunks']) @@ -268,16 +265,14 @@ def try_for_htlc_fee(plugin, payload, my_id, peer_id, amount, chunk, spendable_b plugin.log(" - %s %14s %s" % (r['id'], r['channel'], r['amount_msat'])) try: + ours = get_ours(plugin, payload['scid']) plugin.rpc.sendpay(route, payment_hash, label) result = plugin.rpc.waitsendpay(payment_hash, payload['retry_for'] + start_ts - int(time.time())) if result.get('status') == 'complete': payload['success_msg'] += ["%dmsat sent over %d hops to %s %dmsat [%d/%d]" % (amount + fees, len(route), payload['command'], amount, chunk+1, payload['chunks'])] - # we need to wait for gossipd to update to new state, - # so remaining amounts will be calculated correctly for the next chunk - spendable, _ = spendable_from_scid(plugin, payload) - while spendable == spendable_before: - time.sleep(0.5) - spendable, _ = spendable_from_scid(plugin, payload) + # we need to wait for HTLC to resolve, so remaining amounts + # can be calculated correctly for the next chunk + wait_ours(plugin, payload['scid'], ours) return True return False @@ -327,6 +322,9 @@ def read_params(command: str, scid: str, percentage: float, "success_msg" : [], } + # cache some often required data + payload['my_id'] = plugin.rpc.getinfo().get('id') + # translate a 'setbalance' into respective drain or fill if command == 'setbalance': spendable, receivable = spendable_from_scid(plugin, payload) @@ -348,10 +346,9 @@ def read_params(command: str, scid: str, percentage: float, def execute(payload: dict): - my_id = plugin.rpc.getinfo().get('id') - peer_id = peer_from_scid(plugin, payload, payload['scid'], my_id) - get_channel(plugin, payload, peer_id, payload['scid']) # ensures or raises error - test_or_set_chunks(plugin, payload, my_id) + peer_id = peer_from_scid(plugin, payload) + get_channel(plugin, payload, peer_id) # ensures or raises error + test_or_set_chunks(plugin, payload) plugin.log("%s %s %d%% %d chunks" % (payload['command'], payload['scid'], payload['percentage'], payload['chunks'])) # iterate of chunks, default just one @@ -386,7 +383,7 @@ def execute(payload: dict): plugin.log("Trying... chunk:%s/%s spendable:%s receivable:%s htlc_fee:%s => amount:%s" % (chunk+1, payload['chunks'], spendable, receivable, htlc_fee, amount)) try: - result = try_for_htlc_fee(plugin, payload, my_id, peer_id, amount, chunk, spendable) + result = try_for_htlc_fee(plugin, payload, peer_id, amount, chunk, spendable) except Exception as err: if "htlc_fee unknown" in str(err): if htlc_fee == HTLC_FEE_NUL: @@ -436,6 +433,7 @@ def fill(plugin, scid: str, percentage: float=100, chunks: int=0, maxfeepercent: payload = read_params('fill', scid, percentage, chunks, maxfeepercent, retry_for, exemptfee) return execute(payload) + @plugin.method("setbalance") def setbalance(plugin, scid: str, percentage: float=50, chunks: int=0, maxfeepercent: float=0.5, retry_for: int=60, exemptfee: Millisatoshi=Millisatoshi(5000)): @@ -448,6 +446,7 @@ def setbalance(plugin, scid: str, percentage: float=50, chunks: int=0, maxfeeper payload = read_params('setbalance', scid, percentage, chunks, maxfeepercent, retry_for, exemptfee) return execute(payload) + @plugin.init() def init(options, configuration, plugin): plugin.options['cltv-final']['value'] = plugin.rpc.listconfigs().get('cltv-final') diff --git a/drain/utils.py b/drain/utils.py new file mode 100644 index 0000000..27e217c --- /dev/null +++ b/drain/utils.py @@ -0,0 +1,44 @@ +import time + +TIMEOUT=60 + + +def wait_for(success, timeout=TIMEOUT): + start_time = time.time() + interval = 0.25 + while not success() and time.time() < start_time + timeout: + time.sleep(interval) + interval *= 2 + if interval > 5: + interval = 5 + if time.time() > start_time + timeout: + raise ValueError("Timeout waiting for {}", success) + + +# waits for a bunch of nodes HTLCs to settle +def wait_for_all_htlcs(nodes): + for n in nodes: + n.wait_for_htlcs() + + +# returns our_amount_msat for a given node and scid +def get_ours(node, scid): + return [c for c in node.rpc.listfunds()['channels'] if c['short_channel_id'] == scid][0]['our_amount_msat'] + + +# these wait for the HTLC commit settlement +def wait_ours(node, scid, ours_before): + wait_for(lambda: ours_before != get_ours(node, scid)) + return get_ours(node, scid) + + +def wait_ours_above(node, scid, value): + wait_for(lambda: get_ours(node, scid) > value) + return get_ours(node, scid) + + +def wait_ours_below(node, scid, value): + wait_for(lambda: get_ours(node, scid) < value) + return get_ours(node, scid) + +