mirror of
https://github.com/aljazceru/nutshell.git
synced 2025-12-24 03:54:21 +01:00
feat: implement exponential backoff for paid_invoices_stream (#778)
* feat: implement exponential backoff for paid_invoices_stream across all Lightning backends - Add exponential backoff retry logic to CLN REST, LND REST, and LND gRPC backends - Start with 1 second delay, exponentially increase up to 5 minutes maximum - Reset delay to 1 second on successful reconnection - Improve error logging to include retry delay information - Replace fixed delays with adaptive backoff to handle network issues gracefully - Prevents system overload during persistent connection problems Resolves issues with rapid reconnection attempts that could overwhelm Lightning nodes during network instability. * remove unused import * feat: extend exponential backoff to all remaining backends and invoice listener - Implement exponential backoff in LNbits paid_invoices_stream for both SSE and WebSocket modes - Add exponential backoff guidance comments to Blink and Strike backends (not implemented) - Apply exponential backoff to invoice_listener in tasks.py that calls paid_invoices_stream - Ensure consistent retry behavior across all Lightning backend integrations - Improve system resilience during network interruptions and backend failures All backends and the invoice listener now use the same exponential backoff strategy: - Start with 1 second delay, exponentially increase up to 5 minutes maximum - Reset delay to 1 second on successful reconnection - Enhanced error logging with retry delay information * blink + strike remove comments * remove hardcoded values in favor of settings * immediate first retry
This commit is contained in:
@@ -71,6 +71,9 @@ class MintSettings(CashuSettings):
|
||||
description="Interval (in seconds) for running regular tasks like the invoice checker.",
|
||||
)
|
||||
|
||||
mint_retry_exponential_backoff_base_delay: int = Field(default=1)
|
||||
mint_retry_exponential_backoff_max_delay: int = Field(default=10)
|
||||
|
||||
|
||||
class MintWatchdogSettings(MintSettings):
|
||||
mint_watchdog_enabled: bool = Field(
|
||||
|
||||
@@ -298,6 +298,10 @@ class CLNRestWallet(LightningBackend):
|
||||
else 0
|
||||
)
|
||||
self.last_pay_index = last_pay_index
|
||||
|
||||
retry_delay = 0
|
||||
max_retry_delay = settings.mint_retry_exponential_backoff_max_delay
|
||||
|
||||
while True:
|
||||
try:
|
||||
url = "/v1/waitanyinvoice"
|
||||
@@ -309,6 +313,8 @@ class CLNRestWallet(LightningBackend):
|
||||
},
|
||||
timeout=None,
|
||||
) as r:
|
||||
# Reset retry delay on successful connection
|
||||
retry_delay = 0
|
||||
async for line in r.aiter_lines():
|
||||
inv = json.loads(line)
|
||||
if "code" in inv and "message" in inv:
|
||||
@@ -332,11 +338,14 @@ class CLNRestWallet(LightningBackend):
|
||||
yield payment_hash
|
||||
|
||||
except Exception as exc:
|
||||
logger.debug(
|
||||
f"lost connection to clnrest invoices stream: '{exc}', "
|
||||
"reconnecting..."
|
||||
logger.error(
|
||||
f"lost connection to clnrest invoices stream: '{exc}', retrying in {retry_delay}"
|
||||
" seconds"
|
||||
)
|
||||
await asyncio.sleep(0.02)
|
||||
await asyncio.sleep(retry_delay)
|
||||
|
||||
# Exponential backoff
|
||||
retry_delay = max(settings.mint_retry_exponential_backoff_base_delay, min(retry_delay * 2, max_retry_delay))
|
||||
|
||||
async def get_payment_quote(
|
||||
self, melt_quote: PostMeltQuoteRequest
|
||||
|
||||
@@ -229,70 +229,83 @@ class LNbitsWallet(LightningBackend):
|
||||
)
|
||||
|
||||
async def paid_invoices_stream(self) -> AsyncGenerator[str, None]:
|
||||
# --- LNBITS RETRO-COMPATIBILITY ---
|
||||
if self.old_api:
|
||||
url = f"{self.endpoint}/api/v1/payments/sse"
|
||||
|
||||
try:
|
||||
sse_headers = self.client.headers.copy()
|
||||
sse_headers.update(
|
||||
{
|
||||
"accept": "text/event-stream",
|
||||
"cache-control": "no-cache",
|
||||
"connection": "keep-alive",
|
||||
}
|
||||
)
|
||||
async with self.client.stream(
|
||||
"GET",
|
||||
url,
|
||||
content="text/event-stream",
|
||||
timeout=None,
|
||||
headers=sse_headers,
|
||||
) as r:
|
||||
sse_trigger = False
|
||||
async for line in r.aiter_lines():
|
||||
if "Payment does not exist." in line:
|
||||
logger.debug("New API detected. Setting old_api = False")
|
||||
self.old_api = False
|
||||
# The data we want to listen to is of this shape:
|
||||
# event: payment-received
|
||||
# data: {.., "payment_hash" : "asd"}
|
||||
if line.startswith("event: payment-received"):
|
||||
sse_trigger = True
|
||||
continue
|
||||
elif sse_trigger and line.startswith("data:"):
|
||||
data = json.loads(line[len("data:") :])
|
||||
sse_trigger = False
|
||||
yield data["payment_hash"]
|
||||
else:
|
||||
sse_trigger = False
|
||||
|
||||
except (OSError, httpx.ReadError, httpx.ConnectError, httpx.ReadTimeout):
|
||||
pass
|
||||
retry_delay = 0
|
||||
max_retry_delay = settings.mint_retry_exponential_backoff_max_delay
|
||||
|
||||
if self.old_api:
|
||||
await asyncio.sleep(1)
|
||||
return
|
||||
# --- END LNBITS RETRO-COMPATIBILITY ---
|
||||
while True:
|
||||
try:
|
||||
# --- LNBITS RETRO-COMPATIBILITY ---
|
||||
if self.old_api:
|
||||
url = f"{self.endpoint}/api/v1/payments/sse"
|
||||
|
||||
try:
|
||||
async with connect(self.ws_url) as ws:
|
||||
logger.info("connected to LNbits fundingsource websocket.")
|
||||
while True:
|
||||
message = await ws.recv()
|
||||
message_dict = json.loads(message)
|
||||
if (
|
||||
message_dict
|
||||
and message_dict.get("payment")
|
||||
and message_dict["payment"].get("payment_hash")
|
||||
and message_dict["payment"].get("amount") > 0
|
||||
):
|
||||
payment_hash = message_dict["payment"]["payment_hash"]
|
||||
logger.info(f"payment-received: {payment_hash}")
|
||||
yield payment_hash
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
f"lost connection to LNbits fundingsource websocket: '{exc}'"
|
||||
"retrying in 5 seconds"
|
||||
)
|
||||
await asyncio.sleep(5)
|
||||
try:
|
||||
sse_headers = self.client.headers.copy()
|
||||
sse_headers.update(
|
||||
{
|
||||
"accept": "text/event-stream",
|
||||
"cache-control": "no-cache",
|
||||
"connection": "keep-alive",
|
||||
}
|
||||
)
|
||||
async with self.client.stream(
|
||||
"GET",
|
||||
url,
|
||||
content="text/event-stream",
|
||||
timeout=None,
|
||||
headers=sse_headers,
|
||||
) as r:
|
||||
# Reset retry delay on successful connection
|
||||
retry_delay = 0
|
||||
sse_trigger = False
|
||||
async for line in r.aiter_lines():
|
||||
if "Payment does not exist." in line:
|
||||
logger.debug("New API detected. Setting old_api = False")
|
||||
self.old_api = False
|
||||
# The data we want to listen to is of this shape:
|
||||
# event: payment-received
|
||||
# data: {.., "payment_hash" : "asd"}
|
||||
if line.startswith("event: payment-received"):
|
||||
sse_trigger = True
|
||||
continue
|
||||
elif sse_trigger and line.startswith("data:"):
|
||||
data = json.loads(line[len("data:") :])
|
||||
sse_trigger = False
|
||||
yield data["payment_hash"]
|
||||
else:
|
||||
sse_trigger = False
|
||||
|
||||
except (OSError, httpx.ReadError, httpx.ConnectError, httpx.ReadTimeout):
|
||||
pass
|
||||
|
||||
if self.old_api:
|
||||
await asyncio.sleep(retry_delay)
|
||||
# Exponential backoff
|
||||
retry_delay = max(settings.mint_retry_exponential_backoff_base_delay, min(retry_delay * 2, max_retry_delay))
|
||||
continue
|
||||
# --- END LNBITS RETRO-COMPATIBILITY ---
|
||||
|
||||
async with connect(self.ws_url) as ws:
|
||||
logger.info("connected to LNbits fundingsource websocket.")
|
||||
# Reset retry delay on successful connection
|
||||
retry_delay = 0
|
||||
while True:
|
||||
message = await ws.recv()
|
||||
message_dict = json.loads(message)
|
||||
if (
|
||||
message_dict
|
||||
and message_dict.get("payment")
|
||||
and message_dict["payment"].get("payment_hash")
|
||||
and message_dict["payment"].get("amount") > 0
|
||||
):
|
||||
payment_hash = message_dict["payment"]["payment_hash"]
|
||||
logger.info(f"payment-received: {payment_hash}")
|
||||
yield payment_hash
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
f"lost connection to LNbits fundingsource websocket: '{exc}', retrying in {retry_delay}"
|
||||
" seconds"
|
||||
)
|
||||
await asyncio.sleep(retry_delay)
|
||||
|
||||
# Exponential backoff
|
||||
retry_delay = max(settings.mint_retry_exponential_backoff_base_delay, min(retry_delay * 2, max_retry_delay))
|
||||
|
||||
@@ -375,12 +375,17 @@ class LndRPCWallet(LightningBackend):
|
||||
return PaymentStatus(result=PaymentResult.UNKNOWN)
|
||||
|
||||
async def paid_invoices_stream(self) -> AsyncGenerator[str, None]:
|
||||
retry_delay = 0
|
||||
max_retry_delay = settings.mint_retry_exponential_backoff_max_delay
|
||||
|
||||
while True:
|
||||
try:
|
||||
async with grpc.aio.secure_channel(
|
||||
self.endpoint, self.combined_creds
|
||||
) as channel:
|
||||
lnstub = lightningstub.LightningStub(channel)
|
||||
# Reset retry delay on successful connection
|
||||
retry_delay = 0
|
||||
async for invoice in lnstub.SubscribeInvoices(
|
||||
lnrpc.InvoiceSubscription()
|
||||
):
|
||||
@@ -389,8 +394,11 @@ class LndRPCWallet(LightningBackend):
|
||||
payment_hash = invoice.r_hash.hex()
|
||||
yield payment_hash
|
||||
except AioRpcError as exc:
|
||||
logger.error(f"SubscribeInvoices failed: {exc}. Retrying in 1 sec...")
|
||||
await asyncio.sleep(1)
|
||||
logger.error(f"SubscribeInvoices failed: {exc}. Retrying in {retry_delay} sec...")
|
||||
await asyncio.sleep(retry_delay)
|
||||
|
||||
# Exponential backoff
|
||||
retry_delay = max(settings.mint_retry_exponential_backoff_base_delay, min(retry_delay * 2, max_retry_delay))
|
||||
|
||||
async def get_payment_quote(
|
||||
self, melt_quote: PostMeltQuoteRequest
|
||||
|
||||
@@ -415,10 +415,15 @@ class LndRestWallet(LightningBackend):
|
||||
return PaymentStatus(result=PaymentResult.UNKNOWN, error_message="timeout")
|
||||
|
||||
async def paid_invoices_stream(self) -> AsyncGenerator[str, None]:
|
||||
retry_delay = 0
|
||||
max_retry_delay = settings.mint_retry_exponential_backoff_max_delay
|
||||
|
||||
while True:
|
||||
try:
|
||||
url = "/v1/invoices/subscribe"
|
||||
async with self.client.stream("GET", url, timeout=None) as r:
|
||||
# Reset retry delay on successful connection
|
||||
retry_delay = 0
|
||||
async for line in r.aiter_lines():
|
||||
try:
|
||||
inv = json.loads(line)["result"]
|
||||
@@ -431,10 +436,13 @@ class LndRestWallet(LightningBackend):
|
||||
yield payment_hash
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
f"lost connection to lnd invoices stream: '{exc}', retrying in 5"
|
||||
f"lost connection to lnd invoices stream: '{exc}', retrying in {retry_delay}"
|
||||
" seconds"
|
||||
)
|
||||
await asyncio.sleep(5)
|
||||
await asyncio.sleep(retry_delay)
|
||||
|
||||
# Exponential backoff with jitter
|
||||
retry_delay = max(settings.mint_retry_exponential_backoff_base_delay, min(retry_delay * 2, max_retry_delay))
|
||||
|
||||
async def get_payment_quote(
|
||||
self, melt_quote: PostMeltQuoteRequest
|
||||
|
||||
@@ -4,6 +4,7 @@ from typing import List
|
||||
from loguru import logger
|
||||
|
||||
from ..core.base import MintQuoteState
|
||||
from ..core.settings import settings
|
||||
from ..lightning.base import LightningBackend
|
||||
from .protocols import SupportsBackends, SupportsDb, SupportsEvents
|
||||
|
||||
@@ -21,14 +22,22 @@ class LedgerTasks(SupportsDb, SupportsBackends, SupportsEvents):
|
||||
|
||||
async def invoice_listener(self, backend: LightningBackend) -> None:
|
||||
if backend.supports_incoming_payment_stream:
|
||||
retry_delay = settings.mint_retry_exponential_backoff_base_delay
|
||||
max_retry_delay = settings.mint_retry_exponential_backoff_max_delay
|
||||
|
||||
while True:
|
||||
try:
|
||||
# Reset retry delay on successful connection to backend stream
|
||||
retry_delay = settings.mint_retry_exponential_backoff_base_delay
|
||||
async for checking_id in backend.paid_invoices_stream():
|
||||
await self.invoice_callback_dispatcher(checking_id)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in invoice listener: {e}")
|
||||
logger.info("Restarting invoice listener...")
|
||||
await asyncio.sleep(1)
|
||||
logger.info(f"Restarting invoice listener in {retry_delay} seconds...")
|
||||
await asyncio.sleep(retry_delay)
|
||||
|
||||
# Exponential backoff
|
||||
retry_delay = min(retry_delay * 2, max_retry_delay)
|
||||
|
||||
async def invoice_callback_dispatcher(self, checking_id: str) -> None:
|
||||
logger.debug(f"Invoice callback dispatcher: {checking_id}")
|
||||
|
||||
Reference in New Issue
Block a user