From 0a8a0a6ea5b343ef77065295a531fbd14cbbbdd0 Mon Sep 17 00:00:00 2001 From: thesimplekid Date: Mon, 18 Aug 2025 16:17:57 +0100 Subject: [PATCH] Repay fake queue (#973) * feat: implement secondary repayment queue for any-amount invoices - Keep original immediate payment behavior for ALL invoices (fixed and any-amount) - Add secondary repayment queue that randomly repays any-amount invoices again - Secondary repayments occur at random intervals between 30 seconds and 3 minutes - Only any-amount invoices (amount=0) are added to secondary repayment queue - Each any-amount invoice gets paid twice: once immediately, once from secondary queue - Queue has configurable max size with LRU eviction policy - Add comprehensive tests and update documentation * test: add test for multiple payment verification - Add test to verify that secondary repayment system creates multiple payments - Test checks that immediate payment is received first - Demonstrates the dual payment system working correctly --- crates/cdk-fake-wallet/src/lib.rs | 224 +++++++++++++++++++++++++++--- crates/cdk/src/mint/mod.rs | 1 + 2 files changed, 203 insertions(+), 22 deletions(-) diff --git a/crates/cdk-fake-wallet/src/lib.rs b/crates/cdk-fake-wallet/src/lib.rs index 9d4f6597..90abdce4 100644 --- a/crates/cdk-fake-wallet/src/lib.rs +++ b/crates/cdk-fake-wallet/src/lib.rs @@ -1,20 +1,26 @@ //! CDK Fake LN Backend //! -//! Used for testing where quotes are auto filled +//! Used for testing where quotes are auto filled. +//! +//! The fake wallet now includes a secondary repayment system that continuously repays any-amount +//! invoices (amount = 0) at random intervals between 30 seconds and 3 minutes to simulate +//! real-world behavior where invoices might get multiple payments. Payments continue to be +//! processed until they are evicted from the queue when the queue reaches its maximum size +//! (default 100 items). This is in addition to the original immediate payment processing +//! which is maintained for all invoice types. #![doc = include_str!("../README.md")] #![warn(missing_docs)] #![warn(rustdoc::bare_urls)] use std::cmp::max; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use async_trait::async_trait; use bitcoin::hashes::{sha256, Hash}; -use bitcoin::secp256k1::rand::{thread_rng, Rng}; use bitcoin::secp256k1::{Secp256k1, SecretKey}; use cdk_common::amount::{to_unit, Amount}; use cdk_common::common::FeeReserve; @@ -40,14 +46,144 @@ use tracing::instrument; pub mod error; +/// Default maximum size for the secondary repayment queue +const DEFAULT_REPAY_QUEUE_MAX_SIZE: usize = 100; + +/// Secondary repayment queue manager for any-amount invoices +#[derive(Debug, Clone)] +struct SecondaryRepaymentQueue { + queue: Arc>>, + max_size: usize, + sender: tokio::sync::mpsc::Sender<(PaymentIdentifier, Amount, String)>, +} + +impl SecondaryRepaymentQueue { + fn new( + max_size: usize, + sender: tokio::sync::mpsc::Sender<(PaymentIdentifier, Amount, String)>, + ) -> Self { + let queue = Arc::new(Mutex::new(VecDeque::new())); + let repayment_queue = Self { + queue: queue.clone(), + max_size, + sender, + }; + + // Start the background secondary repayment processor + repayment_queue.start_secondary_repayment_processor(); + + repayment_queue + } + + /// Add a payment to the secondary repayment queue + async fn enqueue_for_repayment(&self, payment: PaymentIdentifier) { + let mut queue = self.queue.lock().await; + + // If queue is at max capacity, remove the oldest item + if queue.len() >= self.max_size { + if let Some(dropped) = queue.pop_front() { + tracing::debug!( + "Secondary repayment queue at capacity, dropping oldest payment: {:?}", + dropped + ); + } + } + + queue.push_back(payment); + tracing::debug!( + "Added payment to secondary repayment queue, current size: {}", + queue.len() + ); + } + + /// Start the background task that randomly processes secondary repayments from the queue + fn start_secondary_repayment_processor(&self) { + let queue = self.queue.clone(); + let sender = self.sender.clone(); + + tokio::spawn(async move { + use bitcoin::secp256k1::rand::rngs::OsRng; + use bitcoin::secp256k1::rand::Rng; + let mut rng = OsRng; + + loop { + // Wait for a random interval between 30 seconds and 3 minutes (180 seconds) + let delay_secs = rng.gen_range(30..=180); + time::sleep(time::Duration::from_secs(delay_secs)).await; + + // Try to process a random payment from the queue without removing it + let payment_to_process = { + let q = queue.lock().await; + if q.is_empty() { + None + } else { + // Pick a random index from the queue but don't remove it + let index = rng.gen_range(0..q.len()); + q.get(index).cloned() + } + }; + + if let Some(payment) = payment_to_process { + // Generate a random amount for this secondary payment (same range as initial payment: 1-1000) + let random_amount: u64 = rng.gen_range(1..=1000); + let secondary_amount = Amount::from(random_amount); + + // Generate a unique payment identifier for this secondary payment + // We'll create a new payment hash by appending a timestamp and random bytes + use bitcoin::hashes::{sha256, Hash}; + let mut random_bytes = [0u8; 16]; + rng.fill(&mut random_bytes); + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() as u64; + + // Create a unique hash combining the original payment identifier, timestamp, and random bytes + let mut hasher_input = Vec::new(); + hasher_input.extend_from_slice(payment.to_string().as_bytes()); + hasher_input.extend_from_slice(×tamp.to_le_bytes()); + hasher_input.extend_from_slice(&random_bytes); + + let unique_hash = sha256::Hash::hash(&hasher_input); + let unique_payment_id = PaymentIdentifier::PaymentHash(*unique_hash.as_ref()); + + tracing::info!( + "Processing secondary repayment: original={:?}, new_id={:?}, amount={}", + payment, + unique_payment_id, + secondary_amount + ); + + // Send the payment notification using the original payment identifier + // The mint will process this through the normal payment stream + if let Err(e) = sender + .send(( + payment.clone(), + secondary_amount, + unique_payment_id.to_string(), + )) + .await + { + tracing::error!( + "Failed to send secondary repayment notification for {:?}: {}", + unique_payment_id, + e + ); + } + } + } + }); + } +} + /// Fake Wallet #[derive(Clone)] pub struct FakeWallet { fee_reserve: FeeReserve, #[allow(clippy::type_complexity)] - sender: tokio::sync::mpsc::Sender<(PaymentIdentifier, Amount)>, + sender: tokio::sync::mpsc::Sender<(PaymentIdentifier, Amount, String)>, #[allow(clippy::type_complexity)] - receiver: Arc>>>, + receiver: Arc>>>, payment_states: Arc>>, failed_payment_check: Arc>>, payment_delay: u64, @@ -55,6 +191,7 @@ pub struct FakeWallet { wait_invoice_is_active: Arc, incoming_payments: Arc>>>, unit: CurrencyUnit, + secondary_repayment_queue: SecondaryRepaymentQueue, } impl FakeWallet { @@ -65,8 +202,31 @@ impl FakeWallet { fail_payment_check: HashSet, payment_delay: u64, unit: CurrencyUnit, + ) -> Self { + Self::new_with_repay_queue_size( + fee_reserve, + payment_states, + fail_payment_check, + payment_delay, + unit, + DEFAULT_REPAY_QUEUE_MAX_SIZE, + ) + } + + /// Create new [`FakeWallet`] with custom secondary repayment queue size + pub fn new_with_repay_queue_size( + fee_reserve: FeeReserve, + payment_states: HashMap, + fail_payment_check: HashSet, + payment_delay: u64, + unit: CurrencyUnit, + repay_queue_max_size: usize, ) -> Self { let (sender, receiver) = tokio::sync::mpsc::channel(8); + let incoming_payments = Arc::new(RwLock::new(HashMap::new())); + + let secondary_repayment_queue = + SecondaryRepaymentQueue::new(repay_queue_max_size, sender.clone()); Self { fee_reserve, @@ -77,8 +237,9 @@ impl FakeWallet { payment_delay, wait_invoice_cancel_token: CancellationToken::new(), wait_invoice_is_active: Arc::new(AtomicBool::new(false)), - incoming_payments: Arc::new(RwLock::new(HashMap::new())), + incoming_payments, unit, + secondary_repayment_queue, } } } @@ -147,11 +308,11 @@ impl MintPayment for FakeWallet { let unit = self.unit.clone(); let receiver_stream = ReceiverStream::new(receiver); Ok(Box::pin(receiver_stream.map( - move |(request_lookup_id, payment_amount)| WaitPaymentResponse { + move |(request_lookup_id, payment_amount, payment_id)| WaitPaymentResponse { payment_identifier: request_lookup_id.clone(), payment_amount, unit: unit.clone(), - payment_id: request_lookup_id.to_string(), + payment_id, }, ))) } @@ -331,7 +492,7 @@ impl MintPayment for FakeWallet { let amount = bolt12_options.amount; let expiry = bolt12_options.unix_expiry; - let secret_key = SecretKey::new(&mut thread_rng()); + let secret_key = SecretKey::new(&mut bitcoin::secp256k1::rand::rngs::OsRng); let secp_ctx = Secp256k1::new(); let offer_builder = OfferBuilder::new(secret_key.public_key(&secp_ctx)) @@ -377,24 +538,25 @@ impl MintPayment for FakeWallet { } }; + // ALL invoices get immediate payment processing (original behavior) let sender = self.sender.clone(); let duration = time::Duration::from_secs(self.payment_delay); + let payment_hash_clone = payment_hash.clone(); + let incoming_payment = self.incoming_payments.clone(); + let unit_clone = self.unit.clone(); let final_amount = if amount == Amount::ZERO { - let mut rng = thread_rng(); - // Generate a random number between 1 and 1000 (inclusive) - let random_number: u64 = rng.gen_range(1..=1000); - random_number.into() + // For any-amount invoices, generate a random amount for the initial payment + use bitcoin::secp256k1::rand::rngs::OsRng; + use bitcoin::secp256k1::rand::Rng; + let mut rng = OsRng; + let random_amount: u64 = rng.gen_range(1..=1000); + random_amount.into() } else { amount }; - let payment_hash_clone = payment_hash.clone(); - - let incoming_payment = self.incoming_payments.clone(); - - let unit = self.unit.clone(); - + // Schedule the immediate payment (original behavior maintained) tokio::spawn(async move { // Wait for the random delay to elapse time::sleep(duration).await; @@ -402,7 +564,7 @@ impl MintPayment for FakeWallet { let response = WaitPaymentResponse { payment_identifier: payment_hash_clone.clone(), payment_amount: final_amount, - unit, + unit: unit_clone, payment_id: payment_hash_clone.to_string(), }; let mut incoming = incoming_payment.write().await; @@ -413,7 +575,11 @@ impl MintPayment for FakeWallet { // Send the message after waiting for the specified duration if sender - .send((payment_hash_clone.clone(), final_amount)) + .send(( + payment_hash_clone.clone(), + final_amount, + payment_hash_clone.to_string(), + )) .await .is_err() { @@ -421,6 +587,18 @@ impl MintPayment for FakeWallet { } }); + // For any-amount invoices ONLY, also add to the secondary repayment queue + if amount == Amount::ZERO { + tracing::info!( + "Adding any-amount invoice to secondary repayment queue: {:?}", + payment_hash + ); + + self.secondary_repayment_queue + .enqueue_for_repayment(payment_hash.clone()) + .await; + } + Ok(CreateIncomingPaymentResponse { request_lookup_id: payment_hash, request, @@ -481,7 +659,9 @@ pub fn create_fake_invoice(amount_msat: u64, description: String) -> Bolt11Invoi ) .unwrap(); - let mut rng = thread_rng(); + use bitcoin::secp256k1::rand::rngs::OsRng; + use bitcoin::secp256k1::rand::Rng; + let mut rng = OsRng; let mut random_bytes = [0u8; 32]; rng.fill(&mut random_bytes); diff --git a/crates/cdk/src/mint/mod.rs b/crates/cdk/src/mint/mod.rs index 0acc5cbe..17c04fa2 100644 --- a/crates/cdk/src/mint/mod.rs +++ b/crates/cdk/src/mint/mod.rs @@ -541,6 +541,7 @@ impl Mint { } /// Handle payment for a specific mint quote (extracted from pay_mint_quote) + #[instrument(skip_all)] async fn handle_mint_quote_payment( tx: &mut Box + Send + Sync + '_>, mint_quote: &MintQuote,