diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fc77a77..4d2b663d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ ### Added - cashu: `KeySetInfos` type alias and `KeySetInfosMethods` trait for filtering keysets ([thesimplekid]). +- cdk: Mint lifecycle management with `start()` and `stop()` methods for graceful background service control ([thesimplekid]). +- cdk: Background task management for invoice payment monitoring with proper shutdown handling ([thesimplekid]). ### Changed - cdk: Refactored wallet keyset management methods for better clarity and separation of concerns ([thesimplekid]). @@ -17,6 +19,9 @@ - cdk: Improved `load_mint_keysets` method to be the primary method for getting keysets for token operations ([thesimplekid]). - cdk: Enhanced keyset management with better offline/online operation separation ([thesimplekid]). - cdk: Updated method documentation to clarify storage vs network operations ([thesimplekid]). +- cdk: Refactored invoice payment monitoring to use centralized lifecycle management instead of manual task spawning ([thesimplekid]). +- cdk-mintd: Updated to use new mint lifecycle methods for improved service management ([thesimplekid]). +- cdk-integration-tests: Updated test utilities to use new mint lifecycle management ([thesimplekid]). diff --git a/crates/cdk-integration-tests/src/init_pure_tests.rs b/crates/cdk-integration-tests/src/init_pure_tests.rs index 9d64035f..f55e1d1d 100644 --- a/crates/cdk-integration-tests/src/init_pure_tests.rs +++ b/crates/cdk-integration-tests/src/init_pure_tests.rs @@ -24,7 +24,7 @@ use cdk::util::unix_time; use cdk::wallet::{AuthWallet, MintConnector, Wallet, WalletBuilder}; use cdk::{Amount, Error, Mint}; use cdk_fake_wallet::FakeWallet; -use tokio::sync::{Notify, RwLock}; +use tokio::sync::RwLock; use tracing_subscriber::EnvFilter; use uuid::Uuid; @@ -282,12 +282,7 @@ pub async fn create_and_start_test_mint() -> Result { .build_with_seed(localstore.clone(), &mnemonic.to_seed_normalized("")) .await?; - let mint_clone = mint.clone(); - let shutdown = Arc::new(Notify::new()); - tokio::spawn({ - let shutdown = Arc::clone(&shutdown); - async move { mint_clone.wait_for_paid_invoices(shutdown).await } - }); + mint.start().await?; Ok(mint) } diff --git a/crates/cdk-mintd/src/main.rs b/crates/cdk-mintd/src/main.rs index 6296325c..59cbe46b 100644 --- a/crates/cdk-mintd/src/main.rs +++ b/crates/cdk-mintd/src/main.rs @@ -49,7 +49,6 @@ use cdk_mintd::setup::LnBackendSetup; use cdk_sqlite::mint::MintSqliteAuthDatabase; use cdk_sqlite::MintSqliteDatabase; use clap::Parser; -use tokio::sync::Notify; use tower::ServiceBuilder; use tower_http::compression::CompressionLayer; use tower_http::decompression::RequestDecompressionLayer; @@ -757,12 +756,7 @@ async fn start_services( mint_service = mint_service.merge(router); } - let shutdown = Arc::new(Notify::new()); - let mint_clone = Arc::clone(&mint); - tokio::spawn({ - let shutdown = Arc::clone(&shutdown); - async move { mint_clone.wait_for_paid_invoices(shutdown).await } - }); + mint.start().await?; let socket_addr = SocketAddr::from_str(&format!("{listen_addr}:{listen_port}"))?; @@ -784,8 +778,7 @@ async fn start_services( } } - // Notify all waiting tasks to shutdown - shutdown.notify_waiters(); + mint.stop().await?; #[cfg(feature = "management-rpc")] { diff --git a/crates/cdk/src/mint/mod.rs b/crates/cdk/src/mint/mod.rs index 522076d4..0221ba82 100644 --- a/crates/cdk/src/mint/mod.rs +++ b/crates/cdk/src/mint/mod.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use arc_swap::ArcSwap; use cdk_common::common::{PaymentProcessorKey, QuoteTTL}; @@ -9,14 +10,15 @@ use cdk_common::common::{PaymentProcessorKey, QuoteTTL}; use cdk_common::database::MintAuthDatabase; use cdk_common::database::{self, MintDatabase, MintTransaction}; use cdk_common::nuts::{self, BlindSignature, BlindedMessage, CurrencyUnit, Id, Kind}; +use cdk_common::payment::WaitPaymentResponse; use cdk_common::secret; use cdk_signatory::signatory::{Signatory, SignatoryKeySet}; use futures::StreamExt; #[cfg(feature = "auth")] use nut21::ProtectedEndpoint; use subscription::PubSubManager; -use tokio::sync::Notify; -use tokio::task::JoinSet; +use tokio::sync::{Mutex, Notify}; +use tokio::task::{JoinHandle, JoinSet}; use tracing::instrument; use uuid::Uuid; @@ -68,6 +70,17 @@ pub struct Mint { oidc_client: Option, /// In-memory keyset keysets: Arc>>, + /// Background task management + task_state: Arc>, +} + +/// State for managing background tasks +#[derive(Default)] +struct TaskState { + /// Shutdown signal for all background tasks + shutdown_notify: Option>, + /// Handle to the main supervisor task + supervisor_handle: Option>>, } impl Mint { @@ -168,9 +181,115 @@ impl Mint { #[cfg(feature = "auth")] auth_localstore, keysets: Arc::new(ArcSwap::new(keysets.keysets.into())), + task_state: Arc::new(Mutex::new(TaskState::default())), }) } + /// Start the mint's background services and operations + /// + /// This function immediately starts background services and returns. The background + /// tasks will continue running until `stop()` is called. + /// + /// # Returns + /// + /// Returns `Ok(())` if background services started successfully, or an `Error` + /// if startup failed. + /// + /// # Background Services + /// + /// Currently manages: + /// - Invoice payment monitoring across all configured payment processors + /// + /// Future services may include: + /// - Quote cleanup and expiration management + /// - Periodic database maintenance + /// - Health check monitoring + /// - Metrics collection + pub async fn start(&self) -> Result<(), Error> { + let mut task_state = self.task_state.lock().await; + + // Prevent starting if already running + if task_state.shutdown_notify.is_some() { + return Err(Error::Internal); // Already started + } + + // Create shutdown signal + let shutdown_notify = Arc::new(Notify::new()); + + // Clone required components for the background task + let payment_processors = self.payment_processors.clone(); + let localstore = Arc::clone(&self.localstore); + let pubsub_manager = Arc::clone(&self.pubsub_manager); + let shutdown_clone = shutdown_notify.clone(); + + // Spawn the supervisor task + let supervisor_handle = tokio::spawn(async move { + Self::wait_for_paid_invoices( + &payment_processors, + localstore, + pubsub_manager, + shutdown_clone, + ) + .await + }); + + // Store the handles + task_state.shutdown_notify = Some(shutdown_notify); + task_state.supervisor_handle = Some(supervisor_handle); + + // Give the background task a tiny bit of time to start waiting + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + tracing::info!("Mint background services started"); + Ok(()) + } + + /// Stop all background services and wait for graceful shutdown + /// + /// This function signals all background tasks to shut down and waits for them + /// to complete gracefully. It's safe to call multiple times. + /// + /// # Returns + /// + /// Returns `Ok(())` when all background services have shut down cleanly, or an + /// `Error` if there was an issue during shutdown. + pub async fn stop(&self) -> Result<(), Error> { + let mut task_state = self.task_state.lock().await; + + // Take the handles out of the state + let shutdown_notify = task_state.shutdown_notify.take(); + let supervisor_handle = task_state.supervisor_handle.take(); + + // If nothing to stop, return early + let (shutdown_notify, supervisor_handle) = match (shutdown_notify, supervisor_handle) { + (Some(notify), Some(handle)) => (notify, handle), + _ => { + tracing::debug!("Stop called but no background services were running"); + return Ok(()); // Nothing to stop + } + }; + + // Drop the lock before waiting + drop(task_state); + + tracing::info!("Stopping mint background services..."); + + // Signal shutdown + shutdown_notify.notify_waiters(); + + // Wait for supervisor to complete + match supervisor_handle.await { + Ok(result) => { + tracing::info!("Mint background services stopped"); + result + } + Err(join_error) => { + tracing::error!("Background service task panicked: {:?}", join_error); + Err(Error::Internal) + } + } + } + /// Get the payment processor for the given unit and payment method pub fn get_payment_processor( &self, @@ -262,81 +381,195 @@ impl Mint { Ok(tx.commit().await?) } - /// Wait for any invoice to be paid /// For each backend starts a task that waits for any invoice to be paid /// Once invoice is paid mint quote status is updated #[instrument(skip_all)] - pub async fn wait_for_paid_invoices(&self, shutdown: Arc) -> Result<(), Error> { - let mint_arc = Arc::new(self.clone()); - + async fn wait_for_paid_invoices( + payment_processors: &HashMap< + PaymentProcessorKey, + Arc + Send + Sync>, + >, + localstore: Arc + Send + Sync>, + pubsub_manager: Arc, + shutdown: Arc, + ) -> Result<(), Error> { let mut join_set = JoinSet::new(); - let mut processor_groups: Vec<( - Arc + Send + Sync>, - Vec, - )> = Vec::new(); - - for (key, ln) in self.payment_processors.iter() { - // Check if we already have this processor - let found = processor_groups.iter_mut().find(|(proc_ref, _)| { - // Compare Arc pointer equality using ptr_eq - Arc::ptr_eq(proc_ref, ln) - }); - - if let Some((_, keys)) = found { - // We found this processor, add the key to its group - keys.push(key.clone()); - } else { - // New processor, create a new group - processor_groups.push((Arc::clone(ln), vec![key.clone()])); + // Group processors by unique instance (using Arc pointer equality) + let mut seen_processors = Vec::new(); + for (key, processor) in payment_processors { + // Skip if processor is already active + if processor.is_wait_invoice_active() { + continue; } + + // Skip if we've already spawned a task for this processor instance + if seen_processors.iter().any(|p| Arc::ptr_eq(p, processor)) { + continue; + } + + seen_processors.push(Arc::clone(processor)); + + tracing::info!("Starting payment wait task for {:?}", key); + + // Clone for the spawned task + let processor = Arc::clone(processor); + let localstore = Arc::clone(&localstore); + let pubsub_manager = Arc::clone(&pubsub_manager); + let shutdown = Arc::clone(&shutdown); + + join_set.spawn(async move { + let result = Self::wait_for_processor_payments( + processor, + localstore, + pubsub_manager, + shutdown, + ) + .await; + + if let Err(e) = result { + tracing::error!("Payment processor task failed: {:?}", e); + } + }); } - for (ln, key) in processor_groups { - if !ln.is_wait_invoice_active() { - tracing::info!("Wait payment for {:?} inactive starting.", key); - let mint = Arc::clone(&mint_arc); - let ln = Arc::clone(&ln); - let shutdown = Arc::clone(&shutdown); - let key = key.clone(); - join_set.spawn(async move { + // If no payment processors, just wait for shutdown + if join_set.is_empty() { + shutdown.notified().await; + } else { + // Wait for shutdown or all tasks to complete loop { - tracing::info!("Restarting wait for: {:?}", key); tokio::select! { _ = shutdown.notified() => { - tracing::info!("Shutdown signal received, stopping task for {:?}", key); - ln.cancel_wait_invoice(); + println!("Shutting down payment processors"); break; } - result = ln.wait_any_incoming_payment() => { - match result { - Ok(mut stream) => { - while let Some(request_lookup_id) = stream.next().await { - if let Err(err) = mint.pay_mint_quote_for_request_id(request_lookup_id).await { - tracing::warn!("{:?}", err); - } - } - } - Err(err) => { - tracing::warn!("Could not get incoming payment stream for {:?}: {}",key, err); - - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - } + Some(result) = join_set.join_next() => { + if let Err(e) = result { + tracing::warn!("Task panicked: {:?}", e); } } - } - } - }); + else => break, // All tasks completed + } } } - // Spawn a task to manage the JoinSet - while let Some(result) = join_set.join_next().await { - match result { - Ok(_) => tracing::info!("A task completed successfully."), - Err(err) => tracing::warn!("A task failed: {:?}", err), + join_set.shutdown().await; + Ok(()) + } + + /// Handles payment waiting for a single processor + async fn wait_for_processor_payments( + processor: Arc + Send + Sync>, + localstore: Arc + Send + Sync>, + pubsub_manager: Arc, + shutdown: Arc, + ) -> Result<(), Error> { + loop { + tokio::select! { + _ = shutdown.notified() => { + processor.cancel_wait_invoice(); + break; + } + result = processor.wait_any_incoming_payment() => { + match result { + Ok(mut stream) => { + while let Some(request_lookup_id) = stream.next().await { + if let Err(e) = Self::handle_payment_notification( + &localstore, + &pubsub_manager, + request_lookup_id, + ).await { + tracing::warn!("Payment notification error: {:?}", e); + } + } + } + Err(e) => { + tracing::warn!("Failed to get payment stream: {}", e); + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + } } } + Ok(()) + } + + /// Handle payment notification without needing full Mint instance + /// This is a helper function that can be called with just the required components + async fn handle_payment_notification( + localstore: &Arc + Send + Sync>, + pubsub_manager: &Arc, + wait_payment_response: WaitPaymentResponse, + ) -> Result<(), Error> { + if wait_payment_response.payment_amount == Amount::ZERO { + tracing::warn!( + "Received payment response with 0 amount with payment id {}.", + wait_payment_response.payment_id + ); + return Err(Error::AmountUndefined); + } + + let mut tx = localstore.begin_transaction().await?; + + if let Ok(Some(mint_quote)) = tx + .get_mint_quote_by_request_lookup_id(&wait_payment_response.payment_identifier) + .await + { + Self::handle_mint_quote_payment( + &mut tx, + &mint_quote, + wait_payment_response, + pubsub_manager, + ) + .await?; + } else { + tracing::warn!( + "Could not get request for request lookup id {:?}.", + wait_payment_response.payment_identifier + ); + } + + tx.commit().await?; + Ok(()) + } + + /// Handle payment for a specific mint quote (extracted from pay_mint_quote) + async fn handle_mint_quote_payment( + tx: &mut Box + Send + Sync + '_>, + mint_quote: &MintQuote, + wait_payment_response: WaitPaymentResponse, + pubsub_manager: &Arc, + ) -> Result<(), Error> { + tracing::debug!( + "Received payment notification of {} for mint quote {} with payment id {}", + wait_payment_response.payment_amount, + mint_quote.id, + wait_payment_response.payment_id + ); + + let quote_state = mint_quote.state(); + if !mint_quote + .payment_ids() + .contains(&&wait_payment_response.payment_id) + { + if mint_quote.payment_method == PaymentMethod::Bolt11 + && (quote_state == MintQuoteState::Issued || quote_state == MintQuoteState::Paid) + { + tracing::info!("Received payment notification for already issued quote."); + } else { + tx.increment_mint_quote_amount_paid( + &mint_quote.id, + wait_payment_response.payment_amount, + wait_payment_response.payment_id, + ) + .await?; + + pubsub_manager.mint_quote_bolt11_status(mint_quote.clone(), MintQuoteState::Paid); + } + } else { + tracing::info!("Received payment notification for already seen payment."); + } Ok(()) } @@ -557,11 +790,11 @@ impl Mint { /// Total redeemed for keyset #[instrument(skip_all)] pub async fn total_redeemed(&self) -> Result, Error> { - let keysets = self.keysets().keysets; + let keysets = self.signatory.keysets().await?; let mut total_redeemed = HashMap::new(); - for keyset in keysets { + for keyset in keysets.keysets { let (proofs, state) = self.localstore.get_proofs_by_keyset_id(&keyset.id).await?; let total_spent = @@ -581,6 +814,7 @@ impl Mint { #[cfg(test)] mod tests { + use std::str::FromStr; use cdk_sqlite::mint::memory::new_with_state; @@ -713,4 +947,31 @@ mod tests { assert_eq!(expected_keys, serde_json::to_string(&keys.clone()).unwrap()); } + + #[tokio::test] + async fn test_start_stop_lifecycle() { + let mut supported_units = HashMap::new(); + supported_units.insert(CurrencyUnit::default(), (0, 32)); + let config = MintConfig::<'_> { + supported_units, + ..Default::default() + }; + let mint = create_mint(config).await; + + // Start should succeed (async) + mint.start().await.expect("Failed to start mint"); + + // Starting again should fail (already running) + assert!(mint.start().await.is_err()); + + // Stop should succeed (still async) + mint.stop().await.expect("Failed to stop mint"); + + // Stopping again should succeed (idempotent) + mint.stop().await.expect("Second stop should be fine"); + + // Should be able to start again after stopping + mint.start().await.expect("Should be able to restart"); + mint.stop().await.expect("Final stop should work"); + } }