From 5e1ca389a2475da0173ea923290a9ff242a2361e Mon Sep 17 00:00:00 2001 From: Aljaz Ceru Date: Wed, 5 Nov 2025 17:42:57 +0100 Subject: [PATCH 1/5] spark test --- .env.example | 4 + Dockerfile | 3 +- cashu/core/settings.py | 8 + cashu/lightning/__init__.py | 1 + cashu/lightning/spark.py | 561 ++++++++++++++++++++++++++++++++++++ cashu/mint/startup.py | 2 + docker-compose.yaml | 1 - poetry.lock | 43 ++- pyproject.toml | 1 + 9 files changed, 620 insertions(+), 4 deletions(-) create mode 100644 cashu/lightning/spark.py diff --git a/.env.example b/.env.example index 63b1343..5250c9d 100644 --- a/.env.example +++ b/.env.example @@ -112,6 +112,10 @@ MINT_BLINK_KEY=blink_abcdefgh # Use with StrikeWallet for BTC, USD, and EUR MINT_STRIKE_KEY=ABC123 +# Use with SparkBackend +# MINT_SPARK_API_KEY=your_spark_api_key +# MINT_SPARK_MNEMONIC=your twelve word mnemonic phrase here + # fee to reserve in percent of the amount LIGHTNING_FEE_PERCENT=1.0 # minimum fee to reserve diff --git a/Dockerfile b/Dockerfile index 052503b..38e5dcc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,4 +11,5 @@ ENV PATH="/root/.local/bin:$PATH" WORKDIR /app COPY . . RUN poetry config virtualenvs.create false -RUN poetry install --no-dev --no-root +RUN poetry lock --no-update +RUN poetry install --no-root diff --git a/cashu/core/settings.py b/cashu/core/settings.py index cc5f2e5..42d9f40 100644 --- a/cashu/core/settings.py +++ b/cashu/core/settings.py @@ -104,6 +104,14 @@ class MintBackends(MintSettings): mint_strike_key: str = Field(default=None) mint_blink_key: str = Field(default=None) + # Spark SDK settings + mint_spark_api_key: str = Field(default=None) + mint_spark_mnemonic: str = Field(default=None) + mint_spark_network: str = Field(default="mainnet") + mint_spark_storage_dir: str = Field(default="data/spark") + mint_spark_connection_timeout: int = Field(default=30) + mint_spark_retry_attempts: int = Field(default=3) + class MintLimits(MintSettings): mint_rate_limit: bool = Field( diff --git a/cashu/lightning/__init__.py b/cashu/lightning/__init__.py index dfa66b9..aad0c19 100644 --- a/cashu/lightning/__init__.py +++ b/cashu/lightning/__init__.py @@ -7,6 +7,7 @@ from .fake import FakeWallet # noqa: F401 from .lnbits import LNbitsWallet # noqa: F401 from .lnd_grpc.lnd_grpc import LndRPCWallet # noqa: F401 from .lndrest import LndRestWallet # noqa: F401 +from .spark import SparkBackend # noqa: F401 from .strike import StrikeWallet # noqa: F401 backend_settings = [ diff --git a/cashu/lightning/spark.py b/cashu/lightning/spark.py new file mode 100644 index 0000000..5f5969a --- /dev/null +++ b/cashu/lightning/spark.py @@ -0,0 +1,561 @@ +import asyncio +import math +from typing import AsyncGenerator, Optional + +from bolt11 import decode +from loguru import logger + +from ..core.base import Amount, MeltQuote, Unit +from ..core.helpers import fee_reserve +from ..core.models import PostMeltQuoteRequest +from ..core.settings import settings +from .base import ( + InvoiceResponse, + LightningBackend, + PaymentQuoteResponse, + PaymentResponse, + PaymentResult, + PaymentStatus, + StatusResponse, +) + + +def _extract_invoice_checking_id(payment) -> Optional[str]: + """Return a normalized identifier that matches the stored mint quote checking_id.""" + try: + details = getattr(payment, "details", None) + if details: + logger.debug( + "Spark extract: payment.id=%s type=%s details_type=%s has_invoice=%s has_bolt11=%s has_hash=%s", + getattr(payment, "id", None), + type(payment), + type(details), + hasattr(details, "invoice"), + hasattr(details, "bolt11_invoice"), + hasattr(details, "payment_hash"), + ) + invoice = getattr(details, "invoice", None) + if invoice: + logger.debug("Spark extract: using details.invoice=%s", invoice) + return invoice.lower() + + bolt11_details = getattr(details, "bolt11_invoice", None) + if bolt11_details: + bolt11 = getattr(bolt11_details, "bolt11", None) + if bolt11: + logger.debug("Spark extract: using bolt11_details.bolt11=%s", bolt11) + return bolt11.lower() + + payment_hash = getattr(details, "payment_hash", None) + if payment_hash: + logger.debug("Spark extract: using details.payment_hash=%s", payment_hash) + return payment_hash.lower() + + payment_hash = getattr(payment, "payment_hash", None) + if payment_hash: + logger.debug("Spark extract: using payment.payment_hash=%s", payment_hash) + return payment_hash.lower() + except Exception as exc: # pragma: no cover - defensive logging + logger.error(f"Failed to extract Spark invoice identifier: {exc}") + + return None + + +# Import Spark SDK components +try: + from breez_sdk_spark import ( + BreezSdk, + connect, + ConnectRequest, + default_config, + EventListener, + GetInfoRequest, + GetPaymentRequest, + ListPaymentsRequest, + Network, + PaymentStatus as SparkPaymentStatus, + PaymentType, + ReceivePaymentMethod, + ReceivePaymentRequest, + PrepareSendPaymentRequest, + SdkEvent, + SendPaymentRequest, + SendPaymentOptions, + ) +except ImportError: + # Create dummy classes for when SDK is not available + BreezSdk = None + EventListener = None + SparkPaymentStatus = None + logger.warning("Breez SDK Spark not available - SparkBackend will not function") + + +if EventListener is not None: + class SparkEventListener(EventListener): + """Event listener for Spark SDK payment notifications""" + + def __init__(self, queue: asyncio.Queue): + super().__init__() + self.queue = queue + + def on_event(self, event: SdkEvent) -> None: + """Handle SDK events in a thread-safe manner""" + try: + # Check if this is a payment event we care about and extract the invoice id + if hasattr(event, "payment"): + payment = event.payment + checking_id = _extract_invoice_checking_id(payment) + logger.debug( + "Spark event %s payment_type=%s status=%s payment_id=%s raw_payment=%r extracted_id=%s", + event.__class__.__name__, + getattr(payment, "payment_type", None), + getattr(payment, "status", None), + getattr(payment, "id", None), + payment, + checking_id, + ) + if not checking_id: + logger.debug("Spark event %s ignored (no checking id)", event.__class__.__name__) + return + + try: + loop = asyncio.get_running_loop() + # Thread-safe queue put with payment hash (checking_id) + asyncio.run_coroutine_threadsafe( + self.queue.put(checking_id), + loop, + ) + except RuntimeError: + logger.warning("No running event loop found for Spark event") + except Exception as e: + logger.error(f"Error handling Spark event: {e}") +else: + class SparkEventListener: + """Dummy event listener when Spark SDK is not available""" + + def __init__(self, queue: asyncio.Queue): + self.queue = queue + + def on_event(self, event) -> None: + """Dummy event handler""" + logger.warning("SparkEventListener called but Spark SDK not available") + + +class SparkBackend(LightningBackend): + """Breez Spark SDK Lightning backend implementation""" + + supported_units = {Unit.sat, Unit.msat} + supports_mpp = False + supports_incoming_payment_stream = True + supports_description = True + unit = Unit.sat + + def __init__(self, unit: Unit = Unit.sat, **kwargs): + if BreezSdk is None: + raise Exception("Breez SDK not available - install breez-sdk") + + self.assert_unit_supported(unit) + self.unit = unit + self.sdk: Optional[BreezSdk] = None + self.event_queue: Optional[asyncio.Queue] = None + self.listener: Optional[SparkEventListener] = None + self.listener_id: Optional[str] = None + self._initialized = False + self._initialization_lock = asyncio.Lock() + self._connection_retry_count = 0 + self._max_retries = getattr(settings, 'mint_spark_retry_attempts', 3) + self._retry_delay = 5.0 + + # Validate required settings + if not settings.mint_spark_api_key: + raise Exception("MINT_SPARK_API_KEY not set") + if not settings.mint_spark_mnemonic: + raise Exception("MINT_SPARK_MNEMONIC not set") + + async def __aenter__(self): + """Async context manager entry""" + await self._ensure_initialized() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit""" + await self.cleanup() + + async def _ensure_initialized(self) -> None: + """Lazy initialization with proper error handling""" + if self._initialized and self.sdk: + return + + async with self._initialization_lock: + if self._initialized and self.sdk: + return + + try: + await self._initialize_sdk_with_retry() + self._initialized = True + except Exception as e: + logger.error(f"SDK initialization failed: {e}") + raise + + async def _initialize_sdk_with_retry(self) -> None: + """Initialize SDK with exponential backoff retry""" + for attempt in range(self._max_retries): + try: + await self._initialize_sdk() + self._connection_retry_count = 0 + return + except Exception as e: + self._connection_retry_count += 1 + if attempt == self._max_retries - 1: + raise + + delay = self._retry_delay * (2 ** attempt) + logger.warning( + f"SDK init attempt {attempt + 1} failed: {e}. " + f"Retrying in {delay}s" + ) + await asyncio.sleep(delay) + + async def _initialize_sdk(self) -> None: + """Initialize the Spark SDK connection""" + mnemonic = settings.mint_spark_mnemonic + + # Determine network + network_str = getattr(settings, 'mint_spark_network', 'mainnet').lower() + network = Network.MAINNET if network_str == 'mainnet' else Network.TESTNET + + config = default_config(network=network) + config.api_key = settings.mint_spark_api_key + + storage_dir = getattr(settings, 'mint_spark_storage_dir', 'data/spark') + connection_timeout = getattr(settings, 'mint_spark_connection_timeout', 30) + + # ConnectRequest takes mnemonic directly, not a Seed object + self.sdk = await asyncio.wait_for( + connect( + request=ConnectRequest( + config=config, + mnemonic=mnemonic, + storage_dir=storage_dir + ) + ), + timeout=connection_timeout + ) + + # Set up event listener for payment notifications + self.event_queue = asyncio.Queue() + self.listener = SparkEventListener(self.event_queue) + # add_event_listener is not async, it returns a string ID directly + self.listener_id = self.sdk.add_event_listener(listener=self.listener) + logger.debug(f"Spark SDK initialized successfully on {network_str} network") + + # Clear mnemonic from memory + mnemonic = None + del mnemonic + + async def cleanup(self) -> None: + """Proper resource cleanup""" + try: + if hasattr(self, 'listener_id') and self.sdk: + # remove_event_listener is not async + self.sdk.remove_event_listener(id=self.listener_id) + + if self.sdk: + # disconnect is not async + self.sdk.disconnect() + + except Exception as e: + logger.error(f"Cleanup error: {e}") + finally: + self.sdk = None + self.listener = None + self.event_queue = None + self._initialized = False + + async def _check_connectivity(self) -> bool: + """Quick connectivity check""" + try: + if not self.sdk: + return False + await asyncio.wait_for( + self.sdk.get_info(request=GetInfoRequest()), + timeout=5.0 + ) + return True + except Exception: + return False + + async def status(self) -> StatusResponse: + try: + await self._ensure_initialized() + info = await self.sdk.get_info(request=GetInfoRequest()) + return StatusResponse( + balance=Amount(Unit.sat, info.balance_sats), + error_message=None + ) + except Exception as e: + logger.error(f"Spark status error: {e}") + return StatusResponse( + error_message=f"Failed to connect to Spark SDK: {e}", + balance=Amount(self.unit, 0) + ) + + async def create_invoice( + self, + amount: Amount, + memo: Optional[str] = None, + description_hash: Optional[bytes] = None, + unhashed_description: Optional[bytes] = None, + ) -> InvoiceResponse: + self.assert_unit_supported(amount.unit) + + try: + await self._ensure_initialized() + + payment_method = ReceivePaymentMethod.BOLT11_INVOICE( + description=memo or "", + amount_sats=amount.to(Unit.sat).amount + ) + request = ReceivePaymentRequest(payment_method=payment_method) + response = await self.sdk.receive_payment(request=request) + logger.debug( + "Spark create_invoice amount=%s response.payment_request=%s", + amount, + response.payment_request, + ) + + return InvoiceResponse( + ok=True, + checking_id=response.payment_request.lower(), + payment_request=response.payment_request + ) + except Exception as e: + logger.error(f"Spark create_invoice error for amount {amount}: {e}") + return InvoiceResponse(ok=False, error_message=f"Invoice creation failed: {e}") + + async def pay_invoice( + self, quote: MeltQuote, fee_limit_msat: int + ) -> PaymentResponse: + try: + await self._ensure_initialized() + + # Prepare the payment + prepare_request = PrepareSendPaymentRequest( + payment_request=quote.request, + amount=None # Use invoice amount + ) + prepare_response = await self.sdk.prepare_send_payment(request=prepare_request) + + # Send the payment + options = SendPaymentOptions.BOLT11_INVOICE( + prefer_spark=False, + completion_timeout_secs=30 + ) + send_request = SendPaymentRequest( + prepare_response=prepare_response, + options=options + ) + send_response = await self.sdk.send_payment(request=send_request) + + payment = send_response.payment + logger.debug( + "Spark pay_invoice quote=%s result_payment_id=%s status=%s type=%s raw_payment=%r", + quote.quote, + getattr(payment, "id", None), + getattr(payment, "status", None), + getattr(payment, "payment_type", None), + payment, + ) + + # Map Spark payment status to PaymentResult + result = self._map_payment_status(payment) + + return PaymentResponse( + result=result, + checking_id=payment.id, + fee=Amount(Unit.sat, payment.fee_sats) if hasattr(payment, 'fee_sats') and payment.fee_sats else None, + preimage=payment.preimage if hasattr(payment, 'preimage') else None + ) + except Exception as e: + logger.error(f"Spark pay_invoice error for quote {quote.quote}: {e}") + return PaymentResponse( + result=PaymentResult.FAILED, + error_message=f"Payment failed: {e}" + ) + + async def get_invoice_status(self, checking_id: str) -> PaymentStatus: + try: + await self._ensure_initialized() + + # For Spark SDK, checking_id is the Lightning invoice/payment_request + # We need to get all payments and find the one with this payment_request + from .base import PaymentResult + + # List all recent payments to find our invoice + list_request = ListPaymentsRequest() + list_response = await self.sdk.list_payments(request=list_request) + normalized_checking_id = checking_id.lower() + + for payment in list_response.payments: + payment_checking_id = _extract_invoice_checking_id(payment) + logger.debug( + "Spark get_invoice_status candidate id=%s target=%s status=%s payment_id=%s raw_payment=%r", + payment_checking_id, + normalized_checking_id, + getattr(payment, "status", None), + getattr(payment, "id", None), + payment, + ) + if payment_checking_id and payment_checking_id == normalized_checking_id: + # Found our payment - return its status + result = self._map_payment_status(payment) + return PaymentStatus( + result=result, + fee=Amount(Unit.sat, payment.fee_sats) if hasattr(payment, 'fee_sats') and payment.fee_sats else None, + preimage=payment.preimage if hasattr(payment, 'preimage') else None + ) + + # If not found in payments list, invoice is still pending + return PaymentStatus( + result=PaymentResult.PENDING, + error_message=None + ) + + except Exception as e: + logger.error(f"Get invoice status error: {e}") + return PaymentStatus( + result=PaymentResult.UNKNOWN, + error_message=str(e) + ) + + async def get_payment_status(self, checking_id: str) -> PaymentStatus: + try: + await self._ensure_initialized() + + # Get payment by payment ID + get_request = GetPaymentRequest(payment_id=checking_id) + response = await self.sdk.get_payment(request=get_request) + payment = response.payment + + # Map Spark payment status to PaymentResult + result = self._map_payment_status(payment) + + return PaymentStatus( + result=result, + fee=Amount(Unit.sat, payment.fee_sats) if hasattr(payment, 'fee_sats') and payment.fee_sats else None, + preimage=payment.preimage if hasattr(payment, 'preimage') else None + ) + except Exception as e: + logger.error(f"Get payment status error: {e}") + return PaymentStatus( + result=PaymentResult.UNKNOWN, + error_message=str(e) + ) + + def _map_payment_status(self, payment) -> PaymentResult: + """Map Spark SDK payment status to PaymentResult enum.""" + if not hasattr(payment, 'status'): + return PaymentResult.UNKNOWN + + # Use official PaymentStatus enum for more reliable mapping + try: + if payment.status == SparkPaymentStatus.COMPLETED: + return PaymentResult.SETTLED + elif payment.status == SparkPaymentStatus.FAILED: + return PaymentResult.FAILED + elif payment.status == SparkPaymentStatus.PENDING: + return PaymentResult.PENDING + else: + # Fallback to string comparison for any new status values + status_str = str(payment.status).lower() + if 'complete' in status_str or 'settled' in status_str or 'succeeded' in status_str: + return PaymentResult.SETTLED + elif 'failed' in status_str or 'cancelled' in status_str or 'expired' in status_str: + return PaymentResult.FAILED + elif 'pending' in status_str or 'processing' in status_str: + return PaymentResult.PENDING + else: + return PaymentResult.UNKNOWN + except (AttributeError, TypeError): + # Fallback to string-based mapping if enum comparison fails + status_str = str(payment.status).lower() + if 'complete' in status_str or 'settled' in status_str or 'succeeded' in status_str: + return PaymentResult.SETTLED + elif 'failed' in status_str or 'cancelled' in status_str or 'expired' in status_str: + return PaymentResult.FAILED + elif 'pending' in status_str or 'processing' in status_str: + return PaymentResult.PENDING + else: + return PaymentResult.UNKNOWN + + async def get_payment_quote( + self, melt_quote: PostMeltQuoteRequest + ) -> PaymentQuoteResponse: + invoice_obj = decode(melt_quote.request) + assert invoice_obj.amount_msat, "invoice has no amount." + amount_msat = int(invoice_obj.amount_msat) + + # Use standard fee calculation for now + # TODO: Use Spark SDK's fee estimation when available + fees_msat = fee_reserve(amount_msat) + fees = Amount(unit=Unit.msat, amount=fees_msat) + amount = Amount(unit=Unit.msat, amount=amount_msat) + + return PaymentQuoteResponse( + checking_id=invoice_obj.payment_hash, + fee=fees.to(self.unit, round="up"), + amount=amount.to(self.unit, round="up"), + ) + + async def paid_invoices_stream(self) -> AsyncGenerator[str, None]: + """Stream of paid invoice notifications with resilience""" + await self._ensure_initialized() + + retry_delay = settings.mint_retry_exponential_backoff_base_delay + max_retry_delay = settings.mint_retry_exponential_backoff_max_delay + + while True: + try: + # Set timeout to prevent infinite blocking + payment_id = await asyncio.wait_for( + self.event_queue.get(), + timeout=30.0 + ) + logger.debug("Spark paid_invoices_stream emitting checking_id=%s", payment_id) + yield payment_id + + # Reset retry delay on success + retry_delay = settings.mint_retry_exponential_backoff_base_delay + + except asyncio.TimeoutError: + # Periodic connectivity check + if not await self._check_connectivity(): + logger.warning("Spark connectivity lost, attempting reconnection") + self._initialized = False + await self._ensure_initialized() + else: + logger.debug("Spark paid_invoices_stream heartbeat (no events)") + continue + + except Exception as e: + logger.error(f"Spark payment stream error: {e}") + await asyncio.sleep(retry_delay) + + # Exponential backoff + retry_delay = max( + settings.mint_retry_exponential_backoff_base_delay, + min(retry_delay * 2, max_retry_delay) + ) + + # Attempt recovery + if not self._initialized: + await self._ensure_initialized() + + async def health_check(self) -> bool: + """Perform comprehensive health check""" + try: + await self._ensure_initialized() + return await self._check_connectivity() + except Exception as e: + logger.warning(f"Spark health check failed: {e}") + return False diff --git a/cashu/mint/startup.py b/cashu/mint/startup.py index bc14a67..4af5a70 100644 --- a/cashu/mint/startup.py +++ b/cashu/mint/startup.py @@ -35,6 +35,8 @@ for key, value in settings.dict().items(): "mint_lnbits_key", "mint_blink_key", "mint_strike_key", + "mint_spark_api_key", + "mint_spark_mnemonic", "mint_lnd_rest_macaroon", "mint_lnd_rest_admin_macaroon", "mint_lnd_rest_invoice_macaroon", diff --git a/docker-compose.yaml b/docker-compose.yaml index f2695db..687bd3b 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,4 +1,3 @@ -version: "3" services: mint: build: diff --git a/poetry.lock b/poetry.lock index 2ea7dee..d9dd089 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "aiosqlite" @@ -197,6 +197,45 @@ bitstring = "*" click = "*" coincurve = "*" +[[package]] +name = "breez-sdk-spark" +version = "0.1.9" +description = "Python language bindings for the Breez Spark SDK" +optional = false +python-versions = "*" +files = [ + {file = "breez_sdk_spark-0.1.9-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:7fb6c702f77f54306ebeafc5974461290e96c143df6f1a5afb974b62576b7972"}, + {file = "breez_sdk_spark-0.1.9-cp310-cp310-manylinux_2_31_aarch64.whl", hash = "sha256:041b5ac3d6b2ec9c5ffec1a46ed9874376ad16bfd50bc28c2fc2a9a51a473bab"}, + {file = "breez_sdk_spark-0.1.9-cp310-cp310-manylinux_2_31_x86_64.whl", hash = "sha256:86a78f09cdc4dc8fbcc995073e11bdc0c06982e00b214e0f0210b15fb663e1a3"}, + {file = "breez_sdk_spark-0.1.9-cp310-cp310-win32.whl", hash = "sha256:01c64566d2146b5deabb9e7291c3ebcc400bb054ef8391cc2ca50480c03015c2"}, + {file = "breez_sdk_spark-0.1.9-cp310-cp310-win_amd64.whl", hash = "sha256:4c6906390454f6952bf2b923949d13eefcdccdf8da073496f0023bba41431933"}, + {file = "breez_sdk_spark-0.1.9-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:a5467d289d0245d31343f6d10744071cd092cd2b108fdf5e76e1b2f9e63c4a28"}, + {file = "breez_sdk_spark-0.1.9-cp311-cp311-manylinux_2_31_aarch64.whl", hash = "sha256:afadd71fb816c24ebebbc709a13c1c06091941de31c9add321316c982f06652a"}, + {file = "breez_sdk_spark-0.1.9-cp311-cp311-manylinux_2_31_x86_64.whl", hash = "sha256:5fd1e2e0c381bc4c142c748f4f29b43fb089b8bf909dd55107f63ff8b5fd733d"}, + {file = "breez_sdk_spark-0.1.9-cp311-cp311-win32.whl", hash = "sha256:e53cb1890643f697316f4aad7c831794a798f3d6e8f288aa8906b841cc582678"}, + {file = "breez_sdk_spark-0.1.9-cp311-cp311-win_amd64.whl", hash = "sha256:4d7e73759ccb1da1fca587ffe6f72d166459b34d7f6614de23352860306538ec"}, + {file = "breez_sdk_spark-0.1.9-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:5e944b035fecd6de90b9d18eb8f32b990f662880bd91fb847011f10d12e88367"}, + {file = "breez_sdk_spark-0.1.9-cp312-cp312-manylinux_2_31_aarch64.whl", hash = "sha256:6991e3cd3025ee153eb2a844cf06b1892c196a6079ee3db24e449e55aee06544"}, + {file = "breez_sdk_spark-0.1.9-cp312-cp312-manylinux_2_31_x86_64.whl", hash = "sha256:2ceb597a3bff6755525234ed07892dc82034c745fd9578629b3db85433907165"}, + {file = "breez_sdk_spark-0.1.9-cp312-cp312-win32.whl", hash = "sha256:67d4e9dbc5779d8cd67e879be06006f714ec46b3d8c55ca9f08efae1e21e8889"}, + {file = "breez_sdk_spark-0.1.9-cp312-cp312-win_amd64.whl", hash = "sha256:e418d47aad183cd80a5bd4629b90511e7624676db8996c0a83be42f2d528f650"}, + {file = "breez_sdk_spark-0.1.9-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:d966f858bd889b3a509d8f61d5f1e3f72fd1fab7e1b9830b523363fe357d60fa"}, + {file = "breez_sdk_spark-0.1.9-cp313-cp313-manylinux_2_31_aarch64.whl", hash = "sha256:66763c74afdf2a5c4c4f5248666f87725c76c5853685a377467c3935516576b7"}, + {file = "breez_sdk_spark-0.1.9-cp313-cp313-manylinux_2_31_x86_64.whl", hash = "sha256:ca125db55d0a04b08956c2340b2f8091b55848e5e77c4569128badbf1eca6991"}, + {file = "breez_sdk_spark-0.1.9-cp313-cp313-win32.whl", hash = "sha256:3ea9576e9fcde542ff0dda8df4e3dc20147bb7a93eecf8312b393d825ea170c9"}, + {file = "breez_sdk_spark-0.1.9-cp313-cp313-win_amd64.whl", hash = "sha256:540baacc12287f7e8828e6bbbdf3e6aa9b1aaaf1cb85a0980c0a6e70933bfb3d"}, + {file = "breez_sdk_spark-0.1.9-cp38-cp38-macosx_11_0_universal2.whl", hash = "sha256:1159b0e92581a30e05936e7997e19ef682b1d93057d5136847185a01145eb25c"}, + {file = "breez_sdk_spark-0.1.9-cp38-cp38-manylinux_2_31_aarch64.whl", hash = "sha256:84f91b21a7698938dc7707fd31be10d7449347870ba3d07e7f3ff6260c8de754"}, + {file = "breez_sdk_spark-0.1.9-cp38-cp38-manylinux_2_31_x86_64.whl", hash = "sha256:374e9f486d4e576aa3e55e1aab912c474e4d96115a5b678c1f95a08610e79968"}, + {file = "breez_sdk_spark-0.1.9-cp38-cp38-win32.whl", hash = "sha256:326251dff34f0c19614746eb345bc9906785a071a3eaa7da7c54ea4076557b4c"}, + {file = "breez_sdk_spark-0.1.9-cp38-cp38-win_amd64.whl", hash = "sha256:a205f11710fd1720634632a20e502d27fbdd1d400d94b5545ca74a672f509b1b"}, + {file = "breez_sdk_spark-0.1.9-cp39-cp39-macosx_11_0_universal2.whl", hash = "sha256:aa3ffaf682818608a38582b76db92f4fa2ffdf44634e2163014e594722513106"}, + {file = "breez_sdk_spark-0.1.9-cp39-cp39-manylinux_2_31_aarch64.whl", hash = "sha256:7216030f2cea3d3e5fa3a4e98fda7245334de9e96ce2186bdfd11f49bb1872f7"}, + {file = "breez_sdk_spark-0.1.9-cp39-cp39-manylinux_2_31_x86_64.whl", hash = "sha256:2f4512dd96bf2c7eda940db717fc961e20716dda6a38095e63322c38437a43c2"}, + {file = "breez_sdk_spark-0.1.9-cp39-cp39-win32.whl", hash = "sha256:582855db91d81fe9f0f9b340e54e54d55b16d76b890467f77178b777561da9d6"}, + {file = "breez_sdk_spark-0.1.9-cp39-cp39-win_amd64.whl", hash = "sha256:9f5c6110e9f378bdf446303b0f486dc221c96691710cc453a06ea5f5c12ee226"}, +] + [[package]] name = "brotli" version = "1.1.0" @@ -2657,4 +2696,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "5007f3202dedffb266c3bb0ba3101141a6d865e6979185a0ab6ea7d08c13213c" +content-hash = "af82f494ec3ac5f839968129579cd5986e3ff6994f0d93fcb0ddcec4374fc01f" diff --git a/pyproject.toml b/pyproject.toml index 20bec5c..70dd10c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ redis = "^5.1.1" brotli = "^1.1.0" zstandard = "^0.23.0" jinja2 = "^3.1.5" +breez-sdk-spark = "^0.1.0" [tool.poetry.group.dev.dependencies] pytest-asyncio = "^0.24.0" From 3d2643cce39c332cdcffa27792ff674191027667 Mon Sep 17 00:00:00 2001 From: Aljaz Ceru Date: Thu, 6 Nov 2025 13:21:45 +0100 Subject: [PATCH 2/5] payment detection --- cashu/lightning/spark.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/cashu/lightning/spark.py b/cashu/lightning/spark.py index 5f5969a..3453052 100644 --- a/cashu/lightning/spark.py +++ b/cashu/lightning/spark.py @@ -104,6 +104,24 @@ if EventListener is not None: # Check if this is a payment event we care about and extract the invoice id if hasattr(event, "payment"): payment = event.payment + status = getattr(payment, "status", None) + if status != SparkPaymentStatus.COMPLETED: + logger.debug( + "Spark event %s ignored (status %s)", + event.__class__.__name__, + status, + ) + return + + payment_type = getattr(payment, "payment_type", None) + if payment_type != PaymentType.RECEIVE: + logger.debug( + "Spark event %s ignored (payment type %s)", + event.__class__.__name__, + payment_type, + ) + return + checking_id = _extract_invoice_checking_id(payment) logger.debug( "Spark event %s payment_type=%s status=%s payment_id=%s raw_payment=%r extracted_id=%s", From 5c892f6083cf73fb5850301749d22935d89a3821 Mon Sep 17 00:00:00 2001 From: Aljaz Ceru Date: Tue, 11 Nov 2025 20:36:42 +0100 Subject: [PATCH 3/5] not working --- Dockerfile | 3 +- cashu/core/settings.py | 30 +++- cashu/lightning/spark.py | 347 +++++++++++++++++++++++++++++---------- poetry.lock | 64 ++++---- pyproject.toml | 2 +- 5 files changed, 312 insertions(+), 134 deletions(-) diff --git a/Dockerfile b/Dockerfile index 38e5dcc..b7fc88a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,4 +12,5 @@ WORKDIR /app COPY . . RUN poetry config virtualenvs.create false RUN poetry lock --no-update -RUN poetry install --no-root +# Install all dependencies including breez-sdk-spark (now installs 0.3.4 automatically) +RUN poetry install --no-root --all-extras diff --git a/cashu/core/settings.py b/cashu/core/settings.py index 97753c8..3c8851e 100644 --- a/cashu/core/settings.py +++ b/cashu/core/settings.py @@ -4,7 +4,12 @@ from pathlib import Path from typing import List, Optional from environs import Env # type: ignore -from pydantic import BaseSettings, Extra, Field +from pydantic import Field +try: + from pydantic_settings import BaseSettings + from pydantic import ConfigDict +except ImportError: + from pydantic import BaseSettings, Extra env = Env() @@ -29,14 +34,21 @@ class CashuSettings(BaseSettings): lightning_reserve_fee_min: int = Field(default=2000) max_order: int = Field(default=64) - class Config(BaseSettings.Config): - env_file = find_env_file() - env_file_encoding = "utf-8" - case_sensitive = False - extra = Extra.ignore - - # def __init__(self, env_file=None): - # self.env_file = env_file or self.env_file + try: + # Pydantic v2 style + model_config = ConfigDict( + env_file=find_env_file(), + env_file_encoding="utf-8", + case_sensitive=False, + extra="ignore" + ) + except NameError: + # Pydantic v1 style fallback + class Config(BaseSettings.Config): + env_file = find_env_file() + env_file_encoding = "utf-8" + case_sensitive = False + extra = Extra.ignore class EnvSettings(CashuSettings): diff --git a/cashu/lightning/spark.py b/cashu/lightning/spark.py index 3453052..c3ec065 100644 --- a/cashu/lightning/spark.py +++ b/cashu/lightning/spark.py @@ -1,4 +1,5 @@ import asyncio +import inspect import math from typing import AsyncGenerator, Optional @@ -21,39 +22,57 @@ from .base import ( def _extract_invoice_checking_id(payment) -> Optional[str]: - """Return a normalized identifier that matches the stored mint quote checking_id.""" + """Return a normalized identifier (payment_hash) that matches the stored mint quote checking_id.""" try: details = getattr(payment, "details", None) if details: - logger.debug( - "Spark extract: payment.id=%s type=%s details_type=%s has_invoice=%s has_bolt11=%s has_hash=%s", - getattr(payment, "id", None), - type(payment), - type(details), - hasattr(details, "invoice"), - hasattr(details, "bolt11_invoice"), - hasattr(details, "payment_hash"), - ) + # Only log details for debugging when needed + # logger.debug( + # f"Spark extract: payment.id={getattr(payment, 'id', None)} type={type(payment)} " + # f"details_type={type(details)} has_invoice={hasattr(details, 'invoice')} " + # f"has_bolt11={hasattr(details, 'bolt11_invoice')} has_hash={hasattr(details, 'payment_hash')}" + # ) + + # First priority: payment_hash (most reliable for matching) + payment_hash = getattr(details, "payment_hash", None) + if payment_hash: + # logger.debug(f"Spark extract: using details.payment_hash={payment_hash}") + return payment_hash.lower() + + # Second priority: extract hash from invoice if available invoice = getattr(details, "invoice", None) if invoice: - logger.debug("Spark extract: using details.invoice=%s", invoice) + try: + from bolt11 import decode as bolt11_decode + invoice_obj = bolt11_decode(invoice) + if invoice_obj.payment_hash: + # logger.debug(f"Spark extract: extracted payment_hash from invoice={invoice_obj.payment_hash}") + return invoice_obj.payment_hash.lower() + except Exception: + pass + # Fallback to full invoice if can't extract hash + # logger.debug(f"Spark extract: using details.invoice={invoice[:50]}...") return invoice.lower() bolt11_details = getattr(details, "bolt11_invoice", None) if bolt11_details: bolt11 = getattr(bolt11_details, "bolt11", None) if bolt11: - logger.debug("Spark extract: using bolt11_details.bolt11=%s", bolt11) + try: + from bolt11 import decode as bolt11_decode + invoice_obj = bolt11_decode(bolt11) + if invoice_obj.payment_hash: + # logger.debug(f"Spark extract: extracted payment_hash from bolt11={invoice_obj.payment_hash}") + return invoice_obj.payment_hash.lower() + except Exception: + pass + # logger.debug(f"Spark extract: using bolt11_details.bolt11={bolt11[:50]}...") return bolt11.lower() - payment_hash = getattr(details, "payment_hash", None) - if payment_hash: - logger.debug("Spark extract: using details.payment_hash=%s", payment_hash) - return payment_hash.lower() - + # Fallback: check payment-level payment_hash payment_hash = getattr(payment, "payment_hash", None) if payment_hash: - logger.debug("Spark extract: using payment.payment_hash=%s", payment_hash) + # logger.debug(f"Spark extract: using payment.payment_hash={payment_hash}") return payment_hash.lower() except Exception as exc: # pragma: no cover - defensive logging logger.error(f"Failed to extract Spark invoice identifier: {exc}") @@ -61,6 +80,46 @@ def _extract_invoice_checking_id(payment) -> Optional[str]: return None +def _get_payment_fee_sats(payment) -> Optional[int]: + """Return the payment fee in satoshis if available.""" + fee = None + + for attr in ("fee_sats", "fees", "fee"): + if hasattr(payment, attr): + fee = getattr(payment, attr) + if fee is not None: + break + + if fee is None: + details = getattr(payment, "details", None) + if details is not None and hasattr(details, "fees"): + fee = getattr(details, "fees") + + if fee is None: + return None + + try: + return int(fee) + except (TypeError, ValueError): + try: + return int(str(fee)) + except (TypeError, ValueError): + return None + + +def _get_payment_preimage(payment) -> Optional[str]: + """Return the payment preimage if exposed by the SDK.""" + preimage = getattr(payment, "preimage", None) + if preimage: + return preimage + + details = getattr(payment, "details", None) + if details and hasattr(details, "preimage"): + return getattr(details, "preimage") or None + + return None + + # Import Spark SDK components try: from breez_sdk_spark import ( @@ -81,78 +140,128 @@ try: SdkEvent, SendPaymentRequest, SendPaymentOptions, + Seed, ) -except ImportError: + # Event loop fix will be imported but not applied yet + set_sdk_event_loop = None + try: + from .spark_event_loop_fix import set_sdk_event_loop as _set_sdk_event_loop + set_sdk_event_loop = _set_sdk_event_loop + except ImportError: + pass + # uniffi_set_event_loop is not available in newer versions + spark_uniffi_set_event_loop = None + common_uniffi_set_event_loop = None +except ImportError as e: # Create dummy classes for when SDK is not available BreezSdk = None EventListener = None SparkPaymentStatus = None - logger.warning("Breez SDK Spark not available - SparkBackend will not function") + spark_uniffi_set_event_loop = None + common_uniffi_set_event_loop = None + logger.warning(f"Breez SDK Spark not available - SparkBackend will not function: {e}") if EventListener is not None: class SparkEventListener(EventListener): """Event listener for Spark SDK payment notifications""" - def __init__(self, queue: asyncio.Queue): + def __init__(self, queue: asyncio.Queue, loop: asyncio.AbstractEventLoop): super().__init__() self.queue = queue + self.loop = loop def on_event(self, event: SdkEvent) -> None: - """Handle SDK events in a thread-safe manner""" + """Handle SDK events in a thread-safe manner with robust error handling""" try: + # Debug log ALL events to understand what types of events we get + logger.info(f"Spark SDK event received: {event.__class__.__name__}, hasPayment={hasattr(event, 'payment')}, event={event}") + # Check if this is a payment event we care about and extract the invoice id if hasattr(event, "payment"): payment = event.payment status = getattr(payment, "status", None) - if status != SparkPaymentStatus.COMPLETED: - logger.debug( - "Spark event %s ignored (status %s)", - event.__class__.__name__, - status, - ) - return - payment_type = getattr(payment, "payment_type", None) - if payment_type != PaymentType.RECEIVE: - logger.debug( - "Spark event %s ignored (payment type %s)", - event.__class__.__name__, - payment_type, + + # Debug log all payment events to understand what we're getting + logger.info(f"Spark payment event: status={status}, type={payment_type}, payment={payment}") + + # Less restrictive filtering - allow various statuses that might indicate completed payments + if status and hasattr(SparkPaymentStatus, 'COMPLETED') and status != SparkPaymentStatus.COMPLETED: + # Check if it's a different completion status + if not (hasattr(SparkPaymentStatus, 'SETTLED') and status == SparkPaymentStatus.SETTLED): + logger.debug( + f"Spark event {event.__class__.__name__} ignored (status {status})" + ) + return + + # Less restrictive payment type filtering - log but don't reject non-RECEIVE types yet + if payment_type and hasattr(PaymentType, 'RECEIVE') and payment_type != PaymentType.RECEIVE: + logger.info( + f"Spark event {event.__class__.__name__} has non-RECEIVE type ({payment_type}) - processing anyway" ) - return checking_id = _extract_invoice_checking_id(payment) logger.debug( - "Spark event %s payment_type=%s status=%s payment_id=%s raw_payment=%r extracted_id=%s", - event.__class__.__name__, - getattr(payment, "payment_type", None), - getattr(payment, "status", None), - getattr(payment, "id", None), - payment, - checking_id, + f"Spark event {event.__class__.__name__} payment_type={getattr(payment, 'payment_type', None)} " + f"status={getattr(payment, 'status', None)} payment_id={getattr(payment, 'id', None)} " + f"raw_payment={payment!r} extracted_id={checking_id}" ) if not checking_id: - logger.debug("Spark event %s ignored (no checking id)", event.__class__.__name__) + logger.debug(f"Spark event {event.__class__.__name__} ignored (no checking id)") return - try: - loop = asyncio.get_running_loop() - # Thread-safe queue put with payment hash (checking_id) - asyncio.run_coroutine_threadsafe( - self.queue.put(checking_id), - loop, - ) - except RuntimeError: - logger.warning("No running event loop found for Spark event") + # More robust thread-safe event handling + self._safe_put_event(checking_id) + except Exception as e: logger.error(f"Error handling Spark event: {e}") + import traceback + logger.debug(f"Event handler traceback: {traceback.format_exc()}") + + def _safe_put_event(self, checking_id: str) -> None: + """Safely put an event into the queue from any thread context""" + try: + target_loop = self.loop + if target_loop is None: + logger.warning("Spark event listener has no target loop; dropping event") + return + + if target_loop.is_closed(): + logger.warning("Spark event listener target loop is closed; dropping event") + return + + # Use call_soon_threadsafe for more reliable thread-safe event handling + def queue_put(): + try: + self.queue.put_nowait(checking_id) + logger.info(f"Spark event successfully queued: {checking_id}") + except asyncio.QueueFull: + logger.warning(f"Spark event queue full, dropping event: {checking_id}") + except Exception as e: + logger.error(f"Failed to put event in queue: {e}") + + target_loop.call_soon_threadsafe(queue_put) + + except Exception as exc: + logger.warning(f"Failed to queue Spark event (expected from callback thread): {exc}") + # Fallback: try the original approach + try: + if self.loop and not self.loop.is_closed(): + future = asyncio.run_coroutine_threadsafe( + self.queue.put(checking_id), + self.loop, + ) + logger.info(f"Spark event fallback queued: {checking_id}") + except Exception as fallback_exc: + logger.error(f"Both event queueing methods failed: {fallback_exc}") else: class SparkEventListener: """Dummy event listener when Spark SDK is not available""" - def __init__(self, queue: asyncio.Queue): + def __init__(self, queue: asyncio.Queue, loop: asyncio.AbstractEventLoop): self.queue = queue + self.loop = loop def on_event(self, event) -> None: """Dummy event handler""" @@ -178,6 +287,7 @@ class SparkBackend(LightningBackend): self.event_queue: Optional[asyncio.Queue] = None self.listener: Optional[SparkEventListener] = None self.listener_id: Optional[str] = None + self._event_loop: Optional[asyncio.AbstractEventLoop] = None self._initialized = False self._initialization_lock = asyncio.Lock() self._connection_retry_count = 0 @@ -248,12 +358,24 @@ class SparkBackend(LightningBackend): storage_dir = getattr(settings, 'mint_spark_storage_dir', 'data/spark') connection_timeout = getattr(settings, 'mint_spark_connection_timeout', 30) - # ConnectRequest takes mnemonic directly, not a Seed object + event_loop = asyncio.get_running_loop() + # Store the event loop for SDK callbacks + if 'set_sdk_event_loop' in globals(): + set_sdk_event_loop(event_loop) + for setter in (spark_uniffi_set_event_loop, common_uniffi_set_event_loop): + if setter: + try: + setter(event_loop) + except Exception as exc: # pragma: no cover - defensive log + logger.warning(f"Failed to register event loop with Spark SDK: {exc}") + + # ConnectRequest requires a Seed object (mnemonic or entropy based) + seed = Seed.MNEMONIC(mnemonic=mnemonic, passphrase=None) self.sdk = await asyncio.wait_for( connect( request=ConnectRequest( config=config, - mnemonic=mnemonic, + seed=seed, storage_dir=storage_dir ) ), @@ -262,9 +384,11 @@ class SparkBackend(LightningBackend): # Set up event listener for payment notifications self.event_queue = asyncio.Queue() - self.listener = SparkEventListener(self.event_queue) - # add_event_listener is not async, it returns a string ID directly - self.listener_id = self.sdk.add_event_listener(listener=self.listener) + self._event_loop = event_loop + self.listener = SparkEventListener(self.event_queue, self._event_loop) + self.listener_id = await _await_if_needed( + self.sdk.add_event_listener(listener=self.listener) + ) logger.debug(f"Spark SDK initialized successfully on {network_str} network") # Clear mnemonic from memory @@ -275,12 +399,13 @@ class SparkBackend(LightningBackend): """Proper resource cleanup""" try: if hasattr(self, 'listener_id') and self.sdk: - # remove_event_listener is not async - self.sdk.remove_event_listener(id=self.listener_id) + if self.listener_id: + await _await_if_needed( + self.sdk.remove_event_listener(id=self.listener_id) + ) if self.sdk: - # disconnect is not async - self.sdk.disconnect() + await _await_if_needed(self.sdk.disconnect()) except Exception as e: logger.error(f"Cleanup error: {e}") @@ -288,6 +413,8 @@ class SparkBackend(LightningBackend): self.sdk = None self.listener = None self.event_queue = None + self.listener_id = None + self._event_loop = None self._initialized = False async def _check_connectivity(self) -> bool: @@ -296,7 +423,7 @@ class SparkBackend(LightningBackend): if not self.sdk: return False await asyncio.wait_for( - self.sdk.get_info(request=GetInfoRequest()), + self.sdk.get_info(request=GetInfoRequest(ensure_synced=None)), timeout=5.0 ) return True @@ -306,7 +433,7 @@ class SparkBackend(LightningBackend): async def status(self) -> StatusResponse: try: await self._ensure_initialized() - info = await self.sdk.get_info(request=GetInfoRequest()) + info = await self.sdk.get_info(request=GetInfoRequest(ensure_synced=None)) return StatusResponse( balance=Amount(Unit.sat, info.balance_sats), error_message=None @@ -336,15 +463,26 @@ class SparkBackend(LightningBackend): ) request = ReceivePaymentRequest(payment_method=payment_method) response = await self.sdk.receive_payment(request=request) - logger.debug( - "Spark create_invoice amount=%s response.payment_request=%s", - amount, - response.payment_request, - ) + + # Extract payment_hash from the invoice for consistent matching + from bolt11 import decode as bolt11_decode + try: + invoice_obj = bolt11_decode(response.payment_request) + payment_hash = invoice_obj.payment_hash + logger.debug( + f"Spark create_invoice amount={amount} payment_hash={payment_hash} invoice={response.payment_request[:50]}..." + ) + except Exception as e: + logger.error(f"Failed to extract payment_hash from invoice: {e}") + # Fallback to using full invoice as checking_id + payment_hash = response.payment_request.lower() + + checking_id_to_store = payment_hash.lower() if payment_hash else response.payment_request.lower() + logger.info(f"Spark storing checking_id: {checking_id_to_store[:20]}... (hash: {bool(payment_hash)})") return InvoiceResponse( ok=True, - checking_id=response.payment_request.lower(), + checking_id=checking_id_to_store, payment_request=response.payment_request ) except Exception as e: @@ -388,11 +526,14 @@ class SparkBackend(LightningBackend): # Map Spark payment status to PaymentResult result = self._map_payment_status(payment) + fee_sats = _get_payment_fee_sats(payment) + preimage = _get_payment_preimage(payment) + return PaymentResponse( result=result, checking_id=payment.id, - fee=Amount(Unit.sat, payment.fee_sats) if hasattr(payment, 'fee_sats') and payment.fee_sats else None, - preimage=payment.preimage if hasattr(payment, 'preimage') else None + fee=Amount(Unit.sat, fee_sats) if fee_sats is not None else None, + preimage=preimage ) except Exception as e: logger.error(f"Spark pay_invoice error for quote {quote.quote}: {e}") @@ -416,24 +557,22 @@ class SparkBackend(LightningBackend): for payment in list_response.payments: payment_checking_id = _extract_invoice_checking_id(payment) - logger.debug( - "Spark get_invoice_status candidate id=%s target=%s status=%s payment_id=%s raw_payment=%r", - payment_checking_id, - normalized_checking_id, - getattr(payment, "status", None), - getattr(payment, "id", None), - payment, - ) if payment_checking_id and payment_checking_id == normalized_checking_id: # Found our payment - return its status + logger.debug( + f"Spark payment found: target={normalized_checking_id} status={getattr(payment, 'status', None)}" + ) result = self._map_payment_status(payment) + fee_sats = _get_payment_fee_sats(payment) + preimage = _get_payment_preimage(payment) return PaymentStatus( result=result, - fee=Amount(Unit.sat, payment.fee_sats) if hasattr(payment, 'fee_sats') and payment.fee_sats else None, - preimage=payment.preimage if hasattr(payment, 'preimage') else None + fee=Amount(Unit.sat, fee_sats) if fee_sats is not None else None, + preimage=preimage ) # If not found in payments list, invoice is still pending + logger.debug(f"Spark payment not found for checking_id: {normalized_checking_id[:20]}...") return PaymentStatus( result=PaymentResult.PENDING, error_message=None @@ -450,18 +589,39 @@ class SparkBackend(LightningBackend): try: await self._ensure_initialized() - # Get payment by payment ID - get_request = GetPaymentRequest(payment_id=checking_id) - response = await self.sdk.get_payment(request=get_request) - payment = response.payment + # The checking_id is the invoice/bolt11 string for received payments + # We need to list payments and find the one with matching invoice + list_request = ListPaymentsRequest(payment_type=PaymentType.RECEIVE) + response = await self.sdk.list_payments(request=list_request) + + # Find the payment with matching invoice + target_payment = None + checking_id_lower = checking_id.lower() + + for payment in response.payments: + # Check if this payment's invoice matches our checking_id + invoice_id = _extract_invoice_checking_id(payment) + if invoice_id and invoice_id.lower() == checking_id_lower: + target_payment = payment + logger.debug(f"Found matching payment for invoice {checking_id[:20]}...") + break + + if not target_payment: + logger.debug(f"No payment found for checking_id {checking_id[:20]}...") + return PaymentStatus( + result=PaymentResult.PENDING, + error_message="Payment not found yet" + ) # Map Spark payment status to PaymentResult - result = self._map_payment_status(payment) + result = self._map_payment_status(target_payment) + fee_sats = _get_payment_fee_sats(target_payment) + preimage = _get_payment_preimage(target_payment) return PaymentStatus( result=result, - fee=Amount(Unit.sat, payment.fee_sats) if hasattr(payment, 'fee_sats') and payment.fee_sats else None, - preimage=payment.preimage if hasattr(payment, 'preimage') else None + fee=Amount(Unit.sat, fee_sats) if fee_sats is not None else None, + preimage=preimage ) except Exception as e: logger.error(f"Get payment status error: {e}") @@ -539,7 +699,7 @@ class SparkBackend(LightningBackend): self.event_queue.get(), timeout=30.0 ) - logger.debug("Spark paid_invoices_stream emitting checking_id=%s", payment_id) + logger.debug(f"Spark paid_invoices_stream emitting checking_id={payment_id}") yield payment_id # Reset retry delay on success @@ -577,3 +737,8 @@ class SparkBackend(LightningBackend): except Exception as e: logger.warning(f"Spark health check failed: {e}") return False +async def _await_if_needed(value): + """Await value if it is awaitable; otherwise return it directly.""" + if inspect.isawaitable(value): + return await value + return value diff --git a/poetry.lock b/poetry.lock index d9dd089..c7c1ada 100644 --- a/poetry.lock +++ b/poetry.lock @@ -199,41 +199,41 @@ coincurve = "*" [[package]] name = "breez-sdk-spark" -version = "0.1.9" +version = "0.3.4" description = "Python language bindings for the Breez Spark SDK" optional = false python-versions = "*" files = [ - {file = "breez_sdk_spark-0.1.9-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:7fb6c702f77f54306ebeafc5974461290e96c143df6f1a5afb974b62576b7972"}, - {file = "breez_sdk_spark-0.1.9-cp310-cp310-manylinux_2_31_aarch64.whl", hash = "sha256:041b5ac3d6b2ec9c5ffec1a46ed9874376ad16bfd50bc28c2fc2a9a51a473bab"}, - {file = "breez_sdk_spark-0.1.9-cp310-cp310-manylinux_2_31_x86_64.whl", hash = "sha256:86a78f09cdc4dc8fbcc995073e11bdc0c06982e00b214e0f0210b15fb663e1a3"}, - {file = "breez_sdk_spark-0.1.9-cp310-cp310-win32.whl", hash = "sha256:01c64566d2146b5deabb9e7291c3ebcc400bb054ef8391cc2ca50480c03015c2"}, - {file = "breez_sdk_spark-0.1.9-cp310-cp310-win_amd64.whl", hash = "sha256:4c6906390454f6952bf2b923949d13eefcdccdf8da073496f0023bba41431933"}, - {file = "breez_sdk_spark-0.1.9-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:a5467d289d0245d31343f6d10744071cd092cd2b108fdf5e76e1b2f9e63c4a28"}, - {file = "breez_sdk_spark-0.1.9-cp311-cp311-manylinux_2_31_aarch64.whl", hash = "sha256:afadd71fb816c24ebebbc709a13c1c06091941de31c9add321316c982f06652a"}, - {file = "breez_sdk_spark-0.1.9-cp311-cp311-manylinux_2_31_x86_64.whl", hash = "sha256:5fd1e2e0c381bc4c142c748f4f29b43fb089b8bf909dd55107f63ff8b5fd733d"}, - {file = "breez_sdk_spark-0.1.9-cp311-cp311-win32.whl", hash = "sha256:e53cb1890643f697316f4aad7c831794a798f3d6e8f288aa8906b841cc582678"}, - {file = "breez_sdk_spark-0.1.9-cp311-cp311-win_amd64.whl", hash = "sha256:4d7e73759ccb1da1fca587ffe6f72d166459b34d7f6614de23352860306538ec"}, - {file = "breez_sdk_spark-0.1.9-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:5e944b035fecd6de90b9d18eb8f32b990f662880bd91fb847011f10d12e88367"}, - {file = "breez_sdk_spark-0.1.9-cp312-cp312-manylinux_2_31_aarch64.whl", hash = "sha256:6991e3cd3025ee153eb2a844cf06b1892c196a6079ee3db24e449e55aee06544"}, - {file = "breez_sdk_spark-0.1.9-cp312-cp312-manylinux_2_31_x86_64.whl", hash = "sha256:2ceb597a3bff6755525234ed07892dc82034c745fd9578629b3db85433907165"}, - {file = "breez_sdk_spark-0.1.9-cp312-cp312-win32.whl", hash = "sha256:67d4e9dbc5779d8cd67e879be06006f714ec46b3d8c55ca9f08efae1e21e8889"}, - {file = "breez_sdk_spark-0.1.9-cp312-cp312-win_amd64.whl", hash = "sha256:e418d47aad183cd80a5bd4629b90511e7624676db8996c0a83be42f2d528f650"}, - {file = "breez_sdk_spark-0.1.9-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:d966f858bd889b3a509d8f61d5f1e3f72fd1fab7e1b9830b523363fe357d60fa"}, - {file = "breez_sdk_spark-0.1.9-cp313-cp313-manylinux_2_31_aarch64.whl", hash = "sha256:66763c74afdf2a5c4c4f5248666f87725c76c5853685a377467c3935516576b7"}, - {file = "breez_sdk_spark-0.1.9-cp313-cp313-manylinux_2_31_x86_64.whl", hash = "sha256:ca125db55d0a04b08956c2340b2f8091b55848e5e77c4569128badbf1eca6991"}, - {file = "breez_sdk_spark-0.1.9-cp313-cp313-win32.whl", hash = "sha256:3ea9576e9fcde542ff0dda8df4e3dc20147bb7a93eecf8312b393d825ea170c9"}, - {file = "breez_sdk_spark-0.1.9-cp313-cp313-win_amd64.whl", hash = "sha256:540baacc12287f7e8828e6bbbdf3e6aa9b1aaaf1cb85a0980c0a6e70933bfb3d"}, - {file = "breez_sdk_spark-0.1.9-cp38-cp38-macosx_11_0_universal2.whl", hash = "sha256:1159b0e92581a30e05936e7997e19ef682b1d93057d5136847185a01145eb25c"}, - {file = "breez_sdk_spark-0.1.9-cp38-cp38-manylinux_2_31_aarch64.whl", hash = "sha256:84f91b21a7698938dc7707fd31be10d7449347870ba3d07e7f3ff6260c8de754"}, - {file = "breez_sdk_spark-0.1.9-cp38-cp38-manylinux_2_31_x86_64.whl", hash = "sha256:374e9f486d4e576aa3e55e1aab912c474e4d96115a5b678c1f95a08610e79968"}, - {file = "breez_sdk_spark-0.1.9-cp38-cp38-win32.whl", hash = "sha256:326251dff34f0c19614746eb345bc9906785a071a3eaa7da7c54ea4076557b4c"}, - {file = "breez_sdk_spark-0.1.9-cp38-cp38-win_amd64.whl", hash = "sha256:a205f11710fd1720634632a20e502d27fbdd1d400d94b5545ca74a672f509b1b"}, - {file = "breez_sdk_spark-0.1.9-cp39-cp39-macosx_11_0_universal2.whl", hash = "sha256:aa3ffaf682818608a38582b76db92f4fa2ffdf44634e2163014e594722513106"}, - {file = "breez_sdk_spark-0.1.9-cp39-cp39-manylinux_2_31_aarch64.whl", hash = "sha256:7216030f2cea3d3e5fa3a4e98fda7245334de9e96ce2186bdfd11f49bb1872f7"}, - {file = "breez_sdk_spark-0.1.9-cp39-cp39-manylinux_2_31_x86_64.whl", hash = "sha256:2f4512dd96bf2c7eda940db717fc961e20716dda6a38095e63322c38437a43c2"}, - {file = "breez_sdk_spark-0.1.9-cp39-cp39-win32.whl", hash = "sha256:582855db91d81fe9f0f9b340e54e54d55b16d76b890467f77178b777561da9d6"}, - {file = "breez_sdk_spark-0.1.9-cp39-cp39-win_amd64.whl", hash = "sha256:9f5c6110e9f378bdf446303b0f486dc221c96691710cc453a06ea5f5c12ee226"}, + {file = "breez_sdk_spark-0.3.4-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:9cbaa5324af163b763e3dfccc1f6a449ffbd74666f56a5d8a6d648ead3d997d5"}, + {file = "breez_sdk_spark-0.3.4-cp310-cp310-manylinux_2_31_aarch64.whl", hash = "sha256:6b2d7cf0b721bf2ba24e2b1e665cdffc8714ab85815f1d2f75cb1a839c23a03d"}, + {file = "breez_sdk_spark-0.3.4-cp310-cp310-manylinux_2_31_x86_64.whl", hash = "sha256:3c302a7fcd6db5c01de52c1e26600e86b5ddda59a233f64d2fe31f0033b7155e"}, + {file = "breez_sdk_spark-0.3.4-cp310-cp310-win32.whl", hash = "sha256:4ea8fbe1b1f16c5e4d1ece199efc6bdf8c10827782994b8a1e51684023aea128"}, + {file = "breez_sdk_spark-0.3.4-cp310-cp310-win_amd64.whl", hash = "sha256:f641c0f033fd6c5b61c2af7440812404b0f4a7734aa831a170a4cbacd678e18a"}, + {file = "breez_sdk_spark-0.3.4-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:ee7b10b76a83bcc387e79470962e0f3a7ff81ab5db9f21953db056657fec90f7"}, + {file = "breez_sdk_spark-0.3.4-cp311-cp311-manylinux_2_31_aarch64.whl", hash = "sha256:c9f2d77dde456373af739ac0aadae1c38fa506cc53ede857fff33eae56d709e1"}, + {file = "breez_sdk_spark-0.3.4-cp311-cp311-manylinux_2_31_x86_64.whl", hash = "sha256:4d3e941e5996112f8116b2c1f3727f009b56a9d582472141cd97dee876a79a41"}, + {file = "breez_sdk_spark-0.3.4-cp311-cp311-win32.whl", hash = "sha256:de2d2bfc6c7fee6086d75b1e37744da2467c0e62bc8e119b9c76664728cd4ead"}, + {file = "breez_sdk_spark-0.3.4-cp311-cp311-win_amd64.whl", hash = "sha256:0789d0c7d0852afa5ae1b08bbfc44ad841c92faf149356dda343577b4ca0c949"}, + {file = "breez_sdk_spark-0.3.4-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:cb503060516c4ad96e3fbda5bf693dfede04eb0d1a8ab685453c8161fd808103"}, + {file = "breez_sdk_spark-0.3.4-cp312-cp312-manylinux_2_31_aarch64.whl", hash = "sha256:ae57372c1f559ffb5f12eaa9627538d408b25c695473900855e7290ef391ada4"}, + {file = "breez_sdk_spark-0.3.4-cp312-cp312-manylinux_2_31_x86_64.whl", hash = "sha256:5a4d13a9ebb402f1cb2ec10556e4150db6cf2a73c3c91d036d6f38860cca6895"}, + {file = "breez_sdk_spark-0.3.4-cp312-cp312-win32.whl", hash = "sha256:4a3e4eb1404fc915d5bd6d515bd863001e791aed9415710778148fed31a6b43d"}, + {file = "breez_sdk_spark-0.3.4-cp312-cp312-win_amd64.whl", hash = "sha256:58cc6cd7551c70ac062c29e02a0f3b557b6b82cef4abe71f87094b1dafb72495"}, + {file = "breez_sdk_spark-0.3.4-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:d1f8f8a8ffe4392d0833487b089280f846b47c7320a8497a0fbd589b1af0dae9"}, + {file = "breez_sdk_spark-0.3.4-cp313-cp313-manylinux_2_31_aarch64.whl", hash = "sha256:c35d365a90da2f70aac7831987431e49a1f8f4ac0a5f92ab16bf868ec45c7792"}, + {file = "breez_sdk_spark-0.3.4-cp313-cp313-manylinux_2_31_x86_64.whl", hash = "sha256:2fe652e45ed761a4faee9e6a789ceb6fde09dc62a8f2662dfb72b387b936260c"}, + {file = "breez_sdk_spark-0.3.4-cp313-cp313-win32.whl", hash = "sha256:0fe6fa8f156bf8f052e7c40f7cc6a063244793a6bbe24b560f84e0191e2b5914"}, + {file = "breez_sdk_spark-0.3.4-cp313-cp313-win_amd64.whl", hash = "sha256:cec60322d21d4e4300871020d0b4e8d3ba69a98846c588422696310ab48e5727"}, + {file = "breez_sdk_spark-0.3.4-cp38-cp38-macosx_11_0_universal2.whl", hash = "sha256:c93add503f09c0ca99c1b17598e90bb6b9181e98e77485b926851b75dc16423a"}, + {file = "breez_sdk_spark-0.3.4-cp38-cp38-manylinux_2_31_aarch64.whl", hash = "sha256:9de93152ca2be527f73df215ab278884b0957578cb3670f15111a358cd55d0be"}, + {file = "breez_sdk_spark-0.3.4-cp38-cp38-manylinux_2_31_x86_64.whl", hash = "sha256:064cdaee5f92ee8a12c3b7d90633666e34b5eee129e722147354b45409f18506"}, + {file = "breez_sdk_spark-0.3.4-cp38-cp38-win32.whl", hash = "sha256:0283815011edddfd18c82ff85e6b1e76092f9fd3c346cacbc700a67709d35a6f"}, + {file = "breez_sdk_spark-0.3.4-cp38-cp38-win_amd64.whl", hash = "sha256:e85fccbd6844ed3efa585d137a5b8166d12271f2e6e95530622a12a526776a4f"}, + {file = "breez_sdk_spark-0.3.4-cp39-cp39-macosx_11_0_universal2.whl", hash = "sha256:97fcf7372dc10051abd0be40548472fb7332c19263d215c17c95fb7ec6c715bb"}, + {file = "breez_sdk_spark-0.3.4-cp39-cp39-manylinux_2_31_aarch64.whl", hash = "sha256:08a133b43ff123b7e231ff849930f884254ff608dc5407c12276d09ebe645e8b"}, + {file = "breez_sdk_spark-0.3.4-cp39-cp39-manylinux_2_31_x86_64.whl", hash = "sha256:69753fe329cc57e84883d33e2bddcb0bf816f849b724c0ddb86904f4c8d49b55"}, + {file = "breez_sdk_spark-0.3.4-cp39-cp39-win32.whl", hash = "sha256:d68c8c22a837a4ca81375ee4f1d7f72e0808a120ca933c8818308fd185f21665"}, + {file = "breez_sdk_spark-0.3.4-cp39-cp39-win_amd64.whl", hash = "sha256:82fb1c8c147649775259e12e03a63e3ce89bbd000dd7690a0957977807a739e3"}, ] [[package]] @@ -2696,4 +2696,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "af82f494ec3ac5f839968129579cd5986e3ff6994f0d93fcb0ddcec4374fc01f" +content-hash = "69b57bd10ce6b62ec91378f866d77f641a4e99aa5cfc79ac2a4910e2bc1880c8" diff --git a/pyproject.toml b/pyproject.toml index 0c645c7..e5a7441 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ redis = "^5.1.1" brotli = "^1.1.0" zstandard = "^0.23.0" jinja2 = "^3.1.5" -breez-sdk-spark = "^0.1.0" +breez-sdk-spark = "^0.3.0" [tool.poetry.group.dev.dependencies] pytest-asyncio = "^0.24.0" From bbf820c584d8037069a2f3af6ff59f445dc51201 Mon Sep 17 00:00:00 2001 From: Aljaz Ceru Date: Wed, 12 Nov 2025 17:11:39 +0100 Subject: [PATCH 4/5] fixing payment status --- cashu/lightning/spark.py | 161 ++++++++++++++++++++++++++++++++++----- cashu/mint/tasks.py | 54 ++++++++++++- docker-compose.yaml | 10 ++- 3 files changed, 200 insertions(+), 25 deletions(-) diff --git a/cashu/lightning/spark.py b/cashu/lightning/spark.py index c3ec065..8b6f812 100644 --- a/cashu/lightning/spark.py +++ b/cashu/lightning/spark.py @@ -121,7 +121,10 @@ def _get_payment_preimage(payment) -> Optional[str]: # Import Spark SDK components +set_sdk_event_loop = None +_get_sdk_event_loop = None try: + from breez_sdk_spark import breez_sdk_spark as spark_bindings from breez_sdk_spark import ( BreezSdk, connect, @@ -130,6 +133,7 @@ try: EventListener, GetInfoRequest, GetPaymentRequest, + PaymentMethod, ListPaymentsRequest, Network, PaymentStatus as SparkPaymentStatus, @@ -144,24 +148,60 @@ try: ) # Event loop fix will be imported but not applied yet set_sdk_event_loop = None + _get_sdk_event_loop = None try: - from .spark_event_loop_fix import set_sdk_event_loop as _set_sdk_event_loop + from .spark_event_loop_fix import ( + set_sdk_event_loop as _set_sdk_event_loop, + ensure_event_loop as _ensure_event_loop, + ) set_sdk_event_loop = _set_sdk_event_loop + _get_sdk_event_loop = _ensure_event_loop + if spark_bindings is not None: + try: + spark_bindings._uniffi_get_event_loop = _ensure_event_loop + logger.debug("Patched breez_sdk_spark._uniffi_get_event_loop") + except Exception as exc: # pragma: no cover + logger.warning(f"Failed to patch Spark SDK event loop getter: {exc}") except ImportError: - pass + _get_sdk_event_loop = None # uniffi_set_event_loop is not available in newer versions spark_uniffi_set_event_loop = None common_uniffi_set_event_loop = None except ImportError as e: # Create dummy classes for when SDK is not available + spark_bindings = None BreezSdk = None EventListener = None + PaymentMethod = None SparkPaymentStatus = None spark_uniffi_set_event_loop = None common_uniffi_set_event_loop = None logger.warning(f"Breez SDK Spark not available - SparkBackend will not function: {e}") +def _get_payment_amount_sats(payment) -> Optional[int]: + """Return the payment amount in satoshis if available.""" + amount = getattr(payment, "amount", None) + if amount is None: + return None + try: + return int(amount) + except (TypeError, ValueError): + try: + return int(str(amount)) + except (TypeError, ValueError): + return None + + +def _is_lightning_payment(payment) -> bool: + """Check whether the payment method represents a Lightning payment.""" + method = getattr(payment, "method", None) + lightning_method = getattr(PaymentMethod, "LIGHTNING", None) + if lightning_method is None or method is None: + return True # fall back to permissive behavior if enum missing + return method == lightning_method + + if EventListener is not None: class SparkEventListener(EventListener): """Event listener for Spark SDK payment notifications""" @@ -171,7 +211,7 @@ if EventListener is not None: self.queue = queue self.loop = loop - def on_event(self, event: SdkEvent) -> None: + async def on_event(self, event: SdkEvent) -> None: """Handle SDK events in a thread-safe manner with robust error handling""" try: # Debug log ALL events to understand what types of events we get @@ -182,11 +222,12 @@ if EventListener is not None: payment = event.payment status = getattr(payment, "status", None) payment_type = getattr(payment, "payment_type", None) + receive_type = getattr(PaymentType, "RECEIVE", None) # Debug log all payment events to understand what we're getting logger.info(f"Spark payment event: status={status}, type={payment_type}, payment={payment}") - # Less restrictive filtering - allow various statuses that might indicate completed payments + # Only consider completed/settled payments if status and hasattr(SparkPaymentStatus, 'COMPLETED') and status != SparkPaymentStatus.COMPLETED: # Check if it's a different completion status if not (hasattr(SparkPaymentStatus, 'SETTLED') and status == SparkPaymentStatus.SETTLED): @@ -195,11 +236,25 @@ if EventListener is not None: ) return - # Less restrictive payment type filtering - log but don't reject non-RECEIVE types yet - if payment_type and hasattr(PaymentType, 'RECEIVE') and payment_type != PaymentType.RECEIVE: - logger.info( - f"Spark event {event.__class__.__name__} has non-RECEIVE type ({payment_type}) - processing anyway" + # Require RECEIVE payment type if enum exists + if receive_type and payment_type and payment_type != receive_type: + logger.debug( + f"Spark event {event.__class__.__name__} ignored (type {payment_type})" ) + return + + if not _is_lightning_payment(payment): + logger.debug( + f"Spark event {event.__class__.__name__} ignored (non-lightning method {getattr(payment, 'method', None)})" + ) + return + + amount_sats = _get_payment_amount_sats(payment) + if amount_sats is not None and amount_sats <= 0: + logger.debug( + f"Spark event {event.__class__.__name__} ignored (non-positive amount {amount_sats})" + ) + return checking_id = _extract_invoice_checking_id(payment) logger.debug( @@ -223,6 +278,11 @@ if EventListener is not None: """Safely put an event into the queue from any thread context""" try: target_loop = self.loop + if (target_loop is None or target_loop.is_closed()) and callable(_get_sdk_event_loop): + alt_loop = _get_sdk_event_loop() + if alt_loop and not alt_loop.is_closed(): + target_loop = alt_loop + if target_loop is None: logger.warning("Spark event listener has no target loop; dropping event") return @@ -263,7 +323,7 @@ else: self.queue = queue self.loop = loop - def on_event(self, event) -> None: + async def on_event(self, event) -> None: """Dummy event handler""" logger.warning("SparkEventListener called but Spark SDK not available") @@ -360,15 +420,30 @@ class SparkBackend(LightningBackend): event_loop = asyncio.get_running_loop() # Store the event loop for SDK callbacks - if 'set_sdk_event_loop' in globals(): - set_sdk_event_loop(event_loop) + applied = False + if callable(set_sdk_event_loop): + try: + set_sdk_event_loop(event_loop) + applied = True + logger.debug("Registered Spark SDK event loop via Python fix") + except Exception as exc: # pragma: no cover - defensive log + logger.warning(f"Failed to set Spark SDK event loop via python fix: {exc}") + for setter in (spark_uniffi_set_event_loop, common_uniffi_set_event_loop): - if setter: + if callable(setter): try: setter(event_loop) + applied = True + logger.debug(f"Registered Spark SDK event loop via {setter.__name__}") except Exception as exc: # pragma: no cover - defensive log logger.warning(f"Failed to register event loop with Spark SDK: {exc}") + if not applied: + logger.warning( + "Spark SDK event loop could not be registered; callbacks may fail. " + "Ensure the shim in cashu/lightning/spark_event_loop_fix.py is available." + ) + # ConnectRequest requires a Seed object (mnemonic or entropy based) seed = Seed.MNEMONIC(mnemonic=mnemonic, passphrase=None) self.sdk = await asyncio.wait_for( @@ -528,10 +603,16 @@ class SparkBackend(LightningBackend): fee_sats = _get_payment_fee_sats(payment) preimage = _get_payment_preimage(payment) + checking_id = ( + quote.checking_id + or _extract_invoice_checking_id(payment) + or getattr(payment, "payment_hash", None) + or getattr(payment, "id", None) + ) return PaymentResponse( result=result, - checking_id=payment.id, + checking_id=checking_id, fee=Amount(Unit.sat, fee_sats) if fee_sats is not None else None, preimage=preimage ) @@ -547,15 +628,34 @@ class SparkBackend(LightningBackend): await self._ensure_initialized() # For Spark SDK, checking_id is the Lightning invoice/payment_request - # We need to get all payments and find the one with this payment_request + # We need to get all RECEIVE payments and find the one with this payment_request from .base import PaymentResult - # List all recent payments to find our invoice - list_request = ListPaymentsRequest() + receive_type = getattr(PaymentType, "RECEIVE", None) + type_filter = [receive_type] if receive_type else None + if type_filter: + list_request = ListPaymentsRequest(type_filter=type_filter) + else: + list_request = ListPaymentsRequest() list_response = await self.sdk.list_payments(request=list_request) normalized_checking_id = checking_id.lower() for payment in list_response.payments: + payment_type = getattr(payment, "payment_type", None) + if receive_type and payment_type and payment_type != receive_type: + continue + + if not _is_lightning_payment(payment): + continue + + amount_sats = _get_payment_amount_sats(payment) + if amount_sats is not None and amount_sats <= 0: + logger.debug( + "Spark get_invoice_status skipping zero-amount receive payment id=%s", + getattr(payment, "id", None), + ) + continue + payment_checking_id = _extract_invoice_checking_id(payment) if payment_checking_id and payment_checking_id == normalized_checking_id: # Found our payment - return its status @@ -589,9 +689,15 @@ class SparkBackend(LightningBackend): try: await self._ensure_initialized() - # The checking_id is the invoice/bolt11 string for received payments - # We need to list payments and find the one with matching invoice - list_request = ListPaymentsRequest(payment_type=PaymentType.RECEIVE) + # Melt checking_id represents the outgoing invoice/payment hash. + # Query SEND payments (or all if enum missing) so we can match outgoing attempts. + send_type = getattr(PaymentType, "SEND", None) + type_filter = [send_type] if send_type else None + if type_filter: + list_request = ListPaymentsRequest(type_filter=type_filter) + else: + list_request = ListPaymentsRequest() + response = await self.sdk.list_payments(request=list_request) # Find the payment with matching invoice @@ -599,7 +705,22 @@ class SparkBackend(LightningBackend): checking_id_lower = checking_id.lower() for payment in response.payments: - # Check if this payment's invoice matches our checking_id + payment_type = getattr(payment, "payment_type", None) + if send_type and payment_type and payment_type != send_type: + continue + + if not _is_lightning_payment(payment): + continue + + amount_sats = _get_payment_amount_sats(payment) + if amount_sats is not None and amount_sats <= 0: + logger.debug( + "Spark get_payment_status skipping zero-amount send payment id=%s", + getattr(payment, "id", None), + ) + continue + + # Check if this payment's invoice hash matches our checking_id invoice_id = _extract_invoice_checking_id(payment) if invoice_id and invoice_id.lower() == checking_id_lower: target_payment = payment diff --git a/cashu/mint/tasks.py b/cashu/mint/tasks.py index c87a65f..68a8cf7 100644 --- a/cashu/mint/tasks.py +++ b/cashu/mint/tasks.py @@ -3,7 +3,7 @@ from typing import List from loguru import logger -from ..core.base import MintQuoteState +from ..core.base import MintQuoteState, Method, Unit from ..core.settings import settings from ..lightning.base import LightningBackend from .protocols import SupportsBackends, SupportsDb, SupportsEvents @@ -58,6 +58,14 @@ class LedgerTasks(SupportsDb, SupportsBackends, SupportsEvents): ) # set the quote as paid if quote.unpaid: + confirmed = await self._confirm_invoice_paid_with_backend(quote) + if not confirmed: + logger.debug( + "Invoice callback ignored for %s; backend still reports %s", + quote.quote, + "pending" if quote.unpaid else quote.state.value, + ) + return quote.state = MintQuoteState.paid await self.crud.update_mint_quote(quote=quote, db=self.db, conn=conn) logger.trace( @@ -65,3 +73,47 @@ class LedgerTasks(SupportsDb, SupportsBackends, SupportsEvents): ) await self.events.submit(quote) + + async def _confirm_invoice_paid_with_backend(self, quote) -> bool: + """Ensure backend agrees invoice is settled before updating DB.""" + try: + method = Method[quote.method] + except KeyError: + logger.error(f"Unknown payment method on quote {quote.quote}: {quote.method}") + return False + + try: + unit = Unit[quote.unit] + except KeyError: + logger.error(f"Unknown unit on quote {quote.quote}: {quote.unit}") + return False + + if not quote.checking_id: + logger.error(f"Quote {quote.quote} missing checking_id; cannot verify payment") + return False + + method_backends = self.backends.get(method) + if not method_backends: + logger.error(f"No backend registered for method {method}") + return False + + backend = method_backends.get(unit) + if not backend: + logger.error(f"No backend registered for method {method} unit {unit}") + return False + + try: + status = await backend.get_invoice_status(quote.checking_id) + except Exception as exc: + logger.error(f"Backend verification failed for quote {quote.quote}: {exc}") + return False + + if not status.settled: + logger.debug( + "Backend reported %s for quote %s; deferring state change", + status.result, + quote.quote, + ) + return False + + return True diff --git a/docker-compose.yaml b/docker-compose.yaml index 687bd3b..8eeab51 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -6,11 +6,13 @@ services: container_name: mint ports: - "3338:3338" + env_file: + - .env environment: - - MINT_BACKEND_BOLT11_SAT=FakeWallet - - MINT_LISTEN_HOST=0.0.0.0 - - MINT_LISTEN_PORT=3338 - - MINT_PRIVATE_KEY=TEST_PRIVATE_KEY + - MINT_BACKEND_BOLT11_SAT=${MINT_BACKEND_BOLT11_SAT:-FakeWallet} + - MINT_LISTEN_HOST=${MINT_LISTEN_HOST:-0.0.0.0} + - MINT_LISTEN_PORT=${MINT_LISTEN_PORT:-3338} + - MINT_PRIVATE_KEY=${MINT_PRIVATE_KEY:-TEST_PRIVATE_KEY} command: ["poetry", "run", "mint"] wallet: build: From 5332adae40b568e0d937a72b96567b617448dbca Mon Sep 17 00:00:00 2001 From: Aljaz Ceru Date: Thu, 20 Nov 2025 16:47:57 +0100 Subject: [PATCH 5/5] pre-commit pass --- cashu/lightning/spark.py | 250 +++++++++++++++++++++++++-------------- 1 file changed, 159 insertions(+), 91 deletions(-) diff --git a/cashu/lightning/spark.py b/cashu/lightning/spark.py index 8b6f812..a045bb6 100644 --- a/cashu/lightning/spark.py +++ b/cashu/lightning/spark.py @@ -1,6 +1,5 @@ import asyncio import inspect -import math from typing import AsyncGenerator, Optional from bolt11 import decode @@ -44,6 +43,7 @@ def _extract_invoice_checking_id(payment) -> Optional[str]: if invoice: try: from bolt11 import decode as bolt11_decode + invoice_obj = bolt11_decode(invoice) if invoice_obj.payment_hash: # logger.debug(f"Spark extract: extracted payment_hash from invoice={invoice_obj.payment_hash}") @@ -60,6 +60,7 @@ def _extract_invoice_checking_id(payment) -> Optional[str]: if bolt11: try: from bolt11 import decode as bolt11_decode + invoice_obj = bolt11_decode(bolt11) if invoice_obj.payment_hash: # logger.debug(f"Spark extract: extracted payment_hash from bolt11={invoice_obj.payment_hash}") @@ -124,36 +125,41 @@ def _get_payment_preimage(payment) -> Optional[str]: set_sdk_event_loop = None _get_sdk_event_loop = None try: - from breez_sdk_spark import breez_sdk_spark as spark_bindings from breez_sdk_spark import ( BreezSdk, - connect, ConnectRequest, - default_config, EventListener, GetInfoRequest, - GetPaymentRequest, - PaymentMethod, ListPaymentsRequest, Network, - PaymentStatus as SparkPaymentStatus, + PaymentMethod, PaymentType, + PrepareSendPaymentRequest, ReceivePaymentMethod, ReceivePaymentRequest, - PrepareSendPaymentRequest, SdkEvent, - SendPaymentRequest, - SendPaymentOptions, Seed, + SendPaymentOptions, + SendPaymentRequest, + connect, + default_config, ) + from breez_sdk_spark import ( + PaymentStatus as SparkPaymentStatus, + ) + from breez_sdk_spark import breez_sdk_spark as spark_bindings + # Event loop fix will be imported but not applied yet set_sdk_event_loop = None _get_sdk_event_loop = None try: from .spark_event_loop_fix import ( - set_sdk_event_loop as _set_sdk_event_loop, ensure_event_loop as _ensure_event_loop, ) + from .spark_event_loop_fix import ( + set_sdk_event_loop as _set_sdk_event_loop, + ) + set_sdk_event_loop = _set_sdk_event_loop _get_sdk_event_loop = _ensure_event_loop if spark_bindings is not None: @@ -176,7 +182,9 @@ except ImportError as e: SparkPaymentStatus = None spark_uniffi_set_event_loop = None common_uniffi_set_event_loop = None - logger.warning(f"Breez SDK Spark not available - SparkBackend will not function: {e}") + logger.warning( + f"Breez SDK Spark not available - SparkBackend will not function: {e}" + ) def _get_payment_amount_sats(payment) -> Optional[int]: @@ -203,6 +211,7 @@ def _is_lightning_payment(payment) -> bool: if EventListener is not None: + class SparkEventListener(EventListener): """Event listener for Spark SDK payment notifications""" @@ -215,7 +224,9 @@ if EventListener is not None: """Handle SDK events in a thread-safe manner with robust error handling""" try: # Debug log ALL events to understand what types of events we get - logger.info(f"Spark SDK event received: {event.__class__.__name__}, hasPayment={hasattr(event, 'payment')}, event={event}") + logger.info( + f"Spark SDK event received: {event.__class__.__name__}, hasPayment={hasattr(event, 'payment')}, event={event}" + ) # Check if this is a payment event we care about and extract the invoice id if hasattr(event, "payment"): @@ -225,12 +236,21 @@ if EventListener is not None: receive_type = getattr(PaymentType, "RECEIVE", None) # Debug log all payment events to understand what we're getting - logger.info(f"Spark payment event: status={status}, type={payment_type}, payment={payment}") + logger.info( + f"Spark payment event: status={status}, type={payment_type}, payment={payment}" + ) # Only consider completed/settled payments - if status and hasattr(SparkPaymentStatus, 'COMPLETED') and status != SparkPaymentStatus.COMPLETED: + if ( + status + and hasattr(SparkPaymentStatus, "COMPLETED") + and status != SparkPaymentStatus.COMPLETED + ): # Check if it's a different completion status - if not (hasattr(SparkPaymentStatus, 'SETTLED') and status == SparkPaymentStatus.SETTLED): + if not ( + hasattr(SparkPaymentStatus, "SETTLED") + and status == SparkPaymentStatus.SETTLED + ): logger.debug( f"Spark event {event.__class__.__name__} ignored (status {status})" ) @@ -263,7 +283,9 @@ if EventListener is not None: f"raw_payment={payment!r} extracted_id={checking_id}" ) if not checking_id: - logger.debug(f"Spark event {event.__class__.__name__} ignored (no checking id)") + logger.debug( + f"Spark event {event.__class__.__name__} ignored (no checking id)" + ) return # More robust thread-safe event handling @@ -272,23 +294,30 @@ if EventListener is not None: except Exception as e: logger.error(f"Error handling Spark event: {e}") import traceback + logger.debug(f"Event handler traceback: {traceback.format_exc()}") def _safe_put_event(self, checking_id: str) -> None: """Safely put an event into the queue from any thread context""" try: target_loop = self.loop - if (target_loop is None or target_loop.is_closed()) and callable(_get_sdk_event_loop): + if (target_loop is None or target_loop.is_closed()) and callable( + _get_sdk_event_loop + ): alt_loop = _get_sdk_event_loop() if alt_loop and not alt_loop.is_closed(): target_loop = alt_loop if target_loop is None: - logger.warning("Spark event listener has no target loop; dropping event") + logger.warning( + "Spark event listener has no target loop; dropping event" + ) return if target_loop.is_closed(): - logger.warning("Spark event listener target loop is closed; dropping event") + logger.warning( + "Spark event listener target loop is closed; dropping event" + ) return # Use call_soon_threadsafe for more reliable thread-safe event handling @@ -297,18 +326,22 @@ if EventListener is not None: self.queue.put_nowait(checking_id) logger.info(f"Spark event successfully queued: {checking_id}") except asyncio.QueueFull: - logger.warning(f"Spark event queue full, dropping event: {checking_id}") + logger.warning( + f"Spark event queue full, dropping event: {checking_id}" + ) except Exception as e: logger.error(f"Failed to put event in queue: {e}") target_loop.call_soon_threadsafe(queue_put) except Exception as exc: - logger.warning(f"Failed to queue Spark event (expected from callback thread): {exc}") + logger.warning( + f"Failed to queue Spark event (expected from callback thread): {exc}" + ) # Fallback: try the original approach try: if self.loop and not self.loop.is_closed(): - future = asyncio.run_coroutine_threadsafe( + asyncio.run_coroutine_threadsafe( self.queue.put(checking_id), self.loop, ) @@ -316,7 +349,8 @@ if EventListener is not None: except Exception as fallback_exc: logger.error(f"Both event queueing methods failed: {fallback_exc}") else: - class SparkEventListener: + + class SparkEventListener: # type: ignore[no-redef] """Dummy event listener when Spark SDK is not available""" def __init__(self, queue: asyncio.Queue, loop: asyncio.AbstractEventLoop): @@ -351,7 +385,7 @@ class SparkBackend(LightningBackend): self._initialized = False self._initialization_lock = asyncio.Lock() self._connection_retry_count = 0 - self._max_retries = getattr(settings, 'mint_spark_retry_attempts', 3) + self._max_retries = getattr(settings, "mint_spark_retry_attempts", 3) self._retry_delay = 5.0 # Validate required settings @@ -397,7 +431,7 @@ class SparkBackend(LightningBackend): if attempt == self._max_retries - 1: raise - delay = self._retry_delay * (2 ** attempt) + delay = self._retry_delay * (2**attempt) logger.warning( f"SDK init attempt {attempt + 1} failed: {e}. " f"Retrying in {delay}s" @@ -409,14 +443,14 @@ class SparkBackend(LightningBackend): mnemonic = settings.mint_spark_mnemonic # Determine network - network_str = getattr(settings, 'mint_spark_network', 'mainnet').lower() - network = Network.MAINNET if network_str == 'mainnet' else Network.TESTNET + network_str = getattr(settings, "mint_spark_network", "mainnet").lower() + network = Network.MAINNET if network_str == "mainnet" else Network.TESTNET config = default_config(network=network) config.api_key = settings.mint_spark_api_key - storage_dir = getattr(settings, 'mint_spark_storage_dir', 'data/spark') - connection_timeout = getattr(settings, 'mint_spark_connection_timeout', 30) + storage_dir = getattr(settings, "mint_spark_storage_dir", "data/spark") + connection_timeout = getattr(settings, "mint_spark_connection_timeout", 30) event_loop = asyncio.get_running_loop() # Store the event loop for SDK callbacks @@ -427,16 +461,22 @@ class SparkBackend(LightningBackend): applied = True logger.debug("Registered Spark SDK event loop via Python fix") except Exception as exc: # pragma: no cover - defensive log - logger.warning(f"Failed to set Spark SDK event loop via python fix: {exc}") + logger.warning( + f"Failed to set Spark SDK event loop via python fix: {exc}" + ) for setter in (spark_uniffi_set_event_loop, common_uniffi_set_event_loop): if callable(setter): try: setter(event_loop) applied = True - logger.debug(f"Registered Spark SDK event loop via {setter.__name__}") + logger.debug( + f"Registered Spark SDK event loop via {setter.__name__}" + ) except Exception as exc: # pragma: no cover - defensive log - logger.warning(f"Failed to register event loop with Spark SDK: {exc}") + logger.warning( + f"Failed to register event loop with Spark SDK: {exc}" + ) if not applied: logger.warning( @@ -449,31 +489,29 @@ class SparkBackend(LightningBackend): self.sdk = await asyncio.wait_for( connect( request=ConnectRequest( - config=config, - seed=seed, - storage_dir=storage_dir + config=config, seed=seed, storage_dir=storage_dir ) ), - timeout=connection_timeout + timeout=connection_timeout, ) # Set up event listener for payment notifications self.event_queue = asyncio.Queue() self._event_loop = event_loop self.listener = SparkEventListener(self.event_queue, self._event_loop) - self.listener_id = await _await_if_needed( - self.sdk.add_event_listener(listener=self.listener) - ) + if self.sdk is not None: + self.listener_id = await _await_if_needed( + self.sdk.add_event_listener(listener=self.listener) + ) logger.debug(f"Spark SDK initialized successfully on {network_str} network") # Clear mnemonic from memory - mnemonic = None del mnemonic async def cleanup(self) -> None: """Proper resource cleanup""" try: - if hasattr(self, 'listener_id') and self.sdk: + if hasattr(self, "listener_id") and self.sdk: if self.listener_id: await _await_if_needed( self.sdk.remove_event_listener(id=self.listener_id) @@ -499,7 +537,7 @@ class SparkBackend(LightningBackend): return False await asyncio.wait_for( self.sdk.get_info(request=GetInfoRequest(ensure_synced=None)), - timeout=5.0 + timeout=5.0, ) return True except Exception: @@ -508,16 +546,16 @@ class SparkBackend(LightningBackend): async def status(self) -> StatusResponse: try: await self._ensure_initialized() + assert self.sdk is not None info = await self.sdk.get_info(request=GetInfoRequest(ensure_synced=None)) return StatusResponse( - balance=Amount(Unit.sat, info.balance_sats), - error_message=None + balance=Amount(Unit.sat, info.balance_sats), error_message=None ) except Exception as e: logger.error(f"Spark status error: {e}") return StatusResponse( error_message=f"Failed to connect to Spark SDK: {e}", - balance=Amount(self.unit, 0) + balance=Amount(self.unit, 0), ) async def create_invoice( @@ -533,14 +571,15 @@ class SparkBackend(LightningBackend): await self._ensure_initialized() payment_method = ReceivePaymentMethod.BOLT11_INVOICE( - description=memo or "", - amount_sats=amount.to(Unit.sat).amount + description=memo or "", amount_sats=amount.to(Unit.sat).amount ) request = ReceivePaymentRequest(payment_method=payment_method) + assert self.sdk is not None response = await self.sdk.receive_payment(request=request) # Extract payment_hash from the invoice for consistent matching from bolt11 import decode as bolt11_decode + try: invoice_obj = bolt11_decode(response.payment_request) payment_hash = invoice_obj.payment_hash @@ -552,17 +591,25 @@ class SparkBackend(LightningBackend): # Fallback to using full invoice as checking_id payment_hash = response.payment_request.lower() - checking_id_to_store = payment_hash.lower() if payment_hash else response.payment_request.lower() - logger.info(f"Spark storing checking_id: {checking_id_to_store[:20]}... (hash: {bool(payment_hash)})") + checking_id_to_store = ( + payment_hash.lower() + if payment_hash + else response.payment_request.lower() + ) + logger.info( + f"Spark storing checking_id: {checking_id_to_store[:20]}... (hash: {bool(payment_hash)})" + ) return InvoiceResponse( ok=True, checking_id=checking_id_to_store, - payment_request=response.payment_request + payment_request=response.payment_request, ) except Exception as e: logger.error(f"Spark create_invoice error for amount {amount}: {e}") - return InvoiceResponse(ok=False, error_message=f"Invoice creation failed: {e}") + return InvoiceResponse( + ok=False, error_message=f"Invoice creation failed: {e}" + ) async def pay_invoice( self, quote: MeltQuote, fee_limit_msat: int @@ -573,18 +620,19 @@ class SparkBackend(LightningBackend): # Prepare the payment prepare_request = PrepareSendPaymentRequest( payment_request=quote.request, - amount=None # Use invoice amount + amount=None, # Use invoice amount + ) + assert self.sdk is not None + prepare_response = await self.sdk.prepare_send_payment( + request=prepare_request ) - prepare_response = await self.sdk.prepare_send_payment(request=prepare_request) # Send the payment options = SendPaymentOptions.BOLT11_INVOICE( - prefer_spark=False, - completion_timeout_secs=30 + prefer_spark=False, completion_timeout_secs=30 ) send_request = SendPaymentRequest( - prepare_response=prepare_response, - options=options + prepare_response=prepare_response, options=options ) send_response = await self.sdk.send_payment(request=send_request) @@ -614,13 +662,12 @@ class SparkBackend(LightningBackend): result=result, checking_id=checking_id, fee=Amount(Unit.sat, fee_sats) if fee_sats is not None else None, - preimage=preimage + preimage=preimage, ) except Exception as e: logger.error(f"Spark pay_invoice error for quote {quote.quote}: {e}") return PaymentResponse( - result=PaymentResult.FAILED, - error_message=f"Payment failed: {e}" + result=PaymentResult.FAILED, error_message=f"Payment failed: {e}" ) async def get_invoice_status(self, checking_id: str) -> PaymentStatus: @@ -637,6 +684,7 @@ class SparkBackend(LightningBackend): list_request = ListPaymentsRequest(type_filter=type_filter) else: list_request = ListPaymentsRequest() + assert self.sdk is not None list_response = await self.sdk.list_payments(request=list_request) normalized_checking_id = checking_id.lower() @@ -657,7 +705,10 @@ class SparkBackend(LightningBackend): continue payment_checking_id = _extract_invoice_checking_id(payment) - if payment_checking_id and payment_checking_id == normalized_checking_id: + if ( + payment_checking_id + and payment_checking_id == normalized_checking_id + ): # Found our payment - return its status logger.debug( f"Spark payment found: target={normalized_checking_id} status={getattr(payment, 'status', None)}" @@ -667,23 +718,21 @@ class SparkBackend(LightningBackend): preimage = _get_payment_preimage(payment) return PaymentStatus( result=result, - fee=Amount(Unit.sat, fee_sats) if fee_sats is not None else None, - preimage=preimage + fee=Amount(Unit.sat, fee_sats) + if fee_sats is not None + else None, + preimage=preimage, ) # If not found in payments list, invoice is still pending - logger.debug(f"Spark payment not found for checking_id: {normalized_checking_id[:20]}...") - return PaymentStatus( - result=PaymentResult.PENDING, - error_message=None + logger.debug( + f"Spark payment not found for checking_id: {normalized_checking_id[:20]}..." ) + return PaymentStatus(result=PaymentResult.PENDING, error_message=None) except Exception as e: logger.error(f"Get invoice status error: {e}") - return PaymentStatus( - result=PaymentResult.UNKNOWN, - error_message=str(e) - ) + return PaymentStatus(result=PaymentResult.UNKNOWN, error_message=str(e)) async def get_payment_status(self, checking_id: str) -> PaymentStatus: try: @@ -698,6 +747,7 @@ class SparkBackend(LightningBackend): else: list_request = ListPaymentsRequest() + assert self.sdk is not None response = await self.sdk.list_payments(request=list_request) # Find the payment with matching invoice @@ -724,14 +774,15 @@ class SparkBackend(LightningBackend): invoice_id = _extract_invoice_checking_id(payment) if invoice_id and invoice_id.lower() == checking_id_lower: target_payment = payment - logger.debug(f"Found matching payment for invoice {checking_id[:20]}...") + logger.debug( + f"Found matching payment for invoice {checking_id[:20]}..." + ) break if not target_payment: logger.debug(f"No payment found for checking_id {checking_id[:20]}...") return PaymentStatus( - result=PaymentResult.PENDING, - error_message="Payment not found yet" + result=PaymentResult.PENDING, error_message="Payment not found yet" ) # Map Spark payment status to PaymentResult @@ -742,18 +793,15 @@ class SparkBackend(LightningBackend): return PaymentStatus( result=result, fee=Amount(Unit.sat, fee_sats) if fee_sats is not None else None, - preimage=preimage + preimage=preimage, ) except Exception as e: logger.error(f"Get payment status error: {e}") - return PaymentStatus( - result=PaymentResult.UNKNOWN, - error_message=str(e) - ) + return PaymentStatus(result=PaymentResult.UNKNOWN, error_message=str(e)) def _map_payment_status(self, payment) -> PaymentResult: """Map Spark SDK payment status to PaymentResult enum.""" - if not hasattr(payment, 'status'): + if not hasattr(payment, "status"): return PaymentResult.UNKNOWN # Use official PaymentStatus enum for more reliable mapping @@ -767,22 +815,38 @@ class SparkBackend(LightningBackend): else: # Fallback to string comparison for any new status values status_str = str(payment.status).lower() - if 'complete' in status_str or 'settled' in status_str or 'succeeded' in status_str: + if ( + "complete" in status_str + or "settled" in status_str + or "succeeded" in status_str + ): return PaymentResult.SETTLED - elif 'failed' in status_str or 'cancelled' in status_str or 'expired' in status_str: + elif ( + "failed" in status_str + or "cancelled" in status_str + or "expired" in status_str + ): return PaymentResult.FAILED - elif 'pending' in status_str or 'processing' in status_str: + elif "pending" in status_str or "processing" in status_str: return PaymentResult.PENDING else: return PaymentResult.UNKNOWN except (AttributeError, TypeError): # Fallback to string-based mapping if enum comparison fails status_str = str(payment.status).lower() - if 'complete' in status_str or 'settled' in status_str or 'succeeded' in status_str: + if ( + "complete" in status_str + or "settled" in status_str + or "succeeded" in status_str + ): return PaymentResult.SETTLED - elif 'failed' in status_str or 'cancelled' in status_str or 'expired' in status_str: + elif ( + "failed" in status_str + or "cancelled" in status_str + or "expired" in status_str + ): return PaymentResult.FAILED - elif 'pending' in status_str or 'processing' in status_str: + elif "pending" in status_str or "processing" in status_str: return PaymentResult.PENDING else: return PaymentResult.UNKNOWN @@ -816,11 +880,13 @@ class SparkBackend(LightningBackend): while True: try: # Set timeout to prevent infinite blocking + assert self.event_queue is not None payment_id = await asyncio.wait_for( - self.event_queue.get(), - timeout=30.0 + self.event_queue.get(), timeout=30.0 + ) + logger.debug( + f"Spark paid_invoices_stream emitting checking_id={payment_id}" ) - logger.debug(f"Spark paid_invoices_stream emitting checking_id={payment_id}") yield payment_id # Reset retry delay on success @@ -843,7 +909,7 @@ class SparkBackend(LightningBackend): # Exponential backoff retry_delay = max( settings.mint_retry_exponential_backoff_base_delay, - min(retry_delay * 2, max_retry_delay) + min(retry_delay * 2, max_retry_delay), ) # Attempt recovery @@ -858,6 +924,8 @@ class SparkBackend(LightningBackend): except Exception as e: logger.warning(f"Spark health check failed: {e}") return False + + async def _await_if_needed(value): """Await value if it is awaitable; otherwise return it directly.""" if inspect.isawaitable(value):