diff --git a/core/io/mod.rs b/core/io/mod.rs index 0c0baa807..54e045d68 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -176,10 +176,7 @@ impl CompletionGroup { } pub fn add(&mut self, completion: &Completion) { - if !completion.finished() || completion.failed() { - self.completions.push(completion.clone()); - } - // Skip successfully finished completions + self.completions.push(completion.clone()); } pub fn build(self) -> Completion { @@ -962,4 +959,99 @@ mod tests { assert!(!group.succeeded()); assert_eq!(group.get_error(), Some(CompletionError::Aborted)); } + + #[test] + fn test_completion_group_tracks_all_completions() { + // This test verifies the fix for the bug where CompletionGroup::add() + // would skip successfully-finished completions. This caused problems + // when code used drain() to move completions into a group, because + // finished completions would be removed from the source but not tracked + // by the group, effectively losing them. + use std::sync::atomic::{AtomicUsize, Ordering}; + + let callback_count = Arc::new(AtomicUsize::new(0)); + let callback_count_clone = callback_count.clone(); + + // Simulate the pattern: create multiple completions, complete some, + // then add ALL of them to a group (like drain() would do) + let mut completions = Vec::new(); + + // Create 4 completions + for _ in 0..4 { + completions.push(Completion::new_write(|_| {})); + } + + // Complete 2 of them before adding to group (simulate async completion) + completions[0].complete(0); + completions[2].complete(0); + + // Now create a group and add ALL completions (like drain() would do) + let mut group = CompletionGroup::new(move |_| { + callback_count_clone.fetch_add(1, Ordering::SeqCst); + }); + + // Add all completions to the group + for c in &completions { + group.add(c); + } + + let group = group.build(); + + // The group should track all 4 completions: + // - c[0] and c[2] are already finished + // - c[1] and c[3] are still pending + // So the group should not be finished yet + assert!(!group.finished()); + assert_eq!(callback_count.load(Ordering::SeqCst), 0); + + // Complete the first pending completion + completions[1].complete(0); + assert!(!group.finished()); + assert_eq!(callback_count.load(Ordering::SeqCst), 0); + + // Complete the last pending completion - now group should finish + completions[3].complete(0); + assert!(group.finished()); + assert!(group.succeeded()); + assert_eq!(callback_count.load(Ordering::SeqCst), 1); + + // Verify no errors + assert!(group.get_error().is_none()); + } + + #[test] + fn test_completion_group_with_all_finished_successfully() { + // Edge case: all completions are already successfully finished + // when added to the group. The group should complete immediately. + use std::sync::atomic::{AtomicBool, Ordering}; + + let callback_called = Arc::new(AtomicBool::new(false)); + let callback_called_clone = callback_called.clone(); + + let mut completions = Vec::new(); + + // Create and immediately complete 3 completions + for _ in 0..3 { + let c = Completion::new_write(|_| {}); + c.complete(0); + completions.push(c); + } + + // Add all already-completed completions to group + let mut group = CompletionGroup::new(move |_| { + callback_called_clone.store(true, Ordering::SeqCst); + }); + + for c in &completions { + group.add(c); + } + + let group = group.build(); + + // Group should be immediately finished since all completions were done + assert!(group.finished()); + assert!(group.succeeded()); + assert!(callback_called.load(Ordering::SeqCst)); + assert!(group.get_error().is_none()); + } }