core/io: Never skip a completion in CompletionGroup::add()

The previous implementation of CompletionGroup::add() would filter out
successfully-finished completions:

    if !completion.finished() || completion.failed() {
        self.completions.push(completion.clone());
    }

This caused a problem when combined with drain() in the calling code.
Completions that were already finished would be removed from the source
vector by drain() but not added to the group, effectively losing track
of them.

This breaks the invariant that all completions passed to a group must
be tracked, regardless of their state. The build() method already
handles finished completions correctly by not including them in the
outstanding count.

The fix is to always add all completions and let build() handle their
state appropriately, matching the behavior of the old io_yield_many!()
macro.
This commit is contained in:
Pekka Enberg
2025-10-13 15:30:31 +03:00
parent 0d8a3dda8c
commit d3e8285d93

View File

@@ -176,10 +176,7 @@ impl CompletionGroup {
}
pub fn add(&mut self, completion: &Completion) {
if !completion.finished() || completion.failed() {
self.completions.push(completion.clone());
}
// Skip successfully finished completions
self.completions.push(completion.clone());
}
pub fn build(self) -> Completion {
@@ -962,4 +959,99 @@ mod tests {
assert!(!group.succeeded());
assert_eq!(group.get_error(), Some(CompletionError::Aborted));
}
#[test]
fn test_completion_group_tracks_all_completions() {
// This test verifies the fix for the bug where CompletionGroup::add()
// would skip successfully-finished completions. This caused problems
// when code used drain() to move completions into a group, because
// finished completions would be removed from the source but not tracked
// by the group, effectively losing them.
use std::sync::atomic::{AtomicUsize, Ordering};
let callback_count = Arc::new(AtomicUsize::new(0));
let callback_count_clone = callback_count.clone();
// Simulate the pattern: create multiple completions, complete some,
// then add ALL of them to a group (like drain() would do)
let mut completions = Vec::new();
// Create 4 completions
for _ in 0..4 {
completions.push(Completion::new_write(|_| {}));
}
// Complete 2 of them before adding to group (simulate async completion)
completions[0].complete(0);
completions[2].complete(0);
// Now create a group and add ALL completions (like drain() would do)
let mut group = CompletionGroup::new(move |_| {
callback_count_clone.fetch_add(1, Ordering::SeqCst);
});
// Add all completions to the group
for c in &completions {
group.add(c);
}
let group = group.build();
// The group should track all 4 completions:
// - c[0] and c[2] are already finished
// - c[1] and c[3] are still pending
// So the group should not be finished yet
assert!(!group.finished());
assert_eq!(callback_count.load(Ordering::SeqCst), 0);
// Complete the first pending completion
completions[1].complete(0);
assert!(!group.finished());
assert_eq!(callback_count.load(Ordering::SeqCst), 0);
// Complete the last pending completion - now group should finish
completions[3].complete(0);
assert!(group.finished());
assert!(group.succeeded());
assert_eq!(callback_count.load(Ordering::SeqCst), 1);
// Verify no errors
assert!(group.get_error().is_none());
}
#[test]
fn test_completion_group_with_all_finished_successfully() {
// Edge case: all completions are already successfully finished
// when added to the group. The group should complete immediately.
use std::sync::atomic::{AtomicBool, Ordering};
let callback_called = Arc::new(AtomicBool::new(false));
let callback_called_clone = callback_called.clone();
let mut completions = Vec::new();
// Create and immediately complete 3 completions
for _ in 0..3 {
let c = Completion::new_write(|_| {});
c.complete(0);
completions.push(c);
}
// Add all already-completed completions to group
let mut group = CompletionGroup::new(move |_| {
callback_called_clone.store(true, Ordering::SeqCst);
});
for c in &completions {
group.add(c);
}
let group = group.build();
// Group should be immediately finished since all completions were done
assert!(group.finished());
assert!(group.succeeded());
assert!(callback_called.load(Ordering::SeqCst));
assert!(group.get_error().is_none());
}
}