working flow with wordpress

This commit is contained in:
2025-05-17 11:52:45 +02:00
parent 4201f7ce79
commit ef48d4567c
2 changed files with 228 additions and 143 deletions

View File

@@ -1,3 +1,4 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, HTTPException, Header, Query, APIRouter
from fastapi.security.api_key import APIKeyHeader
from pydantic import BaseModel, Field
@@ -8,6 +9,9 @@ from enum import Enum
from nodeless import PaymentHandler
from shopify.router import router as shopify_router
import logging
import threading
import asyncio
import time
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
@@ -15,10 +19,125 @@ logger = logging.getLogger(__name__)
load_dotenv()
_payment_handler = None
_handler_lock = threading.Lock()
_sync_task = None
_last_sync_time = 0
_consecutive_sync_failures = 0
async def periodic_sync_check():
"""Background task to periodically check SDK sync status and attempt resync if needed."""
global _last_sync_time, _consecutive_sync_failures, _payment_handler
while True:
try:
current_time = time.time()
if not _payment_handler:
logger.warning("Payment handler not initialized, waiting...")
await asyncio.sleep(5)
continue
is_synced = _payment_handler.listener.is_synced()
sync_age = current_time - _last_sync_time if _last_sync_time > 0 else float('inf')
# Log sync status with detailed metrics
logger.info(f"SDK sync status check - Synced: {is_synced}, Last sync age: {sync_age:.1f}s, Consecutive failures: {_consecutive_sync_failures}")
if not is_synced or sync_age > 30: # Force resync if not synced or sync is older than 30 seconds
logger.warning(f"SDK sync needed - Status: {'Not synced' if not is_synced else 'Sync too old'}")
# Attempt resync with progressively longer timeouts based on consecutive failures
timeout = min(5 + (_consecutive_sync_failures * 2), 30) # Increase timeout up to 30 seconds
if _payment_handler.wait_for_sync(timeout_seconds=timeout):
logger.info("SDK resync successful")
_last_sync_time = time.time()
_consecutive_sync_failures = 0
else:
logger.error(f"SDK resync failed after {timeout}s timeout")
_consecutive_sync_failures += 1
# If we have too many consecutive failures, try to reinitialize handler
if _consecutive_sync_failures >= 5:
logger.warning("Too many consecutive sync failures, attempting to reinitialize handler...")
try:
with _handler_lock:
_payment_handler.disconnect()
_payment_handler = PaymentHandler()
_consecutive_sync_failures = 0
logger.info("Payment handler reinitialized successfully")
except Exception as e:
logger.error(f"Failed to reinitialize payment handler: {e}")
else:
_last_sync_time = current_time
_consecutive_sync_failures = 0
# Adjust sleep time based on sync status
sleep_time = 10 if not is_synced or _consecutive_sync_failures > 0 else 30
await asyncio.sleep(sleep_time)
except Exception as e:
logger.error(f"Error in periodic sync check: {e}")
_consecutive_sync_failures += 1
await asyncio.sleep(5) # Short sleep on error before retrying
def get_payment_handler():
global _payment_handler
if _payment_handler is None:
with _handler_lock:
if _payment_handler is None:
try:
_payment_handler = PaymentHandler()
except Exception as e:
logger.error(f"Failed to initialize PaymentHandler: {str(e)}")
raise HTTPException(
status_code=500,
detail="Failed to initialize payment system"
)
return _payment_handler
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Lifespan context manager for FastAPI application.
Handles startup and shutdown events.
"""
# Startup
global _payment_handler, _sync_task
try:
_payment_handler = PaymentHandler()
logger.info("Payment system initialized during startup")
# Start background sync check task
_sync_task = asyncio.create_task(periodic_sync_check())
logger.info("Background sync check task started")
except Exception as e:
logger.error(f"Failed to initialize payment system during startup: {str(e)}")
# Don't raise here, let the handler initialize on first request if needed
yield # Server is running
# Shutdown
if _sync_task:
_sync_task.cancel()
try:
await _sync_task
except asyncio.CancelledError:
pass
logger.info("Background sync check task stopped")
if _payment_handler:
try:
_payment_handler.disconnect()
logger.info("Payment system disconnected during shutdown")
except Exception as e:
logger.error(f"Error during payment system shutdown: {str(e)}")
app = FastAPI(
title="Breez Nodeless Payments API",
description="A FastAPI implementation of Breez SDK for Lightning/Liquid payments",
version="1.0.0"
version="1.0.0",
lifespan=lifespan
)
API_KEY_NAME = "x-api-key"
@@ -132,12 +251,6 @@ async def get_api_key(api_key: str = Header(None, alias=API_KEY_NAME)):
)
return api_key
def get_payment_handler():
try:
return PaymentHandler()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to initialize PaymentHandler: {str(e)}")
# --- API Endpoints ---
@app.get("/list_payments", response_model=PaymentListResponse)
async def list_payments(
@@ -229,7 +342,10 @@ async def onchain_limits(
@app.get("/health")
async def health():
return {"status": "ok"}
global _payment_handler
if _payment_handler and _payment_handler.listener.is_synced():
return {"status": "ok", "sdk_synced": True}
return {"status": "ok", "sdk_synced": False}
@app.get("/check_payment_status/{destination}", response_model=PaymentStatusResponse)
async def check_payment_status(

View File

@@ -51,6 +51,7 @@ from breez_sdk_liquid import (
import time
import logging
from pprint import pprint
import threading
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
@@ -78,7 +79,6 @@ class SdkListener(EventListener):
if isinstance(event, SdkEvent.SYNCED):
self.synced = True
logger.info("SDK SYNCED")
elif isinstance(event, SdkEvent.PAYMENT_SUCCEEDED):
details = event.details
# Determine identifier based on payment type
@@ -140,83 +140,91 @@ class SdkListener(EventListener):
class PaymentHandler:
"""
A wrapper class for the Breez SDK Nodeless (Liquid implementation).
This class handles SDK initialization, connection, and provides simplified
methods for common payment and wallet operations.
Implements singleton pattern to prevent multiple SDK instances.
"""
def __init__(self, network: LiquidNetwork = LiquidNetwork.MAINNET, working_dir: str = '~/.breez-cli', asset_metadata: Optional[List[AssetMetadata]] = None, external_input_parsers: Optional[List[ExternalInputParser]] = None):
_instance = None
_initialized = False
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, network: LiquidNetwork = LiquidNetwork.MAINNET, working_dir: str = '~/.breez-cli',
asset_metadata: Optional[List[AssetMetadata]] = None,
external_input_parsers: Optional[List[ExternalInputParser]] = None):
"""
Initializes the PaymentHandler and connects to the Breez SDK.
Args:
network: The Liquid network to use (MAINNET or TESTNET).
working_dir: The directory for SDK files.
asset_metadata: Optional list of AssetMetadata for non-Bitcoin assets.
external_input_parsers: Optional list of ExternalInputParser for custom input parsing.
Uses singleton pattern to prevent multiple initializations.
"""
logger.debug("Entering PaymentHandler.__init__")
load_dotenv() # Load environment variables from .env
if self._initialized:
return
self.breez_api_key = os.getenv('BREEZ_API_KEY')
self.seed_phrase = os.getenv('BREEZ_SEED_PHRASE')
with self._lock:
if self._initialized:
return
if not self.breez_api_key:
logger.error("BREEZ_API_KEY not found in environment variables.")
raise Exception("Missing Breez API key in .env file or environment")
if not self.seed_phrase:
logger.error("BREEZ_SEED_PHRASE not found in environment variables.")
raise Exception("Missing seed phrase in .env file or environment")
logger.debug("Initializing PaymentHandler")
load_dotenv()
logger.info("Retrieved credentials from environment successfully")
self.breez_api_key = os.getenv('BREEZ_API_KEY')
self.seed_phrase = os.getenv('BREEZ_SEED_PHRASE')
config = default_config(network, self.breez_api_key)
# Expand user path for working_dir
config.working_dir = os.path.expanduser(working_dir)
# Ensure working directory exists
try:
os.makedirs(config.working_dir, exist_ok=True)
logger.debug(f"Ensured working directory exists: {config.working_dir}")
except OSError as e:
logger.error(f"Failed to create working directory {config.working_dir}: {e}")
raise # Re-raise if directory creation fails
if not self.breez_api_key:
logger.error("BREEZ_API_KEY not found in environment variables.")
raise Exception("Missing Breez API key in .env file or environment")
if not self.seed_phrase:
logger.error("BREEZ_SEED_PHRASE not found in environment variables.")
raise Exception("Missing seed phrase in .env file or environment")
if asset_metadata:
config.asset_metadata = asset_metadata
logger.info(f"Configured asset metadata: {asset_metadata}")
logger.info("Retrieved credentials from environment successfully")
if external_input_parsers:
config.external_input_parsers = external_input_parsers
logger.info(f"Configured external input parsers: {external_input_parsers}")
config = default_config(network, self.breez_api_key)
config.working_dir = os.path.expanduser(working_dir)
try:
os.makedirs(config.working_dir, exist_ok=True)
except OSError as e:
logger.error(f"Failed to create working directory {config.working_dir}: {e}")
raise
connect_request = ConnectRequest(config=config, mnemonic=self.seed_phrase)
if asset_metadata:
config.asset_metadata = asset_metadata
if external_input_parsers:
config.external_input_parsers = external_input_parsers
try:
self.instance = connect(connect_request)
self.listener = SdkListener()
# Add listener immediately after connecting
self.instance.add_event_listener(self.listener)
logger.info("Breez SDK connected successfully.")
except Exception as e:
logger.error(f"Failed to connect to Breez SDK: {e}")
# Re-raise the exception after logging
raise
connect_request = ConnectRequest(config=config, mnemonic=self.seed_phrase)
logger.debug("Exiting PaymentHandler.__init__")
try:
self.instance = connect(connect_request)
self.listener = SdkListener()
self.instance.add_event_listener(self.listener)
logger.info("Breez SDK connected successfully.")
# Shorter sync timeout for initial connection
self.wait_for_sync(timeout_seconds=10)
except Exception as e:
logger.error(f"Failed to connect to Breez SDK: {e}")
raise
self._initialized = True
logger.debug("PaymentHandler initialization complete")
def wait_for_sync(self, timeout_seconds: int = 30):
def wait_for_sync(self, timeout_seconds: int = 10) -> bool:
"""Wait for the SDK to sync before proceeding."""
logger.debug(f"Entering wait_for_sync (timeout={timeout_seconds}s)")
logger.debug(f"Waiting for sync (timeout={timeout_seconds}s)")
start_time = time.time()
while time.time() - start_time < timeout_seconds:
if self.listener.is_synced():
logger.debug("SDK synced.")
logger.debug("Exiting wait_for_sync (synced)")
logger.debug("SDK synced successfully")
return True
time.sleep(0.5) # Shorter sleep for faster sync detection
logger.error("Sync timeout: SDK did not sync within the allocated time.")
logger.debug("Exiting wait_for_sync (timeout)")
raise Exception(f"Sync timeout: SDK did not sync within {timeout_seconds} seconds.")
time.sleep(0.1) # Shorter sleep interval
logger.warning("SDK sync timeout")
return False
def wait_for_payment(self, identifier: str, timeout_seconds: int = 60) -> bool:
"""
@@ -1368,96 +1376,57 @@ class PaymentHandler:
def check_payment_status(self, destination: str) -> Dict[str, Any]:
"""
Checks the status of a specific payment by its destination/invoice.
Args:
destination: The payment destination (invoice) string to check.
Returns:
Dictionary containing payment status information:
{
'status': str, # The payment status (e.g., 'PENDING', 'SUCCEEDED', 'FAILED')
'amount_sat': int, # Amount in satoshis
'fees_sat': int, # Fees paid in satoshis
'payment_time': int, # Unix timestamp of the payment
'payment_hash': str, # Payment hash if available
'error': str, # Error message if any
}
Raises:
ValueError: If destination is invalid.
Exception: For any SDK errors.
Uses optimized status checking with shorter timeouts.
"""
logger.debug(f"Entering check_payment_status for destination: {destination[:30]}...")
logger.debug(f"Checking payment status for {destination[:30]}...")
try:
if not isinstance(destination, str) or not destination:
logger.warning("Invalid or empty destination provided.")
raise ValueError("Destination must be a non-empty string.")
raise ValueError("Invalid destination")
# First check the payment status in our listener's cache
logger.debug("Checking payment status in listener cache...")
# Check cached status first
cached_status = self.listener.get_payment_status(destination)
if cached_status:
logger.debug(f"Found cached payment status: {cached_status}")
# If we have a cached final status, we can return it immediately
if cached_status in ['SUCCEEDED', 'FAILED']:
logger.info(f"Returning cached final status: {cached_status}")
return {
'status': cached_status,
'amount_sat': None, # These would be None for cached statuses
'fees_sat': None,
'payment_time': None,
'payment_hash': None,
'error': None
}
# If no cached final status, check the actual payments
logger.debug("No cached final status found, querying payment list...")
try:
payments = self.instance.list_payments(ListPaymentsRequest())
logger.debug(f"Found {len(payments)} total payments")
except Exception as e:
logger.error(f"Error querying payment list: {e}")
raise
# Find the most recent payment matching the destination
matching_payment = None
for payment in payments:
logger.debug(f"Checking payment: {getattr(payment, 'destination', 'No destination')} == {destination}")
if hasattr(payment, 'destination') and payment.destination == destination:
if matching_payment is None or payment.timestamp > matching_payment.timestamp:
matching_payment = payment
logger.debug("Found matching payment or found more recent matching payment")
if matching_payment:
# Extract payment details
status = str(matching_payment.status)
details = matching_payment.details
result = {
'status': status,
'amount_sat': matching_payment.amount_sat,
'fees_sat': matching_payment.fees_sat,
'payment_time': matching_payment.timestamp,
'payment_hash': getattr(details, 'payment_hash', None),
'error': getattr(matching_payment, 'error', None)
}
logger.info(f"Found payment status for {destination[:30]}...: {status}")
logger.debug(f"Payment details: {result}")
logger.debug("Exiting check_payment_status")
return result
else:
logger.info(f"No payment found for destination: {destination[:30]}...")
if cached_status in ['SUCCEEDED', 'FAILED']:
return {
'status': 'NOT_FOUND',
'status': cached_status,
'amount_sat': None,
'fees_sat': None,
'payment_time': None,
'payment_hash': None,
'error': 'Payment not found'
'error': None
}
# Short wait for payment status
payment_succeeded = self.wait_for_payment(destination, timeout_seconds=2)
# Get final status
final_status = self.listener.get_payment_status(destination) or 'PENDING'
status = 'SUCCEEDED' if payment_succeeded else final_status
# Try to get payment details
try:
payments = self.instance.list_payments(ListPaymentsRequest())
payment = next(
(p for p in payments if hasattr(p, 'destination') and p.destination == destination),
None
)
except Exception as e:
logger.warning(f"Could not fetch payment details: {e}")
payment = None
result = {
'status': status,
'amount_sat': getattr(payment, 'amount_sat', None),
'fees_sat': getattr(payment, 'fees_sat', None),
'payment_time': getattr(payment, 'timestamp', None),
'payment_hash': getattr(payment.details, 'payment_hash', None) if payment and payment.details else None,
'error': None if status == 'SUCCEEDED' else getattr(payment, 'error', 'Payment details not found')
}
logger.info(f"Payment status: {status}")
return result
except Exception as e:
logger.error(f"Error checking payment status for {destination[:30]}...: {str(e)}")
logger.exception("Full error details:")
logger.error(f"Error checking payment status: {str(e)}")
raise
def get_exchange_rate(self, currency: str = None) -> Dict[str, Any]: