From 07ba7276b2f23f8f0ee71f1fe5b05da39ce1a24b Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Mon, 13 Oct 2025 10:38:20 +0300 Subject: [PATCH] core/vdbe/sorter: Replace io_yield_many with completion groups --- core/vdbe/sorter.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index ac7e07ed4..0e9eec00b 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -9,14 +9,14 @@ use tempfile; use crate::types::IOCompletions; use crate::{ error::LimboError, - io::{Buffer, Completion, File, OpenFlags, IO}, + io::{Buffer, Completion, CompletionGroup, File, OpenFlags, IO}, storage::sqlite3_ondisk::{read_varint, varint_len, write_varint}, translate::collate::CollationSeq, turso_assert, types::{IOResult, ImmutableRecord, KeyInfo, RecordCursor, ValueRef}, Result, }; -use crate::{io_yield_many, io_yield_one, return_if_io, CompletionError}; +use crate::{io_yield_one, return_if_io, CompletionError}; #[derive(Debug, Clone, Copy)] enum SortState { @@ -252,20 +252,21 @@ impl Sorter { fn init_chunk_heap(&mut self) -> Result> { match self.init_chunk_heap_state { InitChunkHeapState::Start => { - let mut completions: Vec = Vec::with_capacity(self.chunks.len()); + let mut group = CompletionGroup::new(|_| {}); for chunk in self.chunks.iter_mut() { match chunk.read() { Err(e) => { tracing::error!("Failed to read chunk: {e}"); - self.io.cancel(&completions)?; + group.cancel(); self.io.drain()?; return Err(e); } - Ok(c) => completions.push(c), + Ok(c) => group.add(&c), }; } self.init_chunk_heap_state = InitChunkHeapState::PushChunk; - io_yield_many!(completions); + let completion = group.build(); + io_yield_one!(completion); } InitChunkHeapState::PushChunk => { // Make sure all chunks read at least one record into their buffer. @@ -278,17 +279,19 @@ impl Sorter { ); self.chunk_heap.reserve(self.chunks.len()); // TODO: blocking will be unnecessary here with IO completions - let mut completions = vec![]; + let mut group = CompletionGroup::new(|_| {}); for chunk_idx in 0..self.chunks.len() { if let Some(c) = self.push_to_chunk_heap(chunk_idx)? { - completions.push(c); + group.add(&c); }; } self.init_chunk_heap_state = InitChunkHeapState::Start; - if !completions.is_empty() { - io_yield_many!(completions); + let completion = group.build(); + if completion.finished() { + Ok(IOResult::Done(())) + } else { + io_yield_one!(completion); } - Ok(IOResult::Done(())) } } }