From ad4df779e3267c4875e58dc154cdbe921471f939 Mon Sep 17 00:00:00 2001 From: tsk Date: Tue, 28 Oct 2025 21:33:46 -0400 Subject: [PATCH] feat(cdk-lnbits): add websocket reconnection with exponential backoff (#1237) Implement automatic reconnection logic when LNbits websocket connection is lost, using exponential backoff strategy (1s to 30s max) with automatic resubscription --- crates/cdk-lnbits/Cargo.toml | 2 +- crates/cdk-lnbits/src/lib.rs | 62 ++++++++++++++++++++++++++++-------- 2 files changed, 49 insertions(+), 15 deletions(-) diff --git a/crates/cdk-lnbits/Cargo.toml b/crates/cdk-lnbits/Cargo.toml index 5429a037..9b0050bc 100644 --- a/crates/cdk-lnbits/Cargo.toml +++ b/crates/cdk-lnbits/Cargo.toml @@ -20,6 +20,6 @@ tokio.workspace = true tokio-util.workspace = true tracing.workspace = true thiserror.workspace = true -lnbits-rs = "0.8.0" +lnbits-rs = "0.9.1" serde_json.workspace = true rustls.workspace = true diff --git a/crates/cdk-lnbits/src/lib.rs b/crates/cdk-lnbits/src/lib.rs index fec078c3..02324604 100644 --- a/crates/cdk-lnbits/src/lib.rs +++ b/crates/cdk-lnbits/src/lib.rs @@ -163,23 +163,57 @@ impl MintPayment for LNbits { let is_active = Arc::clone(&self.wait_invoice_is_active); Ok(Box::pin(futures::stream::unfold( - (api, cancel_token, is_active), - |(api, cancel_token, is_active)| async move { + (api, cancel_token, is_active, 0u32), + |(api, cancel_token, is_active, mut retry_count)| async move { is_active.store(true, Ordering::SeqCst); - let receiver = api.receiver(); - let mut receiver = receiver.lock().await; + loop { + tracing::debug!("LNbits: Starting wait loop, attempting to get receiver"); + let receiver = api.receiver(); + let mut receiver = receiver.lock().await; + tracing::debug!("LNbits: Got receiver lock, waiting for messages"); - tokio::select! { - _ = cancel_token.cancelled() => { - is_active.store(false, Ordering::SeqCst); - tracing::info!("Waiting for lnbits invoice ending"); - None - } - msg_option = receiver.recv() => { - Self::process_message(msg_option, &api, &is_active) - .await - .map(|response| (Event::PaymentReceived(response), (api, cancel_token, is_active))) + tokio::select! { + _ = cancel_token.cancelled() => { + is_active.store(false, Ordering::SeqCst); + tracing::info!("Waiting for lnbits invoice ending"); + return None; + } + msg_option = receiver.recv() => { + tracing::debug!("LNbits: Received message from websocket: {:?}", msg_option.as_ref().map(|_| "Some(message)")); + match msg_option { + Some(_) => { + // Successfully received a message, reset retry count + retry_count = 0; + let result = Self::process_message(msg_option, &api, &is_active).await; + return result.map(|response| { + (Event::PaymentReceived(response), (api, cancel_token, is_active, retry_count)) + }); + } + None => { + // Connection lost, need to reconnect + drop(receiver); // Drop the lock before reconnecting + + tracing::warn!("LNbits websocket connection lost (receiver returned None), attempting to reconnect..."); + + // Exponential backoff: 1s, 2s, 4s, 8s, max 10s + let backoff_secs = std::cmp::min(2u64.pow(retry_count), 10); + tracing::info!("Retrying in {} seconds (attempt {})", backoff_secs, retry_count + 1); + tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await; + + // Attempt to resubscribe + if let Err(err) = api.subscribe_to_websocket().await { + tracing::error!("Failed to resubscribe to LNbits websocket: {:?}", err); + } else { + tracing::info!("Successfully reconnected to LNbits websocket"); + } + + retry_count += 1; + // Continue the loop to try again + continue; + } + } + } } } },