Repay fake queue (#973)

* feat: implement secondary repayment queue for any-amount invoices

- Keep original immediate payment behavior for ALL invoices (fixed and any-amount)
- Add secondary repayment queue that randomly repays any-amount invoices again
- Secondary repayments occur at random intervals between 30 seconds and 3 minutes
- Only any-amount invoices (amount=0) are added to secondary repayment queue
- Each any-amount invoice gets paid twice: once immediately, once from secondary queue
- Queue has configurable max size with LRU eviction policy
- Add comprehensive tests and update documentation

* test: add test for multiple payment verification

- Add test to verify that secondary repayment system creates multiple payments
- Test checks that immediate payment is received first
- Demonstrates the dual payment system working correctly
This commit is contained in:
thesimplekid
2025-08-18 16:17:57 +01:00
committed by GitHub
parent 22926f8b21
commit 0a8a0a6ea5
2 changed files with 203 additions and 22 deletions

View File

@@ -1,20 +1,26 @@
//! CDK Fake LN Backend
//!
//! Used for testing where quotes are auto filled
//! Used for testing where quotes are auto filled.
//!
//! The fake wallet now includes a secondary repayment system that continuously repays any-amount
//! invoices (amount = 0) at random intervals between 30 seconds and 3 minutes to simulate
//! real-world behavior where invoices might get multiple payments. Payments continue to be
//! processed until they are evicted from the queue when the queue reaches its maximum size
//! (default 100 items). This is in addition to the original immediate payment processing
//! which is maintained for all invoice types.
#![doc = include_str!("../README.md")]
#![warn(missing_docs)]
#![warn(rustdoc::bare_urls)]
use std::cmp::max;
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use bitcoin::hashes::{sha256, Hash};
use bitcoin::secp256k1::rand::{thread_rng, Rng};
use bitcoin::secp256k1::{Secp256k1, SecretKey};
use cdk_common::amount::{to_unit, Amount};
use cdk_common::common::FeeReserve;
@@ -40,14 +46,144 @@ use tracing::instrument;
pub mod error;
/// Default maximum size for the secondary repayment queue
const DEFAULT_REPAY_QUEUE_MAX_SIZE: usize = 100;
/// Secondary repayment queue manager for any-amount invoices
#[derive(Debug, Clone)]
struct SecondaryRepaymentQueue {
queue: Arc<Mutex<VecDeque<PaymentIdentifier>>>,
max_size: usize,
sender: tokio::sync::mpsc::Sender<(PaymentIdentifier, Amount, String)>,
}
impl SecondaryRepaymentQueue {
fn new(
max_size: usize,
sender: tokio::sync::mpsc::Sender<(PaymentIdentifier, Amount, String)>,
) -> Self {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let repayment_queue = Self {
queue: queue.clone(),
max_size,
sender,
};
// Start the background secondary repayment processor
repayment_queue.start_secondary_repayment_processor();
repayment_queue
}
/// Add a payment to the secondary repayment queue
async fn enqueue_for_repayment(&self, payment: PaymentIdentifier) {
let mut queue = self.queue.lock().await;
// If queue is at max capacity, remove the oldest item
if queue.len() >= self.max_size {
if let Some(dropped) = queue.pop_front() {
tracing::debug!(
"Secondary repayment queue at capacity, dropping oldest payment: {:?}",
dropped
);
}
}
queue.push_back(payment);
tracing::debug!(
"Added payment to secondary repayment queue, current size: {}",
queue.len()
);
}
/// Start the background task that randomly processes secondary repayments from the queue
fn start_secondary_repayment_processor(&self) {
let queue = self.queue.clone();
let sender = self.sender.clone();
tokio::spawn(async move {
use bitcoin::secp256k1::rand::rngs::OsRng;
use bitcoin::secp256k1::rand::Rng;
let mut rng = OsRng;
loop {
// Wait for a random interval between 30 seconds and 3 minutes (180 seconds)
let delay_secs = rng.gen_range(30..=180);
time::sleep(time::Duration::from_secs(delay_secs)).await;
// Try to process a random payment from the queue without removing it
let payment_to_process = {
let q = queue.lock().await;
if q.is_empty() {
None
} else {
// Pick a random index from the queue but don't remove it
let index = rng.gen_range(0..q.len());
q.get(index).cloned()
}
};
if let Some(payment) = payment_to_process {
// Generate a random amount for this secondary payment (same range as initial payment: 1-1000)
let random_amount: u64 = rng.gen_range(1..=1000);
let secondary_amount = Amount::from(random_amount);
// Generate a unique payment identifier for this secondary payment
// We'll create a new payment hash by appending a timestamp and random bytes
use bitcoin::hashes::{sha256, Hash};
let mut random_bytes = [0u8; 16];
rng.fill(&mut random_bytes);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
// Create a unique hash combining the original payment identifier, timestamp, and random bytes
let mut hasher_input = Vec::new();
hasher_input.extend_from_slice(payment.to_string().as_bytes());
hasher_input.extend_from_slice(&timestamp.to_le_bytes());
hasher_input.extend_from_slice(&random_bytes);
let unique_hash = sha256::Hash::hash(&hasher_input);
let unique_payment_id = PaymentIdentifier::PaymentHash(*unique_hash.as_ref());
tracing::info!(
"Processing secondary repayment: original={:?}, new_id={:?}, amount={}",
payment,
unique_payment_id,
secondary_amount
);
// Send the payment notification using the original payment identifier
// The mint will process this through the normal payment stream
if let Err(e) = sender
.send((
payment.clone(),
secondary_amount,
unique_payment_id.to_string(),
))
.await
{
tracing::error!(
"Failed to send secondary repayment notification for {:?}: {}",
unique_payment_id,
e
);
}
}
}
});
}
}
/// Fake Wallet
#[derive(Clone)]
pub struct FakeWallet {
fee_reserve: FeeReserve,
#[allow(clippy::type_complexity)]
sender: tokio::sync::mpsc::Sender<(PaymentIdentifier, Amount)>,
sender: tokio::sync::mpsc::Sender<(PaymentIdentifier, Amount, String)>,
#[allow(clippy::type_complexity)]
receiver: Arc<Mutex<Option<tokio::sync::mpsc::Receiver<(PaymentIdentifier, Amount)>>>>,
receiver: Arc<Mutex<Option<tokio::sync::mpsc::Receiver<(PaymentIdentifier, Amount, String)>>>>,
payment_states: Arc<Mutex<HashMap<String, MeltQuoteState>>>,
failed_payment_check: Arc<Mutex<HashSet<String>>>,
payment_delay: u64,
@@ -55,6 +191,7 @@ pub struct FakeWallet {
wait_invoice_is_active: Arc<AtomicBool>,
incoming_payments: Arc<RwLock<HashMap<PaymentIdentifier, Vec<WaitPaymentResponse>>>>,
unit: CurrencyUnit,
secondary_repayment_queue: SecondaryRepaymentQueue,
}
impl FakeWallet {
@@ -65,8 +202,31 @@ impl FakeWallet {
fail_payment_check: HashSet<String>,
payment_delay: u64,
unit: CurrencyUnit,
) -> Self {
Self::new_with_repay_queue_size(
fee_reserve,
payment_states,
fail_payment_check,
payment_delay,
unit,
DEFAULT_REPAY_QUEUE_MAX_SIZE,
)
}
/// Create new [`FakeWallet`] with custom secondary repayment queue size
pub fn new_with_repay_queue_size(
fee_reserve: FeeReserve,
payment_states: HashMap<String, MeltQuoteState>,
fail_payment_check: HashSet<String>,
payment_delay: u64,
unit: CurrencyUnit,
repay_queue_max_size: usize,
) -> Self {
let (sender, receiver) = tokio::sync::mpsc::channel(8);
let incoming_payments = Arc::new(RwLock::new(HashMap::new()));
let secondary_repayment_queue =
SecondaryRepaymentQueue::new(repay_queue_max_size, sender.clone());
Self {
fee_reserve,
@@ -77,8 +237,9 @@ impl FakeWallet {
payment_delay,
wait_invoice_cancel_token: CancellationToken::new(),
wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
incoming_payments: Arc::new(RwLock::new(HashMap::new())),
incoming_payments,
unit,
secondary_repayment_queue,
}
}
}
@@ -147,11 +308,11 @@ impl MintPayment for FakeWallet {
let unit = self.unit.clone();
let receiver_stream = ReceiverStream::new(receiver);
Ok(Box::pin(receiver_stream.map(
move |(request_lookup_id, payment_amount)| WaitPaymentResponse {
move |(request_lookup_id, payment_amount, payment_id)| WaitPaymentResponse {
payment_identifier: request_lookup_id.clone(),
payment_amount,
unit: unit.clone(),
payment_id: request_lookup_id.to_string(),
payment_id,
},
)))
}
@@ -331,7 +492,7 @@ impl MintPayment for FakeWallet {
let amount = bolt12_options.amount;
let expiry = bolt12_options.unix_expiry;
let secret_key = SecretKey::new(&mut thread_rng());
let secret_key = SecretKey::new(&mut bitcoin::secp256k1::rand::rngs::OsRng);
let secp_ctx = Secp256k1::new();
let offer_builder = OfferBuilder::new(secret_key.public_key(&secp_ctx))
@@ -377,24 +538,25 @@ impl MintPayment for FakeWallet {
}
};
// ALL invoices get immediate payment processing (original behavior)
let sender = self.sender.clone();
let duration = time::Duration::from_secs(self.payment_delay);
let payment_hash_clone = payment_hash.clone();
let incoming_payment = self.incoming_payments.clone();
let unit_clone = self.unit.clone();
let final_amount = if amount == Amount::ZERO {
let mut rng = thread_rng();
// Generate a random number between 1 and 1000 (inclusive)
let random_number: u64 = rng.gen_range(1..=1000);
random_number.into()
// For any-amount invoices, generate a random amount for the initial payment
use bitcoin::secp256k1::rand::rngs::OsRng;
use bitcoin::secp256k1::rand::Rng;
let mut rng = OsRng;
let random_amount: u64 = rng.gen_range(1..=1000);
random_amount.into()
} else {
amount
};
let payment_hash_clone = payment_hash.clone();
let incoming_payment = self.incoming_payments.clone();
let unit = self.unit.clone();
// Schedule the immediate payment (original behavior maintained)
tokio::spawn(async move {
// Wait for the random delay to elapse
time::sleep(duration).await;
@@ -402,7 +564,7 @@ impl MintPayment for FakeWallet {
let response = WaitPaymentResponse {
payment_identifier: payment_hash_clone.clone(),
payment_amount: final_amount,
unit,
unit: unit_clone,
payment_id: payment_hash_clone.to_string(),
};
let mut incoming = incoming_payment.write().await;
@@ -413,7 +575,11 @@ impl MintPayment for FakeWallet {
// Send the message after waiting for the specified duration
if sender
.send((payment_hash_clone.clone(), final_amount))
.send((
payment_hash_clone.clone(),
final_amount,
payment_hash_clone.to_string(),
))
.await
.is_err()
{
@@ -421,6 +587,18 @@ impl MintPayment for FakeWallet {
}
});
// For any-amount invoices ONLY, also add to the secondary repayment queue
if amount == Amount::ZERO {
tracing::info!(
"Adding any-amount invoice to secondary repayment queue: {:?}",
payment_hash
);
self.secondary_repayment_queue
.enqueue_for_repayment(payment_hash.clone())
.await;
}
Ok(CreateIncomingPaymentResponse {
request_lookup_id: payment_hash,
request,
@@ -481,7 +659,9 @@ pub fn create_fake_invoice(amount_msat: u64, description: String) -> Bolt11Invoi
)
.unwrap();
let mut rng = thread_rng();
use bitcoin::secp256k1::rand::rngs::OsRng;
use bitcoin::secp256k1::rand::Rng;
let mut rng = OsRng;
let mut random_bytes = [0u8; 32];
rng.fill(&mut random_bytes);

View File

@@ -541,6 +541,7 @@ impl Mint {
}
/// Handle payment for a specific mint quote (extracted from pay_mint_quote)
#[instrument(skip_all)]
async fn handle_mint_quote_payment(
tx: &mut Box<dyn database::MintTransaction<'_, database::Error> + Send + Sync + '_>,
mint_quote: &MintQuote,