rebalance: handle events while rebalancing

This commit is contained in:
Gálli Zoltán
2023-01-06 16:28:45 +01:00
committed by Michael Schmoock
parent d71c164188
commit 1b73ad8877

View File

@@ -11,11 +11,14 @@ import concurrent.futures
plugin = Plugin()
plugin.rebalance_stop_by_user = False
plugin.rebalance_stop_by_thread = False
plugin.rebalance_stop_by_event = False
plugin.threadids = {}
def rebalance_stopping():
return plugin.rebalance_stop_by_user or plugin.rebalance_stop_by_thread
return (plugin.rebalance_stop_by_user or
plugin.rebalance_stop_by_thread or
plugin.rebalance_stop_by_event)
def get_thread_id_str():
@@ -706,28 +709,37 @@ def feeadjuster_toggle(new_value: bool):
return True
def refresh_parameters():
channels = get_open_channels(plugin)
plugin.enough_liquidity = get_enough_liquidity_threshold(channels)
plugin.ideal_ratio = get_ideal_ratio(channels, plugin.enough_liquidity)
plugin.log(f"Automatic rebalance is running with enough liquidity threshold: {plugin.enough_liquidity.to_satoshi_str()}, "
f"ideal liquidity ratio: {plugin.ideal_ratio * 100:.2f}%, "
f"min rebalancable amount: {plugin.min_amount.to_satoshi_str()}, "
f"feeratio: {plugin.feeratio}")
def rebalanceall_thread():
if not plugin.mutex.acquire(blocking=False):
return
try:
start_ts = time.time()
feeadjuster_state = feeadjuster_toggle(False)
channels = get_open_channels(plugin)
plugin.enough_liquidity = get_enough_liquidity_threshold(channels)
plugin.ideal_ratio = get_ideal_ratio(channels, plugin.enough_liquidity)
plugin.log(f"Automatic rebalance is running with enough liquidity threshold: {plugin.enough_liquidity.to_satoshi_str()}, "
f"ideal liquidity ratio: {plugin.ideal_ratio * 100:.2f}%, "
f"min rebalancable amount: {plugin.min_amount.to_satoshi_str()}, "
f"feeratio: {plugin.feeratio}")
plugin.log(f"Automatic rebalance started")
failed_channels = []
success = 0
fee_spent = Millisatoshi(0)
while not rebalance_stopping():
while True:
refresh_parameters()
result = maybe_rebalance_once(failed_channels)
if not result["success"]:
if not result["success"] and not plugin.rebalance_stop_by_event:
break
if result["success"]:
success += result.get("success_count", 1)
fee_spent += result["fee_spent"]
plugin.rebalance_stop_by_event = False
if rebalance_stopping():
break
success += result.get("success_count", 1)
fee_spent += result["fee_spent"]
feeadjust_would_be_nice()
feeadjuster_toggle(feeadjuster_state)
elapsed_time = timedelta(seconds=time.time() - start_ts)
@@ -738,6 +750,46 @@ def rebalanceall_thread():
plugin.mutex.release()
@plugin.subscribe("forward_event")
def forward_event(plugin: Plugin, forward_event: dict, **kwargs):
if not plugin.mutex.locked():
return
if forward_event["status"] == "settled":
plugin.log("Forward event restarts rebalance threads")
plugin.rebalance_stop_by_event = True
@plugin.subscribe("invoice_payment")
def invoice_payment(plugin: Plugin, invoice_payment: dict, **kwargs):
if not plugin.mutex.locked():
return
if invoice_payment.get('label').startswith("Rebalance"):
return
plugin.log("Invoice payment restarts rebalance threads")
plugin.rebalance_stop_by_event = True
@plugin.subscribe("sendpay_success")
def sendpay_success(plugin: Plugin, sendpay_success: dict, **kwargs):
if not plugin.mutex.locked():
return
my_node_id = plugin.getinfo.get('id')
if sendpay_success.get('destination') == my_node_id:
return
plugin.log("Sendpay success restarts rebalance threads")
plugin.rebalance_stop_by_event = True
@plugin.subscribe("channel_state_changed")
def channel_state_changed(plugin: Plugin, channel_state_changed: dict, **kwargs):
if not plugin.mutex.locked():
return
if channel_state_changed.get('old_state') != 'CHANNELD_NORMAL' and channel_state_changed.get('new_state') != 'CHANNELD_NORMAL':
return
plugin.log("Channel state changed restarts rebalance threads")
plugin.rebalance_stop_by_event = True
@plugin.method("rebalanceall")
def rebalanceall(plugin: Plugin, min_amount: Millisatoshi = Millisatoshi("50000sat"), feeratio: float = 0.5):
"""Rebalance all unbalanced channels if possible for a very low fee.