From ff3fdd5aef3f2139c46bc38b2156bbe249aec01a Mon Sep 17 00:00:00 2001 From: lollerfirst <43107113+lollerfirst@users.noreply.github.com> Date: Mon, 8 Sep 2025 16:14:19 +0200 Subject: [PATCH] feat: implement exponential backoff for `paid_invoices_stream` (#778) * feat: implement exponential backoff for paid_invoices_stream across all Lightning backends - Add exponential backoff retry logic to CLN REST, LND REST, and LND gRPC backends - Start with 1 second delay, exponentially increase up to 5 minutes maximum - Reset delay to 1 second on successful reconnection - Improve error logging to include retry delay information - Replace fixed delays with adaptive backoff to handle network issues gracefully - Prevents system overload during persistent connection problems Resolves issues with rapid reconnection attempts that could overwhelm Lightning nodes during network instability. * remove unused import * feat: extend exponential backoff to all remaining backends and invoice listener - Implement exponential backoff in LNbits paid_invoices_stream for both SSE and WebSocket modes - Add exponential backoff guidance comments to Blink and Strike backends (not implemented) - Apply exponential backoff to invoice_listener in tasks.py that calls paid_invoices_stream - Ensure consistent retry behavior across all Lightning backend integrations - Improve system resilience during network interruptions and backend failures All backends and the invoice listener now use the same exponential backoff strategy: - Start with 1 second delay, exponentially increase up to 5 minutes maximum - Reset delay to 1 second on successful reconnection - Enhanced error logging with retry delay information * blink + strike remove comments * remove hardcoded values in favor of settings * immediate first retry --- cashu/core/settings.py | 3 + cashu/lightning/clnrest.py | 17 +++- cashu/lightning/lnbits.py | 143 +++++++++++++++------------ cashu/lightning/lnd_grpc/lnd_grpc.py | 12 ++- cashu/lightning/lndrest.py | 12 ++- cashu/mint/tasks.py | 13 ++- 6 files changed, 125 insertions(+), 75 deletions(-) diff --git a/cashu/core/settings.py b/cashu/core/settings.py index 9add28a..77ff008 100644 --- a/cashu/core/settings.py +++ b/cashu/core/settings.py @@ -71,6 +71,9 @@ class MintSettings(CashuSettings): description="Interval (in seconds) for running regular tasks like the invoice checker.", ) + mint_retry_exponential_backoff_base_delay: int = Field(default=1) + mint_retry_exponential_backoff_max_delay: int = Field(default=10) + class MintWatchdogSettings(MintSettings): mint_watchdog_enabled: bool = Field( diff --git a/cashu/lightning/clnrest.py b/cashu/lightning/clnrest.py index 7c265e1..8fb85a0 100644 --- a/cashu/lightning/clnrest.py +++ b/cashu/lightning/clnrest.py @@ -298,6 +298,10 @@ class CLNRestWallet(LightningBackend): else 0 ) self.last_pay_index = last_pay_index + + retry_delay = 0 + max_retry_delay = settings.mint_retry_exponential_backoff_max_delay + while True: try: url = "/v1/waitanyinvoice" @@ -309,6 +313,8 @@ class CLNRestWallet(LightningBackend): }, timeout=None, ) as r: + # Reset retry delay on successful connection + retry_delay = 0 async for line in r.aiter_lines(): inv = json.loads(line) if "code" in inv and "message" in inv: @@ -332,11 +338,14 @@ class CLNRestWallet(LightningBackend): yield payment_hash except Exception as exc: - logger.debug( - f"lost connection to clnrest invoices stream: '{exc}', " - "reconnecting..." + logger.error( + f"lost connection to clnrest invoices stream: '{exc}', retrying in {retry_delay}" + " seconds" ) - await asyncio.sleep(0.02) + await asyncio.sleep(retry_delay) + + # Exponential backoff + retry_delay = max(settings.mint_retry_exponential_backoff_base_delay, min(retry_delay * 2, max_retry_delay)) async def get_payment_quote( self, melt_quote: PostMeltQuoteRequest diff --git a/cashu/lightning/lnbits.py b/cashu/lightning/lnbits.py index bc9d4e9..c5e0ab6 100644 --- a/cashu/lightning/lnbits.py +++ b/cashu/lightning/lnbits.py @@ -229,70 +229,83 @@ class LNbitsWallet(LightningBackend): ) async def paid_invoices_stream(self) -> AsyncGenerator[str, None]: - # --- LNBITS RETRO-COMPATIBILITY --- - if self.old_api: - url = f"{self.endpoint}/api/v1/payments/sse" - - try: - sse_headers = self.client.headers.copy() - sse_headers.update( - { - "accept": "text/event-stream", - "cache-control": "no-cache", - "connection": "keep-alive", - } - ) - async with self.client.stream( - "GET", - url, - content="text/event-stream", - timeout=None, - headers=sse_headers, - ) as r: - sse_trigger = False - async for line in r.aiter_lines(): - if "Payment does not exist." in line: - logger.debug("New API detected. Setting old_api = False") - self.old_api = False - # The data we want to listen to is of this shape: - # event: payment-received - # data: {.., "payment_hash" : "asd"} - if line.startswith("event: payment-received"): - sse_trigger = True - continue - elif sse_trigger and line.startswith("data:"): - data = json.loads(line[len("data:") :]) - sse_trigger = False - yield data["payment_hash"] - else: - sse_trigger = False - - except (OSError, httpx.ReadError, httpx.ConnectError, httpx.ReadTimeout): - pass + retry_delay = 0 + max_retry_delay = settings.mint_retry_exponential_backoff_max_delay - if self.old_api: - await asyncio.sleep(1) - return - # --- END LNBITS RETRO-COMPATIBILITY --- + while True: + try: + # --- LNBITS RETRO-COMPATIBILITY --- + if self.old_api: + url = f"{self.endpoint}/api/v1/payments/sse" - try: - async with connect(self.ws_url) as ws: - logger.info("connected to LNbits fundingsource websocket.") - while True: - message = await ws.recv() - message_dict = json.loads(message) - if ( - message_dict - and message_dict.get("payment") - and message_dict["payment"].get("payment_hash") - and message_dict["payment"].get("amount") > 0 - ): - payment_hash = message_dict["payment"]["payment_hash"] - logger.info(f"payment-received: {payment_hash}") - yield payment_hash - except Exception as exc: - logger.error( - f"lost connection to LNbits fundingsource websocket: '{exc}'" - "retrying in 5 seconds" - ) - await asyncio.sleep(5) + try: + sse_headers = self.client.headers.copy() + sse_headers.update( + { + "accept": "text/event-stream", + "cache-control": "no-cache", + "connection": "keep-alive", + } + ) + async with self.client.stream( + "GET", + url, + content="text/event-stream", + timeout=None, + headers=sse_headers, + ) as r: + # Reset retry delay on successful connection + retry_delay = 0 + sse_trigger = False + async for line in r.aiter_lines(): + if "Payment does not exist." in line: + logger.debug("New API detected. Setting old_api = False") + self.old_api = False + # The data we want to listen to is of this shape: + # event: payment-received + # data: {.., "payment_hash" : "asd"} + if line.startswith("event: payment-received"): + sse_trigger = True + continue + elif sse_trigger and line.startswith("data:"): + data = json.loads(line[len("data:") :]) + sse_trigger = False + yield data["payment_hash"] + else: + sse_trigger = False + + except (OSError, httpx.ReadError, httpx.ConnectError, httpx.ReadTimeout): + pass + + if self.old_api: + await asyncio.sleep(retry_delay) + # Exponential backoff + retry_delay = max(settings.mint_retry_exponential_backoff_base_delay, min(retry_delay * 2, max_retry_delay)) + continue + # --- END LNBITS RETRO-COMPATIBILITY --- + + async with connect(self.ws_url) as ws: + logger.info("connected to LNbits fundingsource websocket.") + # Reset retry delay on successful connection + retry_delay = 0 + while True: + message = await ws.recv() + message_dict = json.loads(message) + if ( + message_dict + and message_dict.get("payment") + and message_dict["payment"].get("payment_hash") + and message_dict["payment"].get("amount") > 0 + ): + payment_hash = message_dict["payment"]["payment_hash"] + logger.info(f"payment-received: {payment_hash}") + yield payment_hash + except Exception as exc: + logger.error( + f"lost connection to LNbits fundingsource websocket: '{exc}', retrying in {retry_delay}" + " seconds" + ) + await asyncio.sleep(retry_delay) + + # Exponential backoff + retry_delay = max(settings.mint_retry_exponential_backoff_base_delay, min(retry_delay * 2, max_retry_delay)) diff --git a/cashu/lightning/lnd_grpc/lnd_grpc.py b/cashu/lightning/lnd_grpc/lnd_grpc.py index b298fcb..7473623 100644 --- a/cashu/lightning/lnd_grpc/lnd_grpc.py +++ b/cashu/lightning/lnd_grpc/lnd_grpc.py @@ -375,12 +375,17 @@ class LndRPCWallet(LightningBackend): return PaymentStatus(result=PaymentResult.UNKNOWN) async def paid_invoices_stream(self) -> AsyncGenerator[str, None]: + retry_delay = 0 + max_retry_delay = settings.mint_retry_exponential_backoff_max_delay + while True: try: async with grpc.aio.secure_channel( self.endpoint, self.combined_creds ) as channel: lnstub = lightningstub.LightningStub(channel) + # Reset retry delay on successful connection + retry_delay = 0 async for invoice in lnstub.SubscribeInvoices( lnrpc.InvoiceSubscription() ): @@ -389,8 +394,11 @@ class LndRPCWallet(LightningBackend): payment_hash = invoice.r_hash.hex() yield payment_hash except AioRpcError as exc: - logger.error(f"SubscribeInvoices failed: {exc}. Retrying in 1 sec...") - await asyncio.sleep(1) + logger.error(f"SubscribeInvoices failed: {exc}. Retrying in {retry_delay} sec...") + await asyncio.sleep(retry_delay) + + # Exponential backoff + retry_delay = max(settings.mint_retry_exponential_backoff_base_delay, min(retry_delay * 2, max_retry_delay)) async def get_payment_quote( self, melt_quote: PostMeltQuoteRequest diff --git a/cashu/lightning/lndrest.py b/cashu/lightning/lndrest.py index 04d7c30..fd5990a 100644 --- a/cashu/lightning/lndrest.py +++ b/cashu/lightning/lndrest.py @@ -415,10 +415,15 @@ class LndRestWallet(LightningBackend): return PaymentStatus(result=PaymentResult.UNKNOWN, error_message="timeout") async def paid_invoices_stream(self) -> AsyncGenerator[str, None]: + retry_delay = 0 + max_retry_delay = settings.mint_retry_exponential_backoff_max_delay + while True: try: url = "/v1/invoices/subscribe" async with self.client.stream("GET", url, timeout=None) as r: + # Reset retry delay on successful connection + retry_delay = 0 async for line in r.aiter_lines(): try: inv = json.loads(line)["result"] @@ -431,10 +436,13 @@ class LndRestWallet(LightningBackend): yield payment_hash except Exception as exc: logger.error( - f"lost connection to lnd invoices stream: '{exc}', retrying in 5" + f"lost connection to lnd invoices stream: '{exc}', retrying in {retry_delay}" " seconds" ) - await asyncio.sleep(5) + await asyncio.sleep(retry_delay) + + # Exponential backoff with jitter + retry_delay = max(settings.mint_retry_exponential_backoff_base_delay, min(retry_delay * 2, max_retry_delay)) async def get_payment_quote( self, melt_quote: PostMeltQuoteRequest diff --git a/cashu/mint/tasks.py b/cashu/mint/tasks.py index 7e0bb5f..c87a65f 100644 --- a/cashu/mint/tasks.py +++ b/cashu/mint/tasks.py @@ -4,6 +4,7 @@ from typing import List from loguru import logger from ..core.base import MintQuoteState +from ..core.settings import settings from ..lightning.base import LightningBackend from .protocols import SupportsBackends, SupportsDb, SupportsEvents @@ -21,14 +22,22 @@ class LedgerTasks(SupportsDb, SupportsBackends, SupportsEvents): async def invoice_listener(self, backend: LightningBackend) -> None: if backend.supports_incoming_payment_stream: + retry_delay = settings.mint_retry_exponential_backoff_base_delay + max_retry_delay = settings.mint_retry_exponential_backoff_max_delay + while True: try: + # Reset retry delay on successful connection to backend stream + retry_delay = settings.mint_retry_exponential_backoff_base_delay async for checking_id in backend.paid_invoices_stream(): await self.invoice_callback_dispatcher(checking_id) except Exception as e: logger.error(f"Error in invoice listener: {e}") - logger.info("Restarting invoice listener...") - await asyncio.sleep(1) + logger.info(f"Restarting invoice listener in {retry_delay} seconds...") + await asyncio.sleep(retry_delay) + + # Exponential backoff + retry_delay = min(retry_delay * 2, max_retry_delay) async def invoice_callback_dispatcher(self, checking_id: str) -> None: logger.debug(f"Invoice callback dispatcher: {checking_id}")