From 81a47d5d12f99b1c2fa64698f28aacbff4f93471 Mon Sep 17 00:00:00 2001 From: C Date: Thu, 2 Oct 2025 06:12:17 -0300 Subject: [PATCH] Fix bug with websocket close (#1144) * Fix bug with websocket close Fixes #1111 * Do not connect to ws if there are no active subscription --- crates/cdk/src/wallet/subscription/ws.rs | 26 +++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/crates/cdk/src/wallet/subscription/ws.rs b/crates/cdk/src/wallet/subscription/ws.rs index f0204a90..0fbed754 100644 --- a/crates/cdk/src/wallet/subscription/ws.rs +++ b/crates/cdk/src/wallet/subscription/ws.rs @@ -1,6 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; +use std::time::Duration; use cdk_common::subscription::Params; use cdk_common::ws::{WsMessageOrResponse, WsMethodRequest, WsRequest, WsUnsubscribeRequest}; @@ -8,6 +9,7 @@ use cdk_common::ws::{WsMessageOrResponse, WsMethodRequest, WsRequest, WsUnsubscr use cdk_common::{Method, RoutePath}; use futures::{SinkExt, StreamExt}; use tokio::sync::{mpsc, RwLock}; +use tokio::time::sleep; use tokio_tungstenite::connect_async; use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::tungstenite::Message; @@ -61,6 +63,12 @@ pub async fn ws_main( let mut failure_count = 0; loop { + if subscriptions.read().await.is_empty() { + // No active subscription + sleep(Duration::from_millis(100)).await; + continue; + } + let mut request_clone = request.clone(); #[cfg(feature = "auth")] { @@ -177,7 +185,12 @@ pub async fn ws_main( Some(msg) = read.next() => { let msg = match msg { Ok(msg) => msg, - Err(_) => break, + Err(_) => { + if let Err(err) = write.send(Message::Close(None)).await { + tracing::error!("Closing error {err:?}"); + } + break + }, }; let msg = match msg { Message::Text(msg) => msg, @@ -222,6 +235,10 @@ pub async fn ws_main( .await; } + if let Err(err) = write.send(Message::Close(None)).await { + tracing::error!("Closing error {err:?}"); + } + break; // break connection to force a reconnection, to attempt to recover form this error } } @@ -251,6 +268,13 @@ pub async fn ws_main( if let Some(json) = get_unsub_request(subid) { let _ = write.send(Message::Text(json.into())).await; } + + if subscription.is_empty() { + if let Err(err) = write.send(Message::Close(None)).await { + tracing::error!("Closing error {err:?}"); + } + break; + } } } }