diff --git a/crates/cdk-cli/src/sub_commands/mint.rs b/crates/cdk-cli/src/sub_commands/mint.rs index 51a90a6a..670e72dc 100644 --- a/crates/cdk-cli/src/sub_commands/mint.rs +++ b/crates/cdk-cli/src/sub_commands/mint.rs @@ -1,5 +1,4 @@ use std::str::FromStr; -use std::time::Duration; use anyhow::{anyhow, Result}; use cdk::amount::SplitTarget; @@ -7,7 +6,7 @@ use cdk::mint_url::MintUrl; use cdk::nuts::nut00::ProofsMethods; use cdk::nuts::{CurrencyUnit, PaymentMethod}; use cdk::wallet::MultiMintWallet; -use cdk::Amount; +use cdk::{Amount, StreamExt}; use clap::Args; use serde::{Deserialize, Serialize}; @@ -96,26 +95,17 @@ pub async fn mint( let mut amount_minted = Amount::ZERO; - loop { - let proofs = wallet - .wait_and_mint_quote( - quote.clone(), - SplitTarget::default(), - None, - Duration::from_secs(sub_command_args.wait_duration), - ) - .await?; + let mut proof_streams = wallet.proof_stream(quote, SplitTarget::default(), None); + while let Some(proofs) = proof_streams.next().await { + let proofs = match proofs { + Ok(proofs) => proofs, + Err(err) => { + tracing::error!("Proof streams ended with {:?}", err); + break; + } + }; amount_minted += proofs.total_amount()?; - - if sub_command_args.quote_id.is_none() || quote.payment_method == PaymentMethod::Bolt11 { - break; - } else { - println!( - "Minted {} waiting for next payment.", - proofs.total_amount()? - ); - } } println!("Received {amount_minted} from mint {mint_url}"); diff --git a/crates/cdk-common/src/error.rs b/crates/cdk-common/src/error.rs index 0009bcc6..cffd8d88 100644 --- a/crates/cdk-common/src/error.rs +++ b/crates/cdk-common/src/error.rs @@ -111,13 +111,14 @@ pub enum Error { #[error("Could not parse bolt12")] Bolt12parse, + /// BIP353 address parsing error + #[error("Failed to parse BIP353 address: {0}")] + Bip353Parse(String), + /// Operation timeout #[error("Operation timeout")] Timeout, - /// BIP353 address parsing error - #[error("Failed to parse BIP353 address: {0}")] - Bip353Parse(String), /// BIP353 address resolution error #[error("Failed to resolve BIP353 address: {0}")] Bip353Resolve(String), diff --git a/crates/cdk-integration-tests/src/init_pure_tests.rs b/crates/cdk-integration-tests/src/init_pure_tests.rs index 91101834..c4a27ab9 100644 --- a/crates/cdk-integration-tests/src/init_pure_tests.rs +++ b/crates/cdk-integration-tests/src/init_pure_tests.rs @@ -3,7 +3,6 @@ use std::fmt::{Debug, Formatter}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; -use std::time::Duration; use std::{env, fs}; use anyhow::{anyhow, bail, Result}; @@ -23,7 +22,7 @@ use cdk::nuts::{ use cdk::types::{FeeReserve, QuoteTTL}; use cdk::util::unix_time; use cdk::wallet::{AuthWallet, MintConnector, Wallet, WalletBuilder}; -use cdk::{Amount, Error, Mint}; +use cdk::{Amount, Error, Mint, StreamExt}; use cdk_fake_wallet::FakeWallet; use tokio::sync::RwLock; use tracing_subscriber::EnvFilter; @@ -361,12 +360,10 @@ pub async fn fund_wallet( let desired_amount = Amount::from(amount); let quote = wallet.mint_quote(desired_amount, None).await?; - wallet - .wait_for_payment("e, Duration::from_secs(60)) - .await?; - Ok(wallet - .mint("e.id, split_target.unwrap_or_default(), None) - .await? + .proof_stream(quote, split_target.unwrap_or_default(), None) + .next() + .await + .expect("proofs")? .total_amount()?) } diff --git a/crates/cdk-integration-tests/src/lib.rs b/crates/cdk-integration-tests/src/lib.rs index 675e9402..cd5f4538 100644 --- a/crates/cdk-integration-tests/src/lib.rs +++ b/crates/cdk-integration-tests/src/lib.rs @@ -24,11 +24,10 @@ use anyhow::{anyhow, bail, Result}; use cashu::Bolt11Invoice; use cdk::amount::{Amount, SplitTarget}; use cdk::nuts::State; -use cdk::Wallet; +use cdk::{StreamExt, Wallet}; use cdk_fake_wallet::create_fake_invoice; use init_regtest::{get_lnd_dir, LND_RPC_ADDR}; use ln_regtest_rs::ln_client::{ClnClient, LightningClient, LndClient}; -use tokio::time::Duration; use crate::init_regtest::get_cln_dir; @@ -44,15 +43,12 @@ pub async fn fund_wallet(wallet: Arc, amount: Amount) { .await .expect("Could not get mint quote"); - wallet - .wait_for_payment("e, Duration::from_secs(60)) - .await - .expect("wait for mint failed"); - let _proofs = wallet - .mint("e.id, SplitTarget::default(), None) + .proof_stream(quote, SplitTarget::default(), None) + .next() .await - .expect("Could not mint"); + .expect("proofs") + .expect("proofs with no error"); } pub fn get_mint_url_from_env() -> String { diff --git a/crates/cdk-integration-tests/tests/bolt12.rs b/crates/cdk-integration-tests/tests/bolt12.rs index 68e34027..44873f74 100644 --- a/crates/cdk-integration-tests/tests/bolt12.rs +++ b/crates/cdk-integration-tests/tests/bolt12.rs @@ -1,7 +1,6 @@ use std::env; use std::path::PathBuf; use std::sync::Arc; -use std::time::Duration; use anyhow::{bail, Result}; use bip39::Mnemonic; @@ -116,16 +115,14 @@ async fn test_regtest_bolt12_mint_multiple() -> Result<()> { .await .unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await?; - - wallet.mint_bolt12_quote_state(&mint_quote.id).await?; - let proofs = wallet - .mint_bolt12(&mint_quote.id, None, SplitTarget::default(), None) - .await - .unwrap(); + .wait_and_mint_quote( + mint_quote.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) + .await?; assert_eq!(proofs.total_amount().unwrap(), 10.into()); @@ -134,16 +131,14 @@ async fn test_regtest_bolt12_mint_multiple() -> Result<()> { .await .unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await?; - - wallet.mint_bolt12_quote_state(&mint_quote.id).await?; - let proofs = wallet - .mint_bolt12(&mint_quote.id, None, SplitTarget::default(), None) - .await - .unwrap(); + .wait_and_mint_quote( + mint_quote.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) + .await?; assert_eq!(proofs.total_amount().unwrap(), 11.into()); @@ -187,12 +182,13 @@ async fn test_regtest_bolt12_multiple_wallets() -> Result<()> { .pay_bolt12_offer(None, quote_one.request.clone()) .await?; - wallet_one - .wait_for_payment("e_one, Duration::from_secs(60)) - .await?; - let proofs_one = wallet_one - .mint_bolt12("e_one.id, None, SplitTarget::default(), None) + .wait_and_mint_quote( + quote_one.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) .await?; assert_eq!(proofs_one.total_amount()?, 10_000.into()); @@ -205,13 +201,15 @@ async fn test_regtest_bolt12_multiple_wallets() -> Result<()> { .pay_bolt12_offer(None, quote_two.request.clone()) .await?; - wallet_two - .wait_for_payment("e_two, Duration::from_secs(60)) + let proofs_two = wallet_two + .wait_and_mint_quote( + quote_two.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) .await?; - let proofs_two = wallet_two - .mint_bolt12("e_two.id, None, SplitTarget::default(), None) - .await?; assert_eq!(proofs_two.total_amount()?, 15_000.into()); let offer = cln_client @@ -280,20 +278,19 @@ async fn test_regtest_bolt12_melt() -> Result<()> { .pay_bolt12_offer(None, mint_quote.request.clone()) .await?; - // Wait for payment to be processed - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) + let _proofs = wallet + .wait_and_mint_quote( + mint_quote.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) .await?; let offer = cln_client .get_bolt12_offer(Some(10_000), true, "hhhhhhhh".to_string()) .await?; - let _proofs = wallet - .mint_bolt12(&mint_quote.id, None, SplitTarget::default(), None) - .await - .unwrap(); - let quote = wallet.melt_bolt12_quote(offer.to_string(), None).await?; let melt = wallet.melt("e.id).await?; @@ -340,12 +337,14 @@ async fn test_regtest_bolt12_mint_extra() -> Result<()> { .pay_bolt12_offer(Some(pay_amount_msats), mint_quote.request.clone()) .await?; - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(10)) - .await?; + let payment = wallet + .wait_for_payment(&mint_quote, tokio::time::Duration::from_secs(15)) + .await? + .unwrap(); let state = wallet.mint_bolt12_quote_state(&mint_quote.id).await?; + assert_eq!(payment, state.amount_paid); assert_eq!(state.amount_paid, (pay_amount_msats / 1_000).into()); assert_eq!(state.amount_issued, Amount::ZERO); diff --git a/crates/cdk-integration-tests/tests/fake_auth.rs b/crates/cdk-integration-tests/tests/fake_auth.rs index 26f089e3..ddf700a6 100644 --- a/crates/cdk-integration-tests/tests/fake_auth.rs +++ b/crates/cdk-integration-tests/tests/fake_auth.rs @@ -1,7 +1,6 @@ use std::env; use std::str::FromStr; use std::sync::Arc; -use std::time::Duration; use bip39::Mnemonic; use cashu::{MintAuthRequest, MintInfo}; @@ -331,15 +330,16 @@ async fn test_mint_with_auth() { let mint_amount: Amount = 100.into(); let quote = wallet.mint_quote(mint_amount, None).await.unwrap(); + let proofs = wallet .wait_and_mint_quote( - quote, - Default::default(), - Default::default(), - Duration::from_secs(10), + quote.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), ) .await - .unwrap(); + .expect("payment"); assert!(proofs.total_amount().expect("Could not get proofs amount") == mint_amount); } diff --git a/crates/cdk-integration-tests/tests/fake_wallet.rs b/crates/cdk-integration-tests/tests/fake_wallet.rs index 6d3cb3f2..1e01eed0 100644 --- a/crates/cdk-integration-tests/tests/fake_wallet.rs +++ b/crates/cdk-integration-tests/tests/fake_wallet.rs @@ -15,7 +15,6 @@ //! - Duplicate proof detection use std::sync::Arc; -use std::time::Duration; use bip39::Mnemonic; use cashu::Amount; @@ -27,6 +26,7 @@ use cdk::nuts::{ }; use cdk::wallet::types::TransactionDirection; use cdk::wallet::{HttpClient, MintConnector, Wallet}; +use cdk::StreamExt; use cdk_fake_wallet::{create_fake_invoice, FakeInvoiceDescription}; use cdk_integration_tests::attempt_to_swap_pending; use cdk_sqlite::wallet::memory; @@ -47,15 +47,13 @@ async fn test_fake_tokens_pending() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let _mint_amount = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) + let _proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let fake_description = FakeInvoiceDescription { pay_invoice_state: MeltQuoteState::Pending, @@ -90,15 +88,13 @@ async fn test_fake_melt_payment_fail() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let _mint_amount = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) + let _proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let fake_description = FakeInvoiceDescription { pay_invoice_state: MeltQuoteState::Unknown, @@ -156,15 +152,13 @@ async fn test_fake_melt_payment_fail_and_check() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let _mint_amount = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) + let _proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let fake_description = FakeInvoiceDescription { pay_invoice_state: MeltQuoteState::Unknown, @@ -205,15 +199,13 @@ async fn test_fake_melt_payment_return_fail_status() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let _mint_amount = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) + let _proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let fake_description = FakeInvoiceDescription { pay_invoice_state: MeltQuoteState::Failed, @@ -269,15 +261,13 @@ async fn test_fake_melt_payment_error_unknown() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let _mint_amount = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) + let _proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let fake_description = FakeInvoiceDescription { pay_invoice_state: MeltQuoteState::Failed, @@ -333,15 +323,13 @@ async fn test_fake_melt_payment_err_paid() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let _mint_amount = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) + let _proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let fake_description = FakeInvoiceDescription { pay_invoice_state: MeltQuoteState::Failed, @@ -375,15 +363,13 @@ async fn test_fake_melt_change_in_quote() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let _mint_amount = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) + let _proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let transaction = wallet .list_transactions(Some(TransactionDirection::Incoming)) @@ -445,15 +431,13 @@ async fn test_fake_mint_with_witness() { .expect("failed to create new wallet"); let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let proofs = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) + let proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let mint_amount = proofs.total_amount().unwrap(); @@ -474,10 +458,13 @@ async fn test_fake_mint_without_witness() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) + let mut payment_streams = wallet.payment_stream(&mint_quote); + + payment_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let http_client = HttpClient::new(MINT_URL.parse().unwrap(), None); @@ -515,10 +502,13 @@ async fn test_fake_mint_with_wrong_witness() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) + let mut payment_streams = wallet.payment_stream(&mint_quote); + + payment_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let http_client = HttpClient::new(MINT_URL.parse().unwrap(), None); @@ -562,10 +552,13 @@ async fn test_fake_mint_inflated() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) + let mut payment_streams = wallet.payment_stream(&mint_quote); + + payment_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id; @@ -621,10 +614,13 @@ async fn test_fake_mint_multiple_units() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) + let mut payment_streams = wallet.payment_stream(&mint_quote); + + payment_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id; @@ -701,15 +697,13 @@ async fn test_fake_mint_multiple_unit_swap() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let proofs = wallet - .mint(&mint_quote.id, SplitTarget::None, None) + let proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let wallet_usd = Wallet::new( MINT_URL, @@ -723,15 +717,14 @@ async fn test_fake_mint_multiple_unit_swap() { let mint_quote = wallet_usd.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = + wallet_usd.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let usd_proofs = wallet_usd - .mint(&mint_quote.id, SplitTarget::None, None) + let usd_proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id; @@ -817,15 +810,13 @@ async fn test_fake_mint_multiple_unit_melt() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let proofs = wallet - .mint(&mint_quote.id, SplitTarget::None, None) + let proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); println!("Minted sat"); @@ -841,15 +832,14 @@ async fn test_fake_mint_multiple_unit_melt() { let mint_quote = wallet_usd.mint_quote(100.into(), None).await.unwrap(); println!("Minted quote usd"); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = + wallet_usd.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let usd_proofs = wallet_usd - .mint(&mint_quote.id, SplitTarget::None, None) + let usd_proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); { let inputs: Proofs = vec![ @@ -937,15 +927,13 @@ async fn test_fake_mint_input_output_mismatch() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let proofs = wallet - .mint(&mint_quote.id, SplitTarget::None, None) + let proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let wallet_usd = Wallet::new( MINT_URL, @@ -996,15 +984,14 @@ async fn test_fake_mint_swap_inflated() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let proofs = wallet - .mint(&mint_quote.id, SplitTarget::None, None) + let proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); + let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id; let pre_mint = PreMintSecrets::random(active_keyset_id, 101.into(), &SplitTarget::None).unwrap(); @@ -1041,15 +1028,14 @@ async fn test_fake_mint_swap_spend_after_fail() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let proofs = wallet - .mint(&mint_quote.id, SplitTarget::None, None) + let proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); + let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id; let pre_mint = @@ -1113,15 +1099,14 @@ async fn test_fake_mint_melt_spend_after_fail() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let proofs = wallet - .mint(&mint_quote.id, SplitTarget::None, None) + let proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); + let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id; let pre_mint = @@ -1186,15 +1171,13 @@ async fn test_fake_mint_duplicate_proofs_swap() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let proofs = wallet - .mint(&mint_quote.id, SplitTarget::None, None) + let proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id; @@ -1267,15 +1250,13 @@ async fn test_fake_mint_duplicate_proofs_melt() { let mint_quote = wallet.mint_quote(100.into(), None).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); + let mut proof_streams = wallet.proof_stream(mint_quote.clone(), SplitTarget::default(), None); - let proofs = wallet - .mint(&mint_quote.id, SplitTarget::None, None) + let proofs = proof_streams + .next() .await - .unwrap(); + .expect("payment") + .expect("no error"); let inputs = vec![proofs[0].clone(), proofs[0].clone()]; diff --git a/crates/cdk-integration-tests/tests/happy_path_mint_wallet.rs b/crates/cdk-integration-tests/tests/happy_path_mint_wallet.rs index 1372c09d..0db6cf25 100644 --- a/crates/cdk-integration-tests/tests/happy_path_mint_wallet.rs +++ b/crates/cdk-integration-tests/tests/happy_path_mint_wallet.rs @@ -109,15 +109,15 @@ async fn test_happy_mint_melt_round_trip() { .await .unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); - let proofs = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) + .wait_and_mint_quote( + mint_quote.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) .await - .unwrap(); + .expect("payment"); let mint_amount = proofs.total_amount().unwrap(); @@ -231,15 +231,15 @@ async fn test_happy_mint() { .await .unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); - let proofs = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) + .wait_and_mint_quote( + mint_quote.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) .await - .unwrap(); + .expect("payment"); let mint_amount = proofs.total_amount().unwrap(); @@ -279,15 +279,15 @@ async fn test_restore() { .await .unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) + let _proofs = wallet + .wait_and_mint_quote( + mint_quote.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) .await - .unwrap(); - - let _mint_amount = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) - .await - .unwrap(); + .expect("payment"); assert_eq!(wallet.total_balance().await.unwrap(), 100.into()); @@ -359,15 +359,15 @@ async fn test_fake_melt_change_in_quote() { pay_if_regtest(&get_test_temp_dir(), &bolt11).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) + let _proofs = wallet + .wait_and_mint_quote( + mint_quote.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) .await - .unwrap(); - - let _mint_amount = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) - .await - .unwrap(); + .expect("payment"); let invoice = create_invoice_for_env(Some(9)).await.unwrap(); @@ -429,15 +429,15 @@ async fn test_pay_invoice_twice() { .await .unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) - .await - .unwrap(); - let proofs = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) + .wait_and_mint_quote( + mint_quote.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) .await - .unwrap(); + .expect("payment"); let mint_amount = proofs.total_amount().unwrap(); diff --git a/crates/cdk-integration-tests/tests/regtest.rs b/crates/cdk-integration-tests/tests/regtest.rs index 0061423c..fb66ce88 100644 --- a/crates/cdk-integration-tests/tests/regtest.rs +++ b/crates/cdk-integration-tests/tests/regtest.rs @@ -51,15 +51,15 @@ async fn test_internal_payment() { .await .expect("failed to pay invoice"); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) + let _proofs = wallet + .wait_and_mint_quote( + mint_quote.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) .await - .unwrap(); - - let _mint_amount = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) - .await - .unwrap(); + .expect("payment"); assert!(wallet.total_balance().await.unwrap() == 100.into()); @@ -83,15 +83,15 @@ async fn test_internal_payment() { let _melted = wallet.melt(&melt.id).await.unwrap(); - wallet_2 - .wait_for_payment(&mint_quote, Duration::from_secs(60)) + let _proofs = wallet_2 + .wait_and_mint_quote( + mint_quote.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) .await - .unwrap(); - - let _wallet_2_mint = wallet_2 - .mint(&mint_quote.id, SplitTarget::default(), None) - .await - .unwrap(); + .expect("payment"); // let check_paid = match get_mint_port("0") { // 8085 => { @@ -230,28 +230,32 @@ async fn test_multimint_melt() { .pay_invoice(quote.request.clone()) .await .expect("failed to pay invoice"); - wallet1 - .wait_for_payment("e, Duration::from_secs(60)) + + let _proofs = wallet1 + .wait_and_mint_quote( + quote.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) .await - .unwrap(); - wallet1 - .mint("e.id, SplitTarget::default(), None) - .await - .unwrap(); + .expect("payment"); let quote = wallet2.mint_quote(mint_amount, None).await.unwrap(); ln_client .pay_invoice(quote.request.clone()) .await .expect("failed to pay invoice"); - wallet2 - .wait_for_payment("e, Duration::from_secs(60)) + + let _proofs = wallet2 + .wait_and_mint_quote( + quote.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) .await - .unwrap(); - wallet2 - .mint("e.id, SplitTarget::default(), None) - .await - .unwrap(); + .expect("payment"); // Get an invoice let invoice = ln_client.create_invoice(Some(50)).await.unwrap(); @@ -305,10 +309,10 @@ async fn test_cached_mint() { .await .expect("failed to pay invoice"); - wallet - .wait_for_payment("e, Duration::from_secs(60)) + let _proofs = wallet + .wait_for_payment("e, tokio::time::Duration::from_secs(15)) .await - .unwrap(); + .expect("payment"); let active_keyset_id = wallet.fetch_active_keyset().await.unwrap().id; let http_client = HttpClient::new(get_mint_url_from_env().parse().unwrap(), None); diff --git a/crates/cdk-integration-tests/tests/test_fees.rs b/crates/cdk-integration-tests/tests/test_fees.rs index 458c3495..b6fca782 100644 --- a/crates/cdk-integration-tests/tests/test_fees.rs +++ b/crates/cdk-integration-tests/tests/test_fees.rs @@ -1,6 +1,5 @@ use std::str::FromStr; use std::sync::Arc; -use std::time::Duration; use bip39::Mnemonic; use cashu::{Bolt11Invoice, ProofsMethods}; @@ -28,23 +27,15 @@ async fn test_swap() { let invoice = Bolt11Invoice::from_str(&mint_quote.request).unwrap(); pay_if_regtest(&get_temp_dir(), &invoice).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) + let proofs = wallet + .wait_and_mint_quote( + mint_quote.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) .await - .unwrap(); - - let _mint_amount = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) - .await - .unwrap(); - - let proofs: Vec = wallet - .get_unspent_proofs() - .await - .unwrap() - .iter() - .map(|p| p.amount) - .collect(); + .expect("payment"); println!("{:?}", proofs); @@ -96,15 +87,15 @@ async fn test_fake_melt_change_in_quote() { pay_if_regtest(&get_temp_dir(), &bolt11).await.unwrap(); - wallet - .wait_for_payment(&mint_quote, Duration::from_secs(60)) + let _proofs = wallet + .wait_and_mint_quote( + mint_quote.clone(), + SplitTarget::default(), + None, + tokio::time::Duration::from_secs(15), + ) .await - .unwrap(); - - let _mint_amount = wallet - .mint(&mint_quote.id, SplitTarget::default(), None) - .await - .unwrap(); + .expect("payment"); let invoice_amount = 9; diff --git a/crates/cdk/Cargo.toml b/crates/cdk/Cargo.toml index 97d3f45e..77a5de43 100644 --- a/crates/cdk/Cargo.toml +++ b/crates/cdk/Cargo.toml @@ -50,6 +50,7 @@ sync_wrapper = "0.1.2" bech32 = "0.9.1" arc-swap = "1.7.1" zeroize = "1" +tokio-util.workspace = true [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { workspace = true, features = [ @@ -102,6 +103,13 @@ required-features = ["wallet", "auth"] name = "bip353" required-features = ["wallet", "bip353"] +[[example]] +name = "mint-token-bolt12-with-stream" +required-features = ["wallet"] +[[example]] +name = "mint-token-bolt12" +required-features = ["wallet"] + [dev-dependencies] rand.workspace = true cdk-sqlite.workspace = true diff --git a/crates/cdk/examples/mint-token-bolt12-with-stream.rs b/crates/cdk/examples/mint-token-bolt12-with-stream.rs new file mode 100644 index 00000000..2e83f5ff --- /dev/null +++ b/crates/cdk/examples/mint-token-bolt12-with-stream.rs @@ -0,0 +1,72 @@ +use std::sync::Arc; + +use cdk::error::Error; +use cdk::nuts::nut00::ProofsMethods; +use cdk::nuts::CurrencyUnit; +use cdk::wallet::{SendOptions, Wallet}; +use cdk::{Amount, StreamExt}; +use cdk_sqlite::wallet::memory; +use rand::random; +use tracing_subscriber::EnvFilter; + +#[tokio::main] +async fn main() -> Result<(), Error> { + let default_filter = "debug"; + + let sqlx_filter = "sqlx=warn,hyper_util=warn,reqwest=warn,rustls=warn"; + + let env_filter = EnvFilter::new(format!("{},{}", default_filter, sqlx_filter)); + + // Parse input + tracing_subscriber::fmt().with_env_filter(env_filter).init(); + + // Initialize the memory store for the wallet + let localstore = Arc::new(memory::empty().await?); + + // Generate a random seed for the wallet + let seed = random::<[u8; 64]>(); + + // Define the mint URL and currency unit + let mint_url = "https://fake.thesimplekid.dev"; + let unit = CurrencyUnit::Sat; + let amount = Amount::from(10); + + // Create a new wallet + let wallet = Wallet::new(mint_url, unit, localstore, seed, None)?; + + let quotes = vec![ + wallet.mint_bolt12_quote(None, None).await?, + wallet.mint_bolt12_quote(None, None).await?, + wallet.mint_bolt12_quote(None, None).await?, + ]; + + let mut stream = wallet.mints_proof_stream(quotes, Default::default(), None); + + let stop = stream.get_cancel_token(); + + let mut processed = 0; + + while let Some(proofs) = stream.next().await { + let (mint_quote, proofs) = proofs?; + + // Mint the received amount + let receive_amount = proofs.total_amount()?; + println!("Received {} from mint {}", receive_amount, mint_quote.id); + + // Send a token with the specified amount + let prepared_send = wallet.prepare_send(amount, SendOptions::default()).await?; + let token = prepared_send.confirm(None).await?; + println!("Token:"); + println!("{}", token); + + processed += 1; + + if processed == 3 { + stop.cancel() + } + } + + println!("Stopped the loop after {} quotes being minted", processed); + + Ok(()) +} diff --git a/crates/cdk/examples/mint-token-bolt12.rs b/crates/cdk/examples/mint-token-bolt12.rs new file mode 100644 index 00000000..275d73a6 --- /dev/null +++ b/crates/cdk/examples/mint-token-bolt12.rs @@ -0,0 +1,59 @@ +use std::sync::Arc; +use std::time::Duration; + +use cdk::error::Error; +use cdk::nuts::nut00::ProofsMethods; +use cdk::nuts::CurrencyUnit; +use cdk::wallet::{SendOptions, Wallet}; +use cdk::Amount; +use cdk_sqlite::wallet::memory; +use rand::random; +use tracing_subscriber::EnvFilter; + +#[tokio::main] +async fn main() -> Result<(), Error> { + let default_filter = "debug"; + + let sqlx_filter = "sqlx=warn,hyper_util=warn,reqwest=warn,rustls=warn"; + + let env_filter = EnvFilter::new(format!("{},{}", default_filter, sqlx_filter)); + + // Parse input + tracing_subscriber::fmt().with_env_filter(env_filter).init(); + + // Initialize the memory store for the wallet + let localstore = Arc::new(memory::empty().await?); + + // Generate a random seed for the wallet + let seed = random::<[u8; 64]>(); + + // Define the mint URL and currency unit + let mint_url = "https://fake.thesimplekid.dev"; + let unit = CurrencyUnit::Sat; + let amount = Amount::from(10); + + // Create a new wallet + let wallet = Wallet::new(mint_url, unit, localstore, seed, None)?; + + let quote = wallet.mint_bolt12_quote(None, None).await?; + let proofs = wallet + .wait_and_mint_quote( + quote, + Default::default(), + Default::default(), + Duration::from_secs(10), + ) + .await?; + + // Mint the received amount + let receive_amount = proofs.total_amount()?; + println!("Received {} from mint {}", receive_amount, mint_url); + + // Send a token with the specified amount + let prepared_send = wallet.prepare_send(amount, SendOptions::default()).await?; + let token = prepared_send.confirm(None).await?; + println!("Token:"); + println!("{}", token); + + Ok(()) +} diff --git a/crates/cdk/src/lib.rs b/crates/cdk/src/lib.rs index aea22fcc..ae18ae3c 100644 --- a/crates/cdk/src/lib.rs +++ b/crates/cdk/src/lib.rs @@ -64,3 +64,7 @@ pub use self::wallet::HttpClient; /// Result #[doc(hidden)] pub type Result> = std::result::Result; + +/// Re-export futures::Stream +#[cfg(any(feature = "wallet", feature = "mint"))] +pub use futures::{Stream, StreamExt}; diff --git a/crates/cdk/src/mint/subscription/manager.rs b/crates/cdk/src/mint/subscription/manager.rs index 96f484e3..b723974d 100644 --- a/crates/cdk/src/mint/subscription/manager.rs +++ b/crates/cdk/src/mint/subscription/manager.rs @@ -107,8 +107,8 @@ impl PubSubManager { amount_issued: Amount, ) { if let Ok(mut event) = quote.try_into() { - event.amount_paid += amount_paid; - event.amount_issued += amount_issued; + event.amount_paid = amount_paid; + event.amount_issued = amount_issued; self.broadcast(event.into()); } else { diff --git a/crates/cdk/src/wallet/mod.rs b/crates/cdk/src/wallet/mod.rs index 28364189..1bb41c32 100644 --- a/crates/cdk/src/wallet/mod.rs +++ b/crates/cdk/src/wallet/mod.rs @@ -42,11 +42,12 @@ pub mod multi_mint_wallet; mod proofs; mod receive; mod send; +#[cfg(not(target_arch = "wasm32"))] +mod streams; pub mod subscription; mod swap; mod transactions; pub mod util; -mod wait; #[cfg(feature = "auth")] pub use auth::{AuthMintConnector, AuthWallet}; diff --git a/crates/cdk/src/wallet/streams/mod.rs b/crates/cdk/src/wallet/streams/mod.rs new file mode 100644 index 00000000..fda0945e --- /dev/null +++ b/crates/cdk/src/wallet/streams/mod.rs @@ -0,0 +1,122 @@ +//! Wallet waiter APIn +use std::future::Future; +use std::pin::Pin; + +use cdk_common::amount::SplitTarget; +use cdk_common::wallet::{MeltQuote, MintQuote}; +use cdk_common::{PaymentMethod, SpendingConditions}; +use payment::PaymentStream; +use proof::{MultipleMintQuoteProofStream, SingleMintQuoteProofStream}; + +use super::{Wallet, WalletSubscription}; + +pub mod payment; +pub mod proof; +mod wait; + +/// Shared type +type RecvFuture<'a, Ret> = Pin + Send + 'a>>; + +#[allow(private_bounds)] +#[allow(clippy::enum_variant_names)] +enum WaitableEvent { + MeltQuote(Vec), + MintQuote(Vec<(String, PaymentMethod)>), +} + +impl From<&[MeltQuote]> for WaitableEvent { + fn from(events: &[MeltQuote]) -> Self { + WaitableEvent::MeltQuote(events.iter().map(|event| event.id.to_owned()).collect()) + } +} + +impl From<&MeltQuote> for WaitableEvent { + fn from(event: &MeltQuote) -> Self { + WaitableEvent::MeltQuote(vec![event.id.to_owned()]) + } +} + +impl From<&[MintQuote]> for WaitableEvent { + fn from(events: &[MintQuote]) -> Self { + WaitableEvent::MintQuote( + events + .iter() + .map(|event| (event.id.to_owned(), event.payment_method.clone())) + .collect(), + ) + } +} + +impl From<&MintQuote> for WaitableEvent { + fn from(event: &MintQuote) -> Self { + WaitableEvent::MintQuote(vec![(event.id.to_owned(), event.payment_method.clone())]) + } +} + +impl WaitableEvent { + fn into_subscription(self) -> Vec { + match self { + WaitableEvent::MeltQuote(quotes) => { + vec![WalletSubscription::Bolt11MeltQuoteState(quotes)] + } + WaitableEvent::MintQuote(quotes) => { + let (bolt11, bolt12) = quotes.into_iter().fold( + (Vec::new(), Vec::new()), + |mut acc, (quote_id, payment_method)| { + match payment_method { + PaymentMethod::Bolt11 => acc.0.push(quote_id), + PaymentMethod::Bolt12 => acc.1.push(quote_id), + PaymentMethod::Custom(_) => acc.0.push(quote_id), + } + acc + }, + ); + + let mut subscriptions = Vec::new(); + + if !bolt11.is_empty() { + subscriptions.push(WalletSubscription::Bolt11MintQuoteState(bolt11)); + } + + if !bolt12.is_empty() { + subscriptions.push(WalletSubscription::Bolt12MintQuoteState(bolt12)); + } + + subscriptions + } + } + } +} + +impl Wallet { + /// Streams all proofs from a single mint quote + #[inline(always)] + pub fn proof_stream( + &self, + quote: MintQuote, + amount_split_target: SplitTarget, + spending_conditions: Option, + ) -> SingleMintQuoteProofStream<'_> { + SingleMintQuoteProofStream::new(self, quote, amount_split_target, spending_conditions) + } + + /// Streams all new proofs for a set of mints + #[inline(always)] + pub fn mints_proof_stream( + &self, + quotes: Vec, + amount_split_target: SplitTarget, + spending_conditions: Option, + ) -> MultipleMintQuoteProofStream<'_> { + MultipleMintQuoteProofStream::new(self, quotes, amount_split_target, spending_conditions) + } + + /// Returns a BoxFuture that will wait for payment on the given event with a timeout check + #[allow(private_bounds)] + pub fn payment_stream(&self, events: T) -> PaymentStream<'_> + where + T: Into, + { + PaymentStream::new(self, events.into().into_subscription()) + } +} diff --git a/crates/cdk/src/wallet/streams/payment.rs b/crates/cdk/src/wallet/streams/payment.rs new file mode 100644 index 00000000..8139570a --- /dev/null +++ b/crates/cdk/src/wallet/streams/payment.rs @@ -0,0 +1,205 @@ +//! Payment Stream +//! +//! This future Stream will wait events for a Mint Quote be paid. If it is for a Bolt12 it will not stop +//! but it will eventually error on a Timeout. +//! +//! Bolt11 will emit a single event. +use std::task::Poll; + +use cdk_common::{Amount, Error, MeltQuoteState, MintQuoteState, NotificationPayload}; +use futures::future::join_all; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, Stream, StreamExt}; +use tokio_util::sync::CancellationToken; + +use super::RecvFuture; +use crate::wallet::subscription::ActiveSubscription; +use crate::{Wallet, WalletSubscription}; + +type SubscribeReceived = (Option>, Vec); +type PaymentValue = (String, Option); + +/// PaymentWaiter +pub struct PaymentStream<'a> { + wallet: Option<(&'a Wallet, Vec)>, + is_finalized: bool, + active_subscription: Option>, + + cancel_token: CancellationToken, + + // Future events + subscriber_future: Option>>, + subscription_receiver_future: Option>, + cancellation_future: Option>, +} + +impl<'a> PaymentStream<'a> { + /// Creates a new instance of the + pub fn new(wallet: &'a Wallet, filters: Vec) -> Self { + Self { + wallet: Some((wallet, filters)), + is_finalized: false, + active_subscription: None, + cancel_token: Default::default(), + subscriber_future: None, + subscription_receiver_future: None, + cancellation_future: None, + } + } + + /// Get cancellation token + pub fn get_cancel_token(&self) -> CancellationToken { + self.cancel_token.clone() + } + + /// Creating a wallet subscription is an async event, this may change in the future, but for now, + /// creating a new Subscription should be polled, as any other async event. This function will + /// return None if the subscription is already active, Some(()) otherwise + fn poll_init_subscription(&mut self, cx: &mut std::task::Context<'_>) -> Option<()> { + if let Some((wallet, filters)) = self.wallet.take() { + self.subscriber_future = Some(Box::pin(async move { + join_all(filters.into_iter().map(|w| wallet.subscribe(w))).await + })); + } + + let mut subscriber_future = self.subscriber_future.take()?; + + match subscriber_future.poll_unpin(cx) { + Poll::Pending => { + self.subscriber_future = Some(subscriber_future); + Some(()) + } + Poll::Ready(active_subscription) => { + self.active_subscription = Some(active_subscription); + None + } + } + } + + /// Checks if the stream has been externally cancelled + fn poll_cancel(&mut self, cx: &mut std::task::Context<'_>) -> bool { + let mut cancellation_future = self.cancellation_future.take().unwrap_or_else(|| { + let cancel_token = self.cancel_token.clone(); + Box::pin(async move { cancel_token.cancelled().await }) + }); + + if cancellation_future.poll_unpin(cx).is_ready() { + self.subscription_receiver_future = None; + true + } else { + self.cancellation_future = Some(cancellation_future); + false + } + } + + /// Polls the subscription for any new event + fn poll_event( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> Poll>> { + let (subscription_receiver_future, active_subscription) = ( + self.subscription_receiver_future.take(), + self.active_subscription.take(), + ); + + if subscription_receiver_future.is_none() && active_subscription.is_none() { + // Unexpected state, we should have an in-flight future or the active_subscription to + // create the future to read an event + return Poll::Ready(Some(Err(Error::Internal))); + } + + let mut receiver = subscription_receiver_future.unwrap_or_else(|| { + let mut subscription_receiver = + active_subscription.expect("active subscription object"); + + Box::pin(async move { + let mut futures: FuturesUnordered<_> = subscription_receiver + .iter_mut() + .map(|sub| sub.recv()) + .collect(); + + if let Some(Some(winner)) = futures.next().await { + drop(futures); + return (Some(winner), subscription_receiver); + } + + drop(futures); + (None, subscription_receiver) + }) + }); + + match receiver.poll_unpin(cx) { + Poll::Pending => { + self.subscription_receiver_future = Some(receiver); + Poll::Pending + } + Poll::Ready((notification, subscription)) => { + tracing::debug!("Receive payment notification {:?}", notification); + // This future is now fulfilled, put the active_subscription again back to object. Next time next().await is called, + // the future will be created in subscription_receiver_future. + self.active_subscription = Some(subscription); + self.cancellation_future = None; // resets timeout + match notification { + None => { + self.is_finalized = true; + Poll::Ready(None) + } + Some(info) => { + match info { + NotificationPayload::MintQuoteBolt11Response(info) => { + if info.state == MintQuoteState::Paid { + self.is_finalized = true; + return Poll::Ready(Some(Ok((info.quote, None)))); + } + } + NotificationPayload::MintQuoteBolt12Response(info) => { + let to_be_issued = info.amount_paid - info.amount_issued; + if to_be_issued > Amount::ZERO { + return Poll::Ready(Some(Ok((info.quote, Some(to_be_issued))))); + } + } + NotificationPayload::MeltQuoteBolt11Response(info) => { + if info.state == MeltQuoteState::Paid { + self.is_finalized = true; + return Poll::Ready(Some(Ok((info.quote, None)))); + } + } + _ => {} + } + + // We got an event but it is not what was expected, we need to call `recv` + // again, and to copy-paste this is a recursive call that should be resolved + // to a Poll::Pending *but* will trigger the future execution + self.poll_event(cx) + } + } + } + } + } +} + +impl Stream for PaymentStream<'_> { + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + + if this.is_finalized { + // end of stream + return Poll::Ready(None); + } + + if this.poll_cancel(cx) { + return Poll::Ready(None); + } + + if this.poll_init_subscription(cx).is_some() { + return Poll::Pending; + } + + this.poll_event(cx) + } +} diff --git a/crates/cdk/src/wallet/streams/proof.rs b/crates/cdk/src/wallet/streams/proof.rs new file mode 100644 index 00000000..a9547228 --- /dev/null +++ b/crates/cdk/src/wallet/streams/proof.rs @@ -0,0 +1,185 @@ +//! Mint Stream +//! +//! This will mint after a mint quote has been paid. If the quote is for a Bolt12 it will keep minting until a timeout is reached. +//! +//! Bolt11 will mint once + +use std::collections::HashMap; +use std::task::Poll; + +use cdk_common::amount::SplitTarget; +use cdk_common::wallet::MintQuote; +use cdk_common::{Error, PaymentMethod, Proofs, SpendingConditions}; +use futures::{FutureExt, Stream, StreamExt}; +use tokio_util::sync::CancellationToken; + +use super::payment::PaymentStream; +use super::{RecvFuture, WaitableEvent}; +use crate::Wallet; + +/// Proofs for many mint quotes, as they are minted, in streams +pub struct MultipleMintQuoteProofStream<'a> { + payment_stream: PaymentStream<'a>, + wallet: &'a Wallet, + quotes: HashMap, + amount_split_target: SplitTarget, + spending_conditions: Option, + minting_future: Option>>, +} + +impl<'a> MultipleMintQuoteProofStream<'a> { + /// Create a new Stream + pub fn new( + wallet: &'a Wallet, + quotes: Vec, + amount_split_target: SplitTarget, + spending_conditions: Option, + ) -> Self { + let filter: WaitableEvent = quotes.as_slice().into(); + + Self { + payment_stream: PaymentStream::new(wallet, filter.into_subscription()), + wallet, + amount_split_target, + spending_conditions, + quotes: quotes + .into_iter() + .map(|mint_quote| (mint_quote.id.clone(), mint_quote)) + .collect(), + minting_future: None, + } + } + + /// Get cancellation token + pub fn get_cancel_token(&self) -> CancellationToken { + self.payment_stream.get_cancel_token() + } +} + +impl Stream for MultipleMintQuoteProofStream<'_> { + type Item = Result<(MintQuote, Proofs), Error>; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + + if let Some(mut minting_future) = this.minting_future.take() { + return match minting_future.poll_unpin(cx) { + Poll::Pending => { + this.minting_future = Some(minting_future); + Poll::Pending + } + Poll::Ready(proofs) => Poll::Ready(Some(proofs)), + }; + } + + match this.payment_stream.poll_next_unpin(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(result) => match result { + None => Poll::Ready(None), + Some(result) => { + let (quote_id, amount) = match result { + Err(err) => { + tracing::error!( + "Error while waiting for payment for {:?}", + this.quotes.keys().collect::>() + ); + return Poll::Ready(Some(Err(err))); + } + Ok(amount) => amount, + }; + + let mint_quote = if let Some(quote) = this.quotes.get("e_id) { + quote.clone() + } else { + tracing::error!("Cannot find mint_quote {} internally", quote_id); + return Poll::Ready(Some(Err(Error::UnknownQuote))); + }; + + let amount_split_target = this.amount_split_target.clone(); + let spending_conditions = this.spending_conditions.clone(); + let wallet = this.wallet; + + tracing::debug!( + "Received payment ({:?}) notification for {}. Minting...", + amount, + mint_quote.id + ); + + let mut minting_future = Box::pin(async move { + match mint_quote.payment_method { + PaymentMethod::Bolt11 => wallet + .mint(&mint_quote.id, amount_split_target, spending_conditions) + .await + .map(|proofs| (mint_quote, proofs)), + PaymentMethod::Bolt12 => wallet + .mint_bolt12( + &mint_quote.id, + amount, + amount_split_target, + spending_conditions, + ) + .await + .map(|proofs| (mint_quote, proofs)), + _ => Err(Error::UnsupportedPaymentMethod), + } + }); + + match minting_future.poll_unpin(cx) { + Poll::Pending => { + this.minting_future = Some(minting_future); + Poll::Pending + } + Poll::Ready(result) => Poll::Ready(Some(result)), + } + } + }, + } + } +} + +/// Proofs for a single mint quote +pub struct SingleMintQuoteProofStream<'a>(MultipleMintQuoteProofStream<'a>); + +impl<'a> SingleMintQuoteProofStream<'a> { + /// Create a new Stream + pub fn new( + wallet: &'a Wallet, + quote: MintQuote, + amount_split_target: SplitTarget, + spending_conditions: Option, + ) -> Self { + Self(MultipleMintQuoteProofStream::new( + wallet, + vec![quote], + amount_split_target, + spending_conditions, + )) + } + + /// Get cancellation token + pub fn get_cancel_token(&self) -> CancellationToken { + self.0.payment_stream.get_cancel_token() + } +} + +impl Stream for SingleMintQuoteProofStream<'_> { + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + match this.0.poll_next_unpin(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(result) => match result { + None => Poll::Ready(None), + Some(Err(err)) => Poll::Ready(Some(Err(err))), + Some(Ok((_, proofs))) => Poll::Ready(Some(Ok(proofs))), + }, + } + } +} diff --git a/crates/cdk/src/wallet/streams/wait.rs b/crates/cdk/src/wallet/streams/wait.rs new file mode 100644 index 00000000..61f7e332 --- /dev/null +++ b/crates/cdk/src/wallet/streams/wait.rs @@ -0,0 +1,50 @@ +use cdk_common::amount::SplitTarget; +use cdk_common::wallet::MintQuote; +use cdk_common::{Amount, Error, Proofs, SpendingConditions}; +use futures::future::BoxFuture; +use futures::StreamExt; +use tokio::time::{timeout, Duration}; + +use super::Wallet; + +impl Wallet { + #[inline(always)] + /// Mints a mint quote once it is paid + pub async fn wait_and_mint_quote( + &self, + quote: MintQuote, + amount_split_target: SplitTarget, + spending_conditions: Option, + timeout_duration: Duration, + ) -> Result { + let mut stream = self.proof_stream(quote, amount_split_target, spending_conditions); + + timeout(timeout_duration, async move { + stream.next().await.ok_or(Error::Internal)? + }) + .await + .map_err(|_| Error::Timeout)? + } + + /// Returns a BoxFuture that will wait for payment on the given event with a timeout check + #[allow(private_bounds)] + pub fn wait_for_payment( + &self, + event: &MintQuote, + timeout_duration: Duration, + ) -> BoxFuture<'_, Result, Error>> { + let mut stream = self.payment_stream(event); + + Box::pin(async move { + timeout(timeout_duration, async { + stream + .next() + .await + .ok_or(Error::Internal)? + .map(|(_quote, amount)| amount) + }) + .await + .map_err(|_| Error::Timeout)? + }) + } +} diff --git a/crates/cdk/src/wallet/wait.rs b/crates/cdk/src/wallet/wait.rs deleted file mode 100644 index f5d04173..00000000 --- a/crates/cdk/src/wallet/wait.rs +++ /dev/null @@ -1,119 +0,0 @@ -use cdk_common::amount::SplitTarget; -use cdk_common::wallet::{MeltQuote, MintQuote}; -use cdk_common::{ - Amount, Error, MeltQuoteState, MintQuoteState, NotificationPayload, PaymentMethod, Proofs, - SpendingConditions, -}; -use futures::future::BoxFuture; -use tokio::time::{timeout, Duration}; - -use super::{Wallet, WalletSubscription}; - -#[allow(private_bounds)] -#[allow(clippy::enum_variant_names)] -enum WaitableEvent { - MeltQuote(String), - MintQuote(String), - Bolt12MintQuote(String), -} - -impl From<&MeltQuote> for WaitableEvent { - fn from(event: &MeltQuote) -> Self { - WaitableEvent::MeltQuote(event.id.to_owned()) - } -} - -impl From<&MintQuote> for WaitableEvent { - fn from(event: &MintQuote) -> Self { - match event.payment_method { - PaymentMethod::Bolt11 => WaitableEvent::MintQuote(event.id.to_owned()), - PaymentMethod::Bolt12 => WaitableEvent::Bolt12MintQuote(event.id.to_owned()), - PaymentMethod::Custom(_) => WaitableEvent::MintQuote(event.id.to_owned()), - } - } -} - -impl From for WalletSubscription { - fn from(val: WaitableEvent) -> Self { - match val { - WaitableEvent::MeltQuote(quote_id) => { - WalletSubscription::Bolt11MeltQuoteState(vec![quote_id]) - } - WaitableEvent::MintQuote(quote_id) => { - WalletSubscription::Bolt11MintQuoteState(vec![quote_id]) - } - WaitableEvent::Bolt12MintQuote(quote_id) => { - WalletSubscription::Bolt12MintQuoteState(vec![quote_id]) - } - } - } -} - -impl Wallet { - #[inline(always)] - /// Mints a mint quote once it is paid - pub async fn wait_and_mint_quote( - &self, - quote: MintQuote, - amount_split_target: SplitTarget, - spending_conditions: Option, - timeout_duration: Duration, - ) -> Result { - let amount = self.wait_for_payment("e, timeout_duration).await?; - - tracing::debug!("Received payment notification for {}. Minting...", quote.id); - - match quote.payment_method { - PaymentMethod::Bolt11 => { - self.mint("e.id, amount_split_target, spending_conditions) - .await - } - PaymentMethod::Bolt12 => { - self.mint_bolt12("e.id, amount, amount_split_target, spending_conditions) - .await - } - _ => Err(Error::UnsupportedPaymentMethod), - } - } - - /// Returns a BoxFuture that will wait for payment on the given event with a timeout check - #[allow(private_bounds)] - pub fn wait_for_payment( - &self, - event: T, - timeout_duration: Duration, - ) -> BoxFuture<'_, Result, Error>> - where - T: Into, - { - let subs = self.subscribe::(event.into().into()); - - Box::pin(async move { - timeout(timeout_duration, async { - let mut subscription = subs.await; - loop { - match subscription.recv().await.ok_or(Error::Internal)? { - NotificationPayload::MintQuoteBolt11Response(info) => { - if info.state == MintQuoteState::Paid { - return Ok(None); - } - } - NotificationPayload::MintQuoteBolt12Response(info) => { - if info.amount_paid - info.amount_issued > Amount::ZERO { - return Ok(Some(info.amount_paid - info.amount_issued)); - } - } - NotificationPayload::MeltQuoteBolt11Response(info) => { - if info.state == MeltQuoteState::Paid { - return Ok(None); - } - } - _ => {} - } - } - }) - .await - .map_err(|_| Error::Timeout)? - }) - } -}