diff --git a/crates/cdk-phoenixd/Cargo.toml b/crates/cdk-phoenixd/Cargo.toml index e39ca5fa..cfeb98b8 100644 --- a/crates/cdk-phoenixd/Cargo.toml +++ b/crates/cdk-phoenixd/Cargo.toml @@ -17,6 +17,7 @@ bitcoin = { version = "0.32.2", default-features = false } cdk = { path = "../cdk", version = "0.4.0", default-features = false, features = ["mint"] } futures = { version = "0.3.28", default-features = false } tokio = { version = "1", default-features = false } +tokio-util = { version = "0.7.11", default-features = false } tracing = { version = "0.1", default-features = false, features = ["attributes", "log"] } thiserror = "1" # phoenixd-rs = "0.3.0" diff --git a/crates/cdk-phoenixd/src/lib.rs b/crates/cdk-phoenixd/src/lib.rs index f73efcea..22f9cda6 100644 --- a/crates/cdk-phoenixd/src/lib.rs +++ b/crates/cdk-phoenixd/src/lib.rs @@ -4,6 +4,7 @@ #![warn(rustdoc::bare_urls)] use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use anyhow::anyhow; @@ -24,6 +25,7 @@ use futures::{Stream, StreamExt}; use phoenixd_rs::webhooks::WebhookResponse; use phoenixd_rs::{InvoiceRequest, Phoenixd as PhoenixdApi}; use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; pub mod error; @@ -36,6 +38,8 @@ pub struct Phoenixd { fee_reserve: FeeReserve, receiver: Arc>>>, webhook_url: String, + wait_invoice_cancel_token: CancellationToken, + wait_invoice_is_active: Arc, } impl Phoenixd { @@ -57,6 +61,8 @@ impl Phoenixd { fee_reserve, receiver, webhook_url, + wait_invoice_cancel_token: CancellationToken::new(), + wait_invoice_is_active: Arc::new(AtomicBool::new(false)), }) } @@ -87,11 +93,11 @@ impl MintLightning for Phoenixd { } fn is_wait_invoice_active(&self) -> bool { - todo!() + self.wait_invoice_is_active.load(Ordering::SeqCst) } fn cancel_wait_invoice(&self) { - todo!() + self.wait_invoice_cancel_token.cancel() } async fn wait_any_invoice( @@ -106,29 +112,55 @@ impl MintLightning for Phoenixd { let phoenixd_api = self.phoenixd_api.clone(); - Ok(futures::stream::unfold( - (receiver, phoenixd_api), - |(mut receiver, phoenixd_api)| async move { - match receiver.recv().await { - Some(msg) => { - let check = phoenixd_api.get_incoming_invoice(&msg.payment_hash).await; + let cancel_token = self.wait_invoice_cancel_token.clone(); - match check { - Ok(state) => { - if state.is_paid { - Some((msg.payment_hash, (receiver, phoenixd_api))) - } else { + Ok(futures::stream::unfold( + (receiver, phoenixd_api, cancel_token, + Arc::clone(&self.wait_invoice_is_active), + ), + |(mut receiver, phoenixd_api, cancel_token, is_active)| async move { + + is_active.store(true, Ordering::SeqCst); + tokio::select! { + _ = cancel_token.cancelled() => { + // Stream is cancelled + is_active.store(false, Ordering::SeqCst); + tracing::info!("Waiting for phonixd invoice ending"); + return None; + } + msg_option = receiver.recv() => { + match msg_option { + Some(msg) => { + let check = phoenixd_api.get_incoming_invoice(&msg.payment_hash).await; + + match check { + Ok(state) => { + if state.is_paid { + // Yield the payment hash and continue the stream + Some((msg.payment_hash, (receiver, phoenixd_api, cancel_token, is_active))) + } else { + // Invoice not paid yet, continue waiting + // We need to continue the stream, so we return the same state + None + } + } + Err(e) => { + // Log the error and continue + tracing::warn!("Error checking invoice state: {:?}", e); None } } - _ => None, + } + None => { + // The receiver stream has ended + None } } - None => None, } - }, - ) - .boxed()) + } + }, + ) + .boxed()) } async fn get_payment_quote(