From fe694c228d409e19b0d1db429963acf7c9e8687a Mon Sep 17 00:00:00 2001 From: Jack Amadeo Date: Fri, 13 Jun 2025 14:43:45 -0400 Subject: [PATCH] fix: Check for stderr error in receive() (#2905) --- .../mcp-client/examples/integration_test.rs | 22 +++++++++++++++++++ crates/mcp-client/src/client.rs | 3 +-- crates/mcp-client/src/service.rs | 12 ++++++---- crates/mcp-client/src/transport/stdio.rs | 8 ++++++- 4 files changed, 38 insertions(+), 7 deletions(-) diff --git a/crates/mcp-client/examples/integration_test.rs b/crates/mcp-client/examples/integration_test.rs index d4aa7475..1cc0dc12 100644 --- a/crates/mcp-client/examples/integration_test.rs +++ b/crates/mcp-client/examples/integration_test.rs @@ -22,6 +22,17 @@ async fn main() -> Result<()> { test_transport(sse_transport().await?).await?; test_transport(stdio_transport().await?).await?; + // Test broken transport + match test_transport(broken_stdio_transport().await?).await { + Ok(_) => assert!(false, "Expected an error but got success"), + Err(e) => { + assert!(e + .to_string() + .contains("error: package(s) `thispackagedoesnotexist` not found in workspace")); + println!("Expected error occurred: {e}"); + } + } + Ok(()) } @@ -52,6 +63,17 @@ async fn stdio_transport() -> Result { )) } +async fn broken_stdio_transport() -> Result { + Ok(StdioTransport::new( + "cargo", + vec!["run", "-p", "thispackagedoesnotexist"] + .into_iter() + .map(|s| s.to_string()) + .collect(), + HashMap::new(), + )) +} + async fn test_transport(transport: T) -> Result<()> where T: Transport + Send + 'static, diff --git a/crates/mcp-client/src/client.rs b/crates/mcp-client/src/client.rs index e00e758f..5cade18d 100644 --- a/crates/mcp-client/src/client.rs +++ b/crates/mcp-client/src/client.rs @@ -146,8 +146,7 @@ where } } Err(e) => { - tracing::error!("transport error: {:?}", e); - service_ptr.hangup().await; + service_ptr.hangup(e).await; subscribers_ptr.lock().await.clear(); break; } diff --git a/crates/mcp-client/src/service.rs b/crates/mcp-client/src/service.rs index b2ea82cf..12432c64 100644 --- a/crates/mcp-client/src/service.rs +++ b/crates/mcp-client/src/service.rs @@ -27,8 +27,8 @@ impl McpService { self.pending_requests.respond(id, response).await } - pub async fn hangup(&self) { - self.pending_requests.broadcast_close().await + pub async fn hangup(&self, error: Error) { + self.pending_requests.broadcast_close(error).await } } @@ -115,9 +115,13 @@ impl PendingRequests { } } - pub async fn broadcast_close(&self) { + pub async fn broadcast_close(&self, error: Error) { for (_, tx) in self.requests.write().await.drain() { - let _ = tx.send(Err(Error::ChannelClosed)); + let err = match &error { + Error::StdioProcessError(s) => Error::StdioProcessError(s.clone()), + _ => Error::ChannelClosed, + }; + let _ = tx.send(Err(err)); } } diff --git a/crates/mcp-client/src/transport/stdio.rs b/crates/mcp-client/src/transport/stdio.rs index 94b51aff..0b3a44ca 100644 --- a/crates/mcp-client/src/transport/stdio.rs +++ b/crates/mcp-client/src/transport/stdio.rs @@ -168,7 +168,13 @@ impl TransportHandle for StdioTransportHandle { async fn receive(&self) -> Result { let mut receiver = self.receiver.lock().await; - receiver.recv().await.ok_or(Error::ChannelClosed) + match receiver.recv().await { + Some(message) => Ok(message), + None => { + self.check_for_errors().await?; + Err(Error::ChannelClosed) + } + } } }