refactor(payment): replace wait_any_incoming_payment with event-based system (#1019)

Rename wait_any_incoming_payment to wait_payment_event and change return type
from WaitPaymentResponse stream to Event stream. This introduces a new Event
enum that wraps payment responses, making the system more extensible for
future event types.

- Add Event enum with PaymentReceived variant
- Update MintPayment trait method signature
- Refactor all payment backend implementations (LND, CLN, LNBits, fake wallet)
- Update mint and payment processor to handle new event stream forma
This commit is contained in:
thesimplekid
2025-08-31 17:26:50 +01:00
committed by GitHub
parent df8b78043e
commit 7a71a37eab
10 changed files with 78 additions and 48 deletions

View File

@@ -6,6 +6,14 @@
## [Unreleased]
### Added
- cdk-common: New `Event` enum for payment event handling with `PaymentReceived` variant ([thesimplekid]).
### Changed
- cdk-common: Refactored `MintPayment` trait method `wait_any_incoming_payment` to `wait_payment_event` with event-driven architecture ([thesimplekid]).
- cdk-common: Updated `wait_payment_event` return type to stream `Event` enum instead of `WaitPaymentResponse` directly ([thesimplekid]).
- cdk: Updated mint payment handling to process payment events through new `Event` enum pattern ([thesimplekid]).
## [0.12.0](https://github.com/cashubtc/cdk/releases/tag/v0.12.0)
### Summary

View File

@@ -19,7 +19,7 @@ use cdk_common::common::FeeReserve;
use cdk_common::nuts::{CurrencyUnit, MeltOptions, MeltQuoteState};
use cdk_common::payment::{
self, Bolt11IncomingPaymentOptions, Bolt11Settings, Bolt12IncomingPaymentOptions,
CreateIncomingPaymentResponse, IncomingPaymentOptions, MakePaymentResponse, MintPayment,
CreateIncomingPaymentResponse, Event, IncomingPaymentOptions, MakePaymentResponse, MintPayment,
OutgoingPaymentOptions, PaymentIdentifier, PaymentQuoteResponse, WaitPaymentResponse,
};
use cdk_common::util::{hex, unix_time};
@@ -89,9 +89,9 @@ impl MintPayment for Cln {
}
#[instrument(skip_all)]
async fn wait_any_incoming_payment(
async fn wait_payment_event(
&self,
) -> Result<Pin<Box<dyn Stream<Item = WaitPaymentResponse> + Send>>, Self::Err> {
) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Self::Err> {
tracing::info!(
"CLN: Starting wait_any_incoming_payment with socket: {:?}",
self.rpc_socket
@@ -243,8 +243,9 @@ impl MintPayment for Cln {
payment_id: payment_hash.to_string()
};
tracing::info!("CLN: Created WaitPaymentResponse with amount {} msats", amount_msats.msat());
let event = Event::PaymentReceived(response);
break Some((response, (cln_client, last_pay_idx, cancel_token, is_active)));
break Some((event, (cln_client, last_pay_idx, cancel_token, is_active)));
}
Err(e) => {
tracing::warn!("CLN: Error fetching invoice: {e}");

View File

@@ -295,9 +295,9 @@ pub trait MintPayment {
/// Listen for invoices to be paid to the mint
/// Returns a stream of request_lookup_id once invoices are paid
async fn wait_any_incoming_payment(
async fn wait_payment_event(
&self,
) -> Result<Pin<Box<dyn Stream<Item = WaitPaymentResponse> + Send>>, Self::Err>;
) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Self::Err>;
/// Is wait invoice active
fn is_wait_invoice_active(&self) -> bool;
@@ -318,6 +318,13 @@ pub trait MintPayment {
) -> Result<MakePaymentResponse, Self::Err>;
}
/// An event emitted which should be handled by the mint
#[derive(Debug, Clone, Hash)]
pub enum Event {
/// A payment has been received.
PaymentReceived(WaitPaymentResponse),
}
/// Wait any invoice response
#[derive(Debug, Clone, Hash, Serialize, Deserialize)]
pub struct WaitPaymentResponse {

View File

@@ -27,7 +27,7 @@ use cdk_common::common::FeeReserve;
use cdk_common::ensure_cdk;
use cdk_common::nuts::{CurrencyUnit, MeltOptions, MeltQuoteState};
use cdk_common::payment::{
self, Bolt11Settings, CreateIncomingPaymentResponse, IncomingPaymentOptions,
self, Bolt11Settings, CreateIncomingPaymentResponse, Event, IncomingPaymentOptions,
MakePaymentResponse, MintPayment, OutgoingPaymentOptions, PaymentIdentifier,
PaymentQuoteResponse, WaitPaymentResponse,
};
@@ -295,9 +295,9 @@ impl MintPayment for FakeWallet {
}
#[instrument(skip_all)]
async fn wait_any_incoming_payment(
async fn wait_payment_event(
&self,
) -> Result<Pin<Box<dyn Stream<Item = WaitPaymentResponse> + Send>>, Self::Err> {
) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Self::Err> {
tracing::info!("Starting stream for fake invoices");
let receiver = self
.receiver
@@ -309,11 +309,14 @@ impl MintPayment for FakeWallet {
let unit = self.unit.clone();
let receiver_stream = ReceiverStream::new(receiver);
Ok(Box::pin(receiver_stream.map(
move |(request_lookup_id, payment_amount, payment_id)| WaitPaymentResponse {
payment_identifier: request_lookup_id.clone(),
payment_amount,
unit: unit.clone(),
payment_id,
move |(request_lookup_id, payment_amount, payment_id)| {
let wait_response = WaitPaymentResponse {
payment_identifier: request_lookup_id.clone(),
payment_amount,
unit: unit.clone(),
payment_id,
};
Event::PaymentReceived(wait_response)
},
)))
}

View File

@@ -823,9 +823,9 @@ impl MintPayment for CdkLdkNode {
/// Listen for invoices to be paid to the mint
/// Returns a stream of request_lookup_id once invoices are paid
#[instrument(skip(self))]
async fn wait_any_incoming_payment(
async fn wait_payment_event(
&self,
) -> Result<Pin<Box<dyn Stream<Item = WaitPaymentResponse> + Send>>, Self::Err> {
) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
tracing::info!("Starting stream for invoices - wait_any_incoming_payment called");
// Set active flag to indicate stream is active
@@ -839,10 +839,10 @@ impl MintPayment for CdkLdkNode {
// Transform the String stream into a WaitPaymentResponse stream
let response_stream = BroadcastStream::new(receiver.resubscribe());
// Map the stream to handle BroadcastStreamRecvError
// Map the stream to handle BroadcastStreamRecvError and wrap in Event
let response_stream = response_stream.filter_map(|result| async move {
match result {
Ok(payment) => Some(payment),
Ok(payment) => Some(cdk_common::payment::Event::PaymentReceived(payment)),
Err(err) => {
tracing::warn!("Error in broadcast stream: {}", err);
None

View File

@@ -15,7 +15,7 @@ use cdk_common::amount::{to_unit, Amount, MSAT_IN_SAT};
use cdk_common::common::FeeReserve;
use cdk_common::nuts::{CurrencyUnit, MeltOptions, MeltQuoteState};
use cdk_common::payment::{
self, Bolt11Settings, CreateIncomingPaymentResponse, IncomingPaymentOptions,
self, Bolt11Settings, CreateIncomingPaymentResponse, Event, IncomingPaymentOptions,
MakePaymentResponse, MintPayment, OutgoingPaymentOptions, PaymentIdentifier,
PaymentQuoteResponse, WaitPaymentResponse,
};
@@ -155,9 +155,9 @@ impl MintPayment for LNbits {
self.wait_invoice_cancel_token.cancel()
}
async fn wait_any_incoming_payment(
async fn wait_payment_event(
&self,
) -> Result<Pin<Box<dyn Stream<Item = WaitPaymentResponse> + Send>>, Self::Err> {
) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Self::Err> {
let api = self.lnbits_api.clone();
let cancel_token = self.wait_invoice_cancel_token.clone();
let is_active = Arc::clone(&self.wait_invoice_is_active);
@@ -179,7 +179,7 @@ impl MintPayment for LNbits {
msg_option = receiver.recv() => {
Self::process_message(msg_option, &api, &is_active)
.await
.map(|response| (response, (api, cancel_token, is_active)))
.map(|response| (Event::PaymentReceived(response), (api, cancel_token, is_active)))
}
}
},

View File

@@ -20,7 +20,7 @@ use cdk_common::bitcoin::hashes::Hash;
use cdk_common::common::FeeReserve;
use cdk_common::nuts::{CurrencyUnit, MeltOptions, MeltQuoteState};
use cdk_common::payment::{
self, Bolt11Settings, CreateIncomingPaymentResponse, IncomingPaymentOptions,
self, Bolt11Settings, CreateIncomingPaymentResponse, Event, IncomingPaymentOptions,
MakePaymentResponse, MintPayment, OutgoingPaymentOptions, PaymentIdentifier,
PaymentQuoteResponse, WaitPaymentResponse,
};
@@ -137,9 +137,9 @@ impl MintPayment for Lnd {
}
#[instrument(skip_all)]
async fn wait_any_incoming_payment(
async fn wait_payment_event(
&self,
) -> Result<Pin<Box<dyn Stream<Item = WaitPaymentResponse> + Send>>, Self::Err> {
) -> Result<Pin<Box<dyn Stream<Item = Event> + Send>>, Self::Err> {
let mut lnd_client = self.lnd_client.clone();
let stream_req = lnrpc::InvoiceSubscription {
@@ -195,7 +195,8 @@ impl MintPayment for Lnd {
};
tracing::info!("LND: Created WaitPaymentResponse with amount {} msat",
msg.amt_paid_msat);
Some((wait_response, (stream, cancel_token, is_active)))
let event = Event::PaymentReceived(wait_response);
Some((event, (stream, cancel_token, is_active)))
} else { None }
} else {
None

View File

@@ -263,9 +263,9 @@ impl MintPayment for PaymentProcessorClient {
}
#[instrument(skip_all)]
async fn wait_any_incoming_payment(
async fn wait_payment_event(
&self,
) -> Result<Pin<Box<dyn Stream<Item = WaitPaymentResponse> + Send>>, Self::Err> {
) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
self.wait_incoming_payment_stream_is_active
.store(true, Ordering::SeqCst);
tracing::debug!("Client waiting for payment");
@@ -288,7 +288,9 @@ impl MintPayment for PaymentProcessorClient {
.filter_map(|item| async {
match item {
Ok(value) => match value.try_into() {
Ok(payment_response) => Some(payment_response),
Ok(payment_response) => Some(cdk_common::payment::Event::PaymentReceived(
payment_response,
)),
Err(e) => {
tracing::error!("Error converting payment response: {}", e);
None

View File

@@ -401,19 +401,23 @@ impl CdkPaymentProcessor for PaymentProcessorServer {
ln.cancel_wait_invoice();
break;
}
result = ln.wait_any_incoming_payment() => {
result = ln.wait_payment_event() => {
match result {
Ok(mut stream) => {
while let Some(payment_response) = stream.next().await {
match tx.send(Result::<_, Status>::Ok(payment_response.into()))
.await
{
Ok(_) => {
// Response was queued to be sent to client
}
Err(item) => {
tracing::error!("Error adding incoming payment to stream: {}", item);
break;
while let Some(event) = stream.next().await {
match event {
cdk_common::payment::Event::PaymentReceived(payment_response) => {
match tx.send(Result::<_, Status>::Ok(payment_response.into()))
.await
{
Ok(_) => {
// Response was queued to be sent to client
}
Err(item) => {
tracing::error!("Error adding incoming payment to stream: {}", item);
break;
}
}
}
}
}

View File

@@ -528,16 +528,20 @@ impl Mint {
processor.cancel_wait_invoice();
break;
}
result = processor.wait_any_incoming_payment() => {
result = processor.wait_payment_event() => {
match result {
Ok(mut stream) => {
while let Some(request_lookup_id) = stream.next().await {
if let Err(e) = Self::handle_payment_notification(
&localstore,
&pubsub_manager,
request_lookup_id,
).await {
tracing::warn!("Payment notification error: {:?}", e);
while let Some(event) = stream.next().await {
match event {
cdk_common::payment::Event::PaymentReceived(wait_payment_response) => {
if let Err(e) = Self::handle_payment_notification(
&localstore,
&pubsub_manager,
wait_payment_response,
).await {
tracing::warn!("Payment notification error: {:?}", e);
}
}
}
}
}