From 5d47502e3adb6549d02727d81b38492b3c3d48b9 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Sat, 19 Jul 2025 06:08:27 +0200 Subject: [PATCH 1/2] Avoid redundant decoding of record headers when reading sorted chunk files --- core/storage/sqlite3_ondisk.rs | 44 ++++++++++----------- core/vdbe/sorter.rs | 70 +++++++++++++++++++++------------- 2 files changed, 63 insertions(+), 51 deletions(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 3151136c6..60cbbcae3 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1059,30 +1059,6 @@ impl Iterator for SmallVecIter<'_, T, N> { } } -pub fn read_record_size(payload: &[u8]) -> Result { - let mut offset = 0; - let mut record_size = 0; - - let (header_size, bytes_read) = read_varint(payload)?; - let header_size = header_size as usize; - if header_size > payload.len() { - crate::bail_corrupt_error!("Incomplete record header"); - } - - offset += bytes_read; - record_size += header_size; - - while offset < header_size { - let (serial_type, bytes_read) = read_varint(&payload[offset..])?; - offset += bytes_read; - - let serial_type_obj = SerialType::try_from(serial_type)?; - record_size += serial_type_obj.size(); - } - - Ok(record_size) -} - /// Reads a value that might reference the buffer it is reading from. Be sure to store RefValue with the buffer /// always. #[inline(always)] @@ -1281,6 +1257,26 @@ pub fn read_varint(buf: &[u8]) -> Result<(u64, usize)> { Ok((v, 9)) } +pub fn varint_len(value: u64) -> usize { + if value <= 0x7f { + return 1; + } + if value <= 0x3fff { + return 2; + } + if (value & ((0xff000000_u64) << 32)) > 0 { + return 9; + } + + let mut bytes = value; + let mut n = 0; + while bytes != 0 { + bytes >>= 7; + n += 1; + } + n +} + pub fn write_varint(buf: &mut [u8], value: u64) -> usize { if value <= 0x7f { buf[0] = (value & 0x7f) as u8; diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 49d486810..4924d9a8f 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -13,7 +13,7 @@ use crate::{ Buffer, BufferData, Completion, CompletionType, File, OpenFlags, ReadCompletion, WriteCompletion, IO, }, - storage::sqlite3_ondisk::read_record_size, + storage::sqlite3_ondisk::{read_varint, varint_len, write_varint}, translate::collate::CollationSeq, types::{compare_immutable, IOResult, ImmutableRecord, KeyInfo}, Result, @@ -236,15 +236,11 @@ impl Sorter { self.io .open_file(chunk_file_path.to_str().unwrap(), OpenFlags::Create, false)?; - // Make sure the chunk buffer size can fit the largest record. + // Make sure the chunk buffer size can fit the largest record and its size varint. let chunk_buffer_size = self .min_chunk_read_buffer_size - .max(self.max_payload_size_in_buffer); - let mut chunk = SortedChunk::new( - chunk_file.clone(), - self.current_buffer_size, - chunk_buffer_size, - ); + .max(self.max_payload_size_in_buffer + 9); + let mut chunk = SortedChunk::new(chunk_file.clone(), chunk_buffer_size); chunk.write(&mut self.records)?; self.chunks.push(chunk); @@ -258,7 +254,7 @@ impl Sorter { struct SortedChunk { /// The chunk file. file: Arc, - /// The chunk size. + /// The size of this chunk file in bytes. chunk_size: usize, /// The read buffer. buffer: Rc>>, @@ -273,10 +269,10 @@ struct SortedChunk { } impl SortedChunk { - fn new(file: Arc, chunk_size: usize, buffer_size: usize) -> Self { + fn new(file: Arc, buffer_size: usize) -> Self { Self { file, - chunk_size, + chunk_size: 0, buffer: Rc::new(RefCell::new(vec![0; buffer_size])), buffer_len: Rc::new(Cell::new(0)), records: Vec::new(), @@ -300,26 +296,27 @@ impl SortedChunk { let buffer = buffer_ref.as_mut_slice(); let mut buffer_offset = 0; while buffer_offset < buffer_len { - // Decode records from the buffer until we run out of the buffer or we hit an incomplete record. - let record_size = match read_record_size(&buffer[buffer_offset..buffer_len]) { - Ok(record_size) => record_size, - Err(LimboError::Corrupt(_)) - if self.io_state.get() != SortedChunkIOState::ReadEOF => - { - // Failed to decode a partial record. - break; - } - Err(e) => { - return Err(e); - } - }; - - if record_size > buffer_len - buffer_offset { + // Extract records from the buffer until we run out of the buffer or we hit an incomplete record. + let (record_size, bytes_read) = + match read_varint(&buffer[buffer_offset..buffer_len]) { + Ok((record_size, bytes_read)) => (record_size as usize, bytes_read), + Err(LimboError::Corrupt(_)) + if self.io_state.get() != SortedChunkIOState::ReadEOF => + { + // Failed to decode a partial varint. + break; + } + Err(e) => { + return Err(e); + } + }; + if record_size > buffer_len - (buffer_offset + bytes_read) { if self.io_state.get() == SortedChunkIOState::ReadEOF { crate::bail_corrupt_error!("Incomplete record"); } break; } + buffer_offset += bytes_read; let mut record = ImmutableRecord::new(record_size); record.start_serialization(&buffer[buffer_offset..buffer_offset + record_size]); @@ -350,6 +347,10 @@ impl SortedChunk { if self.io_state.get() == SortedChunkIOState::ReadEOF { return Ok(()); } + if self.chunk_size - self.total_bytes_read.get() == 0 { + self.io_state.set(SortedChunkIOState::ReadEOF); + return Ok(()); + } self.io_state.set(SortedChunkIOState::WaitingForRead); let read_buffer_size = self.buffer.borrow().len() - self.buffer_len.get(); @@ -397,14 +398,29 @@ impl SortedChunk { fn write(&mut self, records: &mut Vec) -> Result<()> { assert!(self.io_state.get() == SortedChunkIOState::None); self.io_state.set(SortedChunkIOState::WaitingForWrite); + self.chunk_size = 0; + + // Pre-compute varint lengths for record sizes to determine the total buffer size. + let mut record_size_lengths = Vec::with_capacity(records.len()); + for record in records.iter() { + let record_size = record.record.get_payload().len(); + let size_len = varint_len(record_size as u64); + record_size_lengths.push(size_len); + self.chunk_size += size_len + record_size; + } let drop_fn = Rc::new(|_buffer: BufferData| {}); let mut buffer = Buffer::allocate(self.chunk_size, drop_fn); let mut buf_pos = 0; let buf = buffer.as_mut_slice(); - for record in records.drain(..) { + for (idx, record) in records.drain(..).enumerate() { let payload = record.record.get_payload(); + // Write the record size varint. + let size_len = record_size_lengths[idx]; + write_varint(&mut buf[buf_pos..buf_pos + size_len], payload.len() as u64); + buf_pos += size_len; + // Write the record payload. buf[buf_pos..buf_pos + payload.len()].copy_from_slice(payload); buf_pos += payload.len(); } From 10a848fbc5b9eced5657b2444830f312150e96a4 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Sat, 19 Jul 2025 18:40:43 +0200 Subject: [PATCH 2/2] address nit --- core/vdbe/sorter.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 4924d9a8f..0acd06360 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -414,10 +414,9 @@ impl SortedChunk { let mut buf_pos = 0; let buf = buffer.as_mut_slice(); - for (idx, record) in records.drain(..).enumerate() { + for (record, size_len) in records.drain(..).zip(record_size_lengths) { let payload = record.record.get_payload(); // Write the record size varint. - let size_len = record_size_lengths[idx]; write_varint(&mut buf[buf_pos..buf_pos + size_len], payload.len() as u64); buf_pos += size_len; // Write the record payload.