fix: Don't break from consuming subprocess output in shell tool until both streams are done (#2771)

Co-authored-by: Max Novich <maksymstepanenko1990@gmail.com>
This commit is contained in:
Jack Amadeo
2025-06-05 11:20:11 +10:00
committed by GitHub
parent a1ebd2f703
commit 1d557161d0
3 changed files with 55 additions and 40 deletions

View File

@@ -727,7 +727,6 @@ impl Session {
loop {
tokio::select! {
result = stream.next() => {
let _ = progress_bars.hide();
match result {
Some(Ok(AgentEvent::Message(message))) => {
// If it's a confirmation request, get approval but otherwise do not render/persist
@@ -865,6 +864,7 @@ impl Session {
session::persist_messages(&self.session_file, &self.messages, None).await?;
if interactive {output::hide_thinking()};
let _ = progress_bars.hide();
output::render_message(&message, self.debug);
if interactive {output::show_thinking()};
}
@@ -891,7 +891,6 @@ impl Session {
v.to_string()
},
};
// output::render_text_no_newlines(&message, None, true);
progress_bars.log(&message);
},
"notifications/progress" => {

View File

@@ -660,6 +660,12 @@ impl McpSpinners {
}
pub fn hide(&mut self) -> Result<(), Error> {
self.bars.iter_mut().for_each(|(_, bar)| {
bar.disable_steady_tick();
});
if let Some(spinner) = self.log_spinner.as_mut() {
spinner.disable_steady_tick();
}
self.multi_bar.clear()
}
}

View File

@@ -531,52 +531,62 @@ impl DeveloperRouter {
let mut stdout_buf = Vec::new();
let mut stderr_buf = Vec::new();
let mut stdout_done = false;
let mut stderr_done = false;
loop {
tokio::select! {
n = stdout_reader.read_until(b'\n', &mut stdout_buf) => {
n = stdout_reader.read_until(b'\n', &mut stdout_buf), if !stdout_done => {
if n? == 0 {
break;
stdout_done = true;
} else {
let line = String::from_utf8_lossy(&stdout_buf);
notifier.try_send(JsonRpcMessage::Notification(JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/message".to_string(),
params: Some(json!({
"data": {
"type": "shell",
"stream": "stdout",
"output": line.to_string(),
}
})),
})).ok();
combined_output.push_str(&line);
stdout_buf.clear();
}
let line = String::from_utf8_lossy(&stdout_buf);
notifier.try_send(JsonRpcMessage::Notification(JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/message".to_string(),
params: Some(json!({
"data": {
"type": "shell",
"stream": "stdout",
"output": line.to_string(),
}
})),
}))
.ok();
combined_output.push_str(&line);
stdout_buf.clear();
}
n = stderr_reader.read_until(b'\n', &mut stderr_buf) => {
n = stderr_reader.read_until(b'\n', &mut stderr_buf), if !stderr_done => {
if n? == 0 {
break;
stderr_done = true;
} else {
let line = String::from_utf8_lossy(&stderr_buf);
notifier.try_send(JsonRpcMessage::Notification(JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/message".to_string(),
params: Some(json!({
"data": {
"type": "shell",
"stream": "stderr",
"output": line.to_string(),
}
})),
})).ok();
combined_output.push_str(&line);
stderr_buf.clear();
}
let line = String::from_utf8_lossy(&stderr_buf);
notifier.try_send(JsonRpcMessage::Notification(JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/message".to_string(),
params: Some(json!({
"data": {
"type": "shell",
"stream": "stderr",
"output": line.to_string(),
}
})),
}))
.ok();
combined_output.push_str(&line);
stderr_buf.clear();
}
else => break,
}
if stdout_done && stderr_done {
break;
}
}
Ok::<_, std::io::Error>(combined_output)