From 7a71a37eab51d2ee8d6bee6d6637050f7851ec03 Mon Sep 17 00:00:00 2001 From: thesimplekid Date: Sun, 31 Aug 2025 17:26:50 +0100 Subject: [PATCH] refactor(payment): replace wait_any_incoming_payment with event-based system (#1019) Rename wait_any_incoming_payment to wait_payment_event and change return type from WaitPaymentResponse stream to Event stream. This introduces a new Event enum that wraps payment responses, making the system more extensible for future event types. - Add Event enum with PaymentReceived variant - Update MintPayment trait method signature - Refactor all payment backend implementations (LND, CLN, LNBits, fake wallet) - Update mint and payment processor to handle new event stream forma --- CHANGELOG.md | 8 ++++++ crates/cdk-cln/src/lib.rs | 9 ++++--- crates/cdk-common/src/payment.rs | 11 ++++++-- crates/cdk-fake-wallet/src/lib.rs | 19 ++++++++------ crates/cdk-ldk-node/src/lib.rs | 8 +++--- crates/cdk-lnbits/src/lib.rs | 8 +++--- crates/cdk-lnd/src/lib.rs | 9 ++++--- .../cdk-payment-processor/src/proto/client.rs | 8 +++--- .../cdk-payment-processor/src/proto/server.rs | 26 +++++++++++-------- crates/cdk/src/mint/mod.rs | 20 ++++++++------ 10 files changed, 78 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 74ce7a94..11809dcc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,14 @@ ## [Unreleased] +### Added +- cdk-common: New `Event` enum for payment event handling with `PaymentReceived` variant ([thesimplekid]). + +### Changed +- cdk-common: Refactored `MintPayment` trait method `wait_any_incoming_payment` to `wait_payment_event` with event-driven architecture ([thesimplekid]). +- cdk-common: Updated `wait_payment_event` return type to stream `Event` enum instead of `WaitPaymentResponse` directly ([thesimplekid]). +- cdk: Updated mint payment handling to process payment events through new `Event` enum pattern ([thesimplekid]). + ## [0.12.0](https://github.com/cashubtc/cdk/releases/tag/v0.12.0) ### Summary diff --git a/crates/cdk-cln/src/lib.rs b/crates/cdk-cln/src/lib.rs index 66e142bd..0bd797d4 100644 --- a/crates/cdk-cln/src/lib.rs +++ b/crates/cdk-cln/src/lib.rs @@ -19,7 +19,7 @@ use cdk_common::common::FeeReserve; use cdk_common::nuts::{CurrencyUnit, MeltOptions, MeltQuoteState}; use cdk_common::payment::{ self, Bolt11IncomingPaymentOptions, Bolt11Settings, Bolt12IncomingPaymentOptions, - CreateIncomingPaymentResponse, IncomingPaymentOptions, MakePaymentResponse, MintPayment, + CreateIncomingPaymentResponse, Event, IncomingPaymentOptions, MakePaymentResponse, MintPayment, OutgoingPaymentOptions, PaymentIdentifier, PaymentQuoteResponse, WaitPaymentResponse, }; use cdk_common::util::{hex, unix_time}; @@ -89,9 +89,9 @@ impl MintPayment for Cln { } #[instrument(skip_all)] - async fn wait_any_incoming_payment( + async fn wait_payment_event( &self, - ) -> Result + Send>>, Self::Err> { + ) -> Result + Send>>, Self::Err> { tracing::info!( "CLN: Starting wait_any_incoming_payment with socket: {:?}", self.rpc_socket @@ -243,8 +243,9 @@ impl MintPayment for Cln { payment_id: payment_hash.to_string() }; tracing::info!("CLN: Created WaitPaymentResponse with amount {} msats", amount_msats.msat()); + let event = Event::PaymentReceived(response); - break Some((response, (cln_client, last_pay_idx, cancel_token, is_active))); + break Some((event, (cln_client, last_pay_idx, cancel_token, is_active))); } Err(e) => { tracing::warn!("CLN: Error fetching invoice: {e}"); diff --git a/crates/cdk-common/src/payment.rs b/crates/cdk-common/src/payment.rs index afd07ee2..1bcabfcd 100644 --- a/crates/cdk-common/src/payment.rs +++ b/crates/cdk-common/src/payment.rs @@ -295,9 +295,9 @@ pub trait MintPayment { /// Listen for invoices to be paid to the mint /// Returns a stream of request_lookup_id once invoices are paid - async fn wait_any_incoming_payment( + async fn wait_payment_event( &self, - ) -> Result + Send>>, Self::Err>; + ) -> Result + Send>>, Self::Err>; /// Is wait invoice active fn is_wait_invoice_active(&self) -> bool; @@ -318,6 +318,13 @@ pub trait MintPayment { ) -> Result; } +/// An event emitted which should be handled by the mint +#[derive(Debug, Clone, Hash)] +pub enum Event { + /// A payment has been received. + PaymentReceived(WaitPaymentResponse), +} + /// Wait any invoice response #[derive(Debug, Clone, Hash, Serialize, Deserialize)] pub struct WaitPaymentResponse { diff --git a/crates/cdk-fake-wallet/src/lib.rs b/crates/cdk-fake-wallet/src/lib.rs index 13a5fee9..a5b0f344 100644 --- a/crates/cdk-fake-wallet/src/lib.rs +++ b/crates/cdk-fake-wallet/src/lib.rs @@ -27,7 +27,7 @@ use cdk_common::common::FeeReserve; use cdk_common::ensure_cdk; use cdk_common::nuts::{CurrencyUnit, MeltOptions, MeltQuoteState}; use cdk_common::payment::{ - self, Bolt11Settings, CreateIncomingPaymentResponse, IncomingPaymentOptions, + self, Bolt11Settings, CreateIncomingPaymentResponse, Event, IncomingPaymentOptions, MakePaymentResponse, MintPayment, OutgoingPaymentOptions, PaymentIdentifier, PaymentQuoteResponse, WaitPaymentResponse, }; @@ -295,9 +295,9 @@ impl MintPayment for FakeWallet { } #[instrument(skip_all)] - async fn wait_any_incoming_payment( + async fn wait_payment_event( &self, - ) -> Result + Send>>, Self::Err> { + ) -> Result + Send>>, Self::Err> { tracing::info!("Starting stream for fake invoices"); let receiver = self .receiver @@ -309,11 +309,14 @@ impl MintPayment for FakeWallet { let unit = self.unit.clone(); let receiver_stream = ReceiverStream::new(receiver); Ok(Box::pin(receiver_stream.map( - move |(request_lookup_id, payment_amount, payment_id)| WaitPaymentResponse { - payment_identifier: request_lookup_id.clone(), - payment_amount, - unit: unit.clone(), - payment_id, + move |(request_lookup_id, payment_amount, payment_id)| { + let wait_response = WaitPaymentResponse { + payment_identifier: request_lookup_id.clone(), + payment_amount, + unit: unit.clone(), + payment_id, + }; + Event::PaymentReceived(wait_response) }, ))) } diff --git a/crates/cdk-ldk-node/src/lib.rs b/crates/cdk-ldk-node/src/lib.rs index dde3b5ed..7f1e7805 100644 --- a/crates/cdk-ldk-node/src/lib.rs +++ b/crates/cdk-ldk-node/src/lib.rs @@ -823,9 +823,9 @@ impl MintPayment for CdkLdkNode { /// Listen for invoices to be paid to the mint /// Returns a stream of request_lookup_id once invoices are paid #[instrument(skip(self))] - async fn wait_any_incoming_payment( + async fn wait_payment_event( &self, - ) -> Result + Send>>, Self::Err> { + ) -> Result + Send>>, Self::Err> { tracing::info!("Starting stream for invoices - wait_any_incoming_payment called"); // Set active flag to indicate stream is active @@ -839,10 +839,10 @@ impl MintPayment for CdkLdkNode { // Transform the String stream into a WaitPaymentResponse stream let response_stream = BroadcastStream::new(receiver.resubscribe()); - // Map the stream to handle BroadcastStreamRecvError + // Map the stream to handle BroadcastStreamRecvError and wrap in Event let response_stream = response_stream.filter_map(|result| async move { match result { - Ok(payment) => Some(payment), + Ok(payment) => Some(cdk_common::payment::Event::PaymentReceived(payment)), Err(err) => { tracing::warn!("Error in broadcast stream: {}", err); None diff --git a/crates/cdk-lnbits/src/lib.rs b/crates/cdk-lnbits/src/lib.rs index bda71b2a..2f5f84bb 100644 --- a/crates/cdk-lnbits/src/lib.rs +++ b/crates/cdk-lnbits/src/lib.rs @@ -15,7 +15,7 @@ use cdk_common::amount::{to_unit, Amount, MSAT_IN_SAT}; use cdk_common::common::FeeReserve; use cdk_common::nuts::{CurrencyUnit, MeltOptions, MeltQuoteState}; use cdk_common::payment::{ - self, Bolt11Settings, CreateIncomingPaymentResponse, IncomingPaymentOptions, + self, Bolt11Settings, CreateIncomingPaymentResponse, Event, IncomingPaymentOptions, MakePaymentResponse, MintPayment, OutgoingPaymentOptions, PaymentIdentifier, PaymentQuoteResponse, WaitPaymentResponse, }; @@ -155,9 +155,9 @@ impl MintPayment for LNbits { self.wait_invoice_cancel_token.cancel() } - async fn wait_any_incoming_payment( + async fn wait_payment_event( &self, - ) -> Result + Send>>, Self::Err> { + ) -> Result + Send>>, Self::Err> { let api = self.lnbits_api.clone(); let cancel_token = self.wait_invoice_cancel_token.clone(); let is_active = Arc::clone(&self.wait_invoice_is_active); @@ -179,7 +179,7 @@ impl MintPayment for LNbits { msg_option = receiver.recv() => { Self::process_message(msg_option, &api, &is_active) .await - .map(|response| (response, (api, cancel_token, is_active))) + .map(|response| (Event::PaymentReceived(response), (api, cancel_token, is_active))) } } }, diff --git a/crates/cdk-lnd/src/lib.rs b/crates/cdk-lnd/src/lib.rs index 0dfe7ca0..fc6e49df 100644 --- a/crates/cdk-lnd/src/lib.rs +++ b/crates/cdk-lnd/src/lib.rs @@ -20,7 +20,7 @@ use cdk_common::bitcoin::hashes::Hash; use cdk_common::common::FeeReserve; use cdk_common::nuts::{CurrencyUnit, MeltOptions, MeltQuoteState}; use cdk_common::payment::{ - self, Bolt11Settings, CreateIncomingPaymentResponse, IncomingPaymentOptions, + self, Bolt11Settings, CreateIncomingPaymentResponse, Event, IncomingPaymentOptions, MakePaymentResponse, MintPayment, OutgoingPaymentOptions, PaymentIdentifier, PaymentQuoteResponse, WaitPaymentResponse, }; @@ -137,9 +137,9 @@ impl MintPayment for Lnd { } #[instrument(skip_all)] - async fn wait_any_incoming_payment( + async fn wait_payment_event( &self, - ) -> Result + Send>>, Self::Err> { + ) -> Result + Send>>, Self::Err> { let mut lnd_client = self.lnd_client.clone(); let stream_req = lnrpc::InvoiceSubscription { @@ -195,7 +195,8 @@ impl MintPayment for Lnd { }; tracing::info!("LND: Created WaitPaymentResponse with amount {} msat", msg.amt_paid_msat); - Some((wait_response, (stream, cancel_token, is_active))) + let event = Event::PaymentReceived(wait_response); + Some((event, (stream, cancel_token, is_active))) } else { None } } else { None diff --git a/crates/cdk-payment-processor/src/proto/client.rs b/crates/cdk-payment-processor/src/proto/client.rs index cad434d1..d88876af 100644 --- a/crates/cdk-payment-processor/src/proto/client.rs +++ b/crates/cdk-payment-processor/src/proto/client.rs @@ -263,9 +263,9 @@ impl MintPayment for PaymentProcessorClient { } #[instrument(skip_all)] - async fn wait_any_incoming_payment( + async fn wait_payment_event( &self, - ) -> Result + Send>>, Self::Err> { + ) -> Result + Send>>, Self::Err> { self.wait_incoming_payment_stream_is_active .store(true, Ordering::SeqCst); tracing::debug!("Client waiting for payment"); @@ -288,7 +288,9 @@ impl MintPayment for PaymentProcessorClient { .filter_map(|item| async { match item { Ok(value) => match value.try_into() { - Ok(payment_response) => Some(payment_response), + Ok(payment_response) => Some(cdk_common::payment::Event::PaymentReceived( + payment_response, + )), Err(e) => { tracing::error!("Error converting payment response: {}", e); None diff --git a/crates/cdk-payment-processor/src/proto/server.rs b/crates/cdk-payment-processor/src/proto/server.rs index 9085c3fd..81231a8e 100644 --- a/crates/cdk-payment-processor/src/proto/server.rs +++ b/crates/cdk-payment-processor/src/proto/server.rs @@ -401,19 +401,23 @@ impl CdkPaymentProcessor for PaymentProcessorServer { ln.cancel_wait_invoice(); break; } - result = ln.wait_any_incoming_payment() => { + result = ln.wait_payment_event() => { match result { Ok(mut stream) => { - while let Some(payment_response) = stream.next().await { - match tx.send(Result::<_, Status>::Ok(payment_response.into())) - .await - { - Ok(_) => { - // Response was queued to be sent to client - } - Err(item) => { - tracing::error!("Error adding incoming payment to stream: {}", item); - break; + while let Some(event) = stream.next().await { + match event { + cdk_common::payment::Event::PaymentReceived(payment_response) => { + match tx.send(Result::<_, Status>::Ok(payment_response.into())) + .await + { + Ok(_) => { + // Response was queued to be sent to client + } + Err(item) => { + tracing::error!("Error adding incoming payment to stream: {}", item); + break; + } + } } } } diff --git a/crates/cdk/src/mint/mod.rs b/crates/cdk/src/mint/mod.rs index 0c08e7d3..a4b5c1d4 100644 --- a/crates/cdk/src/mint/mod.rs +++ b/crates/cdk/src/mint/mod.rs @@ -528,16 +528,20 @@ impl Mint { processor.cancel_wait_invoice(); break; } - result = processor.wait_any_incoming_payment() => { + result = processor.wait_payment_event() => { match result { Ok(mut stream) => { - while let Some(request_lookup_id) = stream.next().await { - if let Err(e) = Self::handle_payment_notification( - &localstore, - &pubsub_manager, - request_lookup_id, - ).await { - tracing::warn!("Payment notification error: {:?}", e); + while let Some(event) = stream.next().await { + match event { + cdk_common::payment::Event::PaymentReceived(wait_payment_response) => { + if let Err(e) = Self::handle_payment_notification( + &localstore, + &pubsub_manager, + wait_payment_response, + ).await { + tracing::warn!("Payment notification error: {:?}", e); + } + } } } }