From 2ffc5ee423d490d58b6a1ca039b6d131e1dd09ab Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 6 Aug 2025 12:32:52 -0300 Subject: [PATCH 1/9] SorterChunk `read` and `write` should return completions --- core/vdbe/sorter.rs | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index f27b57f54..52771e5b5 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -155,7 +155,7 @@ impl Sorter { SortedChunkIOState::WriteComplete => { all_read_complete = false; // Write complete, we can now read from the chunk. - chunk.read()?; + let _c = chunk.read()?; } SortedChunkIOState::WaitingForWrite | SortedChunkIOState::WaitingForRead => { all_read_complete = false; @@ -249,7 +249,7 @@ impl Sorter { .min_chunk_read_buffer_size .max(self.max_payload_size_in_buffer + 9); let mut chunk = SortedChunk::new(chunk_file.clone(), chunk_buffer_size); - chunk.write(&mut self.records)?; + let _c = chunk.write(&mut self.records)?; self.chunks.push(chunk); self.current_buffer_size = 0; @@ -346,19 +346,16 @@ impl SortedChunk { let record = self.records.pop(); if self.records.is_empty() && self.io_state.get() != SortedChunkIOState::ReadEOF { // We've consumed the last record. Read more payload into the buffer. - self.read()?; + if self.chunk_size - self.total_bytes_read.get() == 0 { + self.io_state.set(SortedChunkIOState::ReadEOF); + } else { + let _c = self.read()?; + } } Ok(record) } - fn read(&mut self) -> Result<()> { - if self.io_state.get() == SortedChunkIOState::ReadEOF { - return Ok(()); - } - if self.chunk_size - self.total_bytes_read.get() == 0 { - self.io_state.set(SortedChunkIOState::ReadEOF); - return Ok(()); - } + fn read(&mut self) -> Result { self.io_state.set(SortedChunkIOState::WaitingForRead); let read_buffer_size = self.buffer.borrow().len() - self.buffer_len.get(); @@ -395,11 +392,11 @@ impl SortedChunk { }); let c = Completion::new_read(read_buffer_ref, read_complete); - let _c = self.file.pread(self.total_bytes_read.get(), c)?; - Ok(()) + let c = self.file.pread(self.total_bytes_read.get(), c)?; + Ok(c) } - fn write(&mut self, records: &mut Vec) -> Result<()> { + fn write(&mut self, records: &mut Vec) -> Result { assert!(self.io_state.get() == SortedChunkIOState::None); self.io_state.set(SortedChunkIOState::WaitingForWrite); self.chunk_size = 0; @@ -440,8 +437,8 @@ impl SortedChunk { }); let c = Completion::new_write(write_complete); - let _c = self.file.pwrite(0, buffer_ref, c)?; - Ok(()) + let c = self.file.pwrite(0, buffer_ref, c)?; + Ok(c) } } From c91c22a6a84af216e7945d9c0b2e23888d98d174 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 6 Aug 2025 13:12:45 -0300 Subject: [PATCH 2/9] state machine for `next` --- core/vdbe/sorter.rs | 153 ++++++++++++++++++++++++-------------------- 1 file changed, 85 insertions(+), 68 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 52771e5b5..9ce453372 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -7,6 +7,7 @@ use std::rc::Rc; use std::sync::Arc; use tempfile; +use crate::return_if_io; use crate::{ error::LimboError, io::{Buffer, Completion, File, OpenFlags, IO}, @@ -152,11 +153,6 @@ impl Sorter { // Make sure all chunks read at least one record into their buffer. for chunk in self.chunks.iter_mut() { match chunk.io_state.get() { - SortedChunkIOState::WriteComplete => { - all_read_complete = false; - // Write complete, we can now read from the chunk. - let _c = chunk.read()?; - } SortedChunkIOState::WaitingForWrite | SortedChunkIOState::WaitingForRead => { all_read_complete = false; } @@ -203,11 +199,10 @@ impl Sorter { } } - fn push_to_chunk_heap(&mut self, chunk_idx: usize) -> Result<()> { + fn push_to_chunk_heap(&mut self, chunk_idx: usize) -> Result> { let chunk = &mut self.chunks[chunk_idx]; - if chunk.has_more() { - let record = chunk.next()?.unwrap(); + if let Some(record) = return_if_io!(chunk.next()) { self.chunk_heap.push(( Reverse(SortableImmutableRecord::new( record, @@ -220,7 +215,8 @@ impl Sorter { self.wait_for_read_complete.push(chunk_idx); } } - Ok(()) + + Ok(IOResult::Done(())) } fn flush(&mut self) -> Result<()> { @@ -259,6 +255,12 @@ impl Sorter { } } +#[derive(Debug, Clone, Copy)] +enum NextState { + Start, + Finish, +} + struct SortedChunk { /// The chunk file. file: Arc, @@ -274,6 +276,8 @@ struct SortedChunk { io_state: Rc>, /// The total number of bytes read from the chunk file. total_bytes_read: Rc>, + /// State machine for [SortedChunk::next] + next_state: NextState, } impl SortedChunk { @@ -286,6 +290,7 @@ impl SortedChunk { records: Vec::new(), io_state: Rc::new(Cell::new(SortedChunkIOState::None)), total_bytes_read: Rc::new(Cell::new(0)), + next_state: NextState::Start, } } @@ -293,66 +298,81 @@ impl SortedChunk { !self.records.is_empty() || self.io_state.get() != SortedChunkIOState::ReadEOF } - fn next(&mut self) -> Result> { - let mut buffer_len = self.buffer_len.get(); - if self.records.is_empty() && buffer_len == 0 { - return Ok(None); - } - - if self.records.is_empty() { - let mut buffer_ref = self.buffer.borrow_mut(); - let buffer = buffer_ref.as_mut_slice(); - let mut buffer_offset = 0; - while buffer_offset < buffer_len { - // Extract records from the buffer until we run out of the buffer or we hit an incomplete record. - let (record_size, bytes_read) = - match read_varint(&buffer[buffer_offset..buffer_len]) { - Ok((record_size, bytes_read)) => (record_size as usize, bytes_read), - Err(LimboError::Corrupt(_)) - if self.io_state.get() != SortedChunkIOState::ReadEOF => - { - // Failed to decode a partial varint. - break; - } - Err(e) => { - return Err(e); - } - }; - if record_size > buffer_len - (buffer_offset + bytes_read) { - if self.io_state.get() == SortedChunkIOState::ReadEOF { - crate::bail_corrupt_error!("Incomplete record"); + fn next(&mut self) -> Result>> { + loop { + match self.next_state { + NextState::Start => { + let mut buffer_len = self.buffer_len.get(); + if self.records.is_empty() && buffer_len == 0 { + return Ok(IOResult::Done(None)); + } + + if self.records.is_empty() { + let mut buffer_ref = self.buffer.borrow_mut(); + let buffer = buffer_ref.as_mut_slice(); + let mut buffer_offset = 0; + while buffer_offset < buffer_len { + // Extract records from the buffer until we run out of the buffer or we hit an incomplete record. + let (record_size, bytes_read) = + match read_varint(&buffer[buffer_offset..buffer_len]) { + Ok((record_size, bytes_read)) => { + (record_size as usize, bytes_read) + } + Err(LimboError::Corrupt(_)) + if self.io_state.get() != SortedChunkIOState::ReadEOF => + { + // Failed to decode a partial varint. + break; + } + Err(e) => { + return Err(e); + } + }; + if record_size > buffer_len - (buffer_offset + bytes_read) { + if self.io_state.get() == SortedChunkIOState::ReadEOF { + crate::bail_corrupt_error!("Incomplete record"); + } + break; + } + buffer_offset += bytes_read; + + let mut record = ImmutableRecord::new(record_size); + record.start_serialization( + &buffer[buffer_offset..buffer_offset + record_size], + ); + buffer_offset += record_size; + + self.records.push(record); + } + if buffer_offset < buffer_len { + buffer.copy_within(buffer_offset..buffer_len, 0); + buffer_len -= buffer_offset; + } else { + buffer_len = 0; + } + self.buffer_len.set(buffer_len); + + self.records.reverse(); + } + + self.next_state = NextState::Finish; + if self.records.len() == 1 && self.io_state.get() != SortedChunkIOState::ReadEOF + { + // We've consumed the last record. Read more payload into the buffer. + if self.chunk_size - self.total_bytes_read.get() == 0 { + self.io_state.set(SortedChunkIOState::ReadEOF); + } else { + let _c = self.read()?; + return Ok(IOResult::IO); + } } - break; } - buffer_offset += bytes_read; - - let mut record = ImmutableRecord::new(record_size); - record.start_serialization(&buffer[buffer_offset..buffer_offset + record_size]); - buffer_offset += record_size; - - self.records.push(record); - } - if buffer_offset < buffer_len { - buffer.copy_within(buffer_offset..buffer_len, 0); - buffer_len -= buffer_offset; - } else { - buffer_len = 0; - } - self.buffer_len.set(buffer_len); - - self.records.reverse(); - } - - let record = self.records.pop(); - if self.records.is_empty() && self.io_state.get() != SortedChunkIOState::ReadEOF { - // We've consumed the last record. Read more payload into the buffer. - if self.chunk_size - self.total_bytes_read.get() == 0 { - self.io_state.set(SortedChunkIOState::ReadEOF); - } else { - let _c = self.read()?; + NextState::Finish => { + self.next_state = NextState::Start; + return Ok(IOResult::Done(self.records.pop())); + } } } - Ok(record) } fn read(&mut self) -> Result { @@ -427,9 +447,7 @@ impl SortedChunk { let buffer_ref = Arc::new(buffer); let buffer_ref_copy = buffer_ref.clone(); - let chunk_io_state_copy = self.io_state.clone(); let write_complete = Box::new(move |bytes_written: i32| { - chunk_io_state_copy.set(SortedChunkIOState::WriteComplete); let buf_len = buffer_ref_copy.len(); if bytes_written < buf_len as i32 { tracing::error!("wrote({bytes_written}) less than expected({buf_len})"); @@ -563,7 +581,6 @@ enum SortedChunkIOState { ReadComplete, ReadEOF, WaitingForWrite, - WriteComplete, None, } From 2ec58b02642f9b3d47b57e2b5fef0ded1ff57655 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 6 Aug 2025 13:50:33 -0300 Subject: [PATCH 3/9] state machine for `sort` --- core/vdbe/sorter.rs | 56 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 9ce453372..30a24e629 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -18,6 +18,14 @@ use crate::{ Result, }; +#[derive(Debug, Clone, Copy)] +enum SortState { + Start, + Flush, + InitHeap, + Next, +} + pub struct Sorter { /// The records in the in-memory buffer. records: Vec, @@ -46,6 +54,8 @@ pub struct Sorter { wait_for_read_complete: Vec, /// The temporary directory for chunk files. temp_dir: Option, + /// State machine for [Sorter::sort] + sort_state: SortState, } impl Sorter { @@ -80,6 +90,7 @@ impl Sorter { io, wait_for_read_complete: Vec::new(), temp_dir: None, + sort_state: SortState::Start, } } @@ -93,16 +104,34 @@ impl Sorter { // We do the sorting here since this is what is called by the SorterSort instruction pub fn sort(&mut self) -> Result> { - if self.chunks.is_empty() { - self.records.sort(); - self.records.reverse(); - } else { - self.flush()?; - if let IOResult::IO = self.init_chunk_heap()? { - return Ok(IOResult::IO); + loop { + match self.sort_state { + SortState::Start => { + if self.chunks.is_empty() { + self.records.sort(); + self.records.reverse(); + self.sort_state = SortState::Next; + } else { + self.sort_state = SortState::Flush; + } + } + SortState::Flush => { + self.sort_state = SortState::InitHeap; + if let Some(_c) = self.flush()? { + return Ok(IOResult::IO); + } + } + SortState::InitHeap => { + return_if_io!(self.init_chunk_heap()); + self.sort_state = SortState::Next; + } + SortState::Next => { + return_if_io!(self.next()); + self.sort_state = SortState::Start; + return Ok(IOResult::Done(())); + } } } - self.next() } pub fn next(&mut self) -> Result> { @@ -136,7 +165,7 @@ impl Sorter { pub fn insert(&mut self, record: &ImmutableRecord) -> Result<()> { let payload_size = record.get_payload().len(); if self.current_buffer_size + payload_size > self.max_buffer_size { - self.flush()?; + let _c = self.flush()?; } self.records.push(SortableImmutableRecord::new( record.clone(), @@ -219,9 +248,10 @@ impl Sorter { Ok(IOResult::Done(())) } - fn flush(&mut self) -> Result<()> { + fn flush(&mut self) -> Result> { if self.records.is_empty() { - return Ok(()); + // Dummy completion to not complicate logic handling + return Ok(None); } self.records.sort(); @@ -245,13 +275,13 @@ impl Sorter { .min_chunk_read_buffer_size .max(self.max_payload_size_in_buffer + 9); let mut chunk = SortedChunk::new(chunk_file.clone(), chunk_buffer_size); - let _c = chunk.write(&mut self.records)?; + let c = chunk.write(&mut self.records)?; self.chunks.push(chunk); self.current_buffer_size = 0; self.max_payload_size_in_buffer = 0; - Ok(()) + Ok(Some(c)) } } From c02936eb3032cebd655c8355270987a4541c12f3 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 6 Aug 2025 14:07:11 -0300 Subject: [PATCH 4/9] state machine for `insert` --- core/vdbe/execute.rs | 2 +- core/vdbe/sorter.rs | 48 +++++++++++++++++++++++++++++++++----------- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index f87253f4e..871c1a0ad 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -3727,7 +3727,7 @@ pub fn op_sorter_insert( Register::Record(record) => record, _ => unreachable!("SorterInsert on non-record register"), }; - cursor.insert(record)?; + return_if_io!(cursor.insert(record)); } state.pc += 1; Ok(InsnFunctionStepResult::Step) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 30a24e629..ecbecf3af 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -26,6 +26,12 @@ enum SortState { Next, } +#[derive(Debug, Clone, Copy)] +enum InsertState { + Start, + Insert, +} + pub struct Sorter { /// The records in the in-memory buffer. records: Vec, @@ -56,6 +62,8 @@ pub struct Sorter { temp_dir: Option, /// State machine for [Sorter::sort] sort_state: SortState, + /// State machine for [Sorter::insert] + insert_state: InsertState, } impl Sorter { @@ -91,6 +99,7 @@ impl Sorter { wait_for_read_complete: Vec::new(), temp_dir: None, sort_state: SortState::Start, + insert_state: InsertState::Start, } } @@ -162,19 +171,32 @@ impl Sorter { self.current.as_ref() } - pub fn insert(&mut self, record: &ImmutableRecord) -> Result<()> { + pub fn insert(&mut self, record: &ImmutableRecord) -> Result> { let payload_size = record.get_payload().len(); - if self.current_buffer_size + payload_size > self.max_buffer_size { - let _c = self.flush()?; + loop { + match self.insert_state { + InsertState::Start => { + self.insert_state = InsertState::Insert; + if self.current_buffer_size + payload_size > self.max_buffer_size { + if let Some(_c) = self.flush()? { + return Ok(IOResult::IO); + } + } + } + InsertState::Insert => { + self.records.push(SortableImmutableRecord::new( + record.clone(), + self.key_len, + self.index_key_info.clone(), + )?); + self.current_buffer_size += payload_size; + self.max_payload_size_in_buffer = + self.max_payload_size_in_buffer.max(payload_size); + self.insert_state = InsertState::Start; + return Ok(IOResult::Done(())); + } + } } - self.records.push(SortableImmutableRecord::new( - record.clone(), - self.key_len, - self.index_key_info.clone(), - )?); - self.current_buffer_size += payload_size; - self.max_payload_size_in_buffer = self.max_payload_size_in_buffer.max(payload_size); - Ok(()) } fn init_chunk_heap(&mut self) -> Result> { @@ -619,6 +641,7 @@ mod tests { use super::*; use crate::translate::collate::CollationSeq; use crate::types::{ImmutableRecord, RefValue, Value, ValueType}; + use crate::util::IOExt; use crate::PlatformIO; use rand_chacha::{ rand_core::{RngCore, SeedableRng}, @@ -666,7 +689,8 @@ mod tests { values.append(&mut generate_values(&mut rng, &value_types)); let record = ImmutableRecord::from_values(&values, values.len()); - sorter.insert(&record).expect("Failed to insert the record"); + io.block(|| sorter.insert(&record)) + .expect("Failed to insert the record"); initial_records.push(record); } From 6fe19e4ef4c91724b6d056be94edab2aa385fa2e Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 6 Aug 2025 14:59:45 -0300 Subject: [PATCH 5/9] adjust external sort `init_chunk_heap` and `next_from_chunk_heap` --- core/vdbe/sorter.rs | 116 +++++++++++++++++++++----------------------- 1 file changed, 56 insertions(+), 60 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index ecbecf3af..a53821716 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use tempfile; use crate::return_if_io; +use crate::util::IOExt; use crate::{ error::LimboError, io::{Buffer, Completion, File, OpenFlags, IO}, @@ -32,6 +33,12 @@ enum InsertState { Insert, } +#[derive(Debug, Clone, Copy)] +enum InitChunkHeapState { + Start, + PushChunk, +} + pub struct Sorter { /// The records in the in-memory buffer. records: Vec, @@ -64,6 +71,7 @@ pub struct Sorter { sort_state: SortState, /// State machine for [Sorter::insert] insert_state: InsertState, + init_chunk_heap_state: InitChunkHeapState, } impl Sorter { @@ -100,6 +108,7 @@ impl Sorter { temp_dir: None, sort_state: SortState::Start, insert_state: InsertState::Start, + init_chunk_heap_state: InitChunkHeapState::Start, } } @@ -125,10 +134,10 @@ impl Sorter { } } SortState::Flush => { - self.sort_state = SortState::InitHeap; if let Some(_c) = self.flush()? { return Ok(IOResult::IO); } + self.sort_state = SortState::InitHeap; } SortState::InitHeap => { return_if_io!(self.init_chunk_heap()); @@ -200,50 +209,52 @@ impl Sorter { } fn init_chunk_heap(&mut self) -> Result> { - let mut all_read_complete = true; - // Make sure all chunks read at least one record into their buffer. - for chunk in self.chunks.iter_mut() { - match chunk.io_state.get() { - SortedChunkIOState::WaitingForWrite | SortedChunkIOState::WaitingForRead => { - all_read_complete = false; + match self.init_chunk_heap_state { + InitChunkHeapState::Start => { + let mut completions = Vec::with_capacity(self.chunks.len()); + for chunk in self.chunks.iter_mut() { + let c = chunk.read()?; + completions.push(c); } - SortedChunkIOState::ReadEOF | SortedChunkIOState::ReadComplete => {} - _ => { - unreachable!("Unexpected chunk IO state: {:?}", chunk.io_state.get()) + self.init_chunk_heap_state = InitChunkHeapState::PushChunk; + Ok(IOResult::IO) + } + InitChunkHeapState::PushChunk => { + // Make sure all chunks read at least one record into their buffer. + if self + .chunks + .iter() + .any(|chunk| chunk.io_state.get() == SortedChunkIOState::WaitingForRead) + { + return Ok(IOResult::IO); } + self.chunk_heap.reserve(self.chunks.len()); + // TODO: blocking will be unnecessary here with IO completions + let io = self.io.clone(); + for chunk_idx in 0..self.chunks.len() { + io.block(|| self.push_to_chunk_heap(chunk_idx))?; + } + self.init_chunk_heap_state = InitChunkHeapState::Start; + Ok(IOResult::Done(())) } } - if !all_read_complete { - return Ok(IOResult::IO); - } - self.chunk_heap.reserve(self.chunks.len()); - for chunk_idx in 0..self.chunks.len() { - self.push_to_chunk_heap(chunk_idx)?; - } - Ok(IOResult::Done(())) } fn next_from_chunk_heap(&mut self) -> Result>> { - let mut all_read_complete = true; - for chunk_idx in self.wait_for_read_complete.iter() { - let chunk_io_state = self.chunks[*chunk_idx].io_state.get(); - match chunk_io_state { - SortedChunkIOState::ReadComplete | SortedChunkIOState::ReadEOF => {} - SortedChunkIOState::WaitingForRead => { - all_read_complete = false; - } - _ => { - unreachable!("Unexpected chunk IO state: {:?}", chunk_io_state) - } - } - } - if !all_read_complete { + // Make sure all chunks read at least one record into their buffer. + if self + .chunks + .iter() + .any(|chunk| chunk.io_state.get() == SortedChunkIOState::WaitingForRead) + { return Ok(IOResult::IO); } self.wait_for_read_complete.clear(); if let Some((next_record, next_chunk_idx)) = self.chunk_heap.pop() { - self.push_to_chunk_heap(next_chunk_idx)?; + // TODO: blocking will be unnecessary here with IO completions + let io = self.io.clone(); + io.block(|| self.push_to_chunk_heap(next_chunk_idx))?; Ok(IOResult::Done(Some(next_record.0))) } else { Ok(IOResult::Done(None)) @@ -346,10 +357,6 @@ impl SortedChunk { } } - fn has_more(&self) -> bool { - !self.records.is_empty() || self.io_state.get() != SortedChunkIOState::ReadEOF - } - fn next(&mut self) -> Result>> { loop { match self.next_state { @@ -470,7 +477,6 @@ impl SortedChunk { fn write(&mut self, records: &mut Vec) -> Result { assert!(self.io_state.get() == SortedChunkIOState::None); - self.io_state.set(SortedChunkIOState::WaitingForWrite); self.chunk_size = 0; // Pre-compute varint lengths for record sizes to determine the total buffer size. @@ -632,7 +638,6 @@ enum SortedChunkIOState { WaitingForRead, ReadComplete, ReadEOF, - WaitingForWrite, None, } @@ -667,16 +672,17 @@ mod tests { let mut rng = ChaCha8Rng::seed_from_u64(seed); let io = Arc::new(PlatformIO::new().unwrap()); - let mut sorter = Sorter::new( - &[SortOrder::Asc], - vec![CollationSeq::Binary], - 256, - 64, - io.clone(), - ); let attempts = 8; for _ in 0..attempts { + let mut sorter = Sorter::new( + &[SortOrder::Asc], + vec![CollationSeq::Binary], + 256, + 64, + io.clone(), + ); + let num_records = 1000 + rng.next_u64() % 2000; let num_records = num_records as i64; @@ -694,13 +700,8 @@ mod tests { initial_records.push(record); } - loop { - if let IOResult::IO = sorter.sort().expect("Failed to sort the records") { - io.run_once().expect("Failed to run the IO"); - continue; - } - break; - } + io.block(|| sorter.sort()) + .expect("Failed to sort the records"); assert!(!sorter.is_empty()); assert!(!sorter.chunks.is_empty()); @@ -712,13 +713,8 @@ mod tests { // Check that the record remained unchanged after sorting. assert_eq!(record, &initial_records[(num_records - i - 1) as usize]); - loop { - if let IOResult::IO = sorter.next().expect("Failed to get the next record") { - io.run_once().expect("Failed to run the IO"); - continue; - } - break; - } + io.block(|| sorter.next()) + .expect("Failed to get the next record"); } assert!(!sorter.has_more()); } From 5924274d6121fa510b9f093f239bc81f9e003278 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Thu, 7 Aug 2025 12:00:06 -0300 Subject: [PATCH 6/9] adjust state machine transition --- core/vdbe/sorter.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index a53821716..03e1cda8c 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -134,10 +134,10 @@ impl Sorter { } } SortState::Flush => { + self.sort_state = SortState::InitHeap; if let Some(_c) = self.flush()? { return Ok(IOResult::IO); } - self.sort_state = SortState::InitHeap; } SortState::InitHeap => { return_if_io!(self.init_chunk_heap()); @@ -415,6 +415,7 @@ impl SortedChunk { } self.next_state = NextState::Finish; + // This check is done to see if we need to read more from the chunk before popping the record if self.records.len() == 1 && self.io_state.get() != SortedChunkIOState::ReadEOF { // We've consumed the last record. Read more payload into the buffer. From 4a3408003a611c2aeeeb7d5274e67b1c19e3170f Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Sat, 9 Aug 2025 13:03:17 -0300 Subject: [PATCH 7/9] wait for flush to complete --- core/vdbe/sorter.rs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 03e1cda8c..075238343 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -71,6 +71,7 @@ pub struct Sorter { sort_state: SortState, /// State machine for [Sorter::insert] insert_state: InsertState, + /// State machine for [Sorter::init_chunk_heap] init_chunk_heap_state: InitChunkHeapState, } @@ -140,6 +141,11 @@ impl Sorter { } } SortState::InitHeap => { + if self.chunks.iter().any(|chunk| { + matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite) + }) { + return Ok(IOResult::IO); + } return_if_io!(self.init_chunk_heap()); self.sort_state = SortState::Next; } @@ -193,6 +199,11 @@ impl Sorter { } } InsertState::Insert => { + if self.chunks.iter().any(|chunk| { + matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite) + }) { + return Ok(IOResult::IO); + } self.records.push(SortableImmutableRecord::new( record.clone(), self.key_len, @@ -224,7 +235,7 @@ impl Sorter { if self .chunks .iter() - .any(|chunk| chunk.io_state.get() == SortedChunkIOState::WaitingForRead) + .any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead)) { return Ok(IOResult::IO); } @@ -245,7 +256,7 @@ impl Sorter { if self .chunks .iter() - .any(|chunk| chunk.io_state.get() == SortedChunkIOState::WaitingForRead) + .any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead)) { return Ok(IOResult::IO); } @@ -478,6 +489,7 @@ impl SortedChunk { fn write(&mut self, records: &mut Vec) -> Result { assert!(self.io_state.get() == SortedChunkIOState::None); + self.io_state.set(SortedChunkIOState::WaitingForWrite); self.chunk_size = 0; // Pre-compute varint lengths for record sizes to determine the total buffer size. @@ -506,7 +518,9 @@ impl SortedChunk { let buffer_ref = Arc::new(buffer); let buffer_ref_copy = buffer_ref.clone(); + let chunk_io_state_copy = self.io_state.clone(); let write_complete = Box::new(move |bytes_written: i32| { + chunk_io_state_copy.set(SortedChunkIOState::WriteComplete); let buf_len = buffer_ref_copy.len(); if bytes_written < buf_len as i32 { tracing::error!("wrote({bytes_written}) less than expected({buf_len})"); @@ -638,6 +652,8 @@ impl Eq for SortableImmutableRecord {} enum SortedChunkIOState { WaitingForRead, ReadComplete, + WaitingForWrite, + WriteComplete, ReadEOF, None, } From 76d6c4a28d750a3a5e0ed86d2eff6a10ba97c3ca Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Sat, 9 Aug 2025 14:04:28 -0300 Subject: [PATCH 8/9] only open 1 file for sorter so chunks just reuse that file --- core/vdbe/sorter.rs | 99 ++++++++++++++++++++++++++++++--------------- 1 file changed, 67 insertions(+), 32 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 075238343..ee28162bc 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -39,6 +39,20 @@ enum InitChunkHeapState { PushChunk, } +struct TempFile { + // When temp_dir is dropped the folder is deleted + _temp_dir: tempfile::TempDir, + file: Arc, +} + +impl core::ops::Deref for TempFile { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.file + } +} + pub struct Sorter { /// The records in the in-memory buffer. records: Vec, @@ -65,8 +79,10 @@ pub struct Sorter { io: Arc, /// The indices of the chunks for which the read is not complete. wait_for_read_complete: Vec, - /// The temporary directory for chunk files. - temp_dir: Option, + /// The temporary file for chunks. + temp_file: Option, + /// Offset where the next chunk will be placed in the `temp_fil` + next_chunk_offset: usize, /// State machine for [Sorter::sort] sort_state: SortState, /// State machine for [Sorter::insert] @@ -106,7 +122,8 @@ impl Sorter { max_payload_size_in_buffer: 0, io, wait_for_read_complete: Vec::new(), - temp_dir: None, + temp_file: None, + next_chunk_offset: 0, sort_state: SortState::Start, insert_state: InsertState::Start, init_chunk_heap_state: InitChunkHeapState::Start, @@ -300,30 +317,47 @@ impl Sorter { self.records.sort(); - if self.temp_dir.is_none() { - self.temp_dir = Some(tempfile::tempdir().map_err(LimboError::IOError)?); - } - - let chunk_file_path = self - .temp_dir - .as_ref() - .unwrap() - .path() - .join(format!("chunk_{}", self.chunks.len())); - let chunk_file = - self.io - .open_file(chunk_file_path.to_str().unwrap(), OpenFlags::Create, false)?; + let chunk_file = match &self.temp_file { + Some(temp_file) => temp_file.file.clone(), + None => { + let temp_dir = tempfile::tempdir().map_err(LimboError::IOError)?; + let chunk_file_path = temp_dir.as_ref().join("chunk_file"); + let chunk_file = self.io.open_file( + chunk_file_path.to_str().unwrap(), + OpenFlags::Create, + false, + )?; + self.temp_file = Some(TempFile { + _temp_dir: temp_dir, + file: chunk_file.clone(), + }); + chunk_file + } + }; // Make sure the chunk buffer size can fit the largest record and its size varint. let chunk_buffer_size = self .min_chunk_read_buffer_size .max(self.max_payload_size_in_buffer + 9); - let mut chunk = SortedChunk::new(chunk_file.clone(), chunk_buffer_size); - let c = chunk.write(&mut self.records)?; + + let mut chunk_size = 0; + // Pre-compute varint lengths for record sizes to determine the total buffer size. + let mut record_size_lengths = Vec::with_capacity(self.records.len()); + for record in self.records.iter() { + let record_size = record.record.get_payload().len(); + let size_len = varint_len(record_size as u64); + record_size_lengths.push(size_len); + chunk_size += size_len + record_size; + } + + let mut chunk = SortedChunk::new(chunk_file, self.next_chunk_offset, chunk_buffer_size); + let c = chunk.write(&mut self.records, record_size_lengths, chunk_size)?; self.chunks.push(chunk); self.current_buffer_size = 0; self.max_payload_size_in_buffer = 0; + // increase offset start for next chunk + self.next_chunk_offset += chunk_size; Ok(Some(c)) } @@ -338,6 +372,8 @@ enum NextState { struct SortedChunk { /// The chunk file. file: Arc, + /// Offset of the start of chunk in file + start_offset: usize, /// The size of this chunk file in bytes. chunk_size: usize, /// The read buffer. @@ -355,9 +391,10 @@ struct SortedChunk { } impl SortedChunk { - fn new(file: Arc, buffer_size: usize) -> Self { + fn new(file: Arc, start_offset: usize, buffer_size: usize) -> Self { Self { file, + start_offset, chunk_size: 0, buffer: Rc::new(RefCell::new(vec![0; buffer_size])), buffer_len: Rc::new(Cell::new(0)), @@ -483,23 +520,21 @@ impl SortedChunk { }); let c = Completion::new_read(read_buffer_ref, read_complete); - let c = self.file.pread(self.total_bytes_read.get(), c)?; + let c = self + .file + .pread(self.start_offset + self.total_bytes_read.get(), c)?; Ok(c) } - fn write(&mut self, records: &mut Vec) -> Result { + fn write( + &mut self, + records: &mut Vec, + record_size_lengths: Vec, + chunk_size: usize, + ) -> Result { assert!(self.io_state.get() == SortedChunkIOState::None); self.io_state.set(SortedChunkIOState::WaitingForWrite); - self.chunk_size = 0; - - // Pre-compute varint lengths for record sizes to determine the total buffer size. - let mut record_size_lengths = Vec::with_capacity(records.len()); - for record in records.iter() { - let record_size = record.record.get_payload().len(); - let size_len = varint_len(record_size as u64); - record_size_lengths.push(size_len); - self.chunk_size += size_len + record_size; - } + self.chunk_size = chunk_size; let buffer = Buffer::new_temporary(self.chunk_size); @@ -528,7 +563,7 @@ impl SortedChunk { }); let c = Completion::new_write(write_complete); - let c = self.file.pwrite(0, buffer_ref, c)?; + let c = self.file.pwrite(self.start_offset, buffer_ref, c)?; Ok(c) } } From f25f51b8aa902a9caf1b50350accfaccf741a19e Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Sat, 9 Aug 2025 15:56:18 -0300 Subject: [PATCH 9/9] do not need `wait_for_read_complete` anymore --- core/vdbe/sorter.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index ee28162bc..991803c10 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -77,11 +77,9 @@ pub struct Sorter { max_payload_size_in_buffer: usize, /// The IO object. io: Arc, - /// The indices of the chunks for which the read is not complete. - wait_for_read_complete: Vec, /// The temporary file for chunks. temp_file: Option, - /// Offset where the next chunk will be placed in the `temp_fil` + /// Offset where the next chunk will be placed in the `temp_file` next_chunk_offset: usize, /// State machine for [Sorter::sort] sort_state: SortState, @@ -121,7 +119,6 @@ impl Sorter { min_chunk_read_buffer_size: min_chunk_read_buffer_size_bytes, max_payload_size_in_buffer: 0, io, - wait_for_read_complete: Vec::new(), temp_file: None, next_chunk_offset: 0, sort_state: SortState::Start, @@ -277,7 +274,6 @@ impl Sorter { { return Ok(IOResult::IO); } - self.wait_for_read_complete.clear(); if let Some((next_record, next_chunk_idx)) = self.chunk_heap.pop() { // TODO: blocking will be unnecessary here with IO completions @@ -301,9 +297,6 @@ impl Sorter { )?), chunk_idx, )); - if let SortedChunkIOState::WaitingForRead = chunk.io_state.get() { - self.wait_for_read_complete.push(chunk_idx); - } } Ok(IOResult::Done(()))