adjust external sort init_chunk_heap and next_from_chunk_heap

This commit is contained in:
pedrocarlo
2025-08-06 14:59:45 -03:00
parent c02936eb30
commit 6fe19e4ef4

View File

@@ -8,6 +8,7 @@ use std::sync::Arc;
use tempfile;
use crate::return_if_io;
use crate::util::IOExt;
use crate::{
error::LimboError,
io::{Buffer, Completion, File, OpenFlags, IO},
@@ -32,6 +33,12 @@ enum InsertState {
Insert,
}
#[derive(Debug, Clone, Copy)]
enum InitChunkHeapState {
Start,
PushChunk,
}
pub struct Sorter {
/// The records in the in-memory buffer.
records: Vec<SortableImmutableRecord>,
@@ -64,6 +71,7 @@ pub struct Sorter {
sort_state: SortState,
/// State machine for [Sorter::insert]
insert_state: InsertState,
init_chunk_heap_state: InitChunkHeapState,
}
impl Sorter {
@@ -100,6 +108,7 @@ impl Sorter {
temp_dir: None,
sort_state: SortState::Start,
insert_state: InsertState::Start,
init_chunk_heap_state: InitChunkHeapState::Start,
}
}
@@ -125,10 +134,10 @@ impl Sorter {
}
}
SortState::Flush => {
self.sort_state = SortState::InitHeap;
if let Some(_c) = self.flush()? {
return Ok(IOResult::IO);
}
self.sort_state = SortState::InitHeap;
}
SortState::InitHeap => {
return_if_io!(self.init_chunk_heap());
@@ -200,50 +209,52 @@ impl Sorter {
}
fn init_chunk_heap(&mut self) -> Result<IOResult<()>> {
let mut all_read_complete = true;
// Make sure all chunks read at least one record into their buffer.
for chunk in self.chunks.iter_mut() {
match chunk.io_state.get() {
SortedChunkIOState::WaitingForWrite | SortedChunkIOState::WaitingForRead => {
all_read_complete = false;
match self.init_chunk_heap_state {
InitChunkHeapState::Start => {
let mut completions = Vec::with_capacity(self.chunks.len());
for chunk in self.chunks.iter_mut() {
let c = chunk.read()?;
completions.push(c);
}
SortedChunkIOState::ReadEOF | SortedChunkIOState::ReadComplete => {}
_ => {
unreachable!("Unexpected chunk IO state: {:?}", chunk.io_state.get())
self.init_chunk_heap_state = InitChunkHeapState::PushChunk;
Ok(IOResult::IO)
}
InitChunkHeapState::PushChunk => {
// Make sure all chunks read at least one record into their buffer.
if self
.chunks
.iter()
.any(|chunk| chunk.io_state.get() == SortedChunkIOState::WaitingForRead)
{
return Ok(IOResult::IO);
}
self.chunk_heap.reserve(self.chunks.len());
// TODO: blocking will be unnecessary here with IO completions
let io = self.io.clone();
for chunk_idx in 0..self.chunks.len() {
io.block(|| self.push_to_chunk_heap(chunk_idx))?;
}
self.init_chunk_heap_state = InitChunkHeapState::Start;
Ok(IOResult::Done(()))
}
}
if !all_read_complete {
return Ok(IOResult::IO);
}
self.chunk_heap.reserve(self.chunks.len());
for chunk_idx in 0..self.chunks.len() {
self.push_to_chunk_heap(chunk_idx)?;
}
Ok(IOResult::Done(()))
}
fn next_from_chunk_heap(&mut self) -> Result<IOResult<Option<SortableImmutableRecord>>> {
let mut all_read_complete = true;
for chunk_idx in self.wait_for_read_complete.iter() {
let chunk_io_state = self.chunks[*chunk_idx].io_state.get();
match chunk_io_state {
SortedChunkIOState::ReadComplete | SortedChunkIOState::ReadEOF => {}
SortedChunkIOState::WaitingForRead => {
all_read_complete = false;
}
_ => {
unreachable!("Unexpected chunk IO state: {:?}", chunk_io_state)
}
}
}
if !all_read_complete {
// Make sure all chunks read at least one record into their buffer.
if self
.chunks
.iter()
.any(|chunk| chunk.io_state.get() == SortedChunkIOState::WaitingForRead)
{
return Ok(IOResult::IO);
}
self.wait_for_read_complete.clear();
if let Some((next_record, next_chunk_idx)) = self.chunk_heap.pop() {
self.push_to_chunk_heap(next_chunk_idx)?;
// TODO: blocking will be unnecessary here with IO completions
let io = self.io.clone();
io.block(|| self.push_to_chunk_heap(next_chunk_idx))?;
Ok(IOResult::Done(Some(next_record.0)))
} else {
Ok(IOResult::Done(None))
@@ -346,10 +357,6 @@ impl SortedChunk {
}
}
fn has_more(&self) -> bool {
!self.records.is_empty() || self.io_state.get() != SortedChunkIOState::ReadEOF
}
fn next(&mut self) -> Result<IOResult<Option<ImmutableRecord>>> {
loop {
match self.next_state {
@@ -470,7 +477,6 @@ impl SortedChunk {
fn write(&mut self, records: &mut Vec<SortableImmutableRecord>) -> Result<Completion> {
assert!(self.io_state.get() == SortedChunkIOState::None);
self.io_state.set(SortedChunkIOState::WaitingForWrite);
self.chunk_size = 0;
// Pre-compute varint lengths for record sizes to determine the total buffer size.
@@ -632,7 +638,6 @@ enum SortedChunkIOState {
WaitingForRead,
ReadComplete,
ReadEOF,
WaitingForWrite,
None,
}
@@ -667,16 +672,17 @@ mod tests {
let mut rng = ChaCha8Rng::seed_from_u64(seed);
let io = Arc::new(PlatformIO::new().unwrap());
let mut sorter = Sorter::new(
&[SortOrder::Asc],
vec![CollationSeq::Binary],
256,
64,
io.clone(),
);
let attempts = 8;
for _ in 0..attempts {
let mut sorter = Sorter::new(
&[SortOrder::Asc],
vec![CollationSeq::Binary],
256,
64,
io.clone(),
);
let num_records = 1000 + rng.next_u64() % 2000;
let num_records = num_records as i64;
@@ -694,13 +700,8 @@ mod tests {
initial_records.push(record);
}
loop {
if let IOResult::IO = sorter.sort().expect("Failed to sort the records") {
io.run_once().expect("Failed to run the IO");
continue;
}
break;
}
io.block(|| sorter.sort())
.expect("Failed to sort the records");
assert!(!sorter.is_empty());
assert!(!sorter.chunks.is_empty());
@@ -712,13 +713,8 @@ mod tests {
// Check that the record remained unchanged after sorting.
assert_eq!(record, &initial_records[(num_records - i - 1) as usize]);
loop {
if let IOResult::IO = sorter.next().expect("Failed to get the next record") {
io.run_once().expect("Failed to run the IO");
continue;
}
break;
}
io.block(|| sorter.next())
.expect("Failed to get the next record");
}
assert!(!sorter.has_more());
}