core/vdbe/sorter: Replace io_yield_many with completion groups

This commit is contained in:
Pekka Enberg
2025-10-13 10:38:20 +03:00
parent 80876148cf
commit 07ba7276b2

View File

@@ -9,14 +9,14 @@ use tempfile;
use crate::types::IOCompletions;
use crate::{
error::LimboError,
io::{Buffer, Completion, File, OpenFlags, IO},
io::{Buffer, Completion, CompletionGroup, File, OpenFlags, IO},
storage::sqlite3_ondisk::{read_varint, varint_len, write_varint},
translate::collate::CollationSeq,
turso_assert,
types::{IOResult, ImmutableRecord, KeyInfo, RecordCursor, ValueRef},
Result,
};
use crate::{io_yield_many, io_yield_one, return_if_io, CompletionError};
use crate::{io_yield_one, return_if_io, CompletionError};
#[derive(Debug, Clone, Copy)]
enum SortState {
@@ -252,20 +252,21 @@ impl Sorter {
fn init_chunk_heap(&mut self) -> Result<IOResult<()>> {
match self.init_chunk_heap_state {
InitChunkHeapState::Start => {
let mut completions: Vec<Completion> = Vec::with_capacity(self.chunks.len());
let mut group = CompletionGroup::new(|_| {});
for chunk in self.chunks.iter_mut() {
match chunk.read() {
Err(e) => {
tracing::error!("Failed to read chunk: {e}");
self.io.cancel(&completions)?;
group.cancel();
self.io.drain()?;
return Err(e);
}
Ok(c) => completions.push(c),
Ok(c) => group.add(&c),
};
}
self.init_chunk_heap_state = InitChunkHeapState::PushChunk;
io_yield_many!(completions);
let completion = group.build();
io_yield_one!(completion);
}
InitChunkHeapState::PushChunk => {
// Make sure all chunks read at least one record into their buffer.
@@ -278,17 +279,19 @@ impl Sorter {
);
self.chunk_heap.reserve(self.chunks.len());
// TODO: blocking will be unnecessary here with IO completions
let mut completions = vec![];
let mut group = CompletionGroup::new(|_| {});
for chunk_idx in 0..self.chunks.len() {
if let Some(c) = self.push_to_chunk_heap(chunk_idx)? {
completions.push(c);
group.add(&c);
};
}
self.init_chunk_heap_state = InitChunkHeapState::Start;
if !completions.is_empty() {
io_yield_many!(completions);
let completion = group.build();
if completion.finished() {
Ok(IOResult::Done(()))
} else {
io_yield_one!(completion);
}
Ok(IOResult::Done(()))
}
}
}