diff --git a/cashu/lightning/spark.py b/cashu/lightning/spark.py index 8b6f812..a045bb6 100644 --- a/cashu/lightning/spark.py +++ b/cashu/lightning/spark.py @@ -1,6 +1,5 @@ import asyncio import inspect -import math from typing import AsyncGenerator, Optional from bolt11 import decode @@ -44,6 +43,7 @@ def _extract_invoice_checking_id(payment) -> Optional[str]: if invoice: try: from bolt11 import decode as bolt11_decode + invoice_obj = bolt11_decode(invoice) if invoice_obj.payment_hash: # logger.debug(f"Spark extract: extracted payment_hash from invoice={invoice_obj.payment_hash}") @@ -60,6 +60,7 @@ def _extract_invoice_checking_id(payment) -> Optional[str]: if bolt11: try: from bolt11 import decode as bolt11_decode + invoice_obj = bolt11_decode(bolt11) if invoice_obj.payment_hash: # logger.debug(f"Spark extract: extracted payment_hash from bolt11={invoice_obj.payment_hash}") @@ -124,36 +125,41 @@ def _get_payment_preimage(payment) -> Optional[str]: set_sdk_event_loop = None _get_sdk_event_loop = None try: - from breez_sdk_spark import breez_sdk_spark as spark_bindings from breez_sdk_spark import ( BreezSdk, - connect, ConnectRequest, - default_config, EventListener, GetInfoRequest, - GetPaymentRequest, - PaymentMethod, ListPaymentsRequest, Network, - PaymentStatus as SparkPaymentStatus, + PaymentMethod, PaymentType, + PrepareSendPaymentRequest, ReceivePaymentMethod, ReceivePaymentRequest, - PrepareSendPaymentRequest, SdkEvent, - SendPaymentRequest, - SendPaymentOptions, Seed, + SendPaymentOptions, + SendPaymentRequest, + connect, + default_config, ) + from breez_sdk_spark import ( + PaymentStatus as SparkPaymentStatus, + ) + from breez_sdk_spark import breez_sdk_spark as spark_bindings + # Event loop fix will be imported but not applied yet set_sdk_event_loop = None _get_sdk_event_loop = None try: from .spark_event_loop_fix import ( - set_sdk_event_loop as _set_sdk_event_loop, ensure_event_loop as _ensure_event_loop, ) + from .spark_event_loop_fix import ( + set_sdk_event_loop as _set_sdk_event_loop, + ) + set_sdk_event_loop = _set_sdk_event_loop _get_sdk_event_loop = _ensure_event_loop if spark_bindings is not None: @@ -176,7 +182,9 @@ except ImportError as e: SparkPaymentStatus = None spark_uniffi_set_event_loop = None common_uniffi_set_event_loop = None - logger.warning(f"Breez SDK Spark not available - SparkBackend will not function: {e}") + logger.warning( + f"Breez SDK Spark not available - SparkBackend will not function: {e}" + ) def _get_payment_amount_sats(payment) -> Optional[int]: @@ -203,6 +211,7 @@ def _is_lightning_payment(payment) -> bool: if EventListener is not None: + class SparkEventListener(EventListener): """Event listener for Spark SDK payment notifications""" @@ -215,7 +224,9 @@ if EventListener is not None: """Handle SDK events in a thread-safe manner with robust error handling""" try: # Debug log ALL events to understand what types of events we get - logger.info(f"Spark SDK event received: {event.__class__.__name__}, hasPayment={hasattr(event, 'payment')}, event={event}") + logger.info( + f"Spark SDK event received: {event.__class__.__name__}, hasPayment={hasattr(event, 'payment')}, event={event}" + ) # Check if this is a payment event we care about and extract the invoice id if hasattr(event, "payment"): @@ -225,12 +236,21 @@ if EventListener is not None: receive_type = getattr(PaymentType, "RECEIVE", None) # Debug log all payment events to understand what we're getting - logger.info(f"Spark payment event: status={status}, type={payment_type}, payment={payment}") + logger.info( + f"Spark payment event: status={status}, type={payment_type}, payment={payment}" + ) # Only consider completed/settled payments - if status and hasattr(SparkPaymentStatus, 'COMPLETED') and status != SparkPaymentStatus.COMPLETED: + if ( + status + and hasattr(SparkPaymentStatus, "COMPLETED") + and status != SparkPaymentStatus.COMPLETED + ): # Check if it's a different completion status - if not (hasattr(SparkPaymentStatus, 'SETTLED') and status == SparkPaymentStatus.SETTLED): + if not ( + hasattr(SparkPaymentStatus, "SETTLED") + and status == SparkPaymentStatus.SETTLED + ): logger.debug( f"Spark event {event.__class__.__name__} ignored (status {status})" ) @@ -263,7 +283,9 @@ if EventListener is not None: f"raw_payment={payment!r} extracted_id={checking_id}" ) if not checking_id: - logger.debug(f"Spark event {event.__class__.__name__} ignored (no checking id)") + logger.debug( + f"Spark event {event.__class__.__name__} ignored (no checking id)" + ) return # More robust thread-safe event handling @@ -272,23 +294,30 @@ if EventListener is not None: except Exception as e: logger.error(f"Error handling Spark event: {e}") import traceback + logger.debug(f"Event handler traceback: {traceback.format_exc()}") def _safe_put_event(self, checking_id: str) -> None: """Safely put an event into the queue from any thread context""" try: target_loop = self.loop - if (target_loop is None or target_loop.is_closed()) and callable(_get_sdk_event_loop): + if (target_loop is None or target_loop.is_closed()) and callable( + _get_sdk_event_loop + ): alt_loop = _get_sdk_event_loop() if alt_loop and not alt_loop.is_closed(): target_loop = alt_loop if target_loop is None: - logger.warning("Spark event listener has no target loop; dropping event") + logger.warning( + "Spark event listener has no target loop; dropping event" + ) return if target_loop.is_closed(): - logger.warning("Spark event listener target loop is closed; dropping event") + logger.warning( + "Spark event listener target loop is closed; dropping event" + ) return # Use call_soon_threadsafe for more reliable thread-safe event handling @@ -297,18 +326,22 @@ if EventListener is not None: self.queue.put_nowait(checking_id) logger.info(f"Spark event successfully queued: {checking_id}") except asyncio.QueueFull: - logger.warning(f"Spark event queue full, dropping event: {checking_id}") + logger.warning( + f"Spark event queue full, dropping event: {checking_id}" + ) except Exception as e: logger.error(f"Failed to put event in queue: {e}") target_loop.call_soon_threadsafe(queue_put) except Exception as exc: - logger.warning(f"Failed to queue Spark event (expected from callback thread): {exc}") + logger.warning( + f"Failed to queue Spark event (expected from callback thread): {exc}" + ) # Fallback: try the original approach try: if self.loop and not self.loop.is_closed(): - future = asyncio.run_coroutine_threadsafe( + asyncio.run_coroutine_threadsafe( self.queue.put(checking_id), self.loop, ) @@ -316,7 +349,8 @@ if EventListener is not None: except Exception as fallback_exc: logger.error(f"Both event queueing methods failed: {fallback_exc}") else: - class SparkEventListener: + + class SparkEventListener: # type: ignore[no-redef] """Dummy event listener when Spark SDK is not available""" def __init__(self, queue: asyncio.Queue, loop: asyncio.AbstractEventLoop): @@ -351,7 +385,7 @@ class SparkBackend(LightningBackend): self._initialized = False self._initialization_lock = asyncio.Lock() self._connection_retry_count = 0 - self._max_retries = getattr(settings, 'mint_spark_retry_attempts', 3) + self._max_retries = getattr(settings, "mint_spark_retry_attempts", 3) self._retry_delay = 5.0 # Validate required settings @@ -397,7 +431,7 @@ class SparkBackend(LightningBackend): if attempt == self._max_retries - 1: raise - delay = self._retry_delay * (2 ** attempt) + delay = self._retry_delay * (2**attempt) logger.warning( f"SDK init attempt {attempt + 1} failed: {e}. " f"Retrying in {delay}s" @@ -409,14 +443,14 @@ class SparkBackend(LightningBackend): mnemonic = settings.mint_spark_mnemonic # Determine network - network_str = getattr(settings, 'mint_spark_network', 'mainnet').lower() - network = Network.MAINNET if network_str == 'mainnet' else Network.TESTNET + network_str = getattr(settings, "mint_spark_network", "mainnet").lower() + network = Network.MAINNET if network_str == "mainnet" else Network.TESTNET config = default_config(network=network) config.api_key = settings.mint_spark_api_key - storage_dir = getattr(settings, 'mint_spark_storage_dir', 'data/spark') - connection_timeout = getattr(settings, 'mint_spark_connection_timeout', 30) + storage_dir = getattr(settings, "mint_spark_storage_dir", "data/spark") + connection_timeout = getattr(settings, "mint_spark_connection_timeout", 30) event_loop = asyncio.get_running_loop() # Store the event loop for SDK callbacks @@ -427,16 +461,22 @@ class SparkBackend(LightningBackend): applied = True logger.debug("Registered Spark SDK event loop via Python fix") except Exception as exc: # pragma: no cover - defensive log - logger.warning(f"Failed to set Spark SDK event loop via python fix: {exc}") + logger.warning( + f"Failed to set Spark SDK event loop via python fix: {exc}" + ) for setter in (spark_uniffi_set_event_loop, common_uniffi_set_event_loop): if callable(setter): try: setter(event_loop) applied = True - logger.debug(f"Registered Spark SDK event loop via {setter.__name__}") + logger.debug( + f"Registered Spark SDK event loop via {setter.__name__}" + ) except Exception as exc: # pragma: no cover - defensive log - logger.warning(f"Failed to register event loop with Spark SDK: {exc}") + logger.warning( + f"Failed to register event loop with Spark SDK: {exc}" + ) if not applied: logger.warning( @@ -449,31 +489,29 @@ class SparkBackend(LightningBackend): self.sdk = await asyncio.wait_for( connect( request=ConnectRequest( - config=config, - seed=seed, - storage_dir=storage_dir + config=config, seed=seed, storage_dir=storage_dir ) ), - timeout=connection_timeout + timeout=connection_timeout, ) # Set up event listener for payment notifications self.event_queue = asyncio.Queue() self._event_loop = event_loop self.listener = SparkEventListener(self.event_queue, self._event_loop) - self.listener_id = await _await_if_needed( - self.sdk.add_event_listener(listener=self.listener) - ) + if self.sdk is not None: + self.listener_id = await _await_if_needed( + self.sdk.add_event_listener(listener=self.listener) + ) logger.debug(f"Spark SDK initialized successfully on {network_str} network") # Clear mnemonic from memory - mnemonic = None del mnemonic async def cleanup(self) -> None: """Proper resource cleanup""" try: - if hasattr(self, 'listener_id') and self.sdk: + if hasattr(self, "listener_id") and self.sdk: if self.listener_id: await _await_if_needed( self.sdk.remove_event_listener(id=self.listener_id) @@ -499,7 +537,7 @@ class SparkBackend(LightningBackend): return False await asyncio.wait_for( self.sdk.get_info(request=GetInfoRequest(ensure_synced=None)), - timeout=5.0 + timeout=5.0, ) return True except Exception: @@ -508,16 +546,16 @@ class SparkBackend(LightningBackend): async def status(self) -> StatusResponse: try: await self._ensure_initialized() + assert self.sdk is not None info = await self.sdk.get_info(request=GetInfoRequest(ensure_synced=None)) return StatusResponse( - balance=Amount(Unit.sat, info.balance_sats), - error_message=None + balance=Amount(Unit.sat, info.balance_sats), error_message=None ) except Exception as e: logger.error(f"Spark status error: {e}") return StatusResponse( error_message=f"Failed to connect to Spark SDK: {e}", - balance=Amount(self.unit, 0) + balance=Amount(self.unit, 0), ) async def create_invoice( @@ -533,14 +571,15 @@ class SparkBackend(LightningBackend): await self._ensure_initialized() payment_method = ReceivePaymentMethod.BOLT11_INVOICE( - description=memo or "", - amount_sats=amount.to(Unit.sat).amount + description=memo or "", amount_sats=amount.to(Unit.sat).amount ) request = ReceivePaymentRequest(payment_method=payment_method) + assert self.sdk is not None response = await self.sdk.receive_payment(request=request) # Extract payment_hash from the invoice for consistent matching from bolt11 import decode as bolt11_decode + try: invoice_obj = bolt11_decode(response.payment_request) payment_hash = invoice_obj.payment_hash @@ -552,17 +591,25 @@ class SparkBackend(LightningBackend): # Fallback to using full invoice as checking_id payment_hash = response.payment_request.lower() - checking_id_to_store = payment_hash.lower() if payment_hash else response.payment_request.lower() - logger.info(f"Spark storing checking_id: {checking_id_to_store[:20]}... (hash: {bool(payment_hash)})") + checking_id_to_store = ( + payment_hash.lower() + if payment_hash + else response.payment_request.lower() + ) + logger.info( + f"Spark storing checking_id: {checking_id_to_store[:20]}... (hash: {bool(payment_hash)})" + ) return InvoiceResponse( ok=True, checking_id=checking_id_to_store, - payment_request=response.payment_request + payment_request=response.payment_request, ) except Exception as e: logger.error(f"Spark create_invoice error for amount {amount}: {e}") - return InvoiceResponse(ok=False, error_message=f"Invoice creation failed: {e}") + return InvoiceResponse( + ok=False, error_message=f"Invoice creation failed: {e}" + ) async def pay_invoice( self, quote: MeltQuote, fee_limit_msat: int @@ -573,18 +620,19 @@ class SparkBackend(LightningBackend): # Prepare the payment prepare_request = PrepareSendPaymentRequest( payment_request=quote.request, - amount=None # Use invoice amount + amount=None, # Use invoice amount + ) + assert self.sdk is not None + prepare_response = await self.sdk.prepare_send_payment( + request=prepare_request ) - prepare_response = await self.sdk.prepare_send_payment(request=prepare_request) # Send the payment options = SendPaymentOptions.BOLT11_INVOICE( - prefer_spark=False, - completion_timeout_secs=30 + prefer_spark=False, completion_timeout_secs=30 ) send_request = SendPaymentRequest( - prepare_response=prepare_response, - options=options + prepare_response=prepare_response, options=options ) send_response = await self.sdk.send_payment(request=send_request) @@ -614,13 +662,12 @@ class SparkBackend(LightningBackend): result=result, checking_id=checking_id, fee=Amount(Unit.sat, fee_sats) if fee_sats is not None else None, - preimage=preimage + preimage=preimage, ) except Exception as e: logger.error(f"Spark pay_invoice error for quote {quote.quote}: {e}") return PaymentResponse( - result=PaymentResult.FAILED, - error_message=f"Payment failed: {e}" + result=PaymentResult.FAILED, error_message=f"Payment failed: {e}" ) async def get_invoice_status(self, checking_id: str) -> PaymentStatus: @@ -637,6 +684,7 @@ class SparkBackend(LightningBackend): list_request = ListPaymentsRequest(type_filter=type_filter) else: list_request = ListPaymentsRequest() + assert self.sdk is not None list_response = await self.sdk.list_payments(request=list_request) normalized_checking_id = checking_id.lower() @@ -657,7 +705,10 @@ class SparkBackend(LightningBackend): continue payment_checking_id = _extract_invoice_checking_id(payment) - if payment_checking_id and payment_checking_id == normalized_checking_id: + if ( + payment_checking_id + and payment_checking_id == normalized_checking_id + ): # Found our payment - return its status logger.debug( f"Spark payment found: target={normalized_checking_id} status={getattr(payment, 'status', None)}" @@ -667,23 +718,21 @@ class SparkBackend(LightningBackend): preimage = _get_payment_preimage(payment) return PaymentStatus( result=result, - fee=Amount(Unit.sat, fee_sats) if fee_sats is not None else None, - preimage=preimage + fee=Amount(Unit.sat, fee_sats) + if fee_sats is not None + else None, + preimage=preimage, ) # If not found in payments list, invoice is still pending - logger.debug(f"Spark payment not found for checking_id: {normalized_checking_id[:20]}...") - return PaymentStatus( - result=PaymentResult.PENDING, - error_message=None + logger.debug( + f"Spark payment not found for checking_id: {normalized_checking_id[:20]}..." ) + return PaymentStatus(result=PaymentResult.PENDING, error_message=None) except Exception as e: logger.error(f"Get invoice status error: {e}") - return PaymentStatus( - result=PaymentResult.UNKNOWN, - error_message=str(e) - ) + return PaymentStatus(result=PaymentResult.UNKNOWN, error_message=str(e)) async def get_payment_status(self, checking_id: str) -> PaymentStatus: try: @@ -698,6 +747,7 @@ class SparkBackend(LightningBackend): else: list_request = ListPaymentsRequest() + assert self.sdk is not None response = await self.sdk.list_payments(request=list_request) # Find the payment with matching invoice @@ -724,14 +774,15 @@ class SparkBackend(LightningBackend): invoice_id = _extract_invoice_checking_id(payment) if invoice_id and invoice_id.lower() == checking_id_lower: target_payment = payment - logger.debug(f"Found matching payment for invoice {checking_id[:20]}...") + logger.debug( + f"Found matching payment for invoice {checking_id[:20]}..." + ) break if not target_payment: logger.debug(f"No payment found for checking_id {checking_id[:20]}...") return PaymentStatus( - result=PaymentResult.PENDING, - error_message="Payment not found yet" + result=PaymentResult.PENDING, error_message="Payment not found yet" ) # Map Spark payment status to PaymentResult @@ -742,18 +793,15 @@ class SparkBackend(LightningBackend): return PaymentStatus( result=result, fee=Amount(Unit.sat, fee_sats) if fee_sats is not None else None, - preimage=preimage + preimage=preimage, ) except Exception as e: logger.error(f"Get payment status error: {e}") - return PaymentStatus( - result=PaymentResult.UNKNOWN, - error_message=str(e) - ) + return PaymentStatus(result=PaymentResult.UNKNOWN, error_message=str(e)) def _map_payment_status(self, payment) -> PaymentResult: """Map Spark SDK payment status to PaymentResult enum.""" - if not hasattr(payment, 'status'): + if not hasattr(payment, "status"): return PaymentResult.UNKNOWN # Use official PaymentStatus enum for more reliable mapping @@ -767,22 +815,38 @@ class SparkBackend(LightningBackend): else: # Fallback to string comparison for any new status values status_str = str(payment.status).lower() - if 'complete' in status_str or 'settled' in status_str or 'succeeded' in status_str: + if ( + "complete" in status_str + or "settled" in status_str + or "succeeded" in status_str + ): return PaymentResult.SETTLED - elif 'failed' in status_str or 'cancelled' in status_str or 'expired' in status_str: + elif ( + "failed" in status_str + or "cancelled" in status_str + or "expired" in status_str + ): return PaymentResult.FAILED - elif 'pending' in status_str or 'processing' in status_str: + elif "pending" in status_str or "processing" in status_str: return PaymentResult.PENDING else: return PaymentResult.UNKNOWN except (AttributeError, TypeError): # Fallback to string-based mapping if enum comparison fails status_str = str(payment.status).lower() - if 'complete' in status_str or 'settled' in status_str or 'succeeded' in status_str: + if ( + "complete" in status_str + or "settled" in status_str + or "succeeded" in status_str + ): return PaymentResult.SETTLED - elif 'failed' in status_str or 'cancelled' in status_str or 'expired' in status_str: + elif ( + "failed" in status_str + or "cancelled" in status_str + or "expired" in status_str + ): return PaymentResult.FAILED - elif 'pending' in status_str or 'processing' in status_str: + elif "pending" in status_str or "processing" in status_str: return PaymentResult.PENDING else: return PaymentResult.UNKNOWN @@ -816,11 +880,13 @@ class SparkBackend(LightningBackend): while True: try: # Set timeout to prevent infinite blocking + assert self.event_queue is not None payment_id = await asyncio.wait_for( - self.event_queue.get(), - timeout=30.0 + self.event_queue.get(), timeout=30.0 + ) + logger.debug( + f"Spark paid_invoices_stream emitting checking_id={payment_id}" ) - logger.debug(f"Spark paid_invoices_stream emitting checking_id={payment_id}") yield payment_id # Reset retry delay on success @@ -843,7 +909,7 @@ class SparkBackend(LightningBackend): # Exponential backoff retry_delay = max( settings.mint_retry_exponential_backoff_base_delay, - min(retry_delay * 2, max_retry_delay) + min(retry_delay * 2, max_retry_delay), ) # Attempt recovery @@ -858,6 +924,8 @@ class SparkBackend(LightningBackend): except Exception as e: logger.warning(f"Spark health check failed: {e}") return False + + async def _await_if_needed(value): """Await value if it is awaitable; otherwise return it directly.""" if inspect.isawaitable(value):