From d3e8285d93e152a12a233dd794d05e81d64de8a1 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Mon, 13 Oct 2025 15:30:31 +0300 Subject: [PATCH] core/io: Never skip a completion in CompletionGroup::add() The previous implementation of CompletionGroup::add() would filter out successfully-finished completions: if !completion.finished() || completion.failed() { self.completions.push(completion.clone()); } This caused a problem when combined with drain() in the calling code. Completions that were already finished would be removed from the source vector by drain() but not added to the group, effectively losing track of them. This breaks the invariant that all completions passed to a group must be tracked, regardless of their state. The build() method already handles finished completions correctly by not including them in the outstanding count. The fix is to always add all completions and let build() handle their state appropriately, matching the behavior of the old io_yield_many!() macro. --- core/io/mod.rs | 100 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 96 insertions(+), 4 deletions(-) diff --git a/core/io/mod.rs b/core/io/mod.rs index 0c0baa807..54e045d68 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -176,10 +176,7 @@ impl CompletionGroup { } pub fn add(&mut self, completion: &Completion) { - if !completion.finished() || completion.failed() { - self.completions.push(completion.clone()); - } - // Skip successfully finished completions + self.completions.push(completion.clone()); } pub fn build(self) -> Completion { @@ -962,4 +959,99 @@ mod tests { assert!(!group.succeeded()); assert_eq!(group.get_error(), Some(CompletionError::Aborted)); } + + #[test] + fn test_completion_group_tracks_all_completions() { + // This test verifies the fix for the bug where CompletionGroup::add() + // would skip successfully-finished completions. This caused problems + // when code used drain() to move completions into a group, because + // finished completions would be removed from the source but not tracked + // by the group, effectively losing them. + use std::sync::atomic::{AtomicUsize, Ordering}; + + let callback_count = Arc::new(AtomicUsize::new(0)); + let callback_count_clone = callback_count.clone(); + + // Simulate the pattern: create multiple completions, complete some, + // then add ALL of them to a group (like drain() would do) + let mut completions = Vec::new(); + + // Create 4 completions + for _ in 0..4 { + completions.push(Completion::new_write(|_| {})); + } + + // Complete 2 of them before adding to group (simulate async completion) + completions[0].complete(0); + completions[2].complete(0); + + // Now create a group and add ALL completions (like drain() would do) + let mut group = CompletionGroup::new(move |_| { + callback_count_clone.fetch_add(1, Ordering::SeqCst); + }); + + // Add all completions to the group + for c in &completions { + group.add(c); + } + + let group = group.build(); + + // The group should track all 4 completions: + // - c[0] and c[2] are already finished + // - c[1] and c[3] are still pending + // So the group should not be finished yet + assert!(!group.finished()); + assert_eq!(callback_count.load(Ordering::SeqCst), 0); + + // Complete the first pending completion + completions[1].complete(0); + assert!(!group.finished()); + assert_eq!(callback_count.load(Ordering::SeqCst), 0); + + // Complete the last pending completion - now group should finish + completions[3].complete(0); + assert!(group.finished()); + assert!(group.succeeded()); + assert_eq!(callback_count.load(Ordering::SeqCst), 1); + + // Verify no errors + assert!(group.get_error().is_none()); + } + + #[test] + fn test_completion_group_with_all_finished_successfully() { + // Edge case: all completions are already successfully finished + // when added to the group. The group should complete immediately. + use std::sync::atomic::{AtomicBool, Ordering}; + + let callback_called = Arc::new(AtomicBool::new(false)); + let callback_called_clone = callback_called.clone(); + + let mut completions = Vec::new(); + + // Create and immediately complete 3 completions + for _ in 0..3 { + let c = Completion::new_write(|_| {}); + c.complete(0); + completions.push(c); + } + + // Add all already-completed completions to group + let mut group = CompletionGroup::new(move |_| { + callback_called_clone.store(true, Ordering::SeqCst); + }); + + for c in &completions { + group.add(c); + } + + let group = group.build(); + + // Group should be immediately finished since all completions were done + assert!(group.finished()); + assert!(group.succeeded()); + assert!(callback_called.load(Ordering::SeqCst)); + assert!(group.get_error().is_none()); + } }