diff --git a/fly/main.py b/fly/main.py index 1ad2299..ae566c8 100644 --- a/fly/main.py +++ b/fly/main.py @@ -1,3 +1,4 @@ +from contextlib import asynccontextmanager from fastapi import FastAPI, Depends, HTTPException, Header, Query, APIRouter from fastapi.security.api_key import APIKeyHeader from pydantic import BaseModel, Field @@ -8,6 +9,9 @@ from enum import Enum from nodeless import PaymentHandler from shopify.router import router as shopify_router import logging +import threading +import asyncio +import time # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') @@ -15,10 +19,125 @@ logger = logging.getLogger(__name__) load_dotenv() +_payment_handler = None +_handler_lock = threading.Lock() +_sync_task = None +_last_sync_time = 0 +_consecutive_sync_failures = 0 + +async def periodic_sync_check(): + """Background task to periodically check SDK sync status and attempt resync if needed.""" + global _last_sync_time, _consecutive_sync_failures, _payment_handler + + while True: + try: + current_time = time.time() + + if not _payment_handler: + logger.warning("Payment handler not initialized, waiting...") + await asyncio.sleep(5) + continue + + is_synced = _payment_handler.listener.is_synced() + sync_age = current_time - _last_sync_time if _last_sync_time > 0 else float('inf') + + # Log sync status with detailed metrics + logger.info(f"SDK sync status check - Synced: {is_synced}, Last sync age: {sync_age:.1f}s, Consecutive failures: {_consecutive_sync_failures}") + + if not is_synced or sync_age > 30: # Force resync if not synced or sync is older than 30 seconds + logger.warning(f"SDK sync needed - Status: {'Not synced' if not is_synced else 'Sync too old'}") + + # Attempt resync with progressively longer timeouts based on consecutive failures + timeout = min(5 + (_consecutive_sync_failures * 2), 30) # Increase timeout up to 30 seconds + if _payment_handler.wait_for_sync(timeout_seconds=timeout): + logger.info("SDK resync successful") + _last_sync_time = time.time() + _consecutive_sync_failures = 0 + else: + logger.error(f"SDK resync failed after {timeout}s timeout") + _consecutive_sync_failures += 1 + + # If we have too many consecutive failures, try to reinitialize handler + if _consecutive_sync_failures >= 5: + logger.warning("Too many consecutive sync failures, attempting to reinitialize handler...") + try: + with _handler_lock: + _payment_handler.disconnect() + _payment_handler = PaymentHandler() + _consecutive_sync_failures = 0 + logger.info("Payment handler reinitialized successfully") + except Exception as e: + logger.error(f"Failed to reinitialize payment handler: {e}") + else: + _last_sync_time = current_time + _consecutive_sync_failures = 0 + + # Adjust sleep time based on sync status + sleep_time = 10 if not is_synced or _consecutive_sync_failures > 0 else 30 + await asyncio.sleep(sleep_time) + + except Exception as e: + logger.error(f"Error in periodic sync check: {e}") + _consecutive_sync_failures += 1 + await asyncio.sleep(5) # Short sleep on error before retrying + +def get_payment_handler(): + global _payment_handler + if _payment_handler is None: + with _handler_lock: + if _payment_handler is None: + try: + _payment_handler = PaymentHandler() + except Exception as e: + logger.error(f"Failed to initialize PaymentHandler: {str(e)}") + raise HTTPException( + status_code=500, + detail="Failed to initialize payment system" + ) + return _payment_handler + +@asynccontextmanager +async def lifespan(app: FastAPI): + """ + Lifespan context manager for FastAPI application. + Handles startup and shutdown events. + """ + # Startup + global _payment_handler, _sync_task + try: + _payment_handler = PaymentHandler() + logger.info("Payment system initialized during startup") + + # Start background sync check task + _sync_task = asyncio.create_task(periodic_sync_check()) + logger.info("Background sync check task started") + except Exception as e: + logger.error(f"Failed to initialize payment system during startup: {str(e)}") + # Don't raise here, let the handler initialize on first request if needed + + yield # Server is running + + # Shutdown + if _sync_task: + _sync_task.cancel() + try: + await _sync_task + except asyncio.CancelledError: + pass + logger.info("Background sync check task stopped") + + if _payment_handler: + try: + _payment_handler.disconnect() + logger.info("Payment system disconnected during shutdown") + except Exception as e: + logger.error(f"Error during payment system shutdown: {str(e)}") + app = FastAPI( title="Breez Nodeless Payments API", description="A FastAPI implementation of Breez SDK for Lightning/Liquid payments", - version="1.0.0" + version="1.0.0", + lifespan=lifespan ) API_KEY_NAME = "x-api-key" @@ -132,12 +251,6 @@ async def get_api_key(api_key: str = Header(None, alias=API_KEY_NAME)): ) return api_key -def get_payment_handler(): - try: - return PaymentHandler() - except Exception as e: - raise HTTPException(status_code=500, detail=f"Failed to initialize PaymentHandler: {str(e)}") - # --- API Endpoints --- @app.get("/list_payments", response_model=PaymentListResponse) async def list_payments( @@ -229,7 +342,10 @@ async def onchain_limits( @app.get("/health") async def health(): - return {"status": "ok"} + global _payment_handler + if _payment_handler and _payment_handler.listener.is_synced(): + return {"status": "ok", "sdk_synced": True} + return {"status": "ok", "sdk_synced": False} @app.get("/check_payment_status/{destination}", response_model=PaymentStatusResponse) async def check_payment_status( diff --git a/fly/nodeless.py b/fly/nodeless.py index fbb6445..d6670ff 100644 --- a/fly/nodeless.py +++ b/fly/nodeless.py @@ -51,6 +51,7 @@ from breez_sdk_liquid import ( import time import logging from pprint import pprint +import threading # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') @@ -78,7 +79,6 @@ class SdkListener(EventListener): if isinstance(event, SdkEvent.SYNCED): self.synced = True - logger.info("SDK SYNCED") elif isinstance(event, SdkEvent.PAYMENT_SUCCEEDED): details = event.details # Determine identifier based on payment type @@ -140,83 +140,91 @@ class SdkListener(EventListener): class PaymentHandler: """ A wrapper class for the Breez SDK Nodeless (Liquid implementation). - - This class handles SDK initialization, connection, and provides simplified - methods for common payment and wallet operations. + Implements singleton pattern to prevent multiple SDK instances. """ - def __init__(self, network: LiquidNetwork = LiquidNetwork.MAINNET, working_dir: str = '~/.breez-cli', asset_metadata: Optional[List[AssetMetadata]] = None, external_input_parsers: Optional[List[ExternalInputParser]] = None): + _instance = None + _initialized = False + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self, network: LiquidNetwork = LiquidNetwork.MAINNET, working_dir: str = '~/.breez-cli', + asset_metadata: Optional[List[AssetMetadata]] = None, + external_input_parsers: Optional[List[ExternalInputParser]] = None): """ Initializes the PaymentHandler and connects to the Breez SDK. - - Args: - network: The Liquid network to use (MAINNET or TESTNET). - working_dir: The directory for SDK files. - asset_metadata: Optional list of AssetMetadata for non-Bitcoin assets. - external_input_parsers: Optional list of ExternalInputParser for custom input parsing. + Uses singleton pattern to prevent multiple initializations. """ - logger.debug("Entering PaymentHandler.__init__") - load_dotenv() # Load environment variables from .env + if self._initialized: + return - self.breez_api_key = os.getenv('BREEZ_API_KEY') - self.seed_phrase = os.getenv('BREEZ_SEED_PHRASE') + with self._lock: + if self._initialized: + return - if not self.breez_api_key: - logger.error("BREEZ_API_KEY not found in environment variables.") - raise Exception("Missing Breez API key in .env file or environment") - if not self.seed_phrase: - logger.error("BREEZ_SEED_PHRASE not found in environment variables.") - raise Exception("Missing seed phrase in .env file or environment") + logger.debug("Initializing PaymentHandler") + load_dotenv() - logger.info("Retrieved credentials from environment successfully") + self.breez_api_key = os.getenv('BREEZ_API_KEY') + self.seed_phrase = os.getenv('BREEZ_SEED_PHRASE') - config = default_config(network, self.breez_api_key) - # Expand user path for working_dir - config.working_dir = os.path.expanduser(working_dir) - # Ensure working directory exists - try: - os.makedirs(config.working_dir, exist_ok=True) - logger.debug(f"Ensured working directory exists: {config.working_dir}") - except OSError as e: - logger.error(f"Failed to create working directory {config.working_dir}: {e}") - raise # Re-raise if directory creation fails + if not self.breez_api_key: + logger.error("BREEZ_API_KEY not found in environment variables.") + raise Exception("Missing Breez API key in .env file or environment") + if not self.seed_phrase: + logger.error("BREEZ_SEED_PHRASE not found in environment variables.") + raise Exception("Missing seed phrase in .env file or environment") - if asset_metadata: - config.asset_metadata = asset_metadata - logger.info(f"Configured asset metadata: {asset_metadata}") + logger.info("Retrieved credentials from environment successfully") - if external_input_parsers: - config.external_input_parsers = external_input_parsers - logger.info(f"Configured external input parsers: {external_input_parsers}") + config = default_config(network, self.breez_api_key) + config.working_dir = os.path.expanduser(working_dir) + + try: + os.makedirs(config.working_dir, exist_ok=True) + except OSError as e: + logger.error(f"Failed to create working directory {config.working_dir}: {e}") + raise - connect_request = ConnectRequest(config=config, mnemonic=self.seed_phrase) + if asset_metadata: + config.asset_metadata = asset_metadata + if external_input_parsers: + config.external_input_parsers = external_input_parsers - try: - self.instance = connect(connect_request) - self.listener = SdkListener() - # Add listener immediately after connecting - self.instance.add_event_listener(self.listener) - logger.info("Breez SDK connected successfully.") - except Exception as e: - logger.error(f"Failed to connect to Breez SDK: {e}") - # Re-raise the exception after logging - raise + connect_request = ConnectRequest(config=config, mnemonic=self.seed_phrase) - logger.debug("Exiting PaymentHandler.__init__") + try: + self.instance = connect(connect_request) + self.listener = SdkListener() + self.instance.add_event_listener(self.listener) + logger.info("Breez SDK connected successfully.") + + # Shorter sync timeout for initial connection + self.wait_for_sync(timeout_seconds=10) + + except Exception as e: + logger.error(f"Failed to connect to Breez SDK: {e}") + raise + self._initialized = True + logger.debug("PaymentHandler initialization complete") - def wait_for_sync(self, timeout_seconds: int = 30): + def wait_for_sync(self, timeout_seconds: int = 10) -> bool: """Wait for the SDK to sync before proceeding.""" - logger.debug(f"Entering wait_for_sync (timeout={timeout_seconds}s)") + logger.debug(f"Waiting for sync (timeout={timeout_seconds}s)") start_time = time.time() while time.time() - start_time < timeout_seconds: if self.listener.is_synced(): - logger.debug("SDK synced.") - logger.debug("Exiting wait_for_sync (synced)") + logger.debug("SDK synced successfully") return True - time.sleep(0.5) # Shorter sleep for faster sync detection - logger.error("Sync timeout: SDK did not sync within the allocated time.") - logger.debug("Exiting wait_for_sync (timeout)") - raise Exception(f"Sync timeout: SDK did not sync within {timeout_seconds} seconds.") + time.sleep(0.1) # Shorter sleep interval + logger.warning("SDK sync timeout") + return False def wait_for_payment(self, identifier: str, timeout_seconds: int = 60) -> bool: """ @@ -1368,96 +1376,57 @@ class PaymentHandler: def check_payment_status(self, destination: str) -> Dict[str, Any]: """ Checks the status of a specific payment by its destination/invoice. - - Args: - destination: The payment destination (invoice) string to check. - Returns: - Dictionary containing payment status information: - { - 'status': str, # The payment status (e.g., 'PENDING', 'SUCCEEDED', 'FAILED') - 'amount_sat': int, # Amount in satoshis - 'fees_sat': int, # Fees paid in satoshis - 'payment_time': int, # Unix timestamp of the payment - 'payment_hash': str, # Payment hash if available - 'error': str, # Error message if any - } - Raises: - ValueError: If destination is invalid. - Exception: For any SDK errors. + Uses optimized status checking with shorter timeouts. """ - logger.debug(f"Entering check_payment_status for destination: {destination[:30]}...") + logger.debug(f"Checking payment status for {destination[:30]}...") try: if not isinstance(destination, str) or not destination: - logger.warning("Invalid or empty destination provided.") - raise ValueError("Destination must be a non-empty string.") + raise ValueError("Invalid destination") - # First check the payment status in our listener's cache - logger.debug("Checking payment status in listener cache...") + # Check cached status first cached_status = self.listener.get_payment_status(destination) - if cached_status: - logger.debug(f"Found cached payment status: {cached_status}") - # If we have a cached final status, we can return it immediately - if cached_status in ['SUCCEEDED', 'FAILED']: - logger.info(f"Returning cached final status: {cached_status}") - return { - 'status': cached_status, - 'amount_sat': None, # These would be None for cached statuses - 'fees_sat': None, - 'payment_time': None, - 'payment_hash': None, - 'error': None - } - - # If no cached final status, check the actual payments - logger.debug("No cached final status found, querying payment list...") - try: - payments = self.instance.list_payments(ListPaymentsRequest()) - logger.debug(f"Found {len(payments)} total payments") - except Exception as e: - logger.error(f"Error querying payment list: {e}") - raise - - # Find the most recent payment matching the destination - matching_payment = None - for payment in payments: - logger.debug(f"Checking payment: {getattr(payment, 'destination', 'No destination')} == {destination}") - if hasattr(payment, 'destination') and payment.destination == destination: - if matching_payment is None or payment.timestamp > matching_payment.timestamp: - matching_payment = payment - logger.debug("Found matching payment or found more recent matching payment") - - if matching_payment: - # Extract payment details - status = str(matching_payment.status) - details = matching_payment.details - - result = { - 'status': status, - 'amount_sat': matching_payment.amount_sat, - 'fees_sat': matching_payment.fees_sat, - 'payment_time': matching_payment.timestamp, - 'payment_hash': getattr(details, 'payment_hash', None), - 'error': getattr(matching_payment, 'error', None) - } - - logger.info(f"Found payment status for {destination[:30]}...: {status}") - logger.debug(f"Payment details: {result}") - logger.debug("Exiting check_payment_status") - return result - else: - logger.info(f"No payment found for destination: {destination[:30]}...") + if cached_status in ['SUCCEEDED', 'FAILED']: return { - 'status': 'NOT_FOUND', + 'status': cached_status, 'amount_sat': None, 'fees_sat': None, 'payment_time': None, 'payment_hash': None, - 'error': 'Payment not found' + 'error': None } + # Short wait for payment status + payment_succeeded = self.wait_for_payment(destination, timeout_seconds=2) + + # Get final status + final_status = self.listener.get_payment_status(destination) or 'PENDING' + status = 'SUCCEEDED' if payment_succeeded else final_status + + # Try to get payment details + try: + payments = self.instance.list_payments(ListPaymentsRequest()) + payment = next( + (p for p in payments if hasattr(p, 'destination') and p.destination == destination), + None + ) + except Exception as e: + logger.warning(f"Could not fetch payment details: {e}") + payment = None + + result = { + 'status': status, + 'amount_sat': getattr(payment, 'amount_sat', None), + 'fees_sat': getattr(payment, 'fees_sat', None), + 'payment_time': getattr(payment, 'timestamp', None), + 'payment_hash': getattr(payment.details, 'payment_hash', None) if payment and payment.details else None, + 'error': None if status == 'SUCCEEDED' else getattr(payment, 'error', 'Payment details not found') + } + + logger.info(f"Payment status: {status}") + return result + except Exception as e: - logger.error(f"Error checking payment status for {destination[:30]}...: {str(e)}") - logger.exception("Full error details:") + logger.error(f"Error checking payment status: {str(e)}") raise def get_exchange_rate(self, currency: str = None) -> Dict[str, Any]: