diff --git a/rebalance/rebalance.py b/rebalance/rebalance.py index c8c34e9..34d7168 100755 --- a/rebalance/rebalance.py +++ b/rebalance/rebalance.py @@ -10,7 +10,7 @@ plugin = Plugin() plugin.rebalance_stop = False -def setup_routing_fees(plugin, route, msatoshi): +def setup_routing_fees(route, msatoshi): delay = plugin.cltv_final for r in reversed(route): r['msatoshi'] = msatoshi.millisatoshis @@ -25,7 +25,7 @@ def setup_routing_fees(plugin, route, msatoshi): delay += ch['delay'] -def get_channel(plugin, payload, peer_id, scid, check_state: bool = False): +def get_channel(payload, peer_id, scid, check_state: bool = False): peer = plugin.rpc.listpeers(peer_id).get('peers')[0] channel = next(c for c in peer['channels'] if c.get('short_channel_id') == scid) if check_state: @@ -36,7 +36,7 @@ def get_channel(plugin, payload, peer_id, scid, check_state: bool = False): return channel -def amounts_from_scid(plugin, scid): +def amounts_from_scid(scid): channels = plugin.rpc.listfunds().get('channels') channel = next(c for c in channels if c.get('short_channel_id') == scid) our_msat = Millisatoshi(channel['our_amount_msat']) @@ -44,7 +44,7 @@ def amounts_from_scid(plugin, scid): return our_msat, total_msat -def peer_from_scid(plugin, short_channel_id, my_node_id, payload): +def peer_from_scid(short_channel_id, my_node_id, payload): channels = plugin.rpc.listchannels(short_channel_id).get('channels') for ch in channels: if ch['source'] == my_node_id: @@ -74,7 +74,7 @@ def find_worst_channel(route): return worst -def cleanup(plugin, label, payload, rpc_result, error=None): +def cleanup(label, payload, rpc_result, error=None): try: plugin.rpc.delinvoice(label, 'unpaid') except RpcError as e: @@ -141,7 +141,7 @@ class NoRouteException(Exception): pass -def getroute_basic(plugin: Plugin, targetid, fromid, excludes, msatoshi: Millisatoshi): +def getroute_basic(targetid, fromid, excludes, msatoshi: Millisatoshi): try: """ This does not make special assumptions and tries all routes it gets. Uses less CPU and does not filter any routes. @@ -159,7 +159,7 @@ def getroute_basic(plugin: Plugin, targetid, fromid, excludes, msatoshi: Millisa raise e -def getroute_iterative(plugin: Plugin, targetid, fromid, excludes, msatoshi: Millisatoshi): +def getroute_iterative(targetid, fromid, excludes, msatoshi: Millisatoshi): """ This searches for 'shorter and bigger pipes' first in order to increase likelyhood of success on short timeout. Can be useful for manual `rebalance`. @@ -222,12 +222,12 @@ def rebalance(plugin, outgoing_scid, incoming_scid, msatoshi: Millisatoshi = Non "exemptfee": exemptfee } my_node_id = plugin.rpc.getinfo().get('id') - outgoing_node_id = peer_from_scid(plugin, outgoing_scid, my_node_id, payload) - incoming_node_id = peer_from_scid(plugin, incoming_scid, my_node_id, payload) - get_channel(plugin, payload, outgoing_node_id, outgoing_scid, True) - get_channel(plugin, payload, incoming_node_id, incoming_scid, True) - out_ours, out_total = amounts_from_scid(plugin, outgoing_scid) - in_ours, in_total = amounts_from_scid(plugin, incoming_scid) + outgoing_node_id = peer_from_scid(outgoing_scid, my_node_id, payload) + incoming_node_id = peer_from_scid(incoming_scid, my_node_id, payload) + get_channel(payload, outgoing_node_id, outgoing_scid, True) + get_channel(payload, incoming_node_id, incoming_scid, True) + out_ours, out_total = amounts_from_scid(outgoing_scid) + in_ours, in_total = amounts_from_scid(incoming_scid) # If amount was not given, calculate a suitable 50/50 rebalance amount if msatoshi is None: @@ -273,8 +273,7 @@ def rebalance(plugin, outgoing_scid, incoming_scid, msatoshi: Millisatoshi = Non count += 1 try: time_start = time.time() - r = getroute(plugin, - targetid=incoming_node_id, + r = getroute(targetid=incoming_node_id, fromid=outgoing_node_id, excludes=excludes, msatoshi=msatoshi) @@ -282,7 +281,7 @@ def rebalance(plugin, outgoing_scid, incoming_scid, msatoshi: Millisatoshi = Non except NoRouteException: # no more chance for a successful getroute rpc_result = {'status': 'error', 'message': 'No suitable routes found'} - return cleanup(plugin, label, payload, rpc_result) + return cleanup(label, payload, rpc_result) except RpcError as e: # getroute can be successful next time with different parameters if e.method == "getroute" and e.error.get('code') == 205: @@ -292,7 +291,7 @@ def rebalance(plugin, outgoing_scid, incoming_scid, msatoshi: Millisatoshi = Non route_mid = r['route'] route = [route_out] + route_mid + [route_in] - setup_routing_fees(plugin, route, msatoshi) + setup_routing_fees(route, msatoshi) fees = route[0]['amount_msat'] - msatoshi # check fee and exclude worst channel the next time @@ -330,7 +329,7 @@ def rebalance(plugin, outgoing_scid, incoming_scid, msatoshi: Millisatoshi = Non time_sendpay += time.time() - time_start if result.get('status') == "complete": rpc_result["stats"] = f"running_for:{int(time.time()) - start_ts} count_getroute:{count} time_getroute:{time_getroute} time_getroute_avg:{time_getroute / count} count_sendpay:{count_sendpay} time_sendpay:{time_sendpay} time_sendpay_avg:{time_sendpay / count_sendpay}" - return cleanup(plugin, label, payload, rpc_result) + return cleanup(label, payload, rpc_result) except RpcError as e: time_sendpay += time.time() - time_start @@ -359,9 +358,9 @@ def rebalance(plugin, outgoing_scid, incoming_scid, msatoshi: Millisatoshi = Non excludes.append(erring_node) except Exception as e: - return cleanup(plugin, label, payload, rpc_result, e) + return cleanup(label, payload, rpc_result, e) rpc_result = {'status': 'error', 'message': 'Timeout reached'} - return cleanup(plugin, label, payload, rpc_result) + return cleanup(label, payload, rpc_result) def a_minus_b(a: Millisatoshi, b: Millisatoshi): @@ -457,7 +456,7 @@ def get_ideal_ratio(channels: list, enough_liquidity: Millisatoshi): return ratio -def feeadjust_would_be_nice(plugin: Plugin): +def feeadjust_would_be_nice(): commands = [c for c in plugin.rpc.help().get("help") if c["command"].split()[0] == "feeadjust"] if len(commands) == 1: msg = plugin.rpc.feeadjust() @@ -470,12 +469,12 @@ def get_max_amount(i: int, plugin: Plugin): return max(plugin.min_amount, plugin.enough_liquidity / (4**i)) -def get_max_fee(plugin: Plugin, msat: Millisatoshi): +def get_max_fee(msat: Millisatoshi): # TODO: sanity check return (plugin.fee_base + msat * plugin.fee_ppm / 10**6) * plugin.feeratio -def get_chan(plugin: Plugin, scid: str): +def get_chan(scid: str): for peer in plugin.rpc.listpeers()["peers"]: if len(peer["channels"]) == 0: continue @@ -515,7 +514,7 @@ def wait_for(success, timeout: int = 60): return True -def wait_for_htlcs(plugin, failed_channels: list, scids: list = None): +def wait_for_htlcs(failed_channels: list, scids: list = None): # HTLC settlement helper # taken and modified from pyln-testing/pyln/testing/utils.py result = True @@ -536,14 +535,14 @@ def wait_for_htlcs(plugin, failed_channels: list, scids: list = None): return result -def maybe_rebalance_pairs(plugin: Plugin, ch1, ch2, failed_channels: list): +def maybe_rebalance_pairs(ch1, ch2, failed_channels: list): scid1 = ch1["short_channel_id"] scid2 = ch2["short_channel_id"] result = {"success": False, "fee_spent": Millisatoshi(0)} if scid1 + ":" + scid2 in failed_channels: return result # check if HTLCs are settled - if not wait_for_htlcs(plugin, failed_channels, [scid1, scid2]): + if not wait_for_htlcs(failed_channels, [scid1, scid2]): return result i = 0 while not plugin.rebalance_stop: @@ -556,7 +555,7 @@ def maybe_rebalance_pairs(plugin: Plugin, ch1, ch2, failed_channels: list): if amount < plugin.min_amount: return result amount = min(amount, get_max_amount(i, plugin)) - maxfee = get_max_fee(plugin, amount) + maxfee = get_max_fee(amount) plugin.log(f"Try to rebalance: {scid1} -> {scid2}; amount={amount}; maxfee={maxfee}") start_ts = time.time() try: @@ -578,33 +577,33 @@ def maybe_rebalance_pairs(plugin: Plugin, ch1, ch2, failed_channels: list): result["fee_spent"] += res["fee"] htlc_start_ts = time.time() # wait for settlement - htlc_success = wait_for_htlcs(plugin, failed_channels, [scid1, scid2]) + htlc_success = wait_for_htlcs(failed_channels, [scid1, scid2]) current_ts = time.time() res["elapsed_time"] = str(timedelta(seconds=current_ts - start_ts))[:-3] res["htlc_time"] = str(timedelta(seconds=current_ts - htlc_start_ts))[:-3] plugin.log(f"Rebalance succeeded: {res}") if not htlc_success: return result - ch1 = get_chan(plugin, scid1) + ch1 = get_chan(scid1) assert ch1 is not None - ch2 = get_chan(plugin, scid2) + ch2 = get_chan(scid2) assert ch2 is not None return result -def maybe_rebalance_once(plugin: Plugin, failed_channels: list): +def maybe_rebalance_once(failed_channels: list): channels = get_open_channels(plugin) for ch1 in channels: for ch2 in channels: if ch1 == ch2: continue - result = maybe_rebalance_pairs(plugin, ch1, ch2, failed_channels) + result = maybe_rebalance_pairs(ch1, ch2, failed_channels) if result["success"] or plugin.rebalance_stop: return result return {"success": False, "fee_spent": Millisatoshi(0)} -def feeadjuster_toggle(plugin: Plugin, new_value: bool): +def feeadjuster_toggle(new_value: bool): commands = [c for c in plugin.rpc.help().get("help") if c["command"].split()[0] == "feeadjuster-toggle"] if len(commands) == 1: msg = plugin.rpc.feeadjuster_toggle(new_value) @@ -613,12 +612,12 @@ def feeadjuster_toggle(plugin: Plugin, new_value: bool): return True -def rebalanceall_thread(plugin: Plugin): +def rebalanceall_thread(): if not plugin.mutex.acquire(blocking=False): return try: start_ts = time.time() - feeadjuster_state = feeadjuster_toggle(plugin, False) + feeadjuster_state = feeadjuster_toggle(False) channels = get_open_channels(plugin) plugin.enough_liquidity = get_enough_liquidity_threshold(channels) plugin.ideal_ratio = get_ideal_ratio(channels, plugin.enough_liquidity) @@ -630,13 +629,13 @@ def rebalanceall_thread(plugin: Plugin): success = 0 fee_spent = Millisatoshi(0) while not plugin.rebalance_stop: - result = maybe_rebalance_once(plugin, failed_channels) + result = maybe_rebalance_once(failed_channels) if not result["success"]: break success += 1 fee_spent += result["fee_spent"] - feeadjust_would_be_nice(plugin) - feeadjuster_toggle(plugin, feeadjuster_state) + feeadjust_would_be_nice() + feeadjuster_toggle(feeadjuster_state) elapsed_time = timedelta(seconds=time.time() - start_ts) plugin.rebalanceall_msg = f"Automatic rebalance finished: {success} successful rebalance, {fee_spent} fee spent, it took {str(elapsed_time)[:-3]}" plugin.log(plugin.rebalanceall_msg) @@ -668,7 +667,7 @@ def rebalanceall(plugin: Plugin, min_amount: Millisatoshi = Millisatoshi("50000s plugin.min_amount = min_amount # run the job - t = Thread(target=rebalanceall_thread, args=(plugin, )) + t = Thread(target=rebalanceall_thread, args=()) t.start() return {"message": f"Rebalance started with min rebalancable amount: {plugin.min_amount}, feeratio: {plugin.feeratio}"} @@ -703,7 +702,7 @@ def health_score(liquidity): return score * coefficient -def get_avg_forward_fees(plugin: Plugin, intervals): +def get_avg_forward_fees(intervals): now = time.time() max_interval = max(intervals) total = [0] * len(intervals) @@ -776,7 +775,7 @@ def rebalancereport(plugin: Plugin): else: res["average_rebalance_fee_ppm"] = 0 - avg_forward_fees = get_avg_forward_fees(plugin, [1, 7, 30]) + avg_forward_fees = get_avg_forward_fees([1, 7, 30]) res['average_forward_fee_ppm_1d'] = avg_forward_fees[0] res['average_forward_fee_ppm_7d'] = avg_forward_fees[1] res['average_forward_fee_ppm_30d'] = avg_forward_fees[2]