From 2ec58b02642f9b3d47b57e2b5fef0ded1ff57655 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 6 Aug 2025 13:50:33 -0300 Subject: [PATCH] state machine for `sort` --- core/vdbe/sorter.rs | 56 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 9ce453372..30a24e629 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -18,6 +18,14 @@ use crate::{ Result, }; +#[derive(Debug, Clone, Copy)] +enum SortState { + Start, + Flush, + InitHeap, + Next, +} + pub struct Sorter { /// The records in the in-memory buffer. records: Vec, @@ -46,6 +54,8 @@ pub struct Sorter { wait_for_read_complete: Vec, /// The temporary directory for chunk files. temp_dir: Option, + /// State machine for [Sorter::sort] + sort_state: SortState, } impl Sorter { @@ -80,6 +90,7 @@ impl Sorter { io, wait_for_read_complete: Vec::new(), temp_dir: None, + sort_state: SortState::Start, } } @@ -93,16 +104,34 @@ impl Sorter { // We do the sorting here since this is what is called by the SorterSort instruction pub fn sort(&mut self) -> Result> { - if self.chunks.is_empty() { - self.records.sort(); - self.records.reverse(); - } else { - self.flush()?; - if let IOResult::IO = self.init_chunk_heap()? { - return Ok(IOResult::IO); + loop { + match self.sort_state { + SortState::Start => { + if self.chunks.is_empty() { + self.records.sort(); + self.records.reverse(); + self.sort_state = SortState::Next; + } else { + self.sort_state = SortState::Flush; + } + } + SortState::Flush => { + self.sort_state = SortState::InitHeap; + if let Some(_c) = self.flush()? { + return Ok(IOResult::IO); + } + } + SortState::InitHeap => { + return_if_io!(self.init_chunk_heap()); + self.sort_state = SortState::Next; + } + SortState::Next => { + return_if_io!(self.next()); + self.sort_state = SortState::Start; + return Ok(IOResult::Done(())); + } } } - self.next() } pub fn next(&mut self) -> Result> { @@ -136,7 +165,7 @@ impl Sorter { pub fn insert(&mut self, record: &ImmutableRecord) -> Result<()> { let payload_size = record.get_payload().len(); if self.current_buffer_size + payload_size > self.max_buffer_size { - self.flush()?; + let _c = self.flush()?; } self.records.push(SortableImmutableRecord::new( record.clone(), @@ -219,9 +248,10 @@ impl Sorter { Ok(IOResult::Done(())) } - fn flush(&mut self) -> Result<()> { + fn flush(&mut self) -> Result> { if self.records.is_empty() { - return Ok(()); + // Dummy completion to not complicate logic handling + return Ok(None); } self.records.sort(); @@ -245,13 +275,13 @@ impl Sorter { .min_chunk_read_buffer_size .max(self.max_payload_size_in_buffer + 9); let mut chunk = SortedChunk::new(chunk_file.clone(), chunk_buffer_size); - let _c = chunk.write(&mut self.records)?; + let c = chunk.write(&mut self.records)?; self.chunks.push(chunk); self.current_buffer_size = 0; self.max_payload_size_in_buffer = 0; - Ok(()) + Ok(Some(c)) } }