From 381707eaaf380b625e6d2d19d4ea2d3843a53f32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1lli=20Zolt=C3=A1n?= Date: Tue, 13 Dec 2022 13:49:29 +0100 Subject: [PATCH] rebalance: multithreading `rebalanceall` can run on multiple threads for faster results --- rebalance/rebalance.py | 129 +++++++++++++++++++++++++++++++++-------- 1 file changed, 104 insertions(+), 25 deletions(-) diff --git a/rebalance/rebalance.py b/rebalance/rebalance.py index ab8c263..ff54f75 100755 --- a/rebalance/rebalance.py +++ b/rebalance/rebalance.py @@ -3,12 +3,24 @@ from clnutils import cln_parse_rpcversion from datetime import timedelta from functools import reduce from pyln.client import Plugin, Millisatoshi, RpcError -from threading import Thread, Lock +import threading import time import uuid +import concurrent.futures plugin = Plugin() -plugin.rebalance_stop = False +plugin.rebalance_stop_by_user = False +plugin.rebalance_stop_by_thread = False +plugin.threadids = {} + + +def rebalance_stopping(): + return plugin.rebalance_stop_by_user or plugin.rebalance_stop_by_thread + + +def get_thread_id_str(): + max_digits = len(str(plugin.threads - 1)) + return f"{plugin.threadids.get(threading.get_ident(), 0):{'0' + str(max_digits)}}" # The route msat helpers are needed because older versions of cln @@ -65,6 +77,7 @@ def peer_from_scid(short_channel_id, my_node_id, payload): return ch['destination'] raise RpcError("rebalance", payload, {'message': 'Cannot find peer for channel: ' + short_channel_id}) + def get_node_alias(node_id): node = plugin.rpc.listnodes(node_id)['nodes'] s = "" @@ -74,6 +87,7 @@ def get_node_alias(node_id): s += node_id[0:7] return s + def find_worst_channel(route): if len(route) < 4: return None @@ -217,7 +231,7 @@ def waitsendpay(payment_hash, start_ts, retry_for): return result except RpcError as e: if e.method == "waitsendpay" and e.error.get('code') == 200: - if plugin.rebalance_stop: + if rebalance_stopping(): raise e if int(time.time()) - start_ts >= retry_for: raise e @@ -299,7 +313,7 @@ def rebalance(plugin, outgoing_scid, incoming_scid, msatoshi: Millisatoshi = Non time_sendpay = 0 try: - while int(time.time()) - start_ts < retry_for and not plugin.rebalance_stop: + while int(time.time()) - start_ts < retry_for and not rebalance_stopping(): count += 1 try: time_start = time.time() @@ -333,7 +347,6 @@ def rebalance(plugin, outgoing_scid, incoming_scid, msatoshi: Millisatoshi = Non excludes.append(worst_channel['channel'] + '/' + str(worst_channel['direction'])) continue - rpc_result = { "sent": msatoshi + fees, "received": msatoshi, @@ -346,7 +359,7 @@ def rebalance(plugin, outgoing_scid, incoming_scid, msatoshi: Millisatoshi = Non } midroute_str = reduce(lambda x,y: x + " -> " + y, map(lambda r: get_node_alias(r['id']), route_mid)) full_route_str = "%s -> %s -> %s -> %s" % (get_node_alias(my_node_id), get_node_alias(outgoing_node_id), midroute_str, get_node_alias(my_node_id)) - plugin.log("%d hops and %s fees for %s along route: %s" % (len(route), fees.to_satoshi_str(), msatoshi.to_satoshi_str(), full_route_str)) + plugin.log(f"Thread{get_thread_id_str()} {len(route)} hops and {fees.to_satoshi_str()} fees for {msatoshi.to_satoshi_str()} along route: {full_route_str}") for r in route: plugin.log(" - %s %14s %s" % (r['id'], r['channel'], route_get_msat(r)), 'debug') @@ -559,7 +572,7 @@ def wait_for_htlcs(failed_channels: list, scids: list = None): if 'htlcs' in channel: if not wait_for(lambda: len(plugin.rpc.listpeers()['peers'][p]['channels'][c]['htlcs']) == 0): failed_channels.append(channel.get('short_channel_id')) - plugin.log(f"Timeout while waiting for htlc settlement in channel {channel.get('short_channel_id')}") + plugin.log(f"Thread{get_thread_id_str()} timeout while waiting for htlc settlement in channel {channel.get('short_channel_id')}") result = False return result @@ -574,7 +587,7 @@ def maybe_rebalance_pairs(ch1, ch2, failed_channels: list): if not wait_for_htlcs(failed_channels, [scid1, scid2]): return result i = 0 - while not plugin.rebalance_stop: + while not rebalance_stopping(): liquidity1 = liquidity_info(ch1, plugin.enough_liquidity, plugin.ideal_ratio) liquidity2 = liquidity_info(ch2, plugin.enough_liquidity, plugin.ideal_ratio) amount1 = min(must_send(liquidity1), could_receive(liquidity2)) @@ -585,7 +598,7 @@ def maybe_rebalance_pairs(ch1, ch2, failed_channels: list): return result amount = min(amount, get_max_amount(i, plugin)) maxfee = get_max_fee(amount) - plugin.log(f"Try to rebalance: {scid1} -> {scid2}; amount={amount}; maxfee={maxfee}") + plugin.log(f"Thread{get_thread_id_str()} tries to rebalance: {scid1} -> {scid2}; amount={amount.to_satoshi_str()}; maxfee={maxfee.to_satoshi_str()}") start_ts = time.time() try: res = rebalance(plugin, outgoing_scid=scid1, incoming_scid=scid2, @@ -610,7 +623,7 @@ def maybe_rebalance_pairs(ch1, ch2, failed_channels: list): current_ts = time.time() res["elapsed_time"] = str(timedelta(seconds=current_ts - start_ts))[:-3] res["htlc_time"] = str(timedelta(seconds=current_ts - htlc_start_ts))[:-3] - plugin.log(f"Rebalance succeeded: {res}") + plugin.log(f"Thread{get_thread_id_str()} rebalance succeeded: {res}") if not htlc_success: return result ch1 = get_chan(scid1) @@ -620,16 +633,68 @@ def maybe_rebalance_pairs(ch1, ch2, failed_channels: list): return result +def rebalance_pair_picker(threadid, channel_pairs: list, failed_channels: list): + plugin.threadids[threading.get_ident()] = threadid + result = {"success": False, "fee_spent": Millisatoshi(0)} + idle_count = 0 + while not rebalance_stopping(): + idle_count += 1 + for pair in channel_pairs: + if rebalance_stopping(): + return result + ch1 = pair[0] + ch2 = pair[1] + processed = pair[2] + if processed: + continue + if ch1["lock"].acquire(blocking=False): + if ch2["lock"].acquire(blocking=False): + idle_count = 0 + pair[2] = True + result = maybe_rebalance_pairs(ch1, ch2, failed_channels) + ch2["lock"].release() + ch1["lock"].release() + if result["success"]: + plugin.log(f"Thread{get_thread_id_str()} restarts rebalance threads after successful rebalance") + plugin.rebalance_stop_by_thread = True + return result + unprocessed = [p for p in channel_pairs if not p[2]] + if len(unprocessed) == 0: + return result + if idle_count == 1: + plugin.log(f"Thread{get_thread_id_str()} is idle, {len(unprocessed)} possible channel pairs remained") + if idle_count > 0: + time.sleep(10) + return result + + def maybe_rebalance_once(failed_channels: list): channels = get_open_channels(plugin) + for ch in channels: + ch["lock"] = threading.Lock() + channel_pairs = [] for ch1 in channels: for ch2 in channels: if ch1 == ch2: continue - result = maybe_rebalance_pairs(ch1, ch2, failed_channels) - if result["success"] or plugin.rebalance_stop: - return result - return {"success": False, "fee_spent": Millisatoshi(0)} + channel_pairs.append([ch1, ch2, False]) + + plugin.log(f"Start to rebalance {len(channel_pairs)} possible channel pairs") + executor = concurrent.futures.ThreadPoolExecutor(max_workers=plugin.threads) + futures = set() + for threadid in range(plugin.threads): + futures.add(executor.submit(rebalance_pair_picker, threadid, channel_pairs, failed_channels)) + result = {"success": False, "fee_spent": Millisatoshi(0)} + for future in concurrent.futures.as_completed(futures): + r2 = future.result() + if r2["success"]: + if result["success"]: + result["fee_spent"] += r2["fee_spent"] + result["success_count"] = result.get("success_count", 1) + 1 + else: + result = r2 + plugin.rebalance_stop_by_thread = False + return result def feeadjuster_toggle(new_value: bool): @@ -650,23 +715,24 @@ def rebalanceall_thread(): channels = get_open_channels(plugin) plugin.enough_liquidity = get_enough_liquidity_threshold(channels) plugin.ideal_ratio = get_ideal_ratio(channels, plugin.enough_liquidity) - plugin.log(f"Automatic rebalance is running with enough liquidity threshold: {plugin.enough_liquidity}, " + plugin.log(f"Automatic rebalance is running with enough liquidity threshold: {plugin.enough_liquidity.to_satoshi_str()}, " f"ideal liquidity ratio: {plugin.ideal_ratio * 100:.2f}%, " - f"min rebalancable amount: {plugin.min_amount}, " + f"min rebalancable amount: {plugin.min_amount.to_satoshi_str()}, " f"feeratio: {plugin.feeratio}") failed_channels = [] success = 0 fee_spent = Millisatoshi(0) - while not plugin.rebalance_stop: + while not rebalance_stopping(): result = maybe_rebalance_once(failed_channels) if not result["success"]: break - success += 1 + success += result.get("success_count", 1) fee_spent += result["fee_spent"] feeadjust_would_be_nice() feeadjuster_toggle(feeadjuster_state) elapsed_time = timedelta(seconds=time.time() - start_ts) - plugin.rebalanceall_msg = f"Automatic rebalance finished: {success} successful rebalance, {fee_spent} fee spent, it took {str(elapsed_time)[:-3]}" + plugin.rebalanceall_msg = (f"Automatic rebalance finished: {success} successful rebalance, " + f"{fee_spent.to_satoshi_str()} fee spent, it took {str(elapsed_time)[:-3]}") plugin.log(plugin.rebalanceall_msg) finally: plugin.mutex.release() @@ -696,7 +762,7 @@ def rebalanceall(plugin: Plugin, min_amount: Millisatoshi = Millisatoshi("50000s plugin.min_amount = min_amount # run the job - t = Thread(target=rebalanceall_thread, args=()) + t = threading.Thread(target=rebalanceall_thread, args=()) t.start() return {"message": f"Rebalance started with min rebalancable amount: {plugin.min_amount}, feeratio: {plugin.feeratio}"} @@ -710,10 +776,13 @@ def rebalancestop(plugin: Plugin): return {"message": "No rebalance is running, nothing to stop."} return {"message": f"No rebalance is running, nothing to stop. " f"Last 'rebalanceall' gave: {plugin.rebalanceall_msg}"} - plugin.rebalance_stop = True + start_ts = time.time() + plugin.rebalance_stop_by_user = True plugin.mutex.acquire(blocking=True) - plugin.rebalance_stop = False + plugin.rebalance_stop_by_user = False plugin.mutex.release() + elapsed_time = timedelta(seconds=time.time() - start_ts) + plugin.log(f"Automatic rebalance stopped in {str(elapsed_time)[:-3]}") return {"message": plugin.rebalanceall_msg} @@ -828,10 +897,11 @@ def init(options, configuration, plugin): plugin.cltv_final = config.get("cltv-final") plugin.fee_base = Millisatoshi(config.get("fee-base")) plugin.fee_ppm = config.get("fee-per-satoshi") - plugin.mutex = Lock() + plugin.mutex = threading.Lock() plugin.maxhops = int(options.get("rebalance-maxhops")) plugin.msatfactor = float(options.get("rebalance-msatfactor")) plugin.erringnodes = int(options.get("rebalance-erringnodes")) + plugin.threads = int(options.get("rebalance-threads")) plugin.getroute = getroute_switch(options.get("rebalance-getroute")) plugin.rebalanceall_msg = None @@ -840,12 +910,13 @@ def init(options, configuration, plugin): if plugin.rpcversion[0] == 0 and plugin.rpcversion[1] < 12: plugin.msatfield = 'msatoshi' - plugin.log(f"Plugin rebalance initialized with {plugin.fee_base} base / {plugin.fee_ppm} ppm fee " + plugin.log(f"Plugin rebalance initialized with {plugin.fee_base.to_satoshi_str()} base / {plugin.fee_ppm} ppm fee " f"cltv_final:{plugin.cltv_final} " f"maxhops:{plugin.maxhops} " f"msatfactor:{plugin.msatfactor} " f"erringnodes:{plugin.erringnodes} " - f"getroute:{plugin.getroute.__name__} ") + f"getroute:{plugin.getroute.__name__} " + f"threads:{plugin.threads} ") plugin.add_option( @@ -881,4 +952,12 @@ plugin.add_option( "string" ) +plugin.add_option( + "rebalance-threads", + "8", + "Number of threads used parallelly by `rebalanceall` " + "Higher numbers increase speed and CPU consumption", + "string" +) + plugin.run()