diff --git a/cashu/lightning/lnbits.py b/cashu/lightning/lnbits.py index 55cdd2b..d231e04 100644 --- a/cashu/lightning/lnbits.py +++ b/cashu/lightning/lnbits.py @@ -1,4 +1,6 @@ # type: ignore +import asyncio +import json from typing import AsyncGenerator, Optional import httpx @@ -25,6 +27,7 @@ class LNbitsWallet(LightningBackend): supported_units = set([Unit.sat]) unit = Unit.sat + supports_incoming_payment_stream: bool = True def __init__(self, unit: Unit = Unit.sat, **kwargs): self.assert_unit_supported(unit) @@ -184,4 +187,40 @@ class LNbitsWallet(LightningBackend): ) async def paid_invoices_stream(self) -> AsyncGenerator[str, None]: - raise NotImplementedError("paid_invoices_stream not implemented") + url = f"{self.endpoint}/api/v1/payments/sse" + + try: + sse_headers = self.client.headers.copy() + sse_headers.update( + { + "accept": "text/event-stream", + "cache-control": "no-cache", + "connection": "keep-alive", + } + ) + async with self.client.stream( + "GET", + url, + content="text/event-stream", + timeout=None, + headers=sse_headers, + ) as r: + sse_trigger = False + async for line in r.aiter_lines(): + # The data we want to listen to is of this shape: + # event: payment-received + # data: {.., "payment_hash" : "asd"} + if line.startswith("event: payment-received"): + sse_trigger = True + continue + elif sse_trigger and line.startswith("data:"): + data = json.loads(line[len("data:") :]) + sse_trigger = False + yield data["payment_hash"] + else: + sse_trigger = False + + except (OSError, httpx.ReadError, httpx.ConnectError, httpx.ReadTimeout): + pass + + await asyncio.sleep(1)