From 78f3bf3475667f459411c5a00bb7da638db7127d Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Thu, 17 Jul 2025 07:10:00 +0200 Subject: [PATCH 01/10] Core: Introduce external sorting --- core/Cargo.toml | 2 +- core/storage/sqlite3_ondisk.rs | 24 ++ core/vdbe/execute.rs | 22 +- core/vdbe/sorter.rs | 524 +++++++++++++++++++++++++++++++-- 4 files changed, 539 insertions(+), 33 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 912bcd656..b6a1d5723 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -69,6 +69,7 @@ bitflags = "2.9.0" serde = { workspace = true , optional = true, features = ["derive"] } paste = "1.0.15" uuid = { version = "1.11.0", features = ["v4", "v7"], optional = true } +tempfile = "3.8.0" [build-dependencies] chrono = { version = "0.4.38", default-features = false } @@ -86,7 +87,6 @@ criterion = { version = "0.5", features = [ ] } rstest = "0.18.2" rusqlite = "0.34.0" -tempfile = "3.8.0" quickcheck = { version = "1.0", default-features = false } quickcheck_macros = { version = "1.0", default-features = false } rand = "0.8.5" # Required for quickcheck diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 150d10d85..36d5efb18 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1063,6 +1063,30 @@ impl Iterator for SmallVecIter<'_, T, N> { } } +pub fn read_record_size(payload: &[u8]) -> Result { + let mut offset = 0; + let mut record_size = 0; + + let (header_size, bytes_read) = read_varint(payload)?; + let header_size = header_size as usize; + if header_size > payload.len() { + crate::bail_corrupt_error!("Incomplete record header"); + } + + offset += bytes_read; + record_size += header_size; + + while offset < header_size && offset < payload.len() { + let (serial_type, bytes_read) = read_varint(&payload[offset..])?; + offset += bytes_read; + + let serial_type_obj = SerialType::try_from(serial_type)?; + record_size += serial_type_obj.size(); + } + + Ok(record_size) +} + /// Reads a value that might reference the buffer it is reading from. Be sure to store RefValue with the buffer /// always. #[inline(always)] diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 29749594e..488d27da8 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -3413,12 +3413,24 @@ pub fn op_sorter_open( else { unreachable!("unexpected Insn {:?}", insn) }; + let cache_size = program.connection.get_cache_size(); + // Set the buffer size threshold to be roughly the same as the limit configured for the page-cache. + let page_size = header_accessor::get_page_size(&pager) + .unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) as usize; + let max_buffer_size_bytes = if cache_size < 0 { + (cache_size.abs() * 1024) as usize + } else { + (cache_size as usize) * page_size + }; let cursor = Sorter::new( order, collations .iter() .map(|collation| collation.unwrap_or_default()) .collect(), + max_buffer_size_bytes, + page_size, + pager.io.clone(), ); let mut cursors = state.cursors.borrow_mut(); cursors @@ -3486,7 +3498,7 @@ pub fn op_sorter_insert( Register::Record(record) => record, _ => unreachable!("SorterInsert on non-record register"), }; - cursor.insert(record); + cursor.insert(record)?; } state.pc += 1; Ok(InsnFunctionStepResult::Step) @@ -3511,7 +3523,9 @@ pub fn op_sorter_sort( let cursor = cursor.as_sorter_mut(); let is_empty = cursor.is_empty(); if !is_empty { - cursor.sort(); + if let CursorResult::IO = cursor.sort()? { + return Ok(InsnFunctionStepResult::IO); + } } is_empty }; @@ -3541,7 +3555,9 @@ pub fn op_sorter_next( let has_more = { let mut cursor = state.get_cursor(*cursor_id); let cursor = cursor.as_sorter_mut(); - cursor.next(); + if let CursorResult::IO = cursor.next()? { + return Ok(InsnFunctionStepResult::IO); + } cursor.has_more() }; if has_more { diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index b2b245d88..1ad61db4d 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -1,20 +1,61 @@ use turso_sqlite3_parser::ast::SortOrder; +use std::cell::{Cell, RefCell}; +use std::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd, Reverse}; +use std::collections::BinaryHeap; +use std::rc::Rc; +use std::sync::Arc; +use tempfile; + use crate::{ + error::LimboError, + io::{ + Buffer, BufferData, Completion, CompletionType, File, OpenFlags, ReadCompletion, + WriteCompletion, IO, + }, + storage::sqlite3_ondisk::read_record_size, translate::collate::CollationSeq, types::{compare_immutable, ImmutableRecord, KeyInfo}, }; pub struct Sorter { - records: Vec, + /// The records in the in-memory buffer. + records: Vec, + /// The current record. current: Option, + /// The number of values in the key. key_len: usize, - index_key_info: Vec, + /// The key info. + index_key_info: Rc>, + /// Sorted chunks stored on disk. + chunks: Vec, + /// The heap of records consumed from the chunks and their corresponding chunk index. + chunk_heap: BinaryHeap<(Reverse, usize)>, + /// The maximum size of the in-memory buffer in bytes before the records are flushed to a chunk file. + max_buffer_size: usize, + /// The current size of the in-memory buffer in bytes. + current_buffer_size: usize, + /// The minimum size of a chunk read buffer in bytes. The actual buffer size can be larger if the largest + /// record in the buffer is larger than this value. + min_chunk_read_buffer_size: usize, + /// The maximum record payload size in the in-memory buffer. + max_payload_size_in_buffer: usize, + /// The IO object. + io: Arc, + /// The indices of the chunks for which the read is not complete. + wait_for_read_complete: Vec, + /// The temporary directory for chunk files. + temp_dir: Option, } impl Sorter { - pub fn new(order: &[SortOrder], collations: Vec) -> Self { - assert_eq!(order.len(), collations.len()); + pub fn new( + order: &[SortOrder], + collations: Vec, + max_buffer_size_bytes: usize, + min_chunk_read_buffer_size_bytes: usize, + io: Arc, + ) -> Self { Self { records: Vec::new(), current: None, @@ -27,10 +68,20 @@ impl Sorter { collation, }) .collect(), + chunks: Vec::new(), + chunk_heap: BinaryHeap::new(), + max_buffer_size: max_buffer_size_bytes, + current_buffer_size: 0, + min_chunk_read_buffer_size: min_chunk_read_buffer_size_bytes, + max_payload_size_in_buffer: 0, + io, + wait_for_read_complete: Vec::new(), + temp_dir: None, } } + pub fn is_empty(&self) -> bool { - self.records.is_empty() + self.records.is_empty() && self.chunks.is_empty() } pub fn has_more(&self) -> bool { @@ -38,36 +89,451 @@ impl Sorter { } // We do the sorting here since this is what is called by the SorterSort instruction - pub fn sort(&mut self) { - self.records.sort_by(|a, b| { - let a_values = a.get_values(); - let b_values = b.get_values(); - - let a_key = if a_values.len() >= self.key_len { - &a_values[..self.key_len] - } else { - &a_values[..] - }; - - let b_key = if b_values.len() >= self.key_len { - &b_values[..self.key_len] - } else { - &b_values[..] - }; - - compare_immutable(a_key, b_key, &self.index_key_info) - }); - self.records.reverse(); + pub fn sort(&mut self) -> Result> { + if self.chunks.is_empty() { + self.records.sort(); + self.records.reverse(); + } else { + self.flush()?; + if let CursorResult::IO = self.init_chunk_heap()? { + return Ok(CursorResult::IO); + } + } self.next() } - pub fn next(&mut self) { - self.current = self.records.pop(); + + pub fn next(&mut self) -> Result> { + if self.chunks.is_empty() { + // Serve from the in-memory buffer. + self.current = self.records.pop().map(|r| r.record); + } else { + // Serve from sorted chunk files. + match self.next_from_chunk_heap()? { + CursorResult::IO => return Ok(CursorResult::IO), + CursorResult::Ok(record) => self.current = record, + } + } + Ok(CursorResult::Ok(())) } + pub fn record(&self) -> Option<&ImmutableRecord> { self.current.as_ref() } - pub fn insert(&mut self, record: &ImmutableRecord) { - self.records.push(record.clone()); + pub fn insert(&mut self, record: &ImmutableRecord) -> Result<()> { + let payload_size = record.get_payload().len(); + if self.current_buffer_size + payload_size > self.max_buffer_size { + self.flush()?; + } + self.records.push(SortableImmutableRecord::new( + record.clone(), + self.key_len, + self.index_key_info.clone(), + )); + self.current_buffer_size += payload_size; + self.max_payload_size_in_buffer = self.max_payload_size_in_buffer.max(payload_size); + Ok(()) + } + + fn init_chunk_heap(&mut self) -> Result> { + let mut all_read_complete = true; + // Make sure all chunks read at least one record into their buffer. + for chunk in self.chunks.iter_mut() { + match chunk.io_state.get() { + SortedChunkIOState::WriteComplete => { + all_read_complete = false; + // Write complete, we can now read from the chunk. + chunk.read()?; + } + SortedChunkIOState::WaitingForWrite => { + all_read_complete = false; + } + SortedChunkIOState::ReadEOF | SortedChunkIOState::ReadComplete => {} + _ => { + unreachable!("Unexpected chunk IO state: {:?}", chunk.io_state.get()) + } + } + } + if !all_read_complete { + return Ok(CursorResult::IO); + } + self.chunk_heap.reserve(self.chunks.len()); + for chunk_idx in 0..self.chunks.len() { + self.push_to_chunk_heap(chunk_idx)?; + } + Ok(CursorResult::Ok(())) + } + + fn next_from_chunk_heap(&mut self) -> Result>> { + let mut all_read_complete = true; + for chunk_idx in self.wait_for_read_complete.iter() { + let chunk_io_state = self.chunks[*chunk_idx].io_state.get(); + match chunk_io_state { + SortedChunkIOState::ReadComplete | SortedChunkIOState::ReadEOF => {} + SortedChunkIOState::WaitingForRead => { + all_read_complete = false; + } + _ => { + unreachable!("Unexpected chunk IO state: {:?}", chunk_io_state) + } + } + } + if !all_read_complete { + return Ok(CursorResult::IO); + } + self.wait_for_read_complete.clear(); + + if let Some((next_record, next_chunk_idx)) = self.chunk_heap.pop() { + self.push_to_chunk_heap(next_chunk_idx)?; + Ok(CursorResult::Ok(Some(next_record.0.record))) + } else { + Ok(CursorResult::Ok(None)) + } + } + + fn push_to_chunk_heap(&mut self, chunk_idx: usize) -> Result<()> { + let chunk = &mut self.chunks[chunk_idx]; + + if chunk.has_more() { + let record = chunk.next()?.unwrap(); + self.chunk_heap.push(( + Reverse(SortableImmutableRecord::new( + record, + self.key_len, + self.order, + self.collations.clone(), + )), + chunk_idx, + )); + if let SortedChunkIOState::WaitingForRead = chunk.io_state.get() { + self.wait_for_read_complete.push(chunk_idx); + } + } + Ok(()) + } + + fn flush(&mut self) -> Result<()> { + if self.records.is_empty() { + return Ok(()); + } + + self.records.sort(); + + if self.temp_dir.is_none() { + self.temp_dir = Some(tempfile::tempdir().map_err(LimboError::IOError)?); + } + + let chunk_file_path = self + .temp_dir + .as_ref() + .unwrap() + .path() + .join(format!("chunk_{}", self.chunks.len())); + let chunk_file = + self.io + .open_file(chunk_file_path.to_str().unwrap(), OpenFlags::Create, false)?; + + // Make sure the chunk buffer size can fit the largest record. + let chunk_buffer_size = self + .min_chunk_read_buffer_size + .max(self.max_payload_size_in_buffer); + let mut chunk = SortedChunk::new( + chunk_file.clone(), + self.current_buffer_size, + chunk_buffer_size, + ); + chunk.write(&mut self.records, self.current_buffer_size)?; + self.chunks.push(chunk); + + self.current_buffer_size = 0; + self.max_payload_size_in_buffer = 0; + + Ok(()) + } +} + +struct SortedChunk { + /// The chunk file. + file: Arc, + /// The chunk size. + chunk_size: usize, + /// The read buffer. + buffer: Rc>>, + /// The current length of the buffer. + buffer_len: Rc>, + /// The records decoded from the chunk file. + records: Vec, + /// The current IO state of the chunk. + io_state: Rc>, + /// The total number of bytes read from the chunk file. + total_bytes_read: Rc>, +} + +impl SortedChunk { + fn new(file: Arc, chunk_size: usize, buffer_size: usize) -> Self { + Self { + file, + chunk_size, + buffer: Rc::new(RefCell::new(vec![0; buffer_size])), + buffer_len: Rc::new(Cell::new(0)), + records: Vec::new(), + io_state: Rc::new(Cell::new(SortedChunkIOState::None)), + total_bytes_read: Rc::new(Cell::new(0)), + } + } + + fn has_more(&self) -> bool { + !self.records.is_empty() || self.io_state.get() != SortedChunkIOState::ReadEOF + } + + fn next(&mut self) -> Result> { + let mut buffer_len = self.buffer_len.get(); + if self.records.is_empty() && buffer_len == 0 { + return Ok(None); + } + + if self.records.is_empty() { + let mut buffer_ref = self.buffer.borrow_mut(); + let buffer = buffer_ref.as_mut_slice(); + let mut buffer_offset = 0; + while buffer_offset < buffer_len { + // Decode records from the buffer until we run out of the buffer or we hit an incomplete record. + let record_size = match read_record_size(&buffer[buffer_offset..buffer_len]) { + Ok(record_size) => record_size, + Err(LimboError::Corrupt(_)) + if self.io_state.get() != SortedChunkIOState::ReadEOF => + { + // Failed to decode a partial record. + break; + } + Err(e) => { + return Err(e); + } + }; + + if record_size > buffer_len - buffer_offset { + if self.io_state.get() == SortedChunkIOState::ReadEOF { + crate::bail_corrupt_error!("Incomplete record"); + } + break; + } + + let mut record = ImmutableRecord::new(record_size); + record.start_serialization(&buffer[buffer_offset..buffer_offset + record_size]); + buffer_offset += record_size; + + self.records.push(record); + } + if buffer_offset < buffer_len { + buffer.copy_within(buffer_offset..buffer_len, 0); + buffer_len -= buffer_offset; + } else { + buffer_len = 0; + } + self.buffer_len.set(buffer_len); + + self.records.reverse(); + } + + let record = self.records.pop(); + if self.records.is_empty() && self.io_state.get() != SortedChunkIOState::ReadEOF { + // We've consumed the last record. Read more payload into the buffer. + self.read()?; + } + Ok(record) + } + + fn read(&mut self) -> Result<()> { + if self.io_state.get() == SortedChunkIOState::ReadEOF { + return Ok(()); + } + self.io_state.set(SortedChunkIOState::WaitingForRead); + + let read_buffer_size = self.buffer.borrow().len() - self.buffer_len.get(); + let read_buffer_size = read_buffer_size.min(self.chunk_size - self.total_bytes_read.get()); + + let drop_fn = Rc::new(|_buffer: BufferData| {}); + let read_buffer = Buffer::allocate(read_buffer_size, drop_fn); + #[allow(clippy::arc_with_non_send_sync)] + let read_buffer_ref = Arc::new(RefCell::new(read_buffer)); + + let chunk_io_state_copy = self.io_state.clone(); + let stored_buffer_copy = self.buffer.clone(); + let stored_buffer_len_copy = self.buffer_len.clone(); + let total_bytes_read_copy = self.total_bytes_read.clone(); + let read_complete = Box::new(move |buf: Arc>, bytes_read: i32| { + let read_buf_ref = buf.borrow(); + let read_buf = read_buf_ref.as_slice(); + + let bytes_read = bytes_read as usize; + if bytes_read == 0 { + chunk_io_state_copy.set(SortedChunkIOState::ReadEOF); + return; + } + chunk_io_state_copy.set(SortedChunkIOState::ReadComplete); + + let mut stored_buf_ref = stored_buffer_copy.borrow_mut(); + let stored_buf = stored_buf_ref.as_mut_slice(); + let mut stored_buf_len = stored_buffer_len_copy.get(); + + stored_buf[stored_buf_len..stored_buf_len + bytes_read] + .copy_from_slice(&read_buf[..bytes_read]); + stored_buf_len += bytes_read; + + stored_buffer_len_copy.set(stored_buf_len); + total_bytes_read_copy.set(total_bytes_read_copy.get() + bytes_read); + }); + + let c = Completion::new(CompletionType::Read(ReadCompletion::new( + read_buffer_ref, + read_complete, + ))); + self.file.pread(self.total_bytes_read.get(), c)?; + Ok(()) + } + + fn write( + &mut self, + records: &mut Vec, + total_size: usize, + ) -> Result<()> { + assert!(self.io_state.get() == SortedChunkIOState::None); + self.io_state.set(SortedChunkIOState::WaitingForWrite); + + let drop_fn = Rc::new(|_buffer: BufferData| {}); + let mut buffer = Buffer::allocate(total_size, drop_fn); + + let mut buf_pos = 0; + let buf = buffer.as_mut_slice(); + for record in records.drain(..) { + let payload = record.record.get_payload(); + buf[buf_pos..buf_pos + payload.len()].copy_from_slice(payload); + buf_pos += payload.len(); + } + + #[allow(clippy::arc_with_non_send_sync)] + let buffer_ref = Arc::new(RefCell::new(buffer)); + + let buffer_ref_copy = buffer_ref.clone(); + let chunk_io_state_copy = self.io_state.clone(); + let write_complete = Box::new(move |bytes_written: i32| { + chunk_io_state_copy.set(SortedChunkIOState::WriteComplete); + let buf_len = buffer_ref_copy.borrow().len(); + if bytes_written < buf_len as i32 { + tracing::error!("wrote({bytes_written}) less than expected({buf_len})"); + } + }); + + let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete))); + self.file.pwrite(0, buffer_ref, c)?; + Ok(()) + } +} + +struct SortableImmutableRecord { + record: ImmutableRecord, + key_len: usize, + index_key_info: Rc>, +} + +impl SortableImmutableRecord { + fn new( + record: ImmutableRecord, + key_len: usize, + index_key_info: Rc>, + ) -> Self { + Self { + record, + key_len, + index_key_info, + } + } +} + +impl Ord for SortableImmutableRecord { + fn cmp(&self, other: &Self) -> Ordering { + let this_values = self.record.get_values(); + let other_values = other.record.get_values(); + + let a_key = if this_values.len() >= self.key_len { + &this_values[..self.key_len] + } else { + &this_values[..] + }; + + let b_key = if other_values.len() >= self.key_len { + &other_values[..self.key_len] + } else { + &other_values[..] + }; + + compare_immutable(a_key, b_key, self.index_key_info) + } +} + +impl PartialOrd for SortableImmutableRecord { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for SortableImmutableRecord { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +impl Eq for SortableImmutableRecord {} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +enum SortedChunkIOState { + WaitingForRead, + ReadComplete, + ReadEOF, + WaitingForWrite, + WriteComplete, + None, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::{ImmutableRecord, RefValue, Value}; + use crate::PlatformIO; + + #[test] + fn test_external_sort() { + let io = Arc::new(PlatformIO::new().unwrap()); + let mut sorter = Sorter::new(&[SortOrder::Asc], vec![], 64, 13, io.clone()); + for i in (0..1024).rev() { + sorter + .insert(&ImmutableRecord::from_values(&[Value::Integer(i)], 1)) + .expect("Failed to insert the record"); + } + + loop { + if let CursorResult::IO = sorter.sort().expect("Failed to sort the records") { + io.run_once().expect("Failed to run the IO"); + continue; + } + break; + } + + assert!(!sorter.is_empty()); + assert_eq!(sorter.chunks.len(), 63); + + for i in 0..1024 { + assert!(sorter.has_more()); + let record = sorter.record().unwrap(); + assert_eq!(record.get_values()[0], RefValue::Integer(i)); + loop { + if let CursorResult::IO = sorter.next().expect("Failed to get the next record") { + io.run_once().expect("Failed to run the IO"); + continue; + } + break; + } + } + assert!(!sorter.has_more()); } } From fd042ac4c8ccb9851d09e6c6ec2f73e6e2fd7988 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Thu, 17 Jul 2025 07:39:00 +0200 Subject: [PATCH 02/10] Use IOResult insteaed of CursorResult --- core/vdbe/execute.rs | 4 ++-- core/vdbe/sorter.rs | 35 ++++++++++++++++++----------------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 488d27da8..3f0157f34 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -3523,7 +3523,7 @@ pub fn op_sorter_sort( let cursor = cursor.as_sorter_mut(); let is_empty = cursor.is_empty(); if !is_empty { - if let CursorResult::IO = cursor.sort()? { + if let IOResult::IO = cursor.sort()? { return Ok(InsnFunctionStepResult::IO); } } @@ -3555,7 +3555,7 @@ pub fn op_sorter_next( let has_more = { let mut cursor = state.get_cursor(*cursor_id); let cursor = cursor.as_sorter_mut(); - if let CursorResult::IO = cursor.next()? { + if let IOResult::IO = cursor.next()? { return Ok(InsnFunctionStepResult::IO); } cursor.has_more() diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 1ad61db4d..1f9658809 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -15,7 +15,8 @@ use crate::{ }, storage::sqlite3_ondisk::read_record_size, translate::collate::CollationSeq, - types::{compare_immutable, ImmutableRecord, KeyInfo}, + types::{compare_immutable, IOResult, ImmutableRecord, KeyInfo}, + Result, }; pub struct Sorter { @@ -89,31 +90,31 @@ impl Sorter { } // We do the sorting here since this is what is called by the SorterSort instruction - pub fn sort(&mut self) -> Result> { + pub fn sort(&mut self) -> Result> { if self.chunks.is_empty() { self.records.sort(); self.records.reverse(); } else { self.flush()?; - if let CursorResult::IO = self.init_chunk_heap()? { - return Ok(CursorResult::IO); + if let IOResult::IO = self.init_chunk_heap()? { + return Ok(IOResult::IO); } } self.next() } - pub fn next(&mut self) -> Result> { + pub fn next(&mut self) -> Result> { if self.chunks.is_empty() { // Serve from the in-memory buffer. self.current = self.records.pop().map(|r| r.record); } else { // Serve from sorted chunk files. match self.next_from_chunk_heap()? { - CursorResult::IO => return Ok(CursorResult::IO), - CursorResult::Ok(record) => self.current = record, + IOResult::IO => return Ok(IOResult::IO), + IOResult::Done(record) => self.current = record, } } - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } pub fn record(&self) -> Option<&ImmutableRecord> { @@ -135,7 +136,7 @@ impl Sorter { Ok(()) } - fn init_chunk_heap(&mut self) -> Result> { + fn init_chunk_heap(&mut self) -> Result> { let mut all_read_complete = true; // Make sure all chunks read at least one record into their buffer. for chunk in self.chunks.iter_mut() { @@ -155,16 +156,16 @@ impl Sorter { } } if !all_read_complete { - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } self.chunk_heap.reserve(self.chunks.len()); for chunk_idx in 0..self.chunks.len() { self.push_to_chunk_heap(chunk_idx)?; } - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } - fn next_from_chunk_heap(&mut self) -> Result>> { + fn next_from_chunk_heap(&mut self) -> Result>> { let mut all_read_complete = true; for chunk_idx in self.wait_for_read_complete.iter() { let chunk_io_state = self.chunks[*chunk_idx].io_state.get(); @@ -179,15 +180,15 @@ impl Sorter { } } if !all_read_complete { - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } self.wait_for_read_complete.clear(); if let Some((next_record, next_chunk_idx)) = self.chunk_heap.pop() { self.push_to_chunk_heap(next_chunk_idx)?; - Ok(CursorResult::Ok(Some(next_record.0.record))) + Ok(IOResult::Done(Some(next_record.0.record))) } else { - Ok(CursorResult::Ok(None)) + Ok(IOResult::Done(None)) } } @@ -512,7 +513,7 @@ mod tests { } loop { - if let CursorResult::IO = sorter.sort().expect("Failed to sort the records") { + if let IOResult::IO = sorter.sort().expect("Failed to sort the records") { io.run_once().expect("Failed to run the IO"); continue; } @@ -527,7 +528,7 @@ mod tests { let record = sorter.record().unwrap(); assert_eq!(record.get_values()[0], RefValue::Integer(i)); loop { - if let CursorResult::IO = sorter.next().expect("Failed to get the next record") { + if let IOResult::IO = sorter.next().expect("Failed to get the next record") { io.run_once().expect("Failed to run the IO"); continue; } From a88b828268906d35f3b1e1abeb0b049259229031 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Thu, 17 Jul 2025 07:47:35 +0200 Subject: [PATCH 03/10] Fix clippy --- core/vdbe/execute.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 3f0157f34..00fd2a17a 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -3415,7 +3415,7 @@ pub fn op_sorter_open( }; let cache_size = program.connection.get_cache_size(); // Set the buffer size threshold to be roughly the same as the limit configured for the page-cache. - let page_size = header_accessor::get_page_size(&pager) + let page_size = header_accessor::get_page_size(pager) .unwrap_or(storage::sqlite3_ondisk::DEFAULT_PAGE_SIZE) as usize; let max_buffer_size_bytes = if cache_size < 0 { (cache_size.abs() * 1024) as usize From 6a609398fe3b730eba0faf2daa1b9f62b2580b6b Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Thu, 17 Jul 2025 08:03:39 +0200 Subject: [PATCH 04/10] cosmetic fix --- core/vdbe/sorter.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 1f9658809..90a6e2654 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -243,7 +243,7 @@ impl Sorter { self.current_buffer_size, chunk_buffer_size, ); - chunk.write(&mut self.records, self.current_buffer_size)?; + chunk.write(&mut self.records)?; self.chunks.push(chunk); self.current_buffer_size = 0; @@ -393,16 +393,12 @@ impl SortedChunk { Ok(()) } - fn write( - &mut self, - records: &mut Vec, - total_size: usize, - ) -> Result<()> { + fn write(&mut self, records: &mut Vec) -> Result<()> { assert!(self.io_state.get() == SortedChunkIOState::None); self.io_state.set(SortedChunkIOState::WaitingForWrite); let drop_fn = Rc::new(|_buffer: BufferData| {}); - let mut buffer = Buffer::allocate(total_size, drop_fn); + let mut buffer = Buffer::allocate(self.chunk_size, drop_fn); let mut buf_pos = 0; let buf = buffer.as_mut_slice(); From edf2be1432f3fa2b4d32b29e07f31f33eec39386 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Thu, 17 Jul 2025 19:03:02 +0200 Subject: [PATCH 05/10] fix conflicts --- core/vdbe/sorter.rs | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 90a6e2654..9cd585d40 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -57,18 +57,21 @@ impl Sorter { min_chunk_read_buffer_size_bytes: usize, io: Arc, ) -> Self { + assert_eq!(order.len(), collations.len()); Self { records: Vec::new(), current: None, key_len: order.len(), - index_key_info: order - .iter() - .zip(collations) - .map(|(order, collation)| KeyInfo { - sort_order: *order, - collation, - }) - .collect(), + index_key_info: Rc::new( + order + .iter() + .zip(collations) + .map(|(order, collation)| KeyInfo { + sort_order: *order, + collation, + }) + .collect(), + ), chunks: Vec::new(), chunk_heap: BinaryHeap::new(), max_buffer_size: max_buffer_size_bytes, @@ -201,8 +204,7 @@ impl Sorter { Reverse(SortableImmutableRecord::new( record, self.key_len, - self.order, - self.collations.clone(), + self.index_key_info.clone(), )), chunk_idx, )); @@ -434,11 +436,7 @@ struct SortableImmutableRecord { } impl SortableImmutableRecord { - fn new( - record: ImmutableRecord, - key_len: usize, - index_key_info: Rc>, - ) -> Self { + fn new(record: ImmutableRecord, key_len: usize, index_key_info: Rc>) -> Self { Self { record, key_len, @@ -464,7 +462,7 @@ impl Ord for SortableImmutableRecord { &other_values[..] }; - compare_immutable(a_key, b_key, self.index_key_info) + compare_immutable(a_key, b_key, self.index_key_info.as_ref()) } } @@ -495,13 +493,20 @@ enum SortedChunkIOState { #[cfg(test)] mod tests { use super::*; + use crate::translate::collate::CollationSeq; use crate::types::{ImmutableRecord, RefValue, Value}; use crate::PlatformIO; #[test] fn test_external_sort() { let io = Arc::new(PlatformIO::new().unwrap()); - let mut sorter = Sorter::new(&[SortOrder::Asc], vec![], 64, 13, io.clone()); + let mut sorter = Sorter::new( + &[SortOrder::Asc], + vec![CollationSeq::Binary], + 64, + 13, + io.clone(), + ); for i in (0..1024).rev() { sorter .insert(&ImmutableRecord::from_values(&[Value::Integer(i)], 1)) From 20bdbd5ca5b5f599e773c40ca350969a3dbc6f86 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Thu, 17 Jul 2025 19:41:58 +0200 Subject: [PATCH 06/10] address suggestions --- core/storage/sqlite3_ondisk.rs | 2 +- core/vdbe/sorter.rs | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 36d5efb18..c1001cf54 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1076,7 +1076,7 @@ pub fn read_record_size(payload: &[u8]) -> Result { offset += bytes_read; record_size += header_size; - while offset < header_size && offset < payload.len() { + while offset < header_size { let (serial_type, bytes_read) = read_varint(&payload[offset..])?; offset += bytes_read; diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 9cd585d40..0b75d6d06 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -357,7 +357,6 @@ impl SortedChunk { let drop_fn = Rc::new(|_buffer: BufferData| {}); let read_buffer = Buffer::allocate(read_buffer_size, drop_fn); - #[allow(clippy::arc_with_non_send_sync)] let read_buffer_ref = Arc::new(RefCell::new(read_buffer)); let chunk_io_state_copy = self.io_state.clone(); @@ -410,7 +409,6 @@ impl SortedChunk { buf_pos += payload.len(); } - #[allow(clippy::arc_with_non_send_sync)] let buffer_ref = Arc::new(RefCell::new(buffer)); let buffer_ref_copy = buffer_ref.clone(); From d9751212d78abdbf3956fc7acd593a5729ca707f Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Fri, 18 Jul 2025 07:15:20 +0200 Subject: [PATCH 07/10] make a fuzz sorter test --- core/vdbe/sorter.rs | 116 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 94 insertions(+), 22 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 0b75d6d06..c7206d92e 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -492,11 +492,31 @@ enum SortedChunkIOState { mod tests { use super::*; use crate::translate::collate::CollationSeq; - use crate::types::{ImmutableRecord, RefValue, Value}; + use crate::types::{ImmutableRecord, RefValue, Value, ValueType}; use crate::PlatformIO; + use rand_chacha::{ + rand_core::{RngCore, SeedableRng}, + ChaCha8Rng, + }; + + fn get_seed() -> u64 { + std::env::var("SEED").map_or( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis(), + |v| { + v.parse() + .expect("Failed to parse SEED environment variable as u64") + }, + ) as u64 + } #[test] - fn test_external_sort() { + fn fuzz_external_sort() { + let seed = get_seed(); + let mut rng = ChaCha8Rng::seed_from_u64(seed); + let io = Arc::new(PlatformIO::new().unwrap()); let mut sorter = Sorter::new( &[SortOrder::Asc], @@ -505,35 +525,87 @@ mod tests { 13, io.clone(), ); - for i in (0..1024).rev() { - sorter - .insert(&ImmutableRecord::from_values(&[Value::Integer(i)], 1)) - .expect("Failed to insert the record"); - } - loop { - if let IOResult::IO = sorter.sort().expect("Failed to sort the records") { - io.run_once().expect("Failed to run the IO"); - continue; + let attempts = 8; + for _ in 0..attempts { + let num_records = 1000 + rng.next_u64() % 2000; + let num_records = num_records as i64; + + let num_values = 1 + rng.next_u64() % 4; + let value_types = generate_value_types(&mut rng, num_values as usize); + + for i in (0..num_records).rev() { + let mut values = vec![Value::Integer(i)]; + values.append(&mut generate_values(&mut rng, &value_types)); + sorter + .insert(&ImmutableRecord::from_values(&values, values.len())) + .expect("Failed to insert the record"); } - break; - } - assert!(!sorter.is_empty()); - assert_eq!(sorter.chunks.len(), 63); - - for i in 0..1024 { - assert!(sorter.has_more()); - let record = sorter.record().unwrap(); - assert_eq!(record.get_values()[0], RefValue::Integer(i)); loop { - if let IOResult::IO = sorter.next().expect("Failed to get the next record") { + if let IOResult::IO = sorter.sort().expect("Failed to sort the records") { io.run_once().expect("Failed to run the IO"); continue; } break; } + + assert!(!sorter.is_empty()); + assert!(!sorter.chunks.is_empty()); + + for i in 0..num_records { + assert!(sorter.has_more()); + let record = sorter.record().unwrap(); + assert_eq!(record.get_values()[0], RefValue::Integer(i)); + loop { + if let IOResult::IO = sorter.next().expect("Failed to get the next record") { + io.run_once().expect("Failed to run the IO"); + continue; + } + break; + } + } + assert!(!sorter.has_more()); } - assert!(!sorter.has_more()); + } + + fn generate_value_types(rng: &mut R, num_values: usize) -> Vec { + let mut value_types = Vec::with_capacity(num_values); + + for _ in 0..num_values { + let value_type: ValueType = match rng.next_u64() % 4 { + 0 => ValueType::Integer, + 1 => ValueType::Float, + 2 => ValueType::Blob, + 3 => ValueType::Null, + _ => unreachable!(), + }; + value_types.push(value_type); + } + + value_types + } + + fn generate_values(rng: &mut R, value_types: &[ValueType]) -> Vec { + let mut values = Vec::with_capacity(value_types.len()); + for value_type in value_types { + let value = match value_type { + ValueType::Integer => Value::Integer(rng.next_u64() as i64), + ValueType::Float => { + let numerator = rng.next_u64() as f64; + let denominator = rng.next_u64() as f64; + Value::Float(numerator / denominator) + } + ValueType::Blob => { + let mut blob = Vec::with_capacity((rng.next_u64() % 2047 + 1) as usize); + rng.fill_bytes(&mut blob); + Value::Blob(blob) + } + ValueType::Null => Value::Null, + _ => unreachable!(), + }; + values.push(value); + } + values } } From f6f1d076da8f605b20cb4afa5a1aa12c60d6515e Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Fri, 18 Jul 2025 07:26:11 +0200 Subject: [PATCH 08/10] verify that records remain unchanged after sorting --- core/vdbe/sorter.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index c7206d92e..b2b3a8429 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -534,12 +534,14 @@ mod tests { let num_values = 1 + rng.next_u64() % 4; let value_types = generate_value_types(&mut rng, num_values as usize); + let mut initial_records = Vec::with_capacity(num_records as usize); for i in (0..num_records).rev() { let mut values = vec![Value::Integer(i)]; values.append(&mut generate_values(&mut rng, &value_types)); - sorter - .insert(&ImmutableRecord::from_values(&values, values.len())) - .expect("Failed to insert the record"); + let record = ImmutableRecord::from_values(&values, values.len()); + + sorter.insert(&record).expect("Failed to insert the record"); + initial_records.push(record); } loop { @@ -557,6 +559,9 @@ mod tests { assert!(sorter.has_more()); let record = sorter.record().unwrap(); assert_eq!(record.get_values()[0], RefValue::Integer(i)); + // Check that the record remained unchanged after sorting. + assert_eq!(record, &initial_records[(num_records - i - 1) as usize]); + loop { if let IOResult::IO = sorter.next().expect("Failed to get the next record") { io.run_once().expect("Failed to run the IO"); From 76e748146b8cf5d9ca89944c12df6c635cc16c19 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Fri, 18 Jul 2025 07:30:08 +0200 Subject: [PATCH 09/10] rebase --- core/vdbe/sorter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index b2b3a8429..21a0f1bea 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -390,7 +390,7 @@ impl SortedChunk { read_buffer_ref, read_complete, ))); - self.file.pread(self.total_bytes_read.get(), c)?; + self.file.pread(self.total_bytes_read.get(), Arc::new(c))?; Ok(()) } @@ -422,7 +422,7 @@ impl SortedChunk { }); let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete))); - self.file.pwrite(0, buffer_ref, c)?; + self.file.pwrite(0, buffer_ref, Arc::new(c))?; Ok(()) } } From 28ff170e149eb37c0f4f1c73d5e20266232a1b77 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Fri, 18 Jul 2025 07:41:15 +0200 Subject: [PATCH 10/10] improve sorter settings in the fuzz test --- core/vdbe/sorter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 21a0f1bea..49d486810 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -521,8 +521,8 @@ mod tests { let mut sorter = Sorter::new( &[SortOrder::Asc], vec![CollationSeq::Binary], + 256, 64, - 13, io.clone(), );