diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 673d85a67..60a2b9a59 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -3727,7 +3727,7 @@ pub fn op_sorter_insert( Register::Record(record) => record, _ => unreachable!("SorterInsert on non-record register"), }; - cursor.insert(record)?; + return_if_io!(cursor.insert(record)); } state.pc += 1; Ok(InsnFunctionStepResult::Step) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index f27b57f54..991803c10 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -7,6 +7,8 @@ use std::rc::Rc; use std::sync::Arc; use tempfile; +use crate::return_if_io; +use crate::util::IOExt; use crate::{ error::LimboError, io::{Buffer, Completion, File, OpenFlags, IO}, @@ -17,6 +19,40 @@ use crate::{ Result, }; +#[derive(Debug, Clone, Copy)] +enum SortState { + Start, + Flush, + InitHeap, + Next, +} + +#[derive(Debug, Clone, Copy)] +enum InsertState { + Start, + Insert, +} + +#[derive(Debug, Clone, Copy)] +enum InitChunkHeapState { + Start, + PushChunk, +} + +struct TempFile { + // When temp_dir is dropped the folder is deleted + _temp_dir: tempfile::TempDir, + file: Arc, +} + +impl core::ops::Deref for TempFile { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.file + } +} + pub struct Sorter { /// The records in the in-memory buffer. records: Vec, @@ -41,10 +77,16 @@ pub struct Sorter { max_payload_size_in_buffer: usize, /// The IO object. io: Arc, - /// The indices of the chunks for which the read is not complete. - wait_for_read_complete: Vec, - /// The temporary directory for chunk files. - temp_dir: Option, + /// The temporary file for chunks. + temp_file: Option, + /// Offset where the next chunk will be placed in the `temp_file` + next_chunk_offset: usize, + /// State machine for [Sorter::sort] + sort_state: SortState, + /// State machine for [Sorter::insert] + insert_state: InsertState, + /// State machine for [Sorter::init_chunk_heap] + init_chunk_heap_state: InitChunkHeapState, } impl Sorter { @@ -77,8 +119,11 @@ impl Sorter { min_chunk_read_buffer_size: min_chunk_read_buffer_size_bytes, max_payload_size_in_buffer: 0, io, - wait_for_read_complete: Vec::new(), - temp_dir: None, + temp_file: None, + next_chunk_offset: 0, + sort_state: SortState::Start, + insert_state: InsertState::Start, + init_chunk_heap_state: InitChunkHeapState::Start, } } @@ -92,16 +137,39 @@ impl Sorter { // We do the sorting here since this is what is called by the SorterSort instruction pub fn sort(&mut self) -> Result> { - if self.chunks.is_empty() { - self.records.sort(); - self.records.reverse(); - } else { - self.flush()?; - if let IOResult::IO = self.init_chunk_heap()? { - return Ok(IOResult::IO); + loop { + match self.sort_state { + SortState::Start => { + if self.chunks.is_empty() { + self.records.sort(); + self.records.reverse(); + self.sort_state = SortState::Next; + } else { + self.sort_state = SortState::Flush; + } + } + SortState::Flush => { + self.sort_state = SortState::InitHeap; + if let Some(_c) = self.flush()? { + return Ok(IOResult::IO); + } + } + SortState::InitHeap => { + if self.chunks.iter().any(|chunk| { + matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite) + }) { + return Ok(IOResult::IO); + } + return_if_io!(self.init_chunk_heap()); + self.sort_state = SortState::Next; + } + SortState::Next => { + return_if_io!(self.next()); + self.sort_state = SortState::Start; + return Ok(IOResult::Done(())); + } } } - self.next() } pub fn next(&mut self) -> Result> { @@ -132,82 +200,95 @@ impl Sorter { self.current.as_ref() } - pub fn insert(&mut self, record: &ImmutableRecord) -> Result<()> { + pub fn insert(&mut self, record: &ImmutableRecord) -> Result> { let payload_size = record.get_payload().len(); - if self.current_buffer_size + payload_size > self.max_buffer_size { - self.flush()?; + loop { + match self.insert_state { + InsertState::Start => { + self.insert_state = InsertState::Insert; + if self.current_buffer_size + payload_size > self.max_buffer_size { + if let Some(_c) = self.flush()? { + return Ok(IOResult::IO); + } + } + } + InsertState::Insert => { + if self.chunks.iter().any(|chunk| { + matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite) + }) { + return Ok(IOResult::IO); + } + self.records.push(SortableImmutableRecord::new( + record.clone(), + self.key_len, + self.index_key_info.clone(), + )?); + self.current_buffer_size += payload_size; + self.max_payload_size_in_buffer = + self.max_payload_size_in_buffer.max(payload_size); + self.insert_state = InsertState::Start; + return Ok(IOResult::Done(())); + } + } } - self.records.push(SortableImmutableRecord::new( - record.clone(), - self.key_len, - self.index_key_info.clone(), - )?); - self.current_buffer_size += payload_size; - self.max_payload_size_in_buffer = self.max_payload_size_in_buffer.max(payload_size); - Ok(()) } fn init_chunk_heap(&mut self) -> Result> { - let mut all_read_complete = true; - // Make sure all chunks read at least one record into their buffer. - for chunk in self.chunks.iter_mut() { - match chunk.io_state.get() { - SortedChunkIOState::WriteComplete => { - all_read_complete = false; - // Write complete, we can now read from the chunk. - chunk.read()?; + match self.init_chunk_heap_state { + InitChunkHeapState::Start => { + let mut completions = Vec::with_capacity(self.chunks.len()); + for chunk in self.chunks.iter_mut() { + let c = chunk.read()?; + completions.push(c); } - SortedChunkIOState::WaitingForWrite | SortedChunkIOState::WaitingForRead => { - all_read_complete = false; + self.init_chunk_heap_state = InitChunkHeapState::PushChunk; + Ok(IOResult::IO) + } + InitChunkHeapState::PushChunk => { + // Make sure all chunks read at least one record into their buffer. + if self + .chunks + .iter() + .any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead)) + { + return Ok(IOResult::IO); } - SortedChunkIOState::ReadEOF | SortedChunkIOState::ReadComplete => {} - _ => { - unreachable!("Unexpected chunk IO state: {:?}", chunk.io_state.get()) + self.chunk_heap.reserve(self.chunks.len()); + // TODO: blocking will be unnecessary here with IO completions + let io = self.io.clone(); + for chunk_idx in 0..self.chunks.len() { + io.block(|| self.push_to_chunk_heap(chunk_idx))?; } + self.init_chunk_heap_state = InitChunkHeapState::Start; + Ok(IOResult::Done(())) } } - if !all_read_complete { - return Ok(IOResult::IO); - } - self.chunk_heap.reserve(self.chunks.len()); - for chunk_idx in 0..self.chunks.len() { - self.push_to_chunk_heap(chunk_idx)?; - } - Ok(IOResult::Done(())) } fn next_from_chunk_heap(&mut self) -> Result>> { - let mut all_read_complete = true; - for chunk_idx in self.wait_for_read_complete.iter() { - let chunk_io_state = self.chunks[*chunk_idx].io_state.get(); - match chunk_io_state { - SortedChunkIOState::ReadComplete | SortedChunkIOState::ReadEOF => {} - SortedChunkIOState::WaitingForRead => { - all_read_complete = false; - } - _ => { - unreachable!("Unexpected chunk IO state: {:?}", chunk_io_state) - } - } - } - if !all_read_complete { + // Make sure all chunks read at least one record into their buffer. + if self + .chunks + .iter() + .any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead)) + { return Ok(IOResult::IO); } - self.wait_for_read_complete.clear(); if let Some((next_record, next_chunk_idx)) = self.chunk_heap.pop() { - self.push_to_chunk_heap(next_chunk_idx)?; + // TODO: blocking will be unnecessary here with IO completions + let io = self.io.clone(); + io.block(|| self.push_to_chunk_heap(next_chunk_idx))?; Ok(IOResult::Done(Some(next_record.0))) } else { Ok(IOResult::Done(None)) } } - fn push_to_chunk_heap(&mut self, chunk_idx: usize) -> Result<()> { + fn push_to_chunk_heap(&mut self, chunk_idx: usize) -> Result> { let chunk = &mut self.chunks[chunk_idx]; - if chunk.has_more() { - let record = chunk.next()?.unwrap(); + if let Some(record) = return_if_io!(chunk.next()) { self.chunk_heap.push(( Reverse(SortableImmutableRecord::new( record, @@ -216,52 +297,76 @@ impl Sorter { )?), chunk_idx, )); - if let SortedChunkIOState::WaitingForRead = chunk.io_state.get() { - self.wait_for_read_complete.push(chunk_idx); - } } - Ok(()) + + Ok(IOResult::Done(())) } - fn flush(&mut self) -> Result<()> { + fn flush(&mut self) -> Result> { if self.records.is_empty() { - return Ok(()); + // Dummy completion to not complicate logic handling + return Ok(None); } self.records.sort(); - if self.temp_dir.is_none() { - self.temp_dir = Some(tempfile::tempdir().map_err(LimboError::IOError)?); - } - - let chunk_file_path = self - .temp_dir - .as_ref() - .unwrap() - .path() - .join(format!("chunk_{}", self.chunks.len())); - let chunk_file = - self.io - .open_file(chunk_file_path.to_str().unwrap(), OpenFlags::Create, false)?; + let chunk_file = match &self.temp_file { + Some(temp_file) => temp_file.file.clone(), + None => { + let temp_dir = tempfile::tempdir().map_err(LimboError::IOError)?; + let chunk_file_path = temp_dir.as_ref().join("chunk_file"); + let chunk_file = self.io.open_file( + chunk_file_path.to_str().unwrap(), + OpenFlags::Create, + false, + )?; + self.temp_file = Some(TempFile { + _temp_dir: temp_dir, + file: chunk_file.clone(), + }); + chunk_file + } + }; // Make sure the chunk buffer size can fit the largest record and its size varint. let chunk_buffer_size = self .min_chunk_read_buffer_size .max(self.max_payload_size_in_buffer + 9); - let mut chunk = SortedChunk::new(chunk_file.clone(), chunk_buffer_size); - chunk.write(&mut self.records)?; + + let mut chunk_size = 0; + // Pre-compute varint lengths for record sizes to determine the total buffer size. + let mut record_size_lengths = Vec::with_capacity(self.records.len()); + for record in self.records.iter() { + let record_size = record.record.get_payload().len(); + let size_len = varint_len(record_size as u64); + record_size_lengths.push(size_len); + chunk_size += size_len + record_size; + } + + let mut chunk = SortedChunk::new(chunk_file, self.next_chunk_offset, chunk_buffer_size); + let c = chunk.write(&mut self.records, record_size_lengths, chunk_size)?; self.chunks.push(chunk); self.current_buffer_size = 0; self.max_payload_size_in_buffer = 0; + // increase offset start for next chunk + self.next_chunk_offset += chunk_size; - Ok(()) + Ok(Some(c)) } } +#[derive(Debug, Clone, Copy)] +enum NextState { + Start, + Finish, +} + struct SortedChunk { /// The chunk file. file: Arc, + /// Offset of the start of chunk in file + start_offset: usize, /// The size of this chunk file in bytes. chunk_size: usize, /// The read buffer. @@ -274,91 +379,104 @@ struct SortedChunk { io_state: Rc>, /// The total number of bytes read from the chunk file. total_bytes_read: Rc>, + /// State machine for [SortedChunk::next] + next_state: NextState, } impl SortedChunk { - fn new(file: Arc, buffer_size: usize) -> Self { + fn new(file: Arc, start_offset: usize, buffer_size: usize) -> Self { Self { file, + start_offset, chunk_size: 0, buffer: Rc::new(RefCell::new(vec![0; buffer_size])), buffer_len: Rc::new(Cell::new(0)), records: Vec::new(), io_state: Rc::new(Cell::new(SortedChunkIOState::None)), total_bytes_read: Rc::new(Cell::new(0)), + next_state: NextState::Start, } } - fn has_more(&self) -> bool { - !self.records.is_empty() || self.io_state.get() != SortedChunkIOState::ReadEOF - } - - fn next(&mut self) -> Result> { - let mut buffer_len = self.buffer_len.get(); - if self.records.is_empty() && buffer_len == 0 { - return Ok(None); - } - - if self.records.is_empty() { - let mut buffer_ref = self.buffer.borrow_mut(); - let buffer = buffer_ref.as_mut_slice(); - let mut buffer_offset = 0; - while buffer_offset < buffer_len { - // Extract records from the buffer until we run out of the buffer or we hit an incomplete record. - let (record_size, bytes_read) = - match read_varint(&buffer[buffer_offset..buffer_len]) { - Ok((record_size, bytes_read)) => (record_size as usize, bytes_read), - Err(LimboError::Corrupt(_)) - if self.io_state.get() != SortedChunkIOState::ReadEOF => - { - // Failed to decode a partial varint. - break; - } - Err(e) => { - return Err(e); - } - }; - if record_size > buffer_len - (buffer_offset + bytes_read) { - if self.io_state.get() == SortedChunkIOState::ReadEOF { - crate::bail_corrupt_error!("Incomplete record"); + fn next(&mut self) -> Result>> { + loop { + match self.next_state { + NextState::Start => { + let mut buffer_len = self.buffer_len.get(); + if self.records.is_empty() && buffer_len == 0 { + return Ok(IOResult::Done(None)); + } + + if self.records.is_empty() { + let mut buffer_ref = self.buffer.borrow_mut(); + let buffer = buffer_ref.as_mut_slice(); + let mut buffer_offset = 0; + while buffer_offset < buffer_len { + // Extract records from the buffer until we run out of the buffer or we hit an incomplete record. + let (record_size, bytes_read) = + match read_varint(&buffer[buffer_offset..buffer_len]) { + Ok((record_size, bytes_read)) => { + (record_size as usize, bytes_read) + } + Err(LimboError::Corrupt(_)) + if self.io_state.get() != SortedChunkIOState::ReadEOF => + { + // Failed to decode a partial varint. + break; + } + Err(e) => { + return Err(e); + } + }; + if record_size > buffer_len - (buffer_offset + bytes_read) { + if self.io_state.get() == SortedChunkIOState::ReadEOF { + crate::bail_corrupt_error!("Incomplete record"); + } + break; + } + buffer_offset += bytes_read; + + let mut record = ImmutableRecord::new(record_size); + record.start_serialization( + &buffer[buffer_offset..buffer_offset + record_size], + ); + buffer_offset += record_size; + + self.records.push(record); + } + if buffer_offset < buffer_len { + buffer.copy_within(buffer_offset..buffer_len, 0); + buffer_len -= buffer_offset; + } else { + buffer_len = 0; + } + self.buffer_len.set(buffer_len); + + self.records.reverse(); + } + + self.next_state = NextState::Finish; + // This check is done to see if we need to read more from the chunk before popping the record + if self.records.len() == 1 && self.io_state.get() != SortedChunkIOState::ReadEOF + { + // We've consumed the last record. Read more payload into the buffer. + if self.chunk_size - self.total_bytes_read.get() == 0 { + self.io_state.set(SortedChunkIOState::ReadEOF); + } else { + let _c = self.read()?; + return Ok(IOResult::IO); + } } - break; } - buffer_offset += bytes_read; - - let mut record = ImmutableRecord::new(record_size); - record.start_serialization(&buffer[buffer_offset..buffer_offset + record_size]); - buffer_offset += record_size; - - self.records.push(record); + NextState::Finish => { + self.next_state = NextState::Start; + return Ok(IOResult::Done(self.records.pop())); + } } - if buffer_offset < buffer_len { - buffer.copy_within(buffer_offset..buffer_len, 0); - buffer_len -= buffer_offset; - } else { - buffer_len = 0; - } - self.buffer_len.set(buffer_len); - - self.records.reverse(); } - - let record = self.records.pop(); - if self.records.is_empty() && self.io_state.get() != SortedChunkIOState::ReadEOF { - // We've consumed the last record. Read more payload into the buffer. - self.read()?; - } - Ok(record) } - fn read(&mut self) -> Result<()> { - if self.io_state.get() == SortedChunkIOState::ReadEOF { - return Ok(()); - } - if self.chunk_size - self.total_bytes_read.get() == 0 { - self.io_state.set(SortedChunkIOState::ReadEOF); - return Ok(()); - } + fn read(&mut self) -> Result { self.io_state.set(SortedChunkIOState::WaitingForRead); let read_buffer_size = self.buffer.borrow().len() - self.buffer_len.get(); @@ -395,23 +513,21 @@ impl SortedChunk { }); let c = Completion::new_read(read_buffer_ref, read_complete); - let _c = self.file.pread(self.total_bytes_read.get(), c)?; - Ok(()) + let c = self + .file + .pread(self.start_offset + self.total_bytes_read.get(), c)?; + Ok(c) } - fn write(&mut self, records: &mut Vec) -> Result<()> { + fn write( + &mut self, + records: &mut Vec, + record_size_lengths: Vec, + chunk_size: usize, + ) -> Result { assert!(self.io_state.get() == SortedChunkIOState::None); self.io_state.set(SortedChunkIOState::WaitingForWrite); - self.chunk_size = 0; - - // Pre-compute varint lengths for record sizes to determine the total buffer size. - let mut record_size_lengths = Vec::with_capacity(records.len()); - for record in records.iter() { - let record_size = record.record.get_payload().len(); - let size_len = varint_len(record_size as u64); - record_size_lengths.push(size_len); - self.chunk_size += size_len + record_size; - } + self.chunk_size = chunk_size; let buffer = Buffer::new_temporary(self.chunk_size); @@ -440,8 +556,8 @@ impl SortedChunk { }); let c = Completion::new_write(write_complete); - let _c = self.file.pwrite(0, buffer_ref, c)?; - Ok(()) + let c = self.file.pwrite(self.start_offset, buffer_ref, c)?; + Ok(c) } } @@ -564,9 +680,9 @@ impl Eq for SortableImmutableRecord {} enum SortedChunkIOState { WaitingForRead, ReadComplete, - ReadEOF, WaitingForWrite, WriteComplete, + ReadEOF, None, } @@ -575,6 +691,7 @@ mod tests { use super::*; use crate::translate::collate::CollationSeq; use crate::types::{ImmutableRecord, RefValue, Value, ValueType}; + use crate::util::IOExt; use crate::PlatformIO; use rand_chacha::{ rand_core::{RngCore, SeedableRng}, @@ -600,16 +717,17 @@ mod tests { let mut rng = ChaCha8Rng::seed_from_u64(seed); let io = Arc::new(PlatformIO::new().unwrap()); - let mut sorter = Sorter::new( - &[SortOrder::Asc], - vec![CollationSeq::Binary], - 256, - 64, - io.clone(), - ); let attempts = 8; for _ in 0..attempts { + let mut sorter = Sorter::new( + &[SortOrder::Asc], + vec![CollationSeq::Binary], + 256, + 64, + io.clone(), + ); + let num_records = 1000 + rng.next_u64() % 2000; let num_records = num_records as i64; @@ -622,17 +740,13 @@ mod tests { values.append(&mut generate_values(&mut rng, &value_types)); let record = ImmutableRecord::from_values(&values, values.len()); - sorter.insert(&record).expect("Failed to insert the record"); + io.block(|| sorter.insert(&record)) + .expect("Failed to insert the record"); initial_records.push(record); } - loop { - if let IOResult::IO = sorter.sort().expect("Failed to sort the records") { - io.run_once().expect("Failed to run the IO"); - continue; - } - break; - } + io.block(|| sorter.sort()) + .expect("Failed to sort the records"); assert!(!sorter.is_empty()); assert!(!sorter.chunks.is_empty()); @@ -644,13 +758,8 @@ mod tests { // Check that the record remained unchanged after sorting. assert_eq!(record, &initial_records[(num_records - i - 1) as usize]); - loop { - if let IOResult::IO = sorter.next().expect("Failed to get the next record") { - io.run_once().expect("Failed to run the IO"); - continue; - } - break; - } + io.block(|| sorter.next()) + .expect("Failed to get the next record"); } assert!(!sorter.has_more()); }