diff --git a/crates/cdk-cln/src/error.rs b/crates/cdk-cln/src/error.rs index 2f9d8e0e..85025b86 100644 --- a/crates/cdk-cln/src/error.rs +++ b/crates/cdk-cln/src/error.rs @@ -32,6 +32,9 @@ pub enum Error { /// Bolt12 Error #[error("Bolt12 error: {0}")] Bolt12(String), + /// Database Error + #[error("Database error: {0}")] + Database(String), } impl From for cdk_common::payment::Error { diff --git a/crates/cdk-cln/src/lib.rs b/crates/cdk-cln/src/lib.rs index 0bd797d4..392c7d4e 100644 --- a/crates/cdk-cln/src/lib.rs +++ b/crates/cdk-cln/src/lib.rs @@ -16,6 +16,7 @@ use async_trait::async_trait; use bitcoin::hashes::sha256::Hash; use cdk_common::amount::{to_unit, Amount}; use cdk_common::common::FeeReserve; +use cdk_common::database::mint::DynMintKVStore; use cdk_common::nuts::{CurrencyUnit, MeltOptions, MeltQuoteState}; use cdk_common::payment::{ self, Bolt11IncomingPaymentOptions, Bolt11Settings, Bolt12IncomingPaymentOptions, @@ -43,6 +44,11 @@ use uuid::Uuid; pub mod error; +// KV Store constants for CLN +const CLN_KV_PRIMARY_NAMESPACE: &str = "cdk_cln_lightning_backend"; +const CLN_KV_SECONDARY_NAMESPACE: &str = "payment_indices"; +const LAST_PAY_INDEX_KV_KEY: &str = "last_pay_index"; + /// CLN mint backend #[derive(Clone)] pub struct Cln { @@ -50,16 +56,22 @@ pub struct Cln { fee_reserve: FeeReserve, wait_invoice_cancel_token: CancellationToken, wait_invoice_is_active: Arc, + kv_store: DynMintKVStore, } impl Cln { /// Create new [`Cln`] - pub async fn new(rpc_socket: PathBuf, fee_reserve: FeeReserve) -> Result { + pub async fn new( + rpc_socket: PathBuf, + fee_reserve: FeeReserve, + kv_store: DynMintKVStore, + ) -> Result { Ok(Self { rpc_socket, fee_reserve, wait_invoice_cancel_token: CancellationToken::new(), wait_invoice_is_active: Arc::new(AtomicBool::new(false)), + kv_store, }) } } @@ -114,14 +126,16 @@ impl MintPayment for Cln { }; tracing::debug!("CLN: Creating stream processing pipeline"); + let kv_store = self.kv_store.clone(); let stream = futures::stream::unfold( ( cln_client, last_pay_index, self.wait_invoice_cancel_token.clone(), Arc::clone(&self.wait_invoice_is_active), + kv_store, ), - |(mut cln_client, mut last_pay_idx, cancel_token, is_active)| async move { + |(mut cln_client, mut last_pay_idx, cancel_token, is_active, kv_store)| async move { // Set the stream as active is_active.store(true, Ordering::SeqCst); tracing::debug!("CLN: Stream is now active, waiting for invoice events with lastpay_index: {:?}", last_pay_idx); @@ -179,6 +193,23 @@ impl MintPayment for Cln { last_pay_idx = wait_any_response.pay_index; tracing::debug!("CLN: Updated last_pay_idx to {:?}", last_pay_idx); + + // Store the updated pay index in KV store for persistence + if let Some(pay_index) = last_pay_idx { + let index_str = pay_index.to_string(); + if let Ok(mut tx) = kv_store.begin_transaction().await { + if let Err(e) = tx.kv_write(CLN_KV_PRIMARY_NAMESPACE, CLN_KV_SECONDARY_NAMESPACE, LAST_PAY_INDEX_KV_KEY, index_str.as_bytes()).await { + tracing::warn!("CLN: Failed to write last pay index {} to KV store: {}", pay_index, e); + } else if let Err(e) = tx.commit().await { + tracing::warn!("CLN: Failed to commit last pay index {} to KV store: {}", pay_index, e); + } else { + tracing::debug!("CLN: Stored last pay index {} in KV store", pay_index); + } + } else { + tracing::warn!("CLN: Failed to begin KV transaction for storing pay index {}", pay_index); + } + } + let payment_hash = wait_any_response.payment_hash; tracing::debug!("CLN: Payment hash: {}", payment_hash); @@ -245,7 +276,7 @@ impl MintPayment for Cln { tracing::info!("CLN: Created WaitPaymentResponse with amount {} msats", amount_msats.msat()); let event = Event::PaymentReceived(response); - break Some((event, (cln_client, last_pay_idx, cancel_token, is_active))); + break Some((event, (cln_client, last_pay_idx, cancel_token, is_active, kv_store))); } Err(e) => { tracing::warn!("CLN: Error fetching invoice: {e}"); @@ -733,6 +764,27 @@ impl Cln { /// Get last pay index for cln async fn get_last_pay_index(&self) -> Result, Error> { + // First try to read from KV store + if let Some(stored_index) = self + .kv_store + .kv_read( + CLN_KV_PRIMARY_NAMESPACE, + CLN_KV_SECONDARY_NAMESPACE, + LAST_PAY_INDEX_KV_KEY, + ) + .await + .map_err(|e| Error::Database(e.to_string()))? + { + if let Ok(index_str) = std::str::from_utf8(&stored_index) { + if let Ok(index) = index_str.parse::() { + tracing::debug!("CLN: Retrieved last pay index {} from KV store", index); + return Ok(Some(index)); + } + } + } + + // Fall back to querying CLN directly + tracing::debug!("CLN: No stored last pay index found in KV store, querying CLN directly"); let mut cln_client = self.cln_client().await?; let listinvoices_response = cln_client .call_typed(&ListinvoicesRequest { @@ -753,7 +805,7 @@ impl Cln { } } - /// Decode string + /// Decode string #[instrument(skip(self))] async fn decode_string(&self, string: String) -> Result { let mut cln_client = self.cln_client().await?; diff --git a/crates/cdk-common/src/database/mint/auth/mod.rs b/crates/cdk-common/src/database/mint/auth/mod.rs index a71257ee..26f25819 100644 --- a/crates/cdk-common/src/database/mint/auth/mod.rs +++ b/crates/cdk-common/src/database/mint/auth/mod.rs @@ -88,3 +88,7 @@ pub trait MintAuthDatabase { &self, ) -> Result>, Self::Err>; } + +/// Type alias for trait objects +pub type DynMintAuthDatabase = + std::sync::Arc + Send + Sync>; diff --git a/crates/cdk-common/src/database/mint/mod.rs b/crates/cdk-common/src/database/mint/mod.rs index 02a26aee..f54a9b8c 100644 --- a/crates/cdk-common/src/database/mint/mod.rs +++ b/crates/cdk-common/src/database/mint/mod.rs @@ -22,7 +22,7 @@ mod auth; pub mod test; #[cfg(feature = "auth")] -pub use auth::{MintAuthDatabase, MintAuthTransaction}; +pub use auth::{DynMintAuthDatabase, MintAuthDatabase, MintAuthTransaction}; /// Valid ASCII characters for namespace and key strings in KV store pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = @@ -442,6 +442,9 @@ pub trait KVStore: KVStoreDatabase { ) -> Result + Send + Sync + 'a>, Error>; } +/// Type alias for Mint Kv store +pub type DynMintKVStore = std::sync::Arc + Send + Sync>; + /// Mint Database trait #[async_trait] pub trait Database: @@ -461,3 +464,6 @@ pub trait Database: /// Get [`QuoteTTL`] async fn get_quote_ttl(&self) -> Result; } + +/// Type alias for Mint Database +pub type DynMintDatabase = std::sync::Arc + Send + Sync>; diff --git a/crates/cdk-common/src/database/mod.rs b/crates/cdk-common/src/database/mod.rs index 6abcfe28..4b808a11 100644 --- a/crates/cdk-common/src/database/mod.rs +++ b/crates/cdk-common/src/database/mod.rs @@ -7,7 +7,7 @@ mod wallet; #[cfg(feature = "mint")] pub use mint::{ - Database as MintDatabase, DbTransactionFinalizer as MintDbWriterFinalizer, + Database as MintDatabase, DbTransactionFinalizer as MintDbWriterFinalizer, DynMintDatabase, KVStore as MintKVStore, KVStoreDatabase as MintKVStoreDatabase, KVStoreTransaction as MintKVStoreTransaction, KeysDatabase as MintKeysDatabase, KeysDatabaseTransaction as MintKeyDatabaseTransaction, ProofsDatabase as MintProofsDatabase, @@ -16,7 +16,7 @@ pub use mint::{ SignaturesTransaction as MintSignatureTransaction, Transaction as MintTransaction, }; #[cfg(all(feature = "mint", feature = "auth"))] -pub use mint::{MintAuthDatabase, MintAuthTransaction}; +pub use mint::{DynMintAuthDatabase, MintAuthDatabase, MintAuthTransaction}; #[cfg(feature = "wallet")] pub use wallet::Database as WalletDatabase; diff --git a/crates/cdk-common/src/payment.rs b/crates/cdk-common/src/payment.rs index 15f18427..8ee371d6 100644 --- a/crates/cdk-common/src/payment.rs +++ b/crates/cdk-common/src/payment.rs @@ -327,6 +327,19 @@ pub enum Event { PaymentReceived(WaitPaymentResponse), } +impl Default for Event { + fn default() -> Self { + // We use this as a sentinel value for no-op events + // The actual processing will filter these out + Event::PaymentReceived(WaitPaymentResponse { + payment_identifier: PaymentIdentifier::CustomId("default".to_string()), + payment_amount: Amount::from(0), + unit: CurrencyUnit::Msat, + payment_id: "default".to_string(), + }) + } +} + /// Wait any invoice response #[derive(Debug, Clone, Hash, Serialize, Deserialize)] pub struct WaitPaymentResponse { @@ -599,3 +612,6 @@ where result } } + +/// Type alias for Mint Payment trait +pub type DynMintPayment = std::sync::Arc + Send + Sync>; diff --git a/crates/cdk-integration-tests/src/init_regtest.rs b/crates/cdk-integration-tests/src/init_regtest.rs index faa61b58..8bb66865 100644 --- a/crates/cdk-integration-tests/src/init_regtest.rs +++ b/crates/cdk-integration-tests/src/init_regtest.rs @@ -6,7 +6,9 @@ use std::sync::Arc; use anyhow::Result; use cdk::types::FeeReserve; use cdk_cln::Cln as CdkCln; +use cdk_common::database::mint::DynMintKVStore; use cdk_lnd::Lnd as CdkLnd; +use cdk_sqlite::mint::memory; use ldk_node::lightning::ln::msgs::SocketAddress; use ldk_node::Node; use ln_regtest_rs::bitcoin_client::BitcoinClient; @@ -164,7 +166,8 @@ pub async fn create_cln_backend(cln_client: &ClnClient) -> Result { percent_fee_reserve: 1.0, }; - Ok(CdkCln::new(rpc_path, fee_reserve).await?) + let kv_store: DynMintKVStore = Arc::new(memory::empty().await?); + Ok(CdkCln::new(rpc_path, fee_reserve, kv_store).await?) } pub async fn create_lnd_backend(lnd_client: &LndClient) -> Result { @@ -173,11 +176,14 @@ pub async fn create_lnd_backend(lnd_client: &LndClient) -> Result { percent_fee_reserve: 1.0, }; + let kv_store: DynMintKVStore = Arc::new(memory::empty().await?); + Ok(CdkLnd::new( lnd_client.address.clone(), lnd_client.cert_file.clone(), lnd_client.macaroon_file.clone(), fee_reserve, + kv_store, ) .await?) } diff --git a/crates/cdk-lnd/src/error.rs b/crates/cdk-lnd/src/error.rs index 5d1b10f0..ba546f53 100644 --- a/crates/cdk-lnd/src/error.rs +++ b/crates/cdk-lnd/src/error.rs @@ -39,6 +39,9 @@ pub enum Error { /// Could not read file #[error("Could not read file")] ReadFile, + /// Database Error + #[error("Database error: {0}")] + Database(String), } impl From for cdk_common::payment::Error { diff --git a/crates/cdk-lnd/src/lib.rs b/crates/cdk-lnd/src/lib.rs index fc6e49df..4066508d 100644 --- a/crates/cdk-lnd/src/lib.rs +++ b/crates/cdk-lnd/src/lib.rs @@ -18,6 +18,7 @@ use async_trait::async_trait; use cdk_common::amount::{to_unit, Amount, MSAT_IN_SAT}; use cdk_common::bitcoin::hashes::Hash; use cdk_common::common::FeeReserve; +use cdk_common::database::mint::DynMintKVStore; use cdk_common::nuts::{CurrencyUnit, MeltOptions, MeltQuoteState}; use cdk_common::payment::{ self, Bolt11Settings, CreateIncomingPaymentResponse, Event, IncomingPaymentOptions, @@ -42,6 +43,12 @@ pub(crate) use proto::{lnrpc, routerrpc}; use crate::lnrpc::invoice::InvoiceState; +/// LND KV Store constants +const LND_KV_PRIMARY_NAMESPACE: &str = "cdk_lnd_lightning_backend"; +const LND_KV_SECONDARY_NAMESPACE: &str = "payment_indices"; +const LAST_ADD_INDEX_KV_KEY: &str = "last_add_index"; +const LAST_SETTLE_INDEX_KV_KEY: &str = "last_settle_index"; + /// Lnd mint backend #[derive(Clone)] pub struct Lnd { @@ -50,6 +57,7 @@ pub struct Lnd { _macaroon_file: PathBuf, lnd_client: client::Client, fee_reserve: FeeReserve, + kv_store: DynMintKVStore, wait_invoice_cancel_token: CancellationToken, wait_invoice_is_active: Arc, settings: Bolt11Settings, @@ -65,6 +73,7 @@ impl Lnd { cert_file: PathBuf, macaroon_file: PathBuf, fee_reserve: FeeReserve, + kv_store: DynMintKVStore, ) -> Result { // Validate address is not empty if address.is_empty() { @@ -104,6 +113,7 @@ impl Lnd { _macaroon_file: macaroon_file, lnd_client, fee_reserve, + kv_store, wait_invoice_cancel_token: CancellationToken::new(), wait_invoice_is_active: Arc::new(AtomicBool::new(false)), settings: Bolt11Settings { @@ -115,6 +125,55 @@ impl Lnd { }, }) } + + /// Get last add and settle indices from KV store + #[instrument(skip_all)] + async fn get_last_indices(&self) -> Result<(Option, Option), Error> { + let add_index = if let Some(stored_index) = self + .kv_store + .kv_read( + LND_KV_PRIMARY_NAMESPACE, + LND_KV_SECONDARY_NAMESPACE, + LAST_ADD_INDEX_KV_KEY, + ) + .await + .map_err(|e| Error::Database(e.to_string()))? + { + if let Ok(index_str) = std::str::from_utf8(stored_index.as_slice()) { + index_str.parse::().ok() + } else { + None + } + } else { + None + }; + + let settle_index = if let Some(stored_index) = self + .kv_store + .kv_read( + LND_KV_PRIMARY_NAMESPACE, + LND_KV_SECONDARY_NAMESPACE, + LAST_SETTLE_INDEX_KV_KEY, + ) + .await + .map_err(|e| Error::Database(e.to_string()))? + { + if let Ok(index_str) = std::str::from_utf8(stored_index.as_slice()) { + index_str.parse::().ok() + } else { + None + } + } else { + None + }; + + tracing::debug!( + "LND: Retrieved last indices from KV store - add_index: {:?}, settle_index: {:?}", + add_index, + settle_index + ); + Ok((add_index, settle_index)) + } } #[async_trait] @@ -142,11 +201,21 @@ impl MintPayment for Lnd { ) -> Result + Send>>, Self::Err> { let mut lnd_client = self.lnd_client.clone(); + // Get last indices from KV store + let (last_add_index, last_settle_index) = + self.get_last_indices().await.unwrap_or((None, None)); + let stream_req = lnrpc::InvoiceSubscription { - add_index: 0, - settle_index: 0, + add_index: last_add_index.unwrap_or(0), + settle_index: last_settle_index.unwrap_or(0), }; + tracing::debug!( + "LND: Starting invoice subscription with add_index: {}, settle_index: {}", + stream_req.add_index, + stream_req.settle_index + ); + let stream = lnd_client .lightning() .subscribe_invoices(stream_req) @@ -158,68 +227,119 @@ impl MintPayment for Lnd { .into_inner(); let cancel_token = self.wait_invoice_cancel_token.clone(); + let kv_store = self.kv_store.clone(); - Ok(futures::stream::unfold( + let event_stream = futures::stream::unfold( ( stream, cancel_token, Arc::clone(&self.wait_invoice_is_active), + kv_store, + last_add_index.unwrap_or(0), + last_settle_index.unwrap_or(0), ), - |(mut stream, cancel_token, is_active)| async move { + |( + mut stream, + cancel_token, + is_active, + kv_store, + mut current_add_index, + mut current_settle_index, + )| async move { is_active.store(true, Ordering::SeqCst); - tokio::select! { - _ = cancel_token.cancelled() => { - // Stream is cancelled - is_active.store(false, Ordering::SeqCst); - tracing::info!("Waiting for lnd invoice ending"); - None + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + // Stream is cancelled + is_active.store(false, Ordering::SeqCst); + tracing::info!("Waiting for lnd invoice ending"); + return None; + } + msg = stream.message() => { + match msg { + Ok(Some(msg)) => { + // Update indices based on the message + current_add_index = current_add_index.max(msg.add_index); + current_settle_index = current_settle_index.max(msg.settle_index); - } - msg = stream.message() => { + // Store the updated indices in KV store regardless of settlement status + let add_index_str = current_add_index.to_string(); + let settle_index_str = current_settle_index.to_string(); - match msg { - Ok(Some(msg)) => { - if msg.state() == InvoiceState::Settled { + if let Ok(mut tx) = kv_store.begin_transaction().await { + let mut has_error = false; - let hash_slice: Result<[u8;32], _> = msg.r_hash.try_into(); + if let Err(e) = tx.kv_write(LND_KV_PRIMARY_NAMESPACE, LND_KV_SECONDARY_NAMESPACE, LAST_ADD_INDEX_KV_KEY, add_index_str.as_bytes()).await { + tracing::warn!("LND: Failed to write add_index {} to KV store: {}", current_add_index, e); + has_error = true; + } - if let Ok(hash_slice) = hash_slice { - let hash = hex::encode(hash_slice); + if let Err(e) = tx.kv_write(LND_KV_PRIMARY_NAMESPACE, LND_KV_SECONDARY_NAMESPACE, LAST_SETTLE_INDEX_KV_KEY, settle_index_str.as_bytes()).await { + tracing::warn!("LND: Failed to write settle_index {} to KV store: {}", current_settle_index, e); + has_error = true; + } + + if !has_error { + if let Err(e) = tx.commit().await { + tracing::warn!("LND: Failed to commit indices to KV store: {}", e); + } else { + tracing::debug!("LND: Stored updated indices - add_index: {}, settle_index: {}", current_add_index, current_settle_index); + } + } + } else { + tracing::warn!("LND: Failed to begin KV transaction for storing indices"); + } + + // Only emit event for settled invoices + if msg.state() == InvoiceState::Settled { + let hash_slice: Result<[u8;32], _> = msg.r_hash.try_into(); + + if let Ok(hash_slice) = hash_slice { + let hash = hex::encode(hash_slice); + + tracing::info!("LND: Payment for {} with amount {} msat", hash, msg.amt_paid_msat); - tracing::info!("LND: Processing payment with hash: {}", hash); let wait_response = WaitPaymentResponse { - payment_identifier: PaymentIdentifier::PaymentHash(hash_slice), payment_amount: Amount::from(msg.amt_paid_msat as u64), + payment_identifier: PaymentIdentifier::PaymentHash(hash_slice), + payment_amount: Amount::from(msg.amt_paid_msat as u64), unit: CurrencyUnit::Msat, payment_id: hash, }; - tracing::info!("LND: Created WaitPaymentResponse with amount {} msat", - msg.amt_paid_msat); let event = Event::PaymentReceived(wait_response); - Some((event, (stream, cancel_token, is_active))) - } else { None } - } else { - None + return Some((event, (stream, cancel_token, is_active, kv_store, current_add_index, current_settle_index))); + } else { + // Invalid hash, skip this message but continue streaming + tracing::error!("LND returned invalid payment hash"); + // Continue the loop without yielding + continue; + } + } else { + // Not a settled invoice, continue but don't emit event + tracing::debug!("LND: Received non-settled invoice, continuing to wait for settled invoices"); + // Continue the loop without yielding + continue; + } + } + Ok(None) => { + is_active.store(false, Ordering::SeqCst); + tracing::info!("LND invoice stream ended."); + return None; + } + Err(err) => { + is_active.store(false, Ordering::SeqCst); + tracing::warn!("Encountered error in LND invoice stream. Stream ending"); + tracing::error!("{:?}", err); + return None; + } + } } } - Ok(None) => { - is_active.store(false, Ordering::SeqCst); - tracing::info!("LND invoice stream ended."); - None - }, // End of stream - Err(err) => { - is_active.store(false, Ordering::SeqCst); - tracing::warn!("Encountered error in LND invoice stream. Stream ending"); - tracing::error!("{:?}", err); - None - - }, // Handle errors gracefully, ends the stream on error - } - } } }, - ) - .boxed()) + ); + + Ok(Box::pin(event_stream)) } #[instrument(skip_all)] diff --git a/crates/cdk-mintd/src/lib.rs b/crates/cdk-mintd/src/lib.rs index 80eb6337..02203056 100644 --- a/crates/cdk-mintd/src/lib.rs +++ b/crates/cdk-mintd/src/lib.rs @@ -38,6 +38,7 @@ use cdk::nuts::{AuthRequired, Method, ProtectedEndpoint, RoutePath}; use cdk::nuts::{ContactInfo, MintVersion, PaymentMethod}; use cdk::types::QuoteTTL; use cdk_axum::cache::HttpCache; +use cdk_common::database::DynMintDatabase; // internal crate modules #[cfg(feature = "prometheus")] use cdk_common::payment::MetricsMintPayment; @@ -97,7 +98,7 @@ async fn initial_setup( settings: &config::Settings, db_password: Option, ) -> Result<( - Arc + Send + Sync>, + DynMintDatabase, Arc + Send + Sync>, Arc + Send + Sync>, )> { @@ -257,7 +258,7 @@ async fn setup_database( _work_dir: &Path, _db_password: Option, ) -> Result<( - Arc + Send + Sync>, + DynMintDatabase, Arc + Send + Sync>, Arc + Send + Sync>, )> { @@ -426,7 +427,7 @@ async fn configure_lightning_backend( .clone() .expect("Config checked at load that cln is some"); let cln = cln_settings - .setup(settings, CurrencyUnit::Msat, None, work_dir, None) + .setup(settings, CurrencyUnit::Msat, None, work_dir, _kv_store) .await?; #[cfg(feature = "prometheus")] let cln = MetricsMintPayment::new(cln); @@ -462,7 +463,7 @@ async fn configure_lightning_backend( LnBackend::Lnd => { let lnd_settings = settings.clone().lnd.expect("Checked at config load"); let lnd = lnd_settings - .setup(settings, CurrencyUnit::Msat, None, work_dir, None) + .setup(settings, CurrencyUnit::Msat, None, work_dir, _kv_store) .await?; #[cfg(feature = "prometheus")] let lnd = MetricsMintPayment::new(lnd); @@ -634,10 +635,10 @@ async fn setup_authentication( _password: Option, ) -> Result { if let Some(auth_settings) = settings.auth.clone() { + use cdk_common::database::DynMintAuthDatabase; + tracing::info!("Auth settings are defined. {:?}", auth_settings); - let auth_localstore: Arc< - dyn cdk_database::MintAuthDatabase + Send + Sync, - > = match settings.database.engine { + let auth_localstore: DynMintAuthDatabase = match settings.database.engine { #[cfg(feature = "sqlite")] DatabaseEngine::Sqlite => { #[cfg(feature = "sqlite")] diff --git a/crates/cdk-mintd/src/setup.rs b/crates/cdk-mintd/src/setup.rs index 127e5142..24e380bb 100644 --- a/crates/cdk-mintd/src/setup.rs +++ b/crates/cdk-mintd/src/setup.rs @@ -47,7 +47,7 @@ impl LnBackendSetup for config::Cln { _unit: CurrencyUnit, _runtime: Option>, _work_dir: &Path, - _kv_store: Option + Send + Sync>>, + kv_store: Option + Send + Sync>>, ) -> anyhow::Result { let cln_socket = expand_path( self.rpc_path @@ -61,7 +61,12 @@ impl LnBackendSetup for config::Cln { percent_fee_reserve: self.fee_percent, }; - let cln = cdk_cln::Cln::new(cln_socket, fee_reserve).await?; + let cln = cdk_cln::Cln::new( + cln_socket, + fee_reserve, + kv_store.expect("Cln needs kv store"), + ) + .await?; Ok(cln) } @@ -110,7 +115,7 @@ impl LnBackendSetup for config::Lnd { _unit: CurrencyUnit, _runtime: Option>, _work_dir: &Path, - _kv_store: Option + Send + Sync>>, + kv_store: Option + Send + Sync>>, ) -> anyhow::Result { let address = &self.address; let cert_file = &self.cert_file; @@ -126,6 +131,7 @@ impl LnBackendSetup for config::Lnd { cert_file.clone(), macaroon_file.clone(), fee_reserve, + kv_store.expect("Lnd needs kv store"), ) .await?; diff --git a/crates/cdk-payment-processor/Cargo.toml b/crates/cdk-payment-processor/Cargo.toml index c123705b..e19e1ccf 100644 --- a/crates/cdk-payment-processor/Cargo.toml +++ b/crates/cdk-payment-processor/Cargo.toml @@ -17,7 +17,7 @@ path = "src/bin/payment_processor.rs" [features] default = ["cln", "fake", "lnd"] bench = [] -cln = ["dep:cdk-cln"] +cln = ["dep:cdk-cln", "dep:cdk-sqlite"] fake = ["dep:cdk-fake-wallet"] lnd = ["dep:cdk-lnd"] @@ -30,6 +30,7 @@ cdk-common = { workspace = true, features = ["mint"] } cdk-cln = { workspace = true, optional = true } cdk-lnd = { workspace = true, optional = true } cdk-fake-wallet = { workspace = true, optional = true } +cdk-sqlite = { workspace = true, optional = true } clap = { workspace = true, features = ["derive"] } serde.workspace = true thiserror.workspace = true diff --git a/crates/cdk-payment-processor/src/bin/payment_processor.rs b/crates/cdk-payment-processor/src/bin/payment_processor.rs index 0ea98775..aecfa250 100644 --- a/crates/cdk-payment-processor/src/bin/payment_processor.rs +++ b/crates/cdk-payment-processor/src/bin/payment_processor.rs @@ -14,6 +14,8 @@ use cdk_common::payment::{self, MintPayment}; use cdk_common::Amount; #[cfg(feature = "fake")] use cdk_fake_wallet::FakeWallet; +#[cfg(feature = "cln")] +use cdk_sqlite::MintSqliteDatabase; use clap::Parser; use serde::{Deserialize, Serialize}; #[cfg(any(feature = "cln", feature = "lnd", feature = "fake"))] @@ -106,7 +108,8 @@ async fn main() -> anyhow::Result<()> { percent_fee_reserve: cln_settings.fee_percent, }; - Arc::new(cdk_cln::Cln::new(cln_settings.rpc_path, fee_reserve).await?) + let kv_store = Arc::new(MintSqliteDatabase::new(":memory:").await?); + Arc::new(cdk_cln::Cln::new(cln_settings.rpc_path, fee_reserve, kv_store).await?) } #[cfg(feature = "fake")] "FAKEWALLET" => { @@ -136,12 +139,14 @@ async fn main() -> anyhow::Result<()> { percent_fee_reserve: lnd_settings.fee_percent, }; + let kv_store = Arc::new(MintSqliteDatabase::new(":memory:").await?); Arc::new( cdk_lnd::Lnd::new( lnd_settings.address, lnd_settings.cert_file, lnd_settings.macaroon_file, fee_reserve, + kv_store, ) .await?, ) diff --git a/crates/cdk/src/mint/builder.rs b/crates/cdk/src/mint/builder.rs index 765ecc9f..be9ed4c5 100644 --- a/crates/cdk/src/mint/builder.rs +++ b/crates/cdk/src/mint/builder.rs @@ -4,23 +4,20 @@ use std::collections::HashMap; use std::sync::Arc; use bitcoin::bip32::DerivationPath; -use cdk_common::database::{self, MintDatabase, MintKeysDatabase}; +use cdk_common::database::{DynMintDatabase, MintKeysDatabase}; use cdk_common::error::Error; use cdk_common::nut04::MintMethodOptions; use cdk_common::nut05::MeltMethodOptions; -use cdk_common::payment::Bolt11Settings; +use cdk_common::payment::{Bolt11Settings, DynMintPayment}; #[cfg(feature = "auth")] -use cdk_common::{nut21, nut22}; +use cdk_common::{database::DynMintAuthDatabase, nut21, nut22}; use cdk_signatory::signatory::Signatory; use super::nut17::SupportedMethods; use super::nut19::{self, CachedEndpoint}; -#[cfg(feature = "auth")] -use super::MintAuthDatabase; use super::Nuts; use crate::amount::Amount; use crate::cdk_database; -use crate::cdk_payment::{self, MintPayment}; use crate::mint::Mint; #[cfg(feature = "auth")] use crate::nuts::ProtectedEndpoint; @@ -33,18 +30,17 @@ use crate::types::PaymentProcessorKey; /// Cashu Mint Builder pub struct MintBuilder { mint_info: MintInfo, - localstore: Arc + Send + Sync>, + localstore: DynMintDatabase, #[cfg(feature = "auth")] - auth_localstore: Option + Send + Sync>>, - payment_processors: - HashMap + Send + Sync>>, + auth_localstore: Option, + payment_processors: HashMap, supported_units: HashMap, custom_paths: HashMap, } impl MintBuilder { /// New [`MintBuilder`] - pub fn new(localstore: Arc + Send + Sync>) -> MintBuilder { + pub fn new(localstore: DynMintDatabase) -> MintBuilder { let mint_info = MintInfo { nuts: Nuts::new() .nut07(true) @@ -72,7 +68,7 @@ impl MintBuilder { #[cfg(feature = "auth")] pub fn with_auth( mut self, - auth_localstore: Arc + Send + Sync>, + auth_localstore: DynMintAuthDatabase, openid_discovery: String, client_id: String, protected_endpoints: Vec, @@ -211,7 +207,7 @@ impl MintBuilder { unit: CurrencyUnit, method: PaymentMethod, limits: MintMeltLimits, - payment_processor: Arc + Send + Sync>, + payment_processor: DynMintPayment, ) -> Result<(), Error> { let key = PaymentProcessorKey { unit: unit.clone(), diff --git a/crates/cdk/src/mint/melt.rs b/crates/cdk/src/mint/melt.rs index 40baa430..014871a5 100644 --- a/crates/cdk/src/mint/melt.rs +++ b/crates/cdk/src/mint/melt.rs @@ -8,8 +8,8 @@ use cdk_common::melt::MeltQuoteRequest; use cdk_common::mint::MeltPaymentRequest; use cdk_common::nut05::MeltMethodOptions; use cdk_common::payment::{ - Bolt11OutgoingPaymentOptions, Bolt12OutgoingPaymentOptions, OutgoingPaymentOptions, - PaymentIdentifier, + Bolt11OutgoingPaymentOptions, Bolt12OutgoingPaymentOptions, DynMintPayment, + OutgoingPaymentOptions, PaymentIdentifier, }; use cdk_common::quote_id::QuoteId; use cdk_common::{MeltOptions, MeltQuoteBolt12Request}; @@ -23,7 +23,7 @@ use super::{ PaymentMethod, PublicKey, State, }; use crate::amount::to_unit; -use crate::cdk_payment::{MakePaymentResponse, MintPayment}; +use crate::cdk_payment::MakePaymentResponse; use crate::mint::proof_writer::ProofWriter; use crate::mint::verification::Verification; use crate::mint::SigFlag; @@ -586,7 +586,7 @@ impl Mint { use std::sync::Arc; async fn check_payment_state( - ln: Arc + Send + Sync>, + ln: DynMintPayment, lookup_id: &PaymentIdentifier, ) -> anyhow::Result { match ln.check_outgoing_payment(lookup_id).await { diff --git a/crates/cdk/src/mint/mod.rs b/crates/cdk/src/mint/mod.rs index 2918fcbe..df6c52c1 100644 --- a/crates/cdk/src/mint/mod.rs +++ b/crates/cdk/src/mint/mod.rs @@ -8,10 +8,10 @@ use arc_swap::ArcSwap; use cdk_common::amount::to_unit; use cdk_common::common::{PaymentProcessorKey, QuoteTTL}; #[cfg(feature = "auth")] -use cdk_common::database::MintAuthDatabase; -use cdk_common::database::{self, MintDatabase, MintTransaction}; +use cdk_common::database::DynMintAuthDatabase; +use cdk_common::database::{self, DynMintDatabase, MintTransaction}; use cdk_common::nuts::{self, BlindSignature, BlindedMessage, CurrencyUnit, Id, Kind}; -use cdk_common::payment::WaitPaymentResponse; +use cdk_common::payment::{DynMintPayment, WaitPaymentResponse}; pub use cdk_common::quote_id::QuoteId; use cdk_common::secret; #[cfg(feature = "prometheus")] @@ -25,7 +25,6 @@ use tokio::sync::{Mutex, Notify}; use tokio::task::{JoinHandle, JoinSet}; use tracing::instrument; -use crate::cdk_payment::{self, MintPayment}; use crate::error::Error; use crate::fees::calculate_fee; use crate::nuts::*; @@ -60,13 +59,12 @@ pub struct Mint { /// be a gRPC client to a remote signatory server. signatory: Arc, /// Mint Storage backend - localstore: Arc + Send + Sync>, + localstore: DynMintDatabase, /// Auth Storage backend (only available with auth feature) #[cfg(feature = "auth")] - auth_localstore: Option + Send + Sync>>, + auth_localstore: Option, /// Payment processors for mint - payment_processors: - HashMap + Send + Sync>>, + payment_processors: HashMap, /// Subscription manager pubsub_manager: Arc, #[cfg(feature = "auth")] @@ -91,11 +89,8 @@ impl Mint { pub async fn new( mint_info: MintInfo, signatory: Arc, - localstore: Arc + Send + Sync>, - payment_processors: HashMap< - PaymentProcessorKey, - Arc + Send + Sync>, - >, + localstore: DynMintDatabase, + payment_processors: HashMap, ) -> Result { Self::new_internal( mint_info, @@ -113,12 +108,9 @@ impl Mint { pub async fn new_with_auth( mint_info: MintInfo, signatory: Arc, - localstore: Arc + Send + Sync>, - auth_localstore: Arc + Send + Sync>, - payment_processors: HashMap< - PaymentProcessorKey, - Arc + Send + Sync>, - >, + localstore: DynMintDatabase, + auth_localstore: DynMintAuthDatabase, + payment_processors: HashMap, ) -> Result { Self::new_internal( mint_info, @@ -135,14 +127,9 @@ impl Mint { async fn new_internal( mint_info: MintInfo, signatory: Arc, - localstore: Arc + Send + Sync>, - #[cfg(feature = "auth")] auth_localstore: Option< - Arc + Send + Sync>, - >, - payment_processors: HashMap< - PaymentProcessorKey, - Arc + Send + Sync>, - >, + localstore: DynMintDatabase, + #[cfg(feature = "auth")] auth_localstore: Option, + payment_processors: HashMap, ) -> Result { let keysets = signatory.keysets().await?; if !keysets @@ -361,7 +348,7 @@ impl Mint { &self, unit: CurrencyUnit, payment_method: PaymentMethod, - ) -> Result + Send + Sync>, Error> { + ) -> Result { let key = PaymentProcessorKey::new(unit.clone(), payment_method.clone()); self.payment_processors.get(&key).cloned().ok_or_else(|| { tracing::info!( @@ -374,7 +361,7 @@ impl Mint { } /// Localstore - pub fn localstore(&self) -> Arc + Send + Sync> { + pub fn localstore(&self) -> DynMintDatabase { Arc::clone(&self.localstore) } @@ -451,11 +438,8 @@ impl Mint { /// Once invoice is paid mint quote status is updated #[instrument(skip_all)] async fn wait_for_paid_invoices( - payment_processors: &HashMap< - PaymentProcessorKey, - Arc + Send + Sync>, - >, - localstore: Arc + Send + Sync>, + payment_processors: &HashMap, + localstore: DynMintDatabase, pubsub_manager: Arc, shutdown: Arc, ) -> Result<(), Error> { @@ -527,8 +511,8 @@ impl Mint { /// Handles payment waiting for a single processor #[instrument(skip_all)] async fn wait_for_processor_payments( - processor: Arc + Send + Sync>, - localstore: Arc + Send + Sync>, + processor: DynMintPayment, + localstore: DynMintDatabase, pubsub_manager: Arc, shutdown: Arc, ) -> Result<(), Error> { @@ -570,7 +554,7 @@ impl Mint { /// This is a helper function that can be called with just the required components #[instrument(skip_all)] async fn handle_payment_notification( - localstore: &Arc + Send + Sync>, + localstore: &DynMintDatabase, pubsub_manager: &Arc, wait_payment_response: WaitPaymentResponse, ) -> Result<(), Error> { diff --git a/crates/cdk/src/mint/proof_writer.rs b/crates/cdk/src/mint/proof_writer.rs index a7d184b7..835fec7f 100644 --- a/crates/cdk/src/mint/proof_writer.rs +++ b/crates/cdk/src/mint/proof_writer.rs @@ -2,12 +2,11 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use cdk_common::database::{self, MintDatabase, MintTransaction}; +use cdk_common::database::{self, DynMintDatabase, MintTransaction}; use cdk_common::{Error, Proofs, ProofsMethods, PublicKey, QuoteId, State}; use super::subscription::PubSubManager; -type Db = Arc + Send + Sync>; type Tx<'a, 'b> = Box + Send + Sync + 'b>; /// Proof writer @@ -22,14 +21,14 @@ type Tx<'a, 'b> = Box + Send + Sync + ' /// This struct is not fully ACID. If the process exits due to a panic, and the `Drop` function /// cannot be run, the reset process should reset the state. pub struct ProofWriter { - db: Option, + db: Option, pubsub_manager: Arc, proof_original_states: Option>>, } impl ProofWriter { /// Creates a new ProofWriter on top of the database - pub fn new(db: Db, pubsub_manager: Arc) -> Self { + pub fn new(db: DynMintDatabase, pubsub_manager: Arc) -> Self { Self { db: Some(db), pubsub_manager, @@ -203,7 +202,7 @@ async fn reset_proofs_to_original_state( #[inline(always)] async fn rollback( - db: Arc + Send + Sync>, + db: DynMintDatabase, ys: Vec, original_states: Vec>, ) -> Result<(), Error> { diff --git a/crates/cdk/src/mint/subscription/manager.rs b/crates/cdk/src/mint/subscription/manager.rs index 7b8e26c6..d4c3b974 100644 --- a/crates/cdk/src/mint/subscription/manager.rs +++ b/crates/cdk/src/mint/subscription/manager.rs @@ -1,8 +1,7 @@ //! Specific Subscription for the cdk crate use std::ops::Deref; -use std::sync::Arc; -use cdk_common::database::{self, MintDatabase}; +use cdk_common::database::DynMintDatabase; use cdk_common::mint::MintQuote; use cdk_common::nut17::Notification; use cdk_common::quote_id::QuoteId; @@ -31,8 +30,8 @@ impl Default for PubSubManager { } } -impl From + Send + Sync>> for PubSubManager { - fn from(val: Arc + Send + Sync>) -> Self { +impl From for PubSubManager { + fn from(val: DynMintDatabase) -> Self { PubSubManager(OnSubscription(Some(val)).into()) } } diff --git a/crates/cdk/src/mint/subscription/on_subscription.rs b/crates/cdk/src/mint/subscription/on_subscription.rs index ed971a4c..1e331db4 100644 --- a/crates/cdk/src/mint/subscription/on_subscription.rs +++ b/crates/cdk/src/mint/subscription/on_subscription.rs @@ -1,9 +1,8 @@ //! On Subscription //! //! This module contains the code that is triggered when a new subscription is created. -use std::sync::Arc; -use cdk_common::database::{self, MintDatabase}; +use cdk_common::database::DynMintDatabase; use cdk_common::nut17::Notification; use cdk_common::pub_sub::OnNewSubscription; use cdk_common::quote_id::QuoteId; @@ -17,7 +16,7 @@ use crate::nuts::{MeltQuoteBolt11Response, MintQuoteBolt11Response, ProofState, /// This struct triggers code when a new subscription is created. /// /// It is used to send the initial state of the subscription to the client. -pub struct OnSubscription(pub(crate) Option + Send + Sync>>); +pub struct OnSubscription(pub(crate) Option); #[async_trait::async_trait] impl OnNewSubscription for OnSubscription {