mirror of
https://github.com/aljazceru/payments-rest-api.git
synced 2025-12-18 06:04:20 +01:00
1626 lines
73 KiB
Python
1626 lines
73 KiB
Python
import json
|
|
import os
|
|
import argparse
|
|
from typing import Optional, List, Dict, Any
|
|
from dotenv import load_dotenv
|
|
from breez_sdk_liquid import (
|
|
LiquidNetwork,
|
|
PayAmount,
|
|
ConnectRequest,
|
|
PrepareSendRequest,
|
|
SendPaymentRequest,
|
|
PrepareReceiveRequest,
|
|
ReceivePaymentRequest,
|
|
EventListener,
|
|
SdkEvent,
|
|
connect,
|
|
default_config,
|
|
PaymentMethod,
|
|
ListPaymentsRequest,
|
|
InputType,
|
|
SignMessageRequest,
|
|
CheckMessageRequest,
|
|
BuyBitcoinProvider,
|
|
PrepareBuyBitcoinRequest,
|
|
BuyBitcoinRequest,
|
|
PreparePayOnchainRequest,
|
|
PayOnchainRequest,
|
|
RefundRequest,
|
|
RefundableSwap,
|
|
FetchPaymentProposedFeesRequest,
|
|
AcceptPaymentProposedFeesRequest,
|
|
PaymentState,
|
|
PaymentDetails,
|
|
AssetMetadata,
|
|
ExternalInputParser,
|
|
GetPaymentRequest,
|
|
ListPaymentDetails,
|
|
ReceiveAmount,
|
|
PrepareBuyBitcoinResponse,
|
|
PrepareLnUrlPayResponse,
|
|
PreparePayOnchainResponse,
|
|
LnUrlPayRequestData,
|
|
LnUrlAuthRequestData,
|
|
LnUrlWithdrawRequestData,
|
|
)
|
|
import time
|
|
import logging
|
|
from pprint import pprint
|
|
import threading
|
|
import asyncio
|
|
|
|
# Set up logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SdkListener(EventListener):
|
|
"""
|
|
A listener class for handling Breez SDK events.
|
|
|
|
This class extends the EventListener from breez_sdk_liquid and implements
|
|
custom event handling logic for tracking payment states through their lifecycle:
|
|
|
|
Lightning Payment States:
|
|
- PENDING: The swap service is holding the payment and has broadcast a lockup transaction
|
|
- WAITING_CONFIRMATION: Claim transaction broadcast or direct Liquid transaction seen
|
|
- SUCCEEDED: Claim transaction or direct Liquid transaction confirmed
|
|
- FAILED: Swap failed (expired or lockup transaction failed)
|
|
- WAITING_FEE_ACCEPTANCE: Payment requires fee acceptance
|
|
"""
|
|
def __init__(self):
|
|
self.synced = False
|
|
self.paid = [] # Legacy list for backward compatibility
|
|
self.refunded = [] # Track refunded payments
|
|
self.payment_statuses = {} # Track all payment statuses
|
|
self.payment_errors = {} # Track error messages for failed payments
|
|
self.payment_timestamps = {} # Track when payments change state
|
|
self.payment_details = {} # Cache payment details
|
|
|
|
def _update_payment_state(self, identifier: str, status: str, details: Any = None, error: str = None):
|
|
"""Helper method to update payment state and related tracking."""
|
|
if not identifier:
|
|
logger.warning(f"Attempted to update payment state with empty identifier. Status: {status}")
|
|
return
|
|
|
|
# Update status and timestamp
|
|
self.payment_statuses[identifier] = status
|
|
self.payment_timestamps[identifier] = int(time.time())
|
|
|
|
# Cache payment details if provided
|
|
if details:
|
|
self.payment_details[identifier] = details
|
|
|
|
# Track errors for failed payments
|
|
if error:
|
|
self.payment_errors[identifier] = error
|
|
elif status != 'FAILED' and identifier in self.payment_errors:
|
|
del self.payment_errors[identifier]
|
|
|
|
# Update paid list for backward compatibility
|
|
if status in ['WAITING_CONFIRMATION', 'SUCCEEDED']:
|
|
if identifier not in self.paid:
|
|
self.paid.append(identifier)
|
|
logger.info(f"Payment {identifier} added to paid list (status: {status})")
|
|
|
|
# Log state change
|
|
logger.info(f"Payment {identifier} state updated to {status}" +
|
|
(f" with error: {error}" if error else ""))
|
|
|
|
def on_event(self, event):
|
|
"""Handles incoming SDK events."""
|
|
logger.debug(f"Received SDK event: {event}")
|
|
|
|
if isinstance(event, SdkEvent.SYNCED):
|
|
self.synced = True
|
|
logger.info("SDK synced")
|
|
return
|
|
|
|
# Extract payment details and identifier
|
|
details = getattr(event, 'details', None)
|
|
if not details:
|
|
logger.debug("Event received without details")
|
|
return
|
|
|
|
# Determine payment identifier (try multiple possible fields)
|
|
identifier = None
|
|
if hasattr(details, 'payment_hash') and details.payment_hash:
|
|
identifier = details.payment_hash
|
|
elif hasattr(details, 'destination') and details.destination:
|
|
identifier = details.destination
|
|
elif hasattr(details, 'swap_id') and details.swap_id:
|
|
identifier = details.swap_id
|
|
|
|
if not identifier:
|
|
logger.warning("Could not determine payment identifier from event")
|
|
return
|
|
|
|
# Handle different payment events
|
|
if isinstance(event, SdkEvent.PAYMENT_PENDING):
|
|
self._update_payment_state(identifier, 'PENDING', details)
|
|
logger.info(f"Payment {identifier} is pending (lockup transaction broadcast)")
|
|
|
|
elif isinstance(event, SdkEvent.PAYMENT_WAITING_CONFIRMATION):
|
|
self._update_payment_state(identifier, 'WAITING_CONFIRMATION', details)
|
|
logger.info(f"Payment {identifier} is waiting confirmation (claim tx broadcast)")
|
|
|
|
elif isinstance(event, SdkEvent.PAYMENT_SUCCEEDED):
|
|
self._update_payment_state(identifier, 'SUCCEEDED', details)
|
|
logger.info(f"Payment {identifier} succeeded (claim tx confirmed)")
|
|
|
|
elif isinstance(event, SdkEvent.PAYMENT_FAILED):
|
|
error = getattr(details, 'error', 'Unknown error')
|
|
self._update_payment_state(identifier, 'FAILED', details, error)
|
|
logger.error(f"Payment {identifier} failed. Error: {error}")
|
|
|
|
elif isinstance(event, SdkEvent.PAYMENT_WAITING_FEE_ACCEPTANCE):
|
|
self._update_payment_state(identifier, 'WAITING_FEE_ACCEPTANCE', details)
|
|
logger.info(f"Payment {identifier} is waiting for fee acceptance")
|
|
|
|
def is_paid(self, destination: str) -> bool:
|
|
"""
|
|
Checks if a payment to a specific destination has succeeded.
|
|
Now considers both WAITING_CONFIRMATION and SUCCEEDED as successful states.
|
|
"""
|
|
status = self.payment_statuses.get(destination)
|
|
return (destination in self.paid or
|
|
status in ['WAITING_CONFIRMATION', 'SUCCEEDED'])
|
|
|
|
def is_synced(self) -> bool:
|
|
"""Checks if the SDK is synced."""
|
|
return self.synced
|
|
|
|
def get_payment_status(self, identifier: str) -> Optional[str]:
|
|
"""
|
|
Get the known status for a payment identified by destination, hash, or swap ID.
|
|
Returns status string ('SUCCEEDED', 'FAILED', 'PENDING', etc.) or None.
|
|
"""
|
|
return self.payment_statuses.get(identifier)
|
|
|
|
def get_payment_error(self, identifier: str) -> Optional[str]:
|
|
"""Get the error message for a failed payment, if any."""
|
|
return self.payment_errors.get(identifier)
|
|
|
|
def get_payment_timestamp(self, identifier: str) -> Optional[int]:
|
|
"""Get the timestamp of the last state change for a payment."""
|
|
return self.payment_timestamps.get(identifier)
|
|
|
|
def get_payment_details(self, identifier: str) -> Optional[Any]:
|
|
"""Get cached payment details if available."""
|
|
return self.payment_details.get(identifier)
|
|
|
|
def clear_old_data(self, max_age_seconds: int = 86400):
|
|
"""
|
|
Clear payment data older than max_age_seconds (default 24 hours).
|
|
This helps prevent memory growth from old payment data.
|
|
"""
|
|
current_time = int(time.time())
|
|
old_identifiers = [
|
|
identifier for identifier, timestamp in self.payment_timestamps.items()
|
|
if current_time - timestamp > max_age_seconds
|
|
]
|
|
|
|
for identifier in old_identifiers:
|
|
self.payment_statuses.pop(identifier, None)
|
|
self.payment_errors.pop(identifier, None)
|
|
self.payment_timestamps.pop(identifier, None)
|
|
self.payment_details.pop(identifier, None)
|
|
if identifier in self.paid:
|
|
self.paid.remove(identifier)
|
|
if identifier in self.refunded:
|
|
self.refunded.remove(identifier)
|
|
|
|
if old_identifiers:
|
|
logger.info(f"Cleared {len(old_identifiers)} old payment records")
|
|
|
|
|
|
class PaymentHandler:
|
|
"""
|
|
A wrapper class for the Breez SDK Nodeless (Liquid implementation).
|
|
Implements singleton pattern to prevent multiple SDK instances.
|
|
"""
|
|
_instance = None
|
|
_initialized = False
|
|
_lock = threading.Lock()
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
if cls._instance is None:
|
|
with cls._lock:
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
return cls._instance
|
|
|
|
def __init__(self, network: LiquidNetwork = LiquidNetwork.MAINNET, working_dir: str = '~/.breez-cli',
|
|
asset_metadata: Optional[List[AssetMetadata]] = None,
|
|
external_input_parsers: Optional[List[ExternalInputParser]] = None):
|
|
"""
|
|
Initializes the PaymentHandler and connects to the Breez SDK.
|
|
Uses singleton pattern to prevent multiple initializations.
|
|
"""
|
|
if self._initialized:
|
|
return
|
|
|
|
with self._lock:
|
|
if self._initialized:
|
|
return
|
|
|
|
logger.debug("Initializing PaymentHandler")
|
|
load_dotenv()
|
|
|
|
self.breez_api_key = os.getenv('BREEZ_API_KEY')
|
|
self.seed_phrase = os.getenv('BREEZ_SEED_PHRASE')
|
|
|
|
if not self.breez_api_key:
|
|
logger.error("BREEZ_API_KEY not found in environment variables.")
|
|
raise Exception("Missing Breez API key in .env file or environment")
|
|
if not self.seed_phrase:
|
|
logger.error("BREEZ_SEED_PHRASE not found in environment variables.")
|
|
raise Exception("Missing seed phrase in .env file or environment")
|
|
|
|
logger.info("Retrieved credentials from environment successfully")
|
|
|
|
config = default_config(network, self.breez_api_key)
|
|
config.working_dir = os.path.expanduser(working_dir)
|
|
|
|
try:
|
|
os.makedirs(config.working_dir, exist_ok=True)
|
|
except OSError as e:
|
|
logger.error(f"Failed to create working directory {config.working_dir}: {e}")
|
|
raise
|
|
|
|
if asset_metadata:
|
|
config.asset_metadata = asset_metadata
|
|
if external_input_parsers:
|
|
config.external_input_parsers = external_input_parsers
|
|
|
|
connect_request = ConnectRequest(config=config, mnemonic=self.seed_phrase)
|
|
|
|
try:
|
|
self.instance = connect(connect_request)
|
|
self.listener = SdkListener()
|
|
self.instance.add_event_listener(self.listener)
|
|
logger.info("Breez SDK connected successfully.")
|
|
|
|
# Shorter sync timeout for initial connection
|
|
self._wait_for_sync_blocking(timeout_seconds=10)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to Breez SDK: {e}")
|
|
raise
|
|
|
|
self._initialized = True
|
|
logger.debug("PaymentHandler initialization complete")
|
|
|
|
def _wait_for_sync_blocking(self, timeout_seconds: int = 10) -> bool:
|
|
"""Blocking helper to wait for the SDK to sync before proceeding."""
|
|
logger.debug(f"Waiting for sync (timeout={timeout_seconds}s) [blocking]")
|
|
start_time = time.time()
|
|
while time.time() - start_time < timeout_seconds:
|
|
if self.listener.is_synced():
|
|
logger.debug("SDK synced successfully")
|
|
return True
|
|
time.sleep(0.1) # Shorter sleep interval
|
|
logger.warning("SDK sync timeout")
|
|
return False
|
|
|
|
async def wait_for_sync(self, timeout_seconds: int = 10) -> bool:
|
|
"""Asynchronously wait for the SDK to sync before proceeding."""
|
|
logger.debug(f"Waiting for sync (timeout={timeout_seconds}s) [async]")
|
|
start_time = time.time()
|
|
while time.time() - start_time < timeout_seconds:
|
|
if self.listener.is_synced():
|
|
logger.debug("SDK synced successfully")
|
|
return True
|
|
await asyncio.sleep(0.1)
|
|
logger.warning("SDK sync timeout")
|
|
return False
|
|
|
|
def wait_for_payment(self, identifier: str, timeout_seconds: int = 60) -> bool:
|
|
"""
|
|
Wait for payment to complete or timeout for a specific identifier
|
|
(destination, hash, or swap ID).
|
|
"""
|
|
logger.debug(f"Entering wait_for_payment (identifier={identifier}, timeout={timeout_seconds}s)")
|
|
start_time = time.time()
|
|
while time.time() - start_time < timeout_seconds:
|
|
status = self.listener.get_payment_status(identifier)
|
|
if status in ['SUCCEEDED', 'PENDING']:
|
|
logger.debug(f"Payment for {identifier} has status: {status}")
|
|
logger.debug("Exiting wait_for_payment (succeeded or pending)")
|
|
return True
|
|
if status == 'FAILED':
|
|
logger.error(f"Payment for {identifier} failed during wait.")
|
|
logger.debug("Exiting wait_for_payment (failed)")
|
|
return False
|
|
# Consider other final states like 'REFUNDED' if applicable
|
|
if status == 'REFUNDED':
|
|
logger.info(f"Swap for {identifier} was refunded during wait.")
|
|
logger.debug("Exiting wait_for_payment (refunded)")
|
|
return False
|
|
|
|
time.sleep(1)
|
|
logger.warning(f"Wait for payment for {identifier} timed out.")
|
|
logger.debug("Exiting wait_for_payment (timeout)")
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""Disconnects from the Breez SDK."""
|
|
logger.debug("Entering disconnect")
|
|
try:
|
|
# Check if the instance attribute exists and is not None
|
|
if hasattr(self, 'instance') and self.instance:
|
|
self.instance.disconnect()
|
|
logger.info("Breez SDK disconnected.")
|
|
else:
|
|
logger.warning("Disconnect called but SDK instance was not initialized or already disconnected.")
|
|
except Exception as e:
|
|
logger.error(f"Error disconnecting from Breez SDK: {e}")
|
|
# Decide if you want to re-raise or just log depending on context
|
|
# raise # Re-raising might prevent clean shutdown
|
|
|
|
logger.debug("Exiting disconnect")
|
|
|
|
|
|
# --- Wallet Operations ---
|
|
def get_info(self) -> Dict[str, Any]:
|
|
"""
|
|
Fetches general wallet and blockchain information.
|
|
|
|
Returns:
|
|
Dictionary containing wallet_info and blockchain_info.
|
|
"""
|
|
logger.debug("Entering get_info")
|
|
try:
|
|
info = self.instance.get_info()
|
|
# Convert info object to dictionary for easier handling
|
|
info_dict = {
|
|
'wallet_info': info.wallet_info.__dict__ if info.wallet_info else None,
|
|
'blockchain_info': info.blockchain_info.__dict__ if info.blockchain_info else None,
|
|
}
|
|
logger.debug(f"Fetched wallet info successfully.")
|
|
logger.debug("Exiting get_info")
|
|
return info_dict
|
|
except Exception as e:
|
|
logger.error(f"Error getting info: {e}")
|
|
logger.debug("Exiting get_info (error)")
|
|
raise
|
|
|
|
def list_payments(self, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
|
|
"""
|
|
Lists payment history with optional filters.
|
|
|
|
Args:
|
|
params: Dictionary with optional filters (from_timestamp, to_timestamp,
|
|
offset, limit, filters, details). 'filters' should be a list
|
|
of breez_sdk_liquid.PaymentType members. 'details' should be
|
|
a breez_sdk_liquid.ListPaymentDetails object.
|
|
Returns:
|
|
List of payment dictionaries.
|
|
Raises:
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug(f"Entering list_payments with params: {params}")
|
|
try:
|
|
from_ts = int(params.get('from_timestamp')) if params and params.get('from_timestamp') is not None else None
|
|
to_ts = int(params.get('to_timestamp')) if params and params.get('to_timestamp') is not None else None
|
|
offset = int(params.get('offset')) if params and params.get('offset') is not None else None
|
|
limit = int(params.get('limit')) if params and params.get('limit') is not None else None
|
|
|
|
# --- Handle optional filters and details ---
|
|
filters = params.get('filters') if params else None # Expects List[PaymentType]
|
|
details_param = params.get('details') if params else None # Expects ListPaymentDetails
|
|
|
|
# Add validation for filters/details types if needed
|
|
if filters is not None and not isinstance(filters, list):
|
|
logger.warning(f"Invalid type for 'filters' parameter: {type(filters)}")
|
|
# Decide whether to raise error or proceed without filter
|
|
# raise ValueError("'filters' parameter must be a list of PaymentType")
|
|
filters = None # Ignore invalid input
|
|
|
|
# Validation for details_param is trickier as it's a union type
|
|
# We'll trust the caller passes the correct SDK object or None
|
|
|
|
req = ListPaymentsRequest(
|
|
from_timestamp=from_ts,
|
|
to_timestamp=to_ts,
|
|
offset=offset,
|
|
limit=limit,
|
|
filters=filters,
|
|
details=details_param,
|
|
)
|
|
# --- End handle optional filters and details ---
|
|
|
|
payments = self.instance.list_payments(req)
|
|
|
|
# Convert payment objects to dictionaries for easier handling
|
|
payment_list = []
|
|
for payment in payments:
|
|
# Use a helper function if this conversion becomes complex/repeated
|
|
payment_dict = {
|
|
'id': getattr(payment, 'id', None), # Payments might have an ID? Check SDK docs
|
|
'timestamp': payment.timestamp,
|
|
'amount_sat': payment.amount_sat,
|
|
'fees_sat': payment.fees_sat,
|
|
'payment_type': str(payment.payment_type), # Convert Enum to string
|
|
'status': str(payment.status), # Convert Enum to string
|
|
'details': self.sdk_to_dict(payment.details) if payment.details else None, # Include details dict
|
|
'destination': getattr(payment, 'destination', None), # Optional field
|
|
'tx_id': getattr(payment, 'tx_id', None), # Optional field
|
|
'payment_hash': getattr(payment.details, 'payment_hash', None), # Often useful, from details
|
|
'swap_id': getattr(payment.details, 'swap_id', None), # Often useful, from details
|
|
}
|
|
payment_list.append(payment_dict)
|
|
|
|
logger.debug(f"Listed {len(payment_list)} payments.")
|
|
logger.debug("Exiting list_payments")
|
|
return payment_list
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error listing payments: {e}")
|
|
logger.debug("Exiting list_payments (error)")
|
|
raise
|
|
|
|
def get_payment(self, identifier: str, identifier_type: str = 'payment_hash') -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Retrieves a specific payment by hash or swap ID.
|
|
|
|
Args:
|
|
identifier: The payment hash or swap ID string.
|
|
identifier_type: 'payment_hash' or 'swap_id'.
|
|
Returns:
|
|
Payment dictionary or None if not found.
|
|
Raises:
|
|
ValueError: If invalid identifier_type is provided.
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug(f"Entering get_payment with identifier: {identifier}, type: {identifier_type}")
|
|
try:
|
|
req = None
|
|
if identifier_type == 'payment_hash':
|
|
req = GetPaymentRequest.PAYMENT_HASH(identifier)
|
|
elif identifier_type == 'swap_id':
|
|
req = GetPaymentRequest.SWAP_ID(identifier)
|
|
else:
|
|
logger.warning(f"Invalid identifier_type for get_payment: {identifier_type}")
|
|
raise ValueError("identifier_type must be 'payment_hash' or 'swap_id'")
|
|
|
|
payment = self.instance.get_payment(req)
|
|
if payment:
|
|
# Use a helper function if payment-to-dict conversion is common
|
|
payment_dict = {
|
|
'id': getattr(payment, 'id', None),
|
|
'timestamp': payment.timestamp,
|
|
'amount_sat': payment.amount_sat,
|
|
'fees_sat': payment.fees_sat,
|
|
'payment_type': str(payment.payment_type),
|
|
'status': str(payment.status),
|
|
'details': self.sdk_to_dict(payment.details) if payment.details else None,
|
|
'destination': getattr(payment, 'destination', None),
|
|
'tx_id': getattr(payment, 'tx_id', None),
|
|
'payment_hash': getattr(payment.details, 'payment_hash', None),
|
|
'swap_id': getattr(payment.details, 'swap_id', None),
|
|
}
|
|
logger.debug(f"Fetched payment: {identifier}")
|
|
logger.debug("Exiting get_payment (found)")
|
|
return payment_dict
|
|
else:
|
|
logger.debug(f"Payment not found: {identifier}")
|
|
logger.debug("Exiting get_payment (not found)")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting payment {identifier}: {e}")
|
|
logger.debug("Exiting get_payment (error)")
|
|
raise
|
|
|
|
# --- Sending Payments ---
|
|
def send_payment(self, destination: str, amount_sat: Optional[int] = None, amount_asset: Optional[float] = None, asset_id: Optional[str] = None, drain: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Prepares and sends a payment to a destination (BOLT11, Liquid BIP21/address)
|
|
for Bitcoin or other Liquid assets.
|
|
|
|
Args:
|
|
destination: The payment destination string.
|
|
amount_sat: Optional amount in satoshis for Bitcoin payments.
|
|
amount_asset: Optional amount for asset payments (as float).
|
|
asset_id: Required if amount_asset is provided. The asset ID string.
|
|
drain: If True, sends all funds (overrides amount arguments).
|
|
Returns:
|
|
Dictionary with initiated payment details.
|
|
Raises:
|
|
ValueError: If inconsistent or missing amount arguments.
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug(f"Entering send_payment to {destination} (amount_sat={amount_sat}, amount_asset={amount_asset}, asset_id={asset_id}, drain={drain})")
|
|
try:
|
|
amount_obj = None
|
|
|
|
if drain:
|
|
amount_obj = PayAmount.DRAIN
|
|
logger.debug("Sending payment using DRAIN amount.")
|
|
elif amount_sat is not None:
|
|
if amount_asset is not None or asset_id is not None:
|
|
logger.warning("Conflicting amount arguments: amount_sat provided with asset arguments.")
|
|
raise ValueError("Provide either amount_sat, or (amount_asset and asset_id), or drain=True.")
|
|
amount_obj = PayAmount.BITCOIN(amount_sat)
|
|
logger.debug(f"Sending Bitcoin payment with amount: {amount_sat} sat.")
|
|
elif amount_asset is not None and asset_id is not None:
|
|
if amount_sat is not None or drain:
|
|
logger.warning("Conflicting amount arguments: asset arguments provided with amount_sat or drain.")
|
|
raise ValueError("Provide either amount_sat, or (amount_asset and asset_id), or drain=True.")
|
|
# False is 'is_liquid_fee' - typically false for standard asset sends
|
|
amount_obj = PayAmount.ASSET(asset_id, amount_asset, False)
|
|
logger.debug(f"Sending asset payment {asset_id} with amount: {amount_asset}.")
|
|
else:
|
|
logger.warning("Missing or inconsistent amount arguments.")
|
|
raise ValueError("Provide either amount_sat, or (amount_asset and asset_id), or drain=True.")
|
|
|
|
|
|
prepare_req = PrepareSendRequest(destination=destination, amount=amount_obj)
|
|
prepare_res = self.instance.prepare_send_payment(prepare_req)
|
|
|
|
# You might want to add a step here to check fees and potentially ask for confirmation
|
|
logger.info(f"Prepared send payment to {destination}. Fees: {prepare_res.fees_sat} sat.")
|
|
logger.debug(f"PrepareSendRequest response: {prepare_res.__dict__}")
|
|
|
|
|
|
req = SendPaymentRequest(prepare_response=prepare_res)
|
|
send_res = self.instance.send_payment(req)
|
|
|
|
# You can track the payment status via the listener or check_payment_status later
|
|
initiated_payment_details = {
|
|
'status': str(send_res.payment.status), # Initial status (likely PENDING)
|
|
'destination': getattr(send_res.payment, 'destination', None), # May or may not be present
|
|
'fees_sat': prepare_res.fees_sat, # Prepared fees, final fees might differ slightly
|
|
'payment_hash': getattr(send_res.payment.details, 'payment_hash', None), # Likely present for lightning
|
|
'swap_id': getattr(send_res.payment.details, 'swap_id', None), # Likely present for onchain/liquid swaps
|
|
}
|
|
logger.info(f"Send payment initiated to {destination}.")
|
|
logger.debug(f"Send payment initiated details: {initiated_payment_details}")
|
|
logger.debug("Exiting send_payment (initiated)")
|
|
|
|
return initiated_payment_details
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending payment to {destination}: {e}")
|
|
logger.debug("Exiting send_payment (error)")
|
|
raise
|
|
|
|
# --- Receiving Payments ---
|
|
def receive_payment(self, amount: int, payment_method: str = 'LIGHTNING', description: Optional[str] = None, asset_id: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Prepares and generates a receive address/invoice.
|
|
|
|
Args:
|
|
amount: The amount to receive.
|
|
payment_method: 'LIGHTNING', 'BITCOIN_ADDRESS', or 'LIQUID_ADDRESS'.
|
|
description: Optional description for the payment (mainly for Lightning).
|
|
asset_id: Optional asset ID string for receiving specific assets on Liquid.
|
|
Returns:
|
|
Dictionary with destination (address/invoice) and prepared fees.
|
|
Raises:
|
|
ValueError: If invalid payment_method is provided.
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug(f"Entering receive_payment (amount={amount}, method={payment_method}, asset={asset_id})")
|
|
try:
|
|
method = getattr(PaymentMethod, payment_method.upper(), None)
|
|
if not method:
|
|
logger.warning(f"Invalid payment_method: {payment_method}")
|
|
raise ValueError(f"Invalid payment_method: {payment_method}. Must be 'LIGHTNING', 'BITCOIN_ADDRESS', or 'LIQUID_ADDRESS'.")
|
|
|
|
if asset_id:
|
|
receive_amount_obj = ReceiveAmount.ASSET(asset_id, amount)
|
|
logger.debug(f"Receiving asset {asset_id} with amount {amount}")
|
|
else:
|
|
receive_amount_obj = ReceiveAmount.BITCOIN(amount)
|
|
logger.debug(f"Receiving Bitcoin with amount {amount} sat.")
|
|
|
|
|
|
prepare_req = PrepareReceiveRequest(payment_method=method, amount=receive_amount_obj)
|
|
prepare_res = self.instance.prepare_receive_payment(prepare_req)
|
|
|
|
logger.info(f"Prepared receive payment ({payment_method}). Fees: {prepare_res.fees_sat} sat.")
|
|
logger.debug(f"PrepareReceiveRequest response: {prepare_res.__dict__}")
|
|
|
|
|
|
req = ReceivePaymentRequest(prepare_response=prepare_res, description=description)
|
|
receive_res = self.instance.receive_payment(req)
|
|
|
|
logger.info(f"Receive payment destination generated: {receive_res.destination}")
|
|
logger.debug(f"Receive payment response: {receive_res.__dict__}")
|
|
logger.debug("Exiting receive_payment")
|
|
|
|
|
|
return {
|
|
'destination': receive_res.destination,
|
|
'fees_sat': prepare_res.fees_sat, # Prepared fees, final fees might differ
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error receiving payment ({payment_method}) for amount {amount}: {e}")
|
|
logger.debug("Exiting receive_payment (error)")
|
|
raise
|
|
|
|
# --- Buy Bitcoin ---
|
|
def fetch_buy_bitcoin_limits(self) -> Dict[str, Any]:
|
|
"""
|
|
Fetches limits for buying Bitcoin (uses onchain limits).
|
|
|
|
Returns:
|
|
Dictionary containing receive and send limits.
|
|
Raises:
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug("Entering fetch_buy_bitcoin_limits")
|
|
try:
|
|
limits = self.instance.fetch_onchain_limits() # Onchain limits apply to Buy/Sell
|
|
limits_dict = {
|
|
'receive': limits.receive.__dict__ if limits.receive else None,
|
|
'send': limits.send.__dict__ if limits.send else None,
|
|
}
|
|
logger.debug(f"Fetched buy/sell limits successfully.")
|
|
logger.debug("Exiting fetch_buy_bitcoin_limits")
|
|
return limits_dict
|
|
except Exception as e:
|
|
logger.error(f"Error fetching buy bitcoin limits: {e}")
|
|
logger.debug("Exiting fetch_buy_bitcoin_limits (error)")
|
|
raise
|
|
|
|
def prepare_buy_bitcoin(self, provider: str, amount_sat: int) -> Dict[str, Any]:
|
|
"""
|
|
Prepares a buy Bitcoin request.
|
|
|
|
Args:
|
|
provider: The buy provider string (e.g., 'MOONPAY').
|
|
amount_sat: The amount in satoshis to buy.
|
|
Returns:
|
|
Dictionary with preparation details, including fees.
|
|
Raises:
|
|
ValueError: If invalid provider is provided.
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug(f"Entering prepare_buy_bitcoin (provider={provider}, amount={amount_sat})")
|
|
try:
|
|
buy_provider = getattr(BuyBitcoinProvider, provider.upper(), None)
|
|
if not buy_provider:
|
|
logger.warning(f"Invalid buy bitcoin provider: {provider}")
|
|
raise ValueError(f"Invalid buy bitcoin provider: {provider}.")
|
|
|
|
req = PrepareBuyBitcoinRequest(provider=buy_provider, amount_sat=amount_sat)
|
|
prepare_res = self.instance.prepare_buy_bitcoin(req)
|
|
prepare_res_dict = prepare_res.__dict__
|
|
logger.info(f"Prepared buy bitcoin with {provider}. Fees: {prepare_res.fees_sat} sat.")
|
|
logger.debug(f"PrepareBuyBitcoinRequest response: {prepare_res_dict}")
|
|
logger.debug("Exiting prepare_buy_bitcoin")
|
|
|
|
return prepare_res_dict
|
|
except Exception as e:
|
|
logger.error(f"Error preparing buy bitcoin for {amount_sat} with {provider}: {e}")
|
|
logger.debug("Exiting prepare_buy_bitcoin (error)")
|
|
raise
|
|
|
|
# Refined signature to expect the SDK object
|
|
def buy_bitcoin(self, prepare_response: PrepareBuyBitcoinResponse) -> str:
|
|
"""
|
|
Executes a buy Bitcoin request using prepared data.
|
|
|
|
Args:
|
|
prepare_response: The PrepareBuyBitcoinResponse object returned by prepare_buy_bitcoin.
|
|
Returns:
|
|
The URL string to complete the purchase.
|
|
Raises:
|
|
TypeError: If prepare_response is not the correct type.
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug("Entering buy_bitcoin")
|
|
try:
|
|
# Check if it's the correct type of SDK object
|
|
if not isinstance(prepare_response, PrepareBuyBitcoinResponse):
|
|
logger.error(f"buy_bitcoin expects PrepareBuyBitcoinResponse object, but received {type(prepare_response)}.")
|
|
raise TypeError("buy_bitcoin expects the SDK PrepareBuyBitcoinResponse object")
|
|
|
|
req = BuyBitcoinRequest(prepare_response=prepare_response) # Pass the actual object
|
|
url = self.instance.buy_bitcoin(req)
|
|
logger.info(f"Buy bitcoin URL generated.")
|
|
logger.debug("Exiting buy_bitcoin")
|
|
return url
|
|
except Exception as e:
|
|
logger.error(f"Error executing buy bitcoin: {e}")
|
|
logger.debug("Exiting buy_bitcoin (error)")
|
|
raise
|
|
|
|
# --- Fiat Currencies ---
|
|
def list_fiat_currencies(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Lists supported fiat currencies.
|
|
|
|
Returns:
|
|
List of fiat currency dictionaries.
|
|
"""
|
|
logger.debug("Entering list_fiat_currencies")
|
|
try:
|
|
currencies = self.instance.list_fiat_currencies()
|
|
currencies_list = [c.__dict__ for c in currencies]
|
|
logger.debug(f"Listed {len(currencies_list)} fiat currencies.")
|
|
logger.debug("Exiting list_fiat_currencies")
|
|
return currencies_list
|
|
except Exception as e:
|
|
logger.error(f"Error listing fiat currencies: {e}")
|
|
logger.debug("Exiting list_fiat_currencies (error)")
|
|
raise
|
|
|
|
def fetch_fiat_rates(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Fetches current fiat exchange rates.
|
|
|
|
Returns:
|
|
List of fiat rate dictionaries.
|
|
"""
|
|
logger.debug("Entering fetch_fiat_rates")
|
|
try:
|
|
rates = self.instance.fetch_fiat_rates()
|
|
rates_list = [r.__dict__ for r in rates]
|
|
logger.debug(f"Fetched {len(rates_list)} fiat rates.")
|
|
logger.debug("Exiting fetch_fiat_rates")
|
|
return rates_list
|
|
except Exception as e:
|
|
logger.error(f"Error fetching fiat rates: {e}")
|
|
logger.debug("Exiting fetch_fiat_rates (error)")
|
|
raise
|
|
|
|
# --- LNURL Operations ---
|
|
def parse_input(self, input_str: str) -> Dict[str, Any]:
|
|
"""
|
|
Parses various input types (LNURL, addresses, invoices, etc.).
|
|
|
|
Args:
|
|
input_str: The string input to parse.
|
|
Returns:
|
|
Dictionary representing the parsed input details.
|
|
Raises:
|
|
Exception: For any SDK errors during parsing.
|
|
"""
|
|
logger.debug(f"Entering parse_input with input: {input_str}")
|
|
try:
|
|
parsed_input = self.instance.parse(input_str)
|
|
# Convert the specific InputType object to a dictionary
|
|
# Access .data on the *instance* of the parsed input, not the type
|
|
if isinstance(parsed_input, InputType.BITCOIN_ADDRESS):
|
|
result = {'type': 'BITCOIN_ADDRESS', 'address': parsed_input.address.address}
|
|
elif isinstance(parsed_input, InputType.BOLT11):
|
|
result = {'type': 'BOLT11', 'invoice': parsed_input.invoice.__dict__}
|
|
elif isinstance(parsed_input, InputType.LN_URL_PAY):
|
|
# Access data on the instance: parsed_input.data
|
|
result = {'type': 'LN_URL_PAY', 'data': parsed_input.data.__dict__}
|
|
elif isinstance(parsed_input, InputType.LN_URL_AUTH):
|
|
# Access data on the instance: parsed_input.data
|
|
result = {'type': 'LN_URL_AUTH', 'data': parsed_input.data.__dict__}
|
|
elif isinstance(parsed_input, InputType.LN_URL_WITHDRAW):
|
|
# Access data on the instance: parsed_input.data
|
|
result = {'type': 'LN_URL_WITHDRAW', 'data': parsed_input.data.__dict__}
|
|
elif isinstance(parsed_input, InputType.LIQUID_ADDRESS):
|
|
result = {'type': 'LIQUID_ADDRESS', 'address': parsed_input.address.address}
|
|
elif isinstance(parsed_input, InputType.BIP21):
|
|
result = {'type': 'BIP21', 'data': parsed_input.bip21.__dict__}
|
|
elif isinstance(parsed_input, InputType.NODE_ID):
|
|
result = {'type': 'NODE_ID', 'node_id': parsed_input.node_id}
|
|
else:
|
|
# Log raw data for unhandled types to aid debugging
|
|
logger.warning(f"Parsed unknown input type: {type(parsed_input)}")
|
|
result = {'type': 'UNKNOWN', 'raw_input': input_str, 'raw_parsed_object': str(parsed_input)}
|
|
|
|
logger.debug(f"Parsed input successfully. Type: {result.get('type')}")
|
|
logger.debug("Exiting parse_input")
|
|
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Error parsing input '{input_str}': {e}")
|
|
logger.debug("Exiting parse_input (error)")
|
|
raise
|
|
|
|
# Corrected type hint to LnUrlPayRequestData
|
|
def prepare_lnurl_pay(self, data: LnUrlPayRequestData, amount_sat: int, comment: Optional[str] = None, validate_success_action_url: bool = True) -> Dict[str, Any]:
|
|
"""
|
|
Prepares an LNURL-Pay request.
|
|
|
|
Args:
|
|
data: The LnUrlPayRequestData object from a parsed LNURL_PAY input's .data attribute.
|
|
amount_sat: Amount in satoshis.
|
|
comment: Optional comment.
|
|
validate_success_action_url: Whether to validate the success action URL.
|
|
Returns:
|
|
Dictionary with preparation details.
|
|
Raises:
|
|
TypeError: If data is not the correct object type.
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug(f"Entering prepare_lnurl_pay (amount={amount_sat}, comment={comment})")
|
|
try:
|
|
# Check if it's the correct type of SDK object
|
|
if not isinstance(data, LnUrlPayRequestData):
|
|
logger.error(f"prepare_lnurl_pay expects LnUrlPayRequestData object, but received {type(data)}.")
|
|
raise TypeError("prepare_lnurl_pay expects the SDK LnUrlPayRequestData object")
|
|
|
|
|
|
# Handle amount format for PayAmount
|
|
pay_amount = PayAmount.BITCOIN(amount_sat)
|
|
|
|
req = PrepareLnUrlPayRequest(
|
|
data=data, # Use the passed object
|
|
amount=pay_amount,
|
|
comment=comment,
|
|
validate_success_action_url=validate_success_action_url,
|
|
bip353_address=getattr(data, 'bip353_address', None) # Get bip353_address from the object
|
|
)
|
|
prepare_res = self.instance.prepare_lnurl_pay(req)
|
|
prepare_res_dict = prepare_res.__dict__
|
|
logger.info(f"Prepared LNURL-Pay. Fees: {prepare_res.fees_sat} sat.")
|
|
logger.debug(f"PrepareLnUrlPayRequest response: {prepare_res_dict}")
|
|
logger.debug("Exiting prepare_lnurl_pay")
|
|
|
|
return prepare_res_dict
|
|
except Exception as e:
|
|
logger.error(f"Error preparing LNURL-Pay: {e}")
|
|
logger.debug("Exiting prepare_lnurl_pay (error)")
|
|
raise
|
|
|
|
# Refined signature to expect the SDK object
|
|
def lnurl_pay(self, prepare_response: PrepareLnUrlPayResponse) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Executes an LNURL-Pay payment using prepared data.
|
|
|
|
Args:
|
|
prepare_response: The PrepareLnUrlPayResponse object returned by prepare_lnurl_pay.
|
|
Returns:
|
|
Dictionary with payment result details, or None if no specific result.
|
|
Raises:
|
|
TypeError: If prepare_response is not the correct type.
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug("Entering lnurl_pay")
|
|
try:
|
|
# Check if it's the correct type of SDK object
|
|
if not isinstance(prepare_response, PrepareLnUrlPayResponse):
|
|
logger.error(f"lnurl_pay expects PrepareLnUrlPayResponse object, but received {type(prepare_response)}.")
|
|
raise TypeError("lnurl_pay expects the SDK PrepareLnUrlPayResponse object")
|
|
|
|
req = LnUrlPayRequest(prepare_response=prepare_response) # Pass the actual object
|
|
result = self.instance.lnurl_pay(req)
|
|
result_dict = result.__dict__ if result else None # Result type depends on success action
|
|
logger.info("Executed LNURL-Pay.")
|
|
logger.debug(f"LNURL-Pay result: {result_dict}")
|
|
logger.debug("Exiting lnurl_pay")
|
|
return result_dict
|
|
except Exception as e:
|
|
logger.error(f"Error executing LNURL-Pay: {e}")
|
|
logger.debug("Exiting lnurl_pay (error)")
|
|
raise
|
|
|
|
# Corrected type hint to LnUrlAuthRequestData
|
|
def lnurl_auth(self, data: LnUrlAuthRequestData) -> bool:
|
|
"""
|
|
Performs LNURL-Auth.
|
|
|
|
Args:
|
|
data: The LnUrlAuthRequestData object from a parsed LNURL_AUTH input's .data attribute.
|
|
Returns:
|
|
True if authentication was successful, False otherwise.
|
|
Raises:
|
|
TypeError: If data is not the correct object type.
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug("Entering lnurl_auth")
|
|
try:
|
|
# Check if it's the correct type of SDK object
|
|
if not isinstance(data, LnUrlAuthRequestData):
|
|
logger.error(f"lnurl_auth expects LnUrlAuthRequestData object, but received {type(data)}.")
|
|
raise TypeError("lnurl_auth expects the SDK LnUrlAuthRequestData object")
|
|
|
|
result = self.instance.lnurl_auth(data) # Pass the actual object
|
|
is_ok = result.is_ok()
|
|
if is_ok:
|
|
logger.info("LNURL-Auth successful.")
|
|
else:
|
|
# Log the error message from the result if available
|
|
error_msg = getattr(result, 'error', 'Unknown error')
|
|
logger.warning(f"LNURL-Auth failed. Error: {error_msg}")
|
|
logger.debug(f"LNURL-Auth result: {is_ok}")
|
|
logger.debug("Exiting lnurl_auth")
|
|
return is_ok
|
|
except Exception as e:
|
|
logger.error(f"Error performing LNURL-Auth: {e}")
|
|
logger.debug("Exiting lnurl_auth (error)")
|
|
raise
|
|
|
|
# Corrected type hint to LnurlWithdrawRequestData
|
|
def lnurl_withdraw(self, data: LnUrlWithdrawRequestData, amount_msat: int, comment: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Performs LNURL-Withdraw.
|
|
|
|
Args:
|
|
data: The LnUrlWithdrawRequestData object from a parsed LNURL_WITHDRAW input's .data attribute.
|
|
amount_msat: Amount in millisatoshis to withdraw.
|
|
comment: Optional comment string.
|
|
Returns:
|
|
Dictionary with withdrawal result details, or None if no specific result.
|
|
Raises:
|
|
TypeError: If data is not the correct object type.
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug(f"Entering lnurl_withdraw (amount_msat={amount_msat}, comment={comment})")
|
|
try:
|
|
# Check if it's the correct type of SDK object
|
|
if not isinstance(data, LnUrlWithdrawRequestData):
|
|
logger.error(f"lnurl_withdraw expects LnUrlWithdrawRequestData object, but received {type(data)}.")
|
|
raise TypeError("lnurl_withdraw expects the SDK LnUrlWithdrawRequestData object")
|
|
|
|
# Basic validation for amount and comment
|
|
if not isinstance(amount_msat, int) or amount_msat <= 0:
|
|
logger.warning(f"Invalid amount_msat provided: {amount_msat}")
|
|
raise ValueError("amount_msat must be a positive integer.")
|
|
if comment is not None and not isinstance(comment, str):
|
|
logger.warning(f"Invalid comment type provided: {type(comment)}")
|
|
raise ValueError("comment must be a string or None.")
|
|
|
|
result = self.instance.lnurl_withdraw(data, amount_msat, comment) # Pass the actual object
|
|
result_dict = result.__dict__ if result else None # Check result type
|
|
logger.info("Executed LNURL-Withdraw.")
|
|
logger.debug(f"LNURL-Withdraw result: {result_dict}")
|
|
logger.debug("Exiting lnurl_withdraw")
|
|
return result_dict
|
|
except Exception as e:
|
|
logger.error(f"Error executing LNURL-Withdraw: {e}")
|
|
logger.debug("Exiting lnurl_withdraw (error)")
|
|
raise
|
|
|
|
# --- Onchain Operations ---
|
|
# fetch_pay_onchain_limits is covered by fetch_onchain_limits (public method)
|
|
|
|
def prepare_pay_onchain(self, amount_sat: Optional[int] = None, drain: bool = False, fee_rate_sat_per_vbyte: Optional[int] = None) -> Dict[str, Any]:
|
|
"""
|
|
Prepares an onchain payment (Bitcoin address).
|
|
|
|
Args:
|
|
amount_sat: Optional amount in satoshis (required unless drain is True).
|
|
drain: If True, prepares to send all funds.
|
|
fee_rate_sat_per_vbyte: Optional custom fee rate.
|
|
Returns:
|
|
Dictionary with preparation details.
|
|
Raises:
|
|
ValueError: If amount is missing for non-drain payment.
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug(f"Entering prepare_pay_onchain (amount={amount_sat}, drain={drain}, fee_rate={fee_rate_sat_per_vbyte})")
|
|
try:
|
|
# Determine amount object based on inputs
|
|
if drain:
|
|
amount_obj = PayAmount.DRAIN
|
|
logger.debug("Preparing onchain payment using DRAIN amount.")
|
|
elif amount_sat is not None:
|
|
amount_obj = PayAmount.BITCOIN(amount_sat)
|
|
logger.debug(f"Preparing onchain payment with amount: {amount_sat} sat.")
|
|
else:
|
|
logger.warning("Amount is missing for non-drain pay onchain.")
|
|
raise ValueError("Amount must be provided for non-drain payments.")
|
|
|
|
# Optional fee rate validation
|
|
if fee_rate_sat_per_vbyte is not None and (not isinstance(fee_rate_sat_per_vbyte, int) or fee_rate_sat_per_vbyte <= 0):
|
|
logger.warning(f"Invalid fee_rate_sat_per_vbyte provided: {fee_rate_sat_per_vbyte}")
|
|
raise ValueError("fee_rate_sat_per_vbyte must be a positive integer or None.")
|
|
|
|
|
|
req = PreparePayOnchainRequest(amount=amount_obj, fee_rate_sat_per_vbyte=fee_rate_sat_per_vbyte)
|
|
prepare_res = self.instance.prepare_pay_onchain(req)
|
|
prepare_res_dict = prepare_res.__dict__
|
|
logger.info(f"Prepared pay onchain. Total fees: {prepare_res.total_fees_sat} sat.")
|
|
logger.debug(f"PreparePayOnchainRequest response: {prepare_res_dict}")
|
|
logger.debug("Exiting prepare_pay_onchain")
|
|
return prepare_res_dict
|
|
except Exception as e:
|
|
logger.error(f"Error preparing pay onchain: {e}")
|
|
logger.debug("Exiting prepare_pay_onchain (error)")
|
|
raise
|
|
|
|
# Refined signature to expect the SDK object
|
|
def pay_onchain(self, address: str, prepare_response: PreparePayOnchainResponse):
|
|
"""
|
|
Executes an onchain payment using prepared data.
|
|
|
|
Args:
|
|
address: The destination Bitcoin address string.
|
|
prepare_response: The PreparePayOnchainResponse object returned by prepare_pay_onchain.
|
|
Raises:
|
|
TypeError: If prepare_response is not the correct type.
|
|
ValueError: If address is invalid.
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug(f"Entering pay_onchain to {address}")
|
|
try:
|
|
# Check if it's the correct type of SDK object
|
|
if not isinstance(prepare_response, PreparePayOnchainResponse):
|
|
logger.error(f"pay_onchain expects PreparePayOnchainResponse object, but received {type(prepare_response)}.")
|
|
raise TypeError("pay_onchain expects the SDK PreparePayOnchainResponse object")
|
|
|
|
# Basic check for address format (could add more robust validation)
|
|
if not isinstance(address, str) or not address:
|
|
logger.warning("Invalid or empty destination address provided for pay_onchain.")
|
|
raise ValueError("Destination address must be a non-empty string.")
|
|
|
|
|
|
req = PayOnchainRequest(address=address, prepare_response=prepare_response) # Pass the actual object
|
|
self.instance.pay_onchain(req)
|
|
logger.info(f"Onchain payment initiated to {address}.")
|
|
logger.debug("Exiting pay_onchain")
|
|
|
|
# Note: Onchain payments might not trigger an immediate SDK event like lightning payments
|
|
# You might need to poll list_payments or rely on webhooks to track final status.
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing pay onchain to {address}: {e}")
|
|
logger.debug("Exiting pay_onchain (error)")
|
|
raise
|
|
|
|
# list_refundable_payments method (already present, returns list of RefundableSwap objects)
|
|
def list_refundable_payments(self) -> List[RefundableSwap]:
|
|
"""
|
|
Lists refundable onchain swaps.
|
|
|
|
Returns:
|
|
List of RefundableSwap objects.
|
|
Raises:
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug("Entering list_refundable_payments")
|
|
try:
|
|
refundable_payments = self.instance.list_refundables()
|
|
logger.debug(f"Found {len(refundable_payments)} refundable payments.")
|
|
logger.debug("Exiting list_refundable_payments")
|
|
return refundable_payments # Return the list of objects directly
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error listing refundable payments: {e}")
|
|
logger.debug("Exiting list_refundable_payments (error)")
|
|
raise
|
|
|
|
# Updated signature and type hint to RefundableSwap and explicit refund_address
|
|
def execute_refund(self, refundable_swap: RefundableSwap, refund_address: str, fee_rate_sat_per_vbyte: int):
|
|
"""
|
|
Executes a refund for a refundable swap.
|
|
|
|
Args:
|
|
refundable_swap: The RefundableSwap object to refund.
|
|
refund_address: The destination address string for the refund.
|
|
fee_rate_sat_per_vbyte: The desired fee rate in satoshis per vbyte for the refund transaction.
|
|
Raises:
|
|
TypeError: If refundable_swap is not the correct type.
|
|
ValueError: If refund_address or fee_rate_sat_per_vbyte is invalid.
|
|
Exception: For any SDK errors.
|
|
"""
|
|
# Using getattr with a default for logging in case refundable_swap is None or malformed (though type hint should prevent this)
|
|
logger.debug(f"Entering execute_refund for swap {getattr(refundable_swap, 'swap_address', 'N/A')} to {refund_address} with fee rate {fee_rate_sat_per_vbyte}")
|
|
try:
|
|
# Check if it's the correct type of SDK object
|
|
if not isinstance(refundable_swap, RefundableSwap):
|
|
logger.error(f"execute_refund expects RefundableSwap object, but received {type(refundable_swap)}.")
|
|
raise TypeError("execute_refund expects the SDK RefundableSwap object")
|
|
|
|
# Basic check for refund_address format (could add more robust validation)
|
|
if not isinstance(refund_address, str) or not refund_address:
|
|
logger.warning("Invalid or empty refund_address provided for execute_refund.")
|
|
raise ValueError("Refund destination address must be a non-empty string.")
|
|
|
|
if not isinstance(fee_rate_sat_per_vbyte, int) or fee_rate_sat_per_vbyte <= 0:
|
|
logger.warning(f"Invalid fee_rate_sat_per_vbyte provided: {fee_rate_sat_per_vbyte}")
|
|
raise ValueError("fee_rate_sat_per_vbyte must be a positive integer.")
|
|
|
|
|
|
req = RefundRequest(
|
|
swap_address=refundable_swap.swap_address, # Use address from the object
|
|
refund_address=refund_address,
|
|
fee_rate_sat_per_vbyte=fee_rate_sat_per_vbyte
|
|
)
|
|
self.instance.refund(req)
|
|
logger.info(f"Refund initiated for swap {refundable_swap.swap_address} to {refund_address}.")
|
|
logger.debug("Exiting execute_refund")
|
|
|
|
# Note: Onchain refunds might not trigger an immediate SDK event
|
|
# You might need to poll list_payments or rely on webhooks to track final status.
|
|
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing refund for swap {getattr(refundable_swap, 'swap_address', 'N/A')}: {e}")
|
|
logger.debug("Exiting execute_refund (error)")
|
|
raise
|
|
|
|
# rescan_swaps method (already present)
|
|
def rescan_swaps(self):
|
|
"""
|
|
Rescans onchain swaps.
|
|
|
|
Raises:
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug("Entering rescan_swaps")
|
|
try:
|
|
self.instance.rescan_onchain_swaps()
|
|
logger.info("Onchain swaps rescan initiated.")
|
|
logger.debug("Exiting rescan_swaps")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error rescanning swaps: {e}")
|
|
logger.debug("Exiting rescan_swaps (error)")
|
|
raise
|
|
|
|
def recommended_fees(self) -> Dict[str, int]:
|
|
"""
|
|
Fetches recommended transaction fees.
|
|
|
|
Returns:
|
|
Dictionary with fee rate estimates (e.g., {'fastest': 100, 'half_hour': 50, ...}).
|
|
Raises:
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug("Entering recommended_fees")
|
|
try:
|
|
fees = self.instance.recommended_fees()
|
|
# Assuming recommended_fees returns an object with __dict__ or similar fee structure
|
|
fees_dict = fees.__dict__ if fees else {} # Convert to dict
|
|
logger.debug(f"Fetched recommended fees: {fees_dict}")
|
|
logger.debug("Exiting recommended_fees")
|
|
return fees_dict
|
|
except Exception as e:
|
|
logger.error(f"Error fetching recommended fees: {e}")
|
|
logger.debug("Exiting recommended_fees (error)")
|
|
raise
|
|
|
|
def handle_payments_waiting_fee_acceptance(self):
|
|
"""
|
|
Fetches and automatically accepts payments waiting for fee acceptance.
|
|
In a real app, you would add logic to decide whether to accept the fees.
|
|
|
|
Raises:
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug("Entering handle_payments_waiting_fee_acceptance")
|
|
try:
|
|
logger.info("Checking for payments waiting for fee acceptance...")
|
|
# Filter for WAITING_FEE_ACCEPTANCE state
|
|
payments_waiting = self.instance.list_payments(
|
|
ListPaymentsRequest(states=[PaymentState.WAITING_FEE_ACCEPTANCE])
|
|
)
|
|
|
|
handled_count = 0
|
|
for payment in payments_waiting:
|
|
# Double-check payment type and swap_id as per doc example
|
|
if not isinstance(payment.details, PaymentDetails.BITCOIN) or not payment.details.swap_id:
|
|
logger.warning(f"Skipping payment in WAITING_FEE_ACCEPTANCE state without Bitcoin details or swap_id: {getattr(payment, 'destination', 'N/A')}")
|
|
continue
|
|
|
|
swap_id = payment.details.swap_id
|
|
logger.info(f"Found payment waiting fee acceptance: {getattr(payment, 'destination', 'N/A')} (Swap ID: {swap_id})")
|
|
|
|
fetch_fees_req = FetchPaymentProposedFeesRequest(swap_id=swap_id)
|
|
fetch_fees_response = self.instance.fetch_payment_proposed_fees(fetch_fees_req)
|
|
|
|
logger.info(
|
|
f"Payer sent {fetch_fees_response.payer_amount_sat} "
|
|
f"and currently proposed fees are {fetch_fees_response.fees_sat}"
|
|
)
|
|
|
|
# --- Decision Point: Accept Fees? ---
|
|
# In a real application, you would implement logic here to decide if the proposed fees
|
|
# are acceptable based on your application's criteria.
|
|
# For this example, we will automatically accept.
|
|
logger.info(f"Automatically accepting proposed fees for swap {swap_id}.")
|
|
# --- End Decision Point ---
|
|
|
|
accept_fees_req = AcceptPaymentProposedFeesRequest(response=fetch_fees_response)
|
|
self.instance.accept_payment_proposed_fees(accept_fees_req)
|
|
logger.info(f"Accepted proposed fees for swap {swap_id}.")
|
|
handled_count += 1
|
|
|
|
logger.info(f"Finished checking for payments waiting fee acceptance. Handled {handled_count}.")
|
|
logger.debug("Exiting handle_payments_waiting_fee_acceptance")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error handling payments waiting fee acceptance: {e}")
|
|
logger.debug("Exiting handle_payments_waiting_fee_acceptance (error)")
|
|
raise
|
|
|
|
|
|
# --- Working with Non-Bitcoin Assets ---
|
|
# Asset Metadata configuration is done in __init__
|
|
|
|
# prepare_receive_asset is covered by receive_payment with asset_id parameter
|
|
|
|
# prepare_send_payment_asset is covered by the updated send_payment with asset_id parameter
|
|
|
|
def fetch_asset_balance(self) -> Dict[str, Any]:
|
|
"""
|
|
Fetches the balance of all assets (Bitcoin and others).
|
|
Note: This information is part of get_info().
|
|
|
|
Returns:
|
|
Dictionary containing asset balances.
|
|
Raises:
|
|
Exception: For any SDK errors from get_info.
|
|
"""
|
|
logger.debug("Entering fetch_asset_balance")
|
|
try:
|
|
# This information is part of get_info().wallet_info.asset_balances
|
|
# Calling get_info handles sync and error logging
|
|
info = self.get_info()
|
|
# Extract asset_balances from the returned info dictionary
|
|
asset_balances = info.get('wallet_info', {}).get('asset_balances', {})
|
|
|
|
# The asset_balances value is a list of AssetBalance objects.
|
|
# You might want to convert these to dictionaries too for consistency if needed.
|
|
# For now, returning the list of objects as is fetched by get_info.
|
|
# If conversion is needed:
|
|
# converted_balances = [bal.__dict__ for bal in asset_balances]
|
|
|
|
logger.debug(f"Fetched asset balances: {asset_balances}")
|
|
logger.debug("Exiting fetch_asset_balance")
|
|
return asset_balances # Or return converted_balances
|
|
|
|
except Exception as e:
|
|
# get_info already logs, this catch is mainly to ensure debug exit logging
|
|
# If get_info fails, it raises, so this block might not be reached
|
|
logger.error(f"Error fetching asset balance (via get_info): {e}")
|
|
logger.debug("Exiting fetch_asset_balance (error)")
|
|
raise
|
|
|
|
|
|
# --- Webhook Management ---
|
|
def register_webhook(self, webhook_url: str):
|
|
"""
|
|
Registers a webhook URL for receiving notifications.
|
|
|
|
Args:
|
|
webhook_url: The URL string to register.
|
|
Raises:
|
|
ValueError: If webhook_url is invalid.
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug(f"Entering register_webhook with URL: {webhook_url}")
|
|
try:
|
|
# Basic URL format validation (can be made more robust)
|
|
if not isinstance(webhook_url, str) or not webhook_url.startswith('https://'):
|
|
logger.warning(f"Invalid webhook_url provided: {webhook_url}")
|
|
raise ValueError("Webhook URL must be a valid HTTPS URL.")
|
|
|
|
self.instance.register_webhook(webhook_url)
|
|
logger.info(f"Webhook registered: {webhook_url}")
|
|
logger.debug("Exiting register_webhook")
|
|
except Exception as e:
|
|
logger.error(f"Error registering webhook {webhook_url}: {e}")
|
|
logger.debug("Exiting register_webhook (error)")
|
|
raise
|
|
|
|
def unregister_webhook(self):
|
|
"""
|
|
Unregisters the currently registered webhook.
|
|
|
|
Raises:
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug("Entering unregister_webhook")
|
|
try:
|
|
self.instance.unregister_webhook()
|
|
logger.info("Webhook unregistered.")
|
|
logger.debug("Exiting unregister_webhook")
|
|
except Exception as e:
|
|
logger.error(f"Error unregistering webhook: {e}")
|
|
logger.debug("Exiting unregister_webhook (error)")
|
|
raise
|
|
|
|
# --- Utilities and Message Signing ---
|
|
# parse_input is implemented above
|
|
|
|
def sign_message(self, message: str) -> Dict[str, str]:
|
|
"""
|
|
Signs a message with the wallet's key.
|
|
|
|
Args:
|
|
message: The message string to sign.
|
|
Returns:
|
|
Dictionary with the signature and the wallet's public key string.
|
|
Raises:
|
|
ValueError: If message is invalid.
|
|
Exception: For any SDK errors.
|
|
"""
|
|
# Log truncated message to avoid logging potentially sensitive full messages
|
|
logger.debug(f"Entering sign_message with message (truncated): {message[:50]}...")
|
|
try:
|
|
if not isinstance(message, str) or not message:
|
|
logger.warning("Invalid or empty message provided for signing.")
|
|
raise ValueError("Message to sign must be a non-empty string.")
|
|
|
|
req = SignMessageRequest(message=message)
|
|
sign_res = self.instance.sign_message(req)
|
|
# Fetch info AFTER signing to get the pubkey that was used
|
|
info = self.instance.get_info()
|
|
|
|
pubkey = info.wallet_info.pubkey if info and info.wallet_info else None
|
|
|
|
if not pubkey:
|
|
logger.warning("Could not retrieve wallet pubkey after signing message.")
|
|
# Decide how to handle this - return None for pubkey or raise error
|
|
# Returning None for pubkey might be acceptable, the signature is the main result.
|
|
pass
|
|
|
|
|
|
result = {
|
|
'signature': sign_res.signature,
|
|
'pubkey': pubkey,
|
|
}
|
|
logger.info("Message signed.")
|
|
logger.debug("Exiting sign_message")
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Error signing message: {e}")
|
|
logger.debug("Exiting sign_message (error)")
|
|
raise
|
|
|
|
def check_message(self, message: str, pubkey: str, signature: str) -> bool:
|
|
"""
|
|
Verifies a signature against a message and public key.
|
|
|
|
Args:
|
|
message: The original message string.
|
|
pubkey: The public key string used for signing.
|
|
signature: The signature string to verify.
|
|
Returns:
|
|
True if the signature is valid, False otherwise.
|
|
Raises:
|
|
ValueError: If message, pubkey, or signature are invalid.
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug(f"Entering check_message for message (truncated): {message[:50]}...")
|
|
try:
|
|
if not isinstance(message, str) or not message:
|
|
logger.warning("Invalid or empty message provided for checking.")
|
|
raise ValueError("Message to check must be a non-empty string.")
|
|
if not isinstance(pubkey, str) or not pubkey:
|
|
logger.warning("Invalid or empty pubkey provided for checking.")
|
|
raise ValueError("Pubkey must be a non-empty string.")
|
|
if not isinstance(signature, str) or not signature:
|
|
logger.warning("Invalid or empty signature provided for checking.")
|
|
raise ValueError("Signature must be a non-empty string.")
|
|
|
|
|
|
req = CheckMessageRequest(message=message, pubkey=pubkey, signature=signature)
|
|
check_res = self.instance.check_message(req)
|
|
is_valid = check_res.is_valid
|
|
logger.info(f"Message signature check result: {is_valid}")
|
|
logger.debug("Exiting check_message")
|
|
return is_valid
|
|
except Exception as e:
|
|
logger.error(f"Error checking message signature: {e}")
|
|
logger.debug("Exiting check_message (error)")
|
|
raise
|
|
|
|
# External Input Parser configuration is done in __init__
|
|
|
|
# Payment Limits
|
|
# Keeping the explicit fetch methods as they are clearer
|
|
|
|
def fetch_lightning_limits(self) -> Dict[str, Any]:
|
|
"""
|
|
Fetches current Lightning payment limits.
|
|
|
|
Returns:
|
|
Dictionary containing receive and send limits.
|
|
Raises:
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug("Entering fetch_lightning_limits")
|
|
try:
|
|
limits = self.instance.fetch_lightning_limits()
|
|
limits_dict = {
|
|
'receive': limits.receive.__dict__ if limits.receive else None,
|
|
'send': limits.send.__dict__ if limits.send else None,
|
|
}
|
|
logger.debug(f"Fetched lightning limits: {limits_dict}")
|
|
logger.debug("Exiting fetch_lightning_limits")
|
|
return limits_dict
|
|
except Exception as e:
|
|
logger.error(f"Error fetching lightning limits: {e}")
|
|
logger.debug("Exiting fetch_lightning_limits (error)")
|
|
raise
|
|
|
|
def fetch_onchain_limits(self) -> Dict[str, Any]:
|
|
"""
|
|
Fetches current onchain payment limits (used for Bitcoin send/receive).
|
|
|
|
Returns:
|
|
Dictionary containing receive and send limits.
|
|
Raises:
|
|
Exception: For any SDK errors.
|
|
"""
|
|
logger.debug("Entering fetch_onchain_limits")
|
|
try:
|
|
limits = self.instance.fetch_onchain_limits()
|
|
limits_dict = {
|
|
'receive': limits.receive.__dict__ if limits.receive else None,
|
|
'send': limits.send.__dict__ if limits.send else None,
|
|
}
|
|
logger.debug(f"Fetched onchain limits: {limits_dict}")
|
|
logger.debug("Exiting fetch_onchain_limits")
|
|
return limits_dict
|
|
except Exception as e:
|
|
logger.error(f"Error fetching onchain limits: {e}")
|
|
logger.debug("Exiting fetch_onchain_limits (error)")
|
|
raise
|
|
|
|
def sdk_to_dict(self, obj):
|
|
if isinstance(obj, (str, int, float, bool, type(None))):
|
|
return obj
|
|
if isinstance(obj, list):
|
|
return [self.sdk_to_dict(i) for i in obj]
|
|
if hasattr(obj, '__dict__'):
|
|
return {k: self.sdk_to_dict(v) for k, v in obj.__dict__.items()}
|
|
return str(obj) # fallback
|
|
|
|
def check_payment_status(self, payment_identifier: str) -> Dict[str, Any]:
|
|
"""
|
|
Checks the status of a payment by its identifier (payment hash, destination, or swap ID).
|
|
For WooCommerce integration, we consider both WAITING_CONFIRMATION and SUCCEEDED as successful states
|
|
since WAITING_CONFIRMATION means the payment is irreversible (just waiting for onchain confirmation).
|
|
|
|
The payment states follow the SDK states directly:
|
|
- PENDING: Swap service is holding payment, lockup transaction broadcast
|
|
- WAITING_CONFIRMATION: Claim transaction broadcast or direct Liquid transaction seen (considered successful)
|
|
- SUCCEEDED: Claim transaction or direct Liquid transaction confirmed
|
|
- FAILED: Swap failed (expired or lockup transaction failed)
|
|
- WAITING_FEE_ACCEPTANCE: Payment requires fee acceptance
|
|
- UNKNOWN: Payment not found or status cannot be determined
|
|
|
|
Args:
|
|
payment_identifier: Payment hash, destination, or swap ID string.
|
|
|
|
Returns:
|
|
Dictionary containing:
|
|
- status: Current payment state from SDK
|
|
- payment_details: Full payment details if available
|
|
- error: Error message if payment failed
|
|
- timestamp: When the payment was initiated/completed
|
|
- amount_sat: Payment amount in satoshis
|
|
- fees_sat: Payment fees in satoshis
|
|
"""
|
|
logger.debug(f"Checking payment status for identifier: {payment_identifier}")
|
|
try:
|
|
if not isinstance(payment_identifier, str) or not payment_identifier:
|
|
raise ValueError("Invalid payment identifier")
|
|
|
|
# Always try to get fresh SDK status first for new payments
|
|
payment = None
|
|
try:
|
|
payment = self.instance.get_payment(GetPaymentRequest.PAYMENT_HASH(payment_identifier))
|
|
if payment:
|
|
status = str(payment.status)
|
|
# Update our internal tracking
|
|
self.listener.payment_statuses[payment_identifier] = status
|
|
# If payment is in a final state, add to paid list if successful
|
|
if status in ['WAITING_CONFIRMATION', 'SUCCEEDED']:
|
|
if payment_identifier not in self.listener.paid:
|
|
self.listener.paid.append(payment_identifier)
|
|
logger.info(f"Payment {payment_identifier} marked as paid (status: {status})")
|
|
|
|
return {
|
|
'status': status,
|
|
'payment_details': self.sdk_to_dict(payment),
|
|
'error': None if status not in ['FAILED'] else 'Payment failed',
|
|
'timestamp': payment.timestamp,
|
|
'amount_sat': payment.amount_sat,
|
|
'fees_sat': payment.fees_sat
|
|
}
|
|
except Exception as e:
|
|
logger.debug(f"Payment hash lookup failed: {str(e)}")
|
|
|
|
# Try swap ID lookup if payment hash lookup failed
|
|
try:
|
|
payment = self.instance.get_payment(GetPaymentRequest.SWAP_ID(payment_identifier))
|
|
if payment:
|
|
status = str(payment.status)
|
|
# Update our internal tracking
|
|
self.listener.payment_statuses[payment_identifier] = status
|
|
# If payment is in a final state, add to paid list if successful
|
|
if status in ['WAITING_CONFIRMATION', 'SUCCEEDED']:
|
|
if payment_identifier not in self.listener.paid:
|
|
self.listener.paid.append(payment_identifier)
|
|
logger.info(f"Payment {payment_identifier} marked as paid (status: {status})")
|
|
|
|
return {
|
|
'status': status,
|
|
'payment_details': self.sdk_to_dict(payment),
|
|
'error': None if status not in ['FAILED'] else 'Payment failed',
|
|
'timestamp': payment.timestamp,
|
|
'amount_sat': payment.amount_sat,
|
|
'fees_sat': payment.fees_sat
|
|
}
|
|
except Exception as e:
|
|
logger.debug(f"Swap ID lookup failed: {str(e)}")
|
|
|
|
# If we couldn't get fresh status, check our internal state
|
|
# This helps with payments we've seen before but might temporarily fail to fetch
|
|
if payment_identifier in self.listener.paid:
|
|
logger.debug(f"Found payment in internal paid list: {payment_identifier}")
|
|
return {
|
|
'status': 'SUCCEEDED', # We consider it succeeded if it was in paid list
|
|
'payment_details': None,
|
|
'error': None,
|
|
'timestamp': None,
|
|
'amount_sat': None,
|
|
'fees_sat': None
|
|
}
|
|
|
|
# Check cached status as last resort
|
|
cached_status = self.listener.get_payment_status(payment_identifier)
|
|
if cached_status:
|
|
logger.debug(f"Using cached status: {cached_status}")
|
|
return {
|
|
'status': cached_status,
|
|
'payment_details': None,
|
|
'error': None if cached_status not in ['FAILED'] else 'Payment failed',
|
|
'timestamp': None,
|
|
'amount_sat': None,
|
|
'fees_sat': None
|
|
}
|
|
|
|
# If we get here, we couldn't find the payment
|
|
logger.debug(f"No payment found for identifier: {payment_identifier}")
|
|
return {
|
|
'status': 'UNKNOWN',
|
|
'payment_details': None,
|
|
'error': 'Payment not found',
|
|
'timestamp': None,
|
|
'amount_sat': None,
|
|
'fees_sat': None
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking payment status: {str(e)}")
|
|
raise
|
|
|
|
def get_exchange_rate(self, currency: str = None) -> Dict[str, Any]:
|
|
"""
|
|
Fetches current exchange rates, optionally filtered by currency.
|
|
|
|
Args:
|
|
currency: Optional currency code (e.g., 'EUR', 'USD'). If provided, returns only that rate.
|
|
Returns:
|
|
Dictionary containing exchange rates. Format:
|
|
If currency specified: {'currency': 'EUR', 'rate': 123.45}
|
|
If no currency: {'EUR': 123.45, 'USD': 234.56, ...}
|
|
Raises:
|
|
ValueError: If specified currency is not found
|
|
Exception: For any SDK errors
|
|
"""
|
|
logger.debug(f"Entering get_exchange_rate for currency: {currency}")
|
|
try:
|
|
rates = self.instance.fetch_fiat_rates()
|
|
rates_dict = {}
|
|
|
|
# Convert rates to dictionary
|
|
for rate in rates:
|
|
rates_dict[rate.coin] = rate.value
|
|
|
|
if currency:
|
|
currency = currency.upper()
|
|
if currency not in rates_dict:
|
|
logger.warning(f"Requested currency {currency} not found in available rates")
|
|
raise ValueError(f"Exchange rate not available for currency: {currency}")
|
|
logger.info(f"Found exchange rate for {currency}: {rates_dict[currency]}")
|
|
return {
|
|
'currency': currency,
|
|
'rate': rates_dict[currency]
|
|
}
|
|
|
|
logger.info(f"Returning all exchange rates for {len(rates_dict)} currencies")
|
|
return rates_dict
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching exchange rate: {str(e)}")
|
|
logger.exception("Full error details:")
|
|
raise |