state machine for sort

This commit is contained in:
pedrocarlo
2025-08-06 13:50:33 -03:00
parent c91c22a6a8
commit 2ec58b0264

View File

@@ -18,6 +18,14 @@ use crate::{
Result,
};
#[derive(Debug, Clone, Copy)]
enum SortState {
Start,
Flush,
InitHeap,
Next,
}
pub struct Sorter {
/// The records in the in-memory buffer.
records: Vec<SortableImmutableRecord>,
@@ -46,6 +54,8 @@ pub struct Sorter {
wait_for_read_complete: Vec<usize>,
/// The temporary directory for chunk files.
temp_dir: Option<tempfile::TempDir>,
/// State machine for [Sorter::sort]
sort_state: SortState,
}
impl Sorter {
@@ -80,6 +90,7 @@ impl Sorter {
io,
wait_for_read_complete: Vec::new(),
temp_dir: None,
sort_state: SortState::Start,
}
}
@@ -93,16 +104,34 @@ impl Sorter {
// We do the sorting here since this is what is called by the SorterSort instruction
pub fn sort(&mut self) -> Result<IOResult<()>> {
if self.chunks.is_empty() {
self.records.sort();
self.records.reverse();
} else {
self.flush()?;
if let IOResult::IO = self.init_chunk_heap()? {
return Ok(IOResult::IO);
loop {
match self.sort_state {
SortState::Start => {
if self.chunks.is_empty() {
self.records.sort();
self.records.reverse();
self.sort_state = SortState::Next;
} else {
self.sort_state = SortState::Flush;
}
}
SortState::Flush => {
self.sort_state = SortState::InitHeap;
if let Some(_c) = self.flush()? {
return Ok(IOResult::IO);
}
}
SortState::InitHeap => {
return_if_io!(self.init_chunk_heap());
self.sort_state = SortState::Next;
}
SortState::Next => {
return_if_io!(self.next());
self.sort_state = SortState::Start;
return Ok(IOResult::Done(()));
}
}
}
self.next()
}
pub fn next(&mut self) -> Result<IOResult<()>> {
@@ -136,7 +165,7 @@ impl Sorter {
pub fn insert(&mut self, record: &ImmutableRecord) -> Result<()> {
let payload_size = record.get_payload().len();
if self.current_buffer_size + payload_size > self.max_buffer_size {
self.flush()?;
let _c = self.flush()?;
}
self.records.push(SortableImmutableRecord::new(
record.clone(),
@@ -219,9 +248,10 @@ impl Sorter {
Ok(IOResult::Done(()))
}
fn flush(&mut self) -> Result<()> {
fn flush(&mut self) -> Result<Option<Completion>> {
if self.records.is_empty() {
return Ok(());
// Dummy completion to not complicate logic handling
return Ok(None);
}
self.records.sort();
@@ -245,13 +275,13 @@ impl Sorter {
.min_chunk_read_buffer_size
.max(self.max_payload_size_in_buffer + 9);
let mut chunk = SortedChunk::new(chunk_file.clone(), chunk_buffer_size);
let _c = chunk.write(&mut self.records)?;
let c = chunk.write(&mut self.records)?;
self.chunks.push(chunk);
self.current_buffer_size = 0;
self.max_payload_size_in_buffer = 0;
Ok(())
Ok(Some(c))
}
}