From 1b73ad8877ba0beb1a06dcc7c744a87401ca90a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1lli=20Zolt=C3=A1n?= Date: Fri, 6 Jan 2023 16:28:45 +0100 Subject: [PATCH] rebalance: handle events while rebalancing --- rebalance/rebalance.py | 76 +++++++++++++++++++++++++++++++++++------- 1 file changed, 64 insertions(+), 12 deletions(-) diff --git a/rebalance/rebalance.py b/rebalance/rebalance.py index 0f3bd96..614e263 100755 --- a/rebalance/rebalance.py +++ b/rebalance/rebalance.py @@ -11,11 +11,14 @@ import concurrent.futures plugin = Plugin() plugin.rebalance_stop_by_user = False plugin.rebalance_stop_by_thread = False +plugin.rebalance_stop_by_event = False plugin.threadids = {} def rebalance_stopping(): - return plugin.rebalance_stop_by_user or plugin.rebalance_stop_by_thread + return (plugin.rebalance_stop_by_user or + plugin.rebalance_stop_by_thread or + plugin.rebalance_stop_by_event) def get_thread_id_str(): @@ -706,28 +709,37 @@ def feeadjuster_toggle(new_value: bool): return True +def refresh_parameters(): + channels = get_open_channels(plugin) + plugin.enough_liquidity = get_enough_liquidity_threshold(channels) + plugin.ideal_ratio = get_ideal_ratio(channels, plugin.enough_liquidity) + plugin.log(f"Automatic rebalance is running with enough liquidity threshold: {plugin.enough_liquidity.to_satoshi_str()}, " + f"ideal liquidity ratio: {plugin.ideal_ratio * 100:.2f}%, " + f"min rebalancable amount: {plugin.min_amount.to_satoshi_str()}, " + f"feeratio: {plugin.feeratio}") + + def rebalanceall_thread(): if not plugin.mutex.acquire(blocking=False): return try: start_ts = time.time() feeadjuster_state = feeadjuster_toggle(False) - channels = get_open_channels(plugin) - plugin.enough_liquidity = get_enough_liquidity_threshold(channels) - plugin.ideal_ratio = get_ideal_ratio(channels, plugin.enough_liquidity) - plugin.log(f"Automatic rebalance is running with enough liquidity threshold: {plugin.enough_liquidity.to_satoshi_str()}, " - f"ideal liquidity ratio: {plugin.ideal_ratio * 100:.2f}%, " - f"min rebalancable amount: {plugin.min_amount.to_satoshi_str()}, " - f"feeratio: {plugin.feeratio}") + plugin.log(f"Automatic rebalance started") failed_channels = [] success = 0 fee_spent = Millisatoshi(0) - while not rebalance_stopping(): + while True: + refresh_parameters() result = maybe_rebalance_once(failed_channels) - if not result["success"]: + if not result["success"] and not plugin.rebalance_stop_by_event: + break + if result["success"]: + success += result.get("success_count", 1) + fee_spent += result["fee_spent"] + plugin.rebalance_stop_by_event = False + if rebalance_stopping(): break - success += result.get("success_count", 1) - fee_spent += result["fee_spent"] feeadjust_would_be_nice() feeadjuster_toggle(feeadjuster_state) elapsed_time = timedelta(seconds=time.time() - start_ts) @@ -738,6 +750,46 @@ def rebalanceall_thread(): plugin.mutex.release() +@plugin.subscribe("forward_event") +def forward_event(plugin: Plugin, forward_event: dict, **kwargs): + if not plugin.mutex.locked(): + return + if forward_event["status"] == "settled": + plugin.log("Forward event restarts rebalance threads") + plugin.rebalance_stop_by_event = True + + +@plugin.subscribe("invoice_payment") +def invoice_payment(plugin: Plugin, invoice_payment: dict, **kwargs): + if not plugin.mutex.locked(): + return + if invoice_payment.get('label').startswith("Rebalance"): + return + plugin.log("Invoice payment restarts rebalance threads") + plugin.rebalance_stop_by_event = True + + +@plugin.subscribe("sendpay_success") +def sendpay_success(plugin: Plugin, sendpay_success: dict, **kwargs): + if not plugin.mutex.locked(): + return + my_node_id = plugin.getinfo.get('id') + if sendpay_success.get('destination') == my_node_id: + return + plugin.log("Sendpay success restarts rebalance threads") + plugin.rebalance_stop_by_event = True + + +@plugin.subscribe("channel_state_changed") +def channel_state_changed(plugin: Plugin, channel_state_changed: dict, **kwargs): + if not plugin.mutex.locked(): + return + if channel_state_changed.get('old_state') != 'CHANNELD_NORMAL' and channel_state_changed.get('new_state') != 'CHANNELD_NORMAL': + return + plugin.log("Channel state changed restarts rebalance threads") + plugin.rebalance_stop_by_event = True + + @plugin.method("rebalanceall") def rebalanceall(plugin: Plugin, min_amount: Millisatoshi = Millisatoshi("50000sat"), feeratio: float = 0.5): """Rebalance all unbalanced channels if possible for a very low fee.