From 6fe19e4ef4c91724b6d056be94edab2aa385fa2e Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 6 Aug 2025 14:59:45 -0300 Subject: [PATCH] adjust external sort `init_chunk_heap` and `next_from_chunk_heap` --- core/vdbe/sorter.rs | 116 +++++++++++++++++++++----------------------- 1 file changed, 56 insertions(+), 60 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index ecbecf3af..a53821716 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use tempfile; use crate::return_if_io; +use crate::util::IOExt; use crate::{ error::LimboError, io::{Buffer, Completion, File, OpenFlags, IO}, @@ -32,6 +33,12 @@ enum InsertState { Insert, } +#[derive(Debug, Clone, Copy)] +enum InitChunkHeapState { + Start, + PushChunk, +} + pub struct Sorter { /// The records in the in-memory buffer. records: Vec, @@ -64,6 +71,7 @@ pub struct Sorter { sort_state: SortState, /// State machine for [Sorter::insert] insert_state: InsertState, + init_chunk_heap_state: InitChunkHeapState, } impl Sorter { @@ -100,6 +108,7 @@ impl Sorter { temp_dir: None, sort_state: SortState::Start, insert_state: InsertState::Start, + init_chunk_heap_state: InitChunkHeapState::Start, } } @@ -125,10 +134,10 @@ impl Sorter { } } SortState::Flush => { - self.sort_state = SortState::InitHeap; if let Some(_c) = self.flush()? { return Ok(IOResult::IO); } + self.sort_state = SortState::InitHeap; } SortState::InitHeap => { return_if_io!(self.init_chunk_heap()); @@ -200,50 +209,52 @@ impl Sorter { } fn init_chunk_heap(&mut self) -> Result> { - let mut all_read_complete = true; - // Make sure all chunks read at least one record into their buffer. - for chunk in self.chunks.iter_mut() { - match chunk.io_state.get() { - SortedChunkIOState::WaitingForWrite | SortedChunkIOState::WaitingForRead => { - all_read_complete = false; + match self.init_chunk_heap_state { + InitChunkHeapState::Start => { + let mut completions = Vec::with_capacity(self.chunks.len()); + for chunk in self.chunks.iter_mut() { + let c = chunk.read()?; + completions.push(c); } - SortedChunkIOState::ReadEOF | SortedChunkIOState::ReadComplete => {} - _ => { - unreachable!("Unexpected chunk IO state: {:?}", chunk.io_state.get()) + self.init_chunk_heap_state = InitChunkHeapState::PushChunk; + Ok(IOResult::IO) + } + InitChunkHeapState::PushChunk => { + // Make sure all chunks read at least one record into their buffer. + if self + .chunks + .iter() + .any(|chunk| chunk.io_state.get() == SortedChunkIOState::WaitingForRead) + { + return Ok(IOResult::IO); } + self.chunk_heap.reserve(self.chunks.len()); + // TODO: blocking will be unnecessary here with IO completions + let io = self.io.clone(); + for chunk_idx in 0..self.chunks.len() { + io.block(|| self.push_to_chunk_heap(chunk_idx))?; + } + self.init_chunk_heap_state = InitChunkHeapState::Start; + Ok(IOResult::Done(())) } } - if !all_read_complete { - return Ok(IOResult::IO); - } - self.chunk_heap.reserve(self.chunks.len()); - for chunk_idx in 0..self.chunks.len() { - self.push_to_chunk_heap(chunk_idx)?; - } - Ok(IOResult::Done(())) } fn next_from_chunk_heap(&mut self) -> Result>> { - let mut all_read_complete = true; - for chunk_idx in self.wait_for_read_complete.iter() { - let chunk_io_state = self.chunks[*chunk_idx].io_state.get(); - match chunk_io_state { - SortedChunkIOState::ReadComplete | SortedChunkIOState::ReadEOF => {} - SortedChunkIOState::WaitingForRead => { - all_read_complete = false; - } - _ => { - unreachable!("Unexpected chunk IO state: {:?}", chunk_io_state) - } - } - } - if !all_read_complete { + // Make sure all chunks read at least one record into their buffer. + if self + .chunks + .iter() + .any(|chunk| chunk.io_state.get() == SortedChunkIOState::WaitingForRead) + { return Ok(IOResult::IO); } self.wait_for_read_complete.clear(); if let Some((next_record, next_chunk_idx)) = self.chunk_heap.pop() { - self.push_to_chunk_heap(next_chunk_idx)?; + // TODO: blocking will be unnecessary here with IO completions + let io = self.io.clone(); + io.block(|| self.push_to_chunk_heap(next_chunk_idx))?; Ok(IOResult::Done(Some(next_record.0))) } else { Ok(IOResult::Done(None)) @@ -346,10 +357,6 @@ impl SortedChunk { } } - fn has_more(&self) -> bool { - !self.records.is_empty() || self.io_state.get() != SortedChunkIOState::ReadEOF - } - fn next(&mut self) -> Result>> { loop { match self.next_state { @@ -470,7 +477,6 @@ impl SortedChunk { fn write(&mut self, records: &mut Vec) -> Result { assert!(self.io_state.get() == SortedChunkIOState::None); - self.io_state.set(SortedChunkIOState::WaitingForWrite); self.chunk_size = 0; // Pre-compute varint lengths for record sizes to determine the total buffer size. @@ -632,7 +638,6 @@ enum SortedChunkIOState { WaitingForRead, ReadComplete, ReadEOF, - WaitingForWrite, None, } @@ -667,16 +672,17 @@ mod tests { let mut rng = ChaCha8Rng::seed_from_u64(seed); let io = Arc::new(PlatformIO::new().unwrap()); - let mut sorter = Sorter::new( - &[SortOrder::Asc], - vec![CollationSeq::Binary], - 256, - 64, - io.clone(), - ); let attempts = 8; for _ in 0..attempts { + let mut sorter = Sorter::new( + &[SortOrder::Asc], + vec![CollationSeq::Binary], + 256, + 64, + io.clone(), + ); + let num_records = 1000 + rng.next_u64() % 2000; let num_records = num_records as i64; @@ -694,13 +700,8 @@ mod tests { initial_records.push(record); } - loop { - if let IOResult::IO = sorter.sort().expect("Failed to sort the records") { - io.run_once().expect("Failed to run the IO"); - continue; - } - break; - } + io.block(|| sorter.sort()) + .expect("Failed to sort the records"); assert!(!sorter.is_empty()); assert!(!sorter.chunks.is_empty()); @@ -712,13 +713,8 @@ mod tests { // Check that the record remained unchanged after sorting. assert_eq!(record, &initial_records[(num_records - i - 1) as usize]); - loop { - if let IOResult::IO = sorter.next().expect("Failed to get the next record") { - io.run_once().expect("Failed to run the IO"); - continue; - } - break; - } + io.block(|| sorter.next()) + .expect("Failed to get the next record"); } assert!(!sorter.has_more()); }