From 67b7ea6b729ffa2ab08ecc3ea0cc6bf0bc65bc0a Mon Sep 17 00:00:00 2001 From: iwarp Date: Sun, 1 Jun 2025 12:20:03 +0200 Subject: [PATCH] Another changes for LNbits API (#739) * pending -> status; sse -> websocket * paid_invoices_stream only for incoming * fix backward compatibility * loop * 'retro-compatible' paid invoice stream --------- Co-authored-by: lollerfirst --- cashu/lightning/lnbits.py | 118 +++++++++++++++++++++++++------------- 1 file changed, 78 insertions(+), 40 deletions(-) diff --git a/cashu/lightning/lnbits.py b/cashu/lightning/lnbits.py index 277e86d..bc9d4e9 100644 --- a/cashu/lightning/lnbits.py +++ b/cashu/lightning/lnbits.py @@ -7,6 +7,8 @@ import httpx from bolt11 import ( decode, ) +from loguru import logger +from websockets.client import connect from ..core.base import Amount, MeltQuote, Unit from ..core.helpers import fee_reserve @@ -39,6 +41,8 @@ class LNbitsWallet(LightningBackend): verify=not settings.debug, headers={"X-Api-Key": settings.mint_lnbits_key}, ) + self.ws_url = f"{self.endpoint.replace('http', 'ws', 1)}/api/v1/ws/{settings.mint_lnbits_key}" + self.old_api = True async def status(self) -> StatusResponse: try: @@ -154,11 +158,13 @@ class LNbitsWallet(LightningBackend): result=PaymentResult.UNKNOWN, error_message=data["detail"] ) - if data["paid"]: + status = data.get("details", {}).get("status", None) + + if data.get("paid", False): result = PaymentResult.SETTLED - elif not data["paid"] and data["details"]["pending"]: + elif status == "pending" or data.get("details", {}).get("pending", False): result = PaymentResult.PENDING - elif not data["paid"] and not data["details"]["pending"]: + elif status == "failed": result = PaymentResult.FAILED else: result = PaymentResult.UNKNOWN @@ -190,11 +196,13 @@ class LNbitsWallet(LightningBackend): result=PaymentResult.UNKNOWN, error_message="invalid response" ) - if data["paid"]: + status = data.get("details", {}).get("status", None) + + if data.get("paid", False): result = PaymentResult.SETTLED - elif not data["paid"] and data["details"]["pending"]: + elif status == "pending" or data.get("details", {}).get("pending", False): result = PaymentResult.PENDING - elif not data["paid"] and not data["details"]["pending"]: + elif status == "failed": result = PaymentResult.FAILED else: result = PaymentResult.UNKNOWN @@ -221,40 +229,70 @@ class LNbitsWallet(LightningBackend): ) async def paid_invoices_stream(self) -> AsyncGenerator[str, None]: - url = f"{self.endpoint}/api/v1/payments/sse" + # --- LNBITS RETRO-COMPATIBILITY --- + if self.old_api: + url = f"{self.endpoint}/api/v1/payments/sse" + + try: + sse_headers = self.client.headers.copy() + sse_headers.update( + { + "accept": "text/event-stream", + "cache-control": "no-cache", + "connection": "keep-alive", + } + ) + async with self.client.stream( + "GET", + url, + content="text/event-stream", + timeout=None, + headers=sse_headers, + ) as r: + sse_trigger = False + async for line in r.aiter_lines(): + if "Payment does not exist." in line: + logger.debug("New API detected. Setting old_api = False") + self.old_api = False + # The data we want to listen to is of this shape: + # event: payment-received + # data: {.., "payment_hash" : "asd"} + if line.startswith("event: payment-received"): + sse_trigger = True + continue + elif sse_trigger and line.startswith("data:"): + data = json.loads(line[len("data:") :]) + sse_trigger = False + yield data["payment_hash"] + else: + sse_trigger = False + + except (OSError, httpx.ReadError, httpx.ConnectError, httpx.ReadTimeout): + pass + + if self.old_api: + await asyncio.sleep(1) + return + # --- END LNBITS RETRO-COMPATIBILITY --- try: - sse_headers = self.client.headers.copy() - sse_headers.update( - { - "accept": "text/event-stream", - "cache-control": "no-cache", - "connection": "keep-alive", - } + async with connect(self.ws_url) as ws: + logger.info("connected to LNbits fundingsource websocket.") + while True: + message = await ws.recv() + message_dict = json.loads(message) + if ( + message_dict + and message_dict.get("payment") + and message_dict["payment"].get("payment_hash") + and message_dict["payment"].get("amount") > 0 + ): + payment_hash = message_dict["payment"]["payment_hash"] + logger.info(f"payment-received: {payment_hash}") + yield payment_hash + except Exception as exc: + logger.error( + f"lost connection to LNbits fundingsource websocket: '{exc}'" + "retrying in 5 seconds" ) - async with self.client.stream( - "GET", - url, - content="text/event-stream", - timeout=None, - headers=sse_headers, - ) as r: - sse_trigger = False - async for line in r.aiter_lines(): - # The data we want to listen to is of this shape: - # event: payment-received - # data: {.., "payment_hash" : "asd"} - if line.startswith("event: payment-received"): - sse_trigger = True - continue - elif sse_trigger and line.startswith("data:"): - data = json.loads(line[len("data:") :]) - sse_trigger = False - yield data["payment_hash"] - else: - sse_trigger = False - - except (OSError, httpx.ReadError, httpx.ConnectError, httpx.ReadTimeout): - pass - - await asyncio.sleep(1) + await asyncio.sleep(5)