diff --git a/crates/cdk-integration-tests/tests/bolt12.rs b/crates/cdk-integration-tests/tests/bolt12.rs index 514a169f..45c7458d 100644 --- a/crates/cdk-integration-tests/tests/bolt12.rs +++ b/crates/cdk-integration-tests/tests/bolt12.rs @@ -1,13 +1,14 @@ use std::env; use std::path::PathBuf; +use std::str::FromStr; use std::sync::Arc; use anyhow::{bail, Result}; use bip39::Mnemonic; use cashu::amount::SplitTarget; use cashu::nut23::Amountless; -use cashu::{Amount, CurrencyUnit, MintRequest, PreMintSecrets, ProofsMethods}; -use cdk::wallet::{HttpClient, MintConnector, Wallet}; +use cashu::{Amount, CurrencyUnit, MintRequest, MintUrl, PreMintSecrets, ProofsMethods}; +use cdk::wallet::{HttpClient, MintConnector, Wallet, WalletBuilder}; use cdk_integration_tests::get_mint_url_from_env; use cdk_integration_tests::init_regtest::{get_cln_dir, get_temp_dir}; use cdk_sqlite::wallet::memory; @@ -97,13 +98,16 @@ async fn test_regtest_bolt12_mint() { /// - Tests the functionality of reusing a quote for multiple payments #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_regtest_bolt12_mint_multiple() -> Result<()> { - let wallet = Wallet::new( - &get_mint_url_from_env(), - CurrencyUnit::Sat, - Arc::new(memory::empty().await?), - Mnemonic::generate(12)?.to_seed_normalized(""), - None, - )?; + let mint_url = MintUrl::from_str(&get_mint_url_from_env())?; + + let wallet = WalletBuilder::new() + .mint_url(mint_url) + .unit(CurrencyUnit::Sat) + .localstore(Arc::new(memory::empty().await?)) + .seed(Mnemonic::generate(12)?.to_seed_normalized("")) + .target_proof_count(3) + .use_http_subscription() + .build()?; let mint_quote = wallet.mint_bolt12_quote(None, None).await?; diff --git a/crates/cdk/src/wallet/builder.rs b/crates/cdk/src/wallet/builder.rs index 3caf2b27..2b2583d0 100644 --- a/crates/cdk/src/wallet/builder.rs +++ b/crates/cdk/src/wallet/builder.rs @@ -26,6 +26,7 @@ pub struct WalletBuilder { #[cfg(feature = "auth")] auth_wallet: Option, seed: Option<[u8; 64]>, + use_http_subscription: bool, client: Option>, } @@ -40,6 +41,7 @@ impl Default for WalletBuilder { auth_wallet: None, seed: None, client: None, + use_http_subscription: false, } } } @@ -50,6 +52,19 @@ impl WalletBuilder { Self::default() } + /// Use HTTP for wallet subscriptions to mint events + pub fn use_http_subscription(mut self) -> Self { + self.use_http_subscription = true; + self + } + + /// If WS is preferred (with fallback to HTTP is it is not supported by the mint) for the wallet + /// subscriptions to mint events + pub fn prefer_ws_subscription(mut self) -> Self { + self.use_http_subscription = false; + self + } + /// Set the mint URL pub fn mint_url(mut self, mint_url: MintUrl) -> Self { self.mint_url = Some(mint_url); @@ -150,7 +165,7 @@ impl WalletBuilder { auth_wallet: Arc::new(RwLock::new(self.auth_wallet)), seed, client: client.clone(), - subscription: SubscriptionManager::new(client), + subscription: SubscriptionManager::new(client, self.use_http_subscription), }) } } diff --git a/crates/cdk/src/wallet/subscription/http.rs b/crates/cdk/src/wallet/subscription/http.rs index fa8d69a5..22290e91 100644 --- a/crates/cdk/src/wallet/subscription/http.rs +++ b/crates/cdk/src/wallet/subscription/http.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use cdk_common::MintQuoteBolt12Response; use tokio::sync::{mpsc, RwLock}; use tokio::time; @@ -15,6 +16,7 @@ use crate::Wallet; #[derive(Debug, Hash, PartialEq, Eq)] enum UrlType { Mint(String), + MintBolt12(String), Melt(String), PublicKey(nut01::PublicKey), } @@ -22,6 +24,7 @@ enum UrlType { #[derive(Debug, Eq, PartialEq)] enum AnyState { MintQuoteState(nut23::QuoteState), + MintBolt12QuoteState(MintQuoteBolt12Response), MeltQuoteState(nut05::QuoteState), PublicKey(nut07::State), Empty, @@ -67,7 +70,12 @@ async fn convert_subscription( } } Kind::Bolt12MintQuote => { - for id in sub.1.filters.iter().map(|id| UrlType::Mint(id.clone())) { + for id in sub + .1 + .filters + .iter() + .map(|id| UrlType::MintBolt12(id.clone())) + { subscribed_to.insert(id, (sub.0.clone(), sub.1.id.clone(), AnyState::Empty)); } } @@ -98,6 +106,18 @@ pub async fn http_main>( for (url, (sender, _, last_state)) in subscribed_to.iter_mut() { tracing::debug!("Polling: {:?}", url); match url { + UrlType::MintBolt12(id) => { + let response = http_client.get_mint_quote_bolt12_status(id).await; + if let Ok(response) = response { + if *last_state == AnyState::MintBolt12QuoteState(response.clone()) { + continue; + } + *last_state = AnyState::MintBolt12QuoteState(response.clone()); + if let Err(err) = sender.try_send(NotificationPayload::MintQuoteBolt12Response(response)) { + tracing::error!("Error sending mint quote response: {:?}", err); + } + } + }, UrlType::Mint(id) => { let response = http_client.get_mint_quote_status(id).await; diff --git a/crates/cdk/src/wallet/subscription/mod.rs b/crates/cdk/src/wallet/subscription/mod.rs index 6acaed44..a2c4afc3 100644 --- a/crates/cdk/src/wallet/subscription/mod.rs +++ b/crates/cdk/src/wallet/subscription/mod.rs @@ -48,14 +48,16 @@ type WsSubscriptionBody = (mpsc::Sender, Params); pub struct SubscriptionManager { all_connections: Arc>>, http_client: Arc, + prefer_http: bool, } impl SubscriptionManager { /// Create a new subscription manager - pub fn new(http_client: Arc) -> Self { + pub fn new(http_client: Arc, prefer_http: bool) -> Self { Self { all_connections: Arc::new(RwLock::new(HashMap::new())), http_client, + prefer_http, } } @@ -93,6 +95,12 @@ impl SubscriptionManager { ))] let is_ws_support = false; + let is_ws_support = if self.prefer_http { + false + } else { + is_ws_support + }; + tracing::debug!( "Connect to {:?} to subscribe. WebSocket is supported ({})", mint_url, diff --git a/crates/cdk/src/wallet/subscription/ws.rs b/crates/cdk/src/wallet/subscription/ws.rs index 4eb78bad..1e3c834e 100644 --- a/crates/cdk/src/wallet/subscription/ws.rs +++ b/crates/cdk/src/wallet/subscription/ws.rs @@ -18,25 +18,6 @@ use crate::Wallet; const MAX_ATTEMPT_FALLBACK_HTTP: usize = 10; -async fn fallback_to_http>( - initial_state: S, - http_client: Arc, - subscriptions: Arc>>, - new_subscription_recv: mpsc::Receiver, - on_drop: mpsc::Receiver, - wallet: Arc, -) { - http_main( - initial_state, - http_client, - subscriptions, - new_subscription_recv, - on_drop, - wallet, - ) - .await -} - #[inline] pub async fn ws_main( http_client: Arc, @@ -72,7 +53,8 @@ pub async fn ws_main( tracing::error!( "Could not connect to server after {MAX_ATTEMPT_FALLBACK_HTTP} attempts, falling back to HTTP-subscription client" ); - return fallback_to_http( + + return http_main( active_subscriptions.into_keys(), http_client, subscriptions, @@ -169,17 +151,19 @@ pub async fn ws_main( WsMessageOrResponse::ErrorResponse(error) => { tracing::error!("Received error from server: {:?}", error); if subscription_requests.contains(&error.id) { - // If the server sends an error response to a subscription request, we should - // fallback to HTTP. - // TODO: Add some retry before giving up to HTTP. - return fallback_to_http( + tracing::error!( + "Falling back to HTTP client" + ); + + return http_main( active_subscriptions.into_keys(), http_client, subscriptions, new_subscription_recv, on_drop, - wallet - ).await; + wallet, + ) + .await; } } }