rebalance: multithreading

`rebalanceall` can run on multiple threads for faster results
This commit is contained in:
Gálli Zoltán
2022-12-13 13:49:29 +01:00
committed by Michael Schmoock
parent a778da4e10
commit 381707eaaf

View File

@@ -3,12 +3,24 @@ from clnutils import cln_parse_rpcversion
from datetime import timedelta
from functools import reduce
from pyln.client import Plugin, Millisatoshi, RpcError
from threading import Thread, Lock
import threading
import time
import uuid
import concurrent.futures
plugin = Plugin()
plugin.rebalance_stop = False
plugin.rebalance_stop_by_user = False
plugin.rebalance_stop_by_thread = False
plugin.threadids = {}
def rebalance_stopping():
return plugin.rebalance_stop_by_user or plugin.rebalance_stop_by_thread
def get_thread_id_str():
max_digits = len(str(plugin.threads - 1))
return f"{plugin.threadids.get(threading.get_ident(), 0):{'0' + str(max_digits)}}"
# The route msat helpers are needed because older versions of cln
@@ -65,6 +77,7 @@ def peer_from_scid(short_channel_id, my_node_id, payload):
return ch['destination']
raise RpcError("rebalance", payload, {'message': 'Cannot find peer for channel: ' + short_channel_id})
def get_node_alias(node_id):
node = plugin.rpc.listnodes(node_id)['nodes']
s = ""
@@ -74,6 +87,7 @@ def get_node_alias(node_id):
s += node_id[0:7]
return s
def find_worst_channel(route):
if len(route) < 4:
return None
@@ -217,7 +231,7 @@ def waitsendpay(payment_hash, start_ts, retry_for):
return result
except RpcError as e:
if e.method == "waitsendpay" and e.error.get('code') == 200:
if plugin.rebalance_stop:
if rebalance_stopping():
raise e
if int(time.time()) - start_ts >= retry_for:
raise e
@@ -299,7 +313,7 @@ def rebalance(plugin, outgoing_scid, incoming_scid, msatoshi: Millisatoshi = Non
time_sendpay = 0
try:
while int(time.time()) - start_ts < retry_for and not plugin.rebalance_stop:
while int(time.time()) - start_ts < retry_for and not rebalance_stopping():
count += 1
try:
time_start = time.time()
@@ -333,7 +347,6 @@ def rebalance(plugin, outgoing_scid, incoming_scid, msatoshi: Millisatoshi = Non
excludes.append(worst_channel['channel'] + '/' + str(worst_channel['direction']))
continue
rpc_result = {
"sent": msatoshi + fees,
"received": msatoshi,
@@ -346,7 +359,7 @@ def rebalance(plugin, outgoing_scid, incoming_scid, msatoshi: Millisatoshi = Non
}
midroute_str = reduce(lambda x,y: x + " -> " + y, map(lambda r: get_node_alias(r['id']), route_mid))
full_route_str = "%s -> %s -> %s -> %s" % (get_node_alias(my_node_id), get_node_alias(outgoing_node_id), midroute_str, get_node_alias(my_node_id))
plugin.log("%d hops and %s fees for %s along route: %s" % (len(route), fees.to_satoshi_str(), msatoshi.to_satoshi_str(), full_route_str))
plugin.log(f"Thread{get_thread_id_str()} {len(route)} hops and {fees.to_satoshi_str()} fees for {msatoshi.to_satoshi_str()} along route: {full_route_str}")
for r in route:
plugin.log(" - %s %14s %s" % (r['id'], r['channel'], route_get_msat(r)), 'debug')
@@ -559,7 +572,7 @@ def wait_for_htlcs(failed_channels: list, scids: list = None):
if 'htlcs' in channel:
if not wait_for(lambda: len(plugin.rpc.listpeers()['peers'][p]['channels'][c]['htlcs']) == 0):
failed_channels.append(channel.get('short_channel_id'))
plugin.log(f"Timeout while waiting for htlc settlement in channel {channel.get('short_channel_id')}")
plugin.log(f"Thread{get_thread_id_str()} timeout while waiting for htlc settlement in channel {channel.get('short_channel_id')}")
result = False
return result
@@ -574,7 +587,7 @@ def maybe_rebalance_pairs(ch1, ch2, failed_channels: list):
if not wait_for_htlcs(failed_channels, [scid1, scid2]):
return result
i = 0
while not plugin.rebalance_stop:
while not rebalance_stopping():
liquidity1 = liquidity_info(ch1, plugin.enough_liquidity, plugin.ideal_ratio)
liquidity2 = liquidity_info(ch2, plugin.enough_liquidity, plugin.ideal_ratio)
amount1 = min(must_send(liquidity1), could_receive(liquidity2))
@@ -585,7 +598,7 @@ def maybe_rebalance_pairs(ch1, ch2, failed_channels: list):
return result
amount = min(amount, get_max_amount(i, plugin))
maxfee = get_max_fee(amount)
plugin.log(f"Try to rebalance: {scid1} -> {scid2}; amount={amount}; maxfee={maxfee}")
plugin.log(f"Thread{get_thread_id_str()} tries to rebalance: {scid1} -> {scid2}; amount={amount.to_satoshi_str()}; maxfee={maxfee.to_satoshi_str()}")
start_ts = time.time()
try:
res = rebalance(plugin, outgoing_scid=scid1, incoming_scid=scid2,
@@ -610,7 +623,7 @@ def maybe_rebalance_pairs(ch1, ch2, failed_channels: list):
current_ts = time.time()
res["elapsed_time"] = str(timedelta(seconds=current_ts - start_ts))[:-3]
res["htlc_time"] = str(timedelta(seconds=current_ts - htlc_start_ts))[:-3]
plugin.log(f"Rebalance succeeded: {res}")
plugin.log(f"Thread{get_thread_id_str()} rebalance succeeded: {res}")
if not htlc_success:
return result
ch1 = get_chan(scid1)
@@ -620,16 +633,68 @@ def maybe_rebalance_pairs(ch1, ch2, failed_channels: list):
return result
def rebalance_pair_picker(threadid, channel_pairs: list, failed_channels: list):
plugin.threadids[threading.get_ident()] = threadid
result = {"success": False, "fee_spent": Millisatoshi(0)}
idle_count = 0
while not rebalance_stopping():
idle_count += 1
for pair in channel_pairs:
if rebalance_stopping():
return result
ch1 = pair[0]
ch2 = pair[1]
processed = pair[2]
if processed:
continue
if ch1["lock"].acquire(blocking=False):
if ch2["lock"].acquire(blocking=False):
idle_count = 0
pair[2] = True
result = maybe_rebalance_pairs(ch1, ch2, failed_channels)
ch2["lock"].release()
ch1["lock"].release()
if result["success"]:
plugin.log(f"Thread{get_thread_id_str()} restarts rebalance threads after successful rebalance")
plugin.rebalance_stop_by_thread = True
return result
unprocessed = [p for p in channel_pairs if not p[2]]
if len(unprocessed) == 0:
return result
if idle_count == 1:
plugin.log(f"Thread{get_thread_id_str()} is idle, {len(unprocessed)} possible channel pairs remained")
if idle_count > 0:
time.sleep(10)
return result
def maybe_rebalance_once(failed_channels: list):
channels = get_open_channels(plugin)
for ch in channels:
ch["lock"] = threading.Lock()
channel_pairs = []
for ch1 in channels:
for ch2 in channels:
if ch1 == ch2:
continue
result = maybe_rebalance_pairs(ch1, ch2, failed_channels)
if result["success"] or plugin.rebalance_stop:
return result
return {"success": False, "fee_spent": Millisatoshi(0)}
channel_pairs.append([ch1, ch2, False])
plugin.log(f"Start to rebalance {len(channel_pairs)} possible channel pairs")
executor = concurrent.futures.ThreadPoolExecutor(max_workers=plugin.threads)
futures = set()
for threadid in range(plugin.threads):
futures.add(executor.submit(rebalance_pair_picker, threadid, channel_pairs, failed_channels))
result = {"success": False, "fee_spent": Millisatoshi(0)}
for future in concurrent.futures.as_completed(futures):
r2 = future.result()
if r2["success"]:
if result["success"]:
result["fee_spent"] += r2["fee_spent"]
result["success_count"] = result.get("success_count", 1) + 1
else:
result = r2
plugin.rebalance_stop_by_thread = False
return result
def feeadjuster_toggle(new_value: bool):
@@ -650,23 +715,24 @@ def rebalanceall_thread():
channels = get_open_channels(plugin)
plugin.enough_liquidity = get_enough_liquidity_threshold(channels)
plugin.ideal_ratio = get_ideal_ratio(channels, plugin.enough_liquidity)
plugin.log(f"Automatic rebalance is running with enough liquidity threshold: {plugin.enough_liquidity}, "
plugin.log(f"Automatic rebalance is running with enough liquidity threshold: {plugin.enough_liquidity.to_satoshi_str()}, "
f"ideal liquidity ratio: {plugin.ideal_ratio * 100:.2f}%, "
f"min rebalancable amount: {plugin.min_amount}, "
f"min rebalancable amount: {plugin.min_amount.to_satoshi_str()}, "
f"feeratio: {plugin.feeratio}")
failed_channels = []
success = 0
fee_spent = Millisatoshi(0)
while not plugin.rebalance_stop:
while not rebalance_stopping():
result = maybe_rebalance_once(failed_channels)
if not result["success"]:
break
success += 1
success += result.get("success_count", 1)
fee_spent += result["fee_spent"]
feeadjust_would_be_nice()
feeadjuster_toggle(feeadjuster_state)
elapsed_time = timedelta(seconds=time.time() - start_ts)
plugin.rebalanceall_msg = f"Automatic rebalance finished: {success} successful rebalance, {fee_spent} fee spent, it took {str(elapsed_time)[:-3]}"
plugin.rebalanceall_msg = (f"Automatic rebalance finished: {success} successful rebalance, "
f"{fee_spent.to_satoshi_str()} fee spent, it took {str(elapsed_time)[:-3]}")
plugin.log(plugin.rebalanceall_msg)
finally:
plugin.mutex.release()
@@ -696,7 +762,7 @@ def rebalanceall(plugin: Plugin, min_amount: Millisatoshi = Millisatoshi("50000s
plugin.min_amount = min_amount
# run the job
t = Thread(target=rebalanceall_thread, args=())
t = threading.Thread(target=rebalanceall_thread, args=())
t.start()
return {"message": f"Rebalance started with min rebalancable amount: {plugin.min_amount}, feeratio: {plugin.feeratio}"}
@@ -710,10 +776,13 @@ def rebalancestop(plugin: Plugin):
return {"message": "No rebalance is running, nothing to stop."}
return {"message": f"No rebalance is running, nothing to stop. "
f"Last 'rebalanceall' gave: {plugin.rebalanceall_msg}"}
plugin.rebalance_stop = True
start_ts = time.time()
plugin.rebalance_stop_by_user = True
plugin.mutex.acquire(blocking=True)
plugin.rebalance_stop = False
plugin.rebalance_stop_by_user = False
plugin.mutex.release()
elapsed_time = timedelta(seconds=time.time() - start_ts)
plugin.log(f"Automatic rebalance stopped in {str(elapsed_time)[:-3]}")
return {"message": plugin.rebalanceall_msg}
@@ -828,10 +897,11 @@ def init(options, configuration, plugin):
plugin.cltv_final = config.get("cltv-final")
plugin.fee_base = Millisatoshi(config.get("fee-base"))
plugin.fee_ppm = config.get("fee-per-satoshi")
plugin.mutex = Lock()
plugin.mutex = threading.Lock()
plugin.maxhops = int(options.get("rebalance-maxhops"))
plugin.msatfactor = float(options.get("rebalance-msatfactor"))
plugin.erringnodes = int(options.get("rebalance-erringnodes"))
plugin.threads = int(options.get("rebalance-threads"))
plugin.getroute = getroute_switch(options.get("rebalance-getroute"))
plugin.rebalanceall_msg = None
@@ -840,12 +910,13 @@ def init(options, configuration, plugin):
if plugin.rpcversion[0] == 0 and plugin.rpcversion[1] < 12:
plugin.msatfield = 'msatoshi'
plugin.log(f"Plugin rebalance initialized with {plugin.fee_base} base / {plugin.fee_ppm} ppm fee "
plugin.log(f"Plugin rebalance initialized with {plugin.fee_base.to_satoshi_str()} base / {plugin.fee_ppm} ppm fee "
f"cltv_final:{plugin.cltv_final} "
f"maxhops:{plugin.maxhops} "
f"msatfactor:{plugin.msatfactor} "
f"erringnodes:{plugin.erringnodes} "
f"getroute:{plugin.getroute.__name__} ")
f"getroute:{plugin.getroute.__name__} "
f"threads:{plugin.threads} ")
plugin.add_option(
@@ -881,4 +952,12 @@ plugin.add_option(
"string"
)
plugin.add_option(
"rebalance-threads",
"8",
"Number of threads used parallelly by `rebalanceall` "
"Higher numbers increase speed and CPU consumption",
"string"
)
plugin.run()