Merge 'Avoid redundant decoding of record headers when reading sorted chunk files' from Iaroslav Zeigerman

Currently, each record header is decoded at least twice: once to
determine the record size within the read buffer (in order to construct
the `ImmutableRecord` instance), and again later when decoding the
record for comparison. This redundant decoding can have a noticeable
negative impact on performance when records are wide (eg. contain
multiple columns).
This update modifies the (de)serialization format for sorted chunk files
by prepending a record size varint to each record payload. As a result,
only a single varint needs to be decoded to determine the record size,
eliminating the need to decode the full record header during reads.

Closes #2176
This commit is contained in:
Jussi Saurio
2025-07-20 23:54:54 +03:00
2 changed files with 62 additions and 51 deletions

View File

@@ -1059,30 +1059,6 @@ impl<T: Default + Copy, const N: usize> Iterator for SmallVecIter<'_, T, N> {
}
}
pub fn read_record_size(payload: &[u8]) -> Result<usize> {
let mut offset = 0;
let mut record_size = 0;
let (header_size, bytes_read) = read_varint(payload)?;
let header_size = header_size as usize;
if header_size > payload.len() {
crate::bail_corrupt_error!("Incomplete record header");
}
offset += bytes_read;
record_size += header_size;
while offset < header_size {
let (serial_type, bytes_read) = read_varint(&payload[offset..])?;
offset += bytes_read;
let serial_type_obj = SerialType::try_from(serial_type)?;
record_size += serial_type_obj.size();
}
Ok(record_size)
}
/// Reads a value that might reference the buffer it is reading from. Be sure to store RefValue with the buffer
/// always.
#[inline(always)]
@@ -1281,6 +1257,26 @@ pub fn read_varint(buf: &[u8]) -> Result<(u64, usize)> {
Ok((v, 9))
}
pub fn varint_len(value: u64) -> usize {
if value <= 0x7f {
return 1;
}
if value <= 0x3fff {
return 2;
}
if (value & ((0xff000000_u64) << 32)) > 0 {
return 9;
}
let mut bytes = value;
let mut n = 0;
while bytes != 0 {
bytes >>= 7;
n += 1;
}
n
}
pub fn write_varint(buf: &mut [u8], value: u64) -> usize {
if value <= 0x7f {
buf[0] = (value & 0x7f) as u8;

View File

@@ -13,7 +13,7 @@ use crate::{
Buffer, BufferData, Completion, CompletionType, File, OpenFlags, ReadCompletion,
WriteCompletion, IO,
},
storage::sqlite3_ondisk::read_record_size,
storage::sqlite3_ondisk::{read_varint, varint_len, write_varint},
translate::collate::CollationSeq,
types::{compare_immutable, IOResult, ImmutableRecord, KeyInfo},
Result,
@@ -236,15 +236,11 @@ impl Sorter {
self.io
.open_file(chunk_file_path.to_str().unwrap(), OpenFlags::Create, false)?;
// Make sure the chunk buffer size can fit the largest record.
// Make sure the chunk buffer size can fit the largest record and its size varint.
let chunk_buffer_size = self
.min_chunk_read_buffer_size
.max(self.max_payload_size_in_buffer);
let mut chunk = SortedChunk::new(
chunk_file.clone(),
self.current_buffer_size,
chunk_buffer_size,
);
.max(self.max_payload_size_in_buffer + 9);
let mut chunk = SortedChunk::new(chunk_file.clone(), chunk_buffer_size);
chunk.write(&mut self.records)?;
self.chunks.push(chunk);
@@ -258,7 +254,7 @@ impl Sorter {
struct SortedChunk {
/// The chunk file.
file: Arc<dyn File>,
/// The chunk size.
/// The size of this chunk file in bytes.
chunk_size: usize,
/// The read buffer.
buffer: Rc<RefCell<Vec<u8>>>,
@@ -273,10 +269,10 @@ struct SortedChunk {
}
impl SortedChunk {
fn new(file: Arc<dyn File>, chunk_size: usize, buffer_size: usize) -> Self {
fn new(file: Arc<dyn File>, buffer_size: usize) -> Self {
Self {
file,
chunk_size,
chunk_size: 0,
buffer: Rc::new(RefCell::new(vec![0; buffer_size])),
buffer_len: Rc::new(Cell::new(0)),
records: Vec::new(),
@@ -300,26 +296,27 @@ impl SortedChunk {
let buffer = buffer_ref.as_mut_slice();
let mut buffer_offset = 0;
while buffer_offset < buffer_len {
// Decode records from the buffer until we run out of the buffer or we hit an incomplete record.
let record_size = match read_record_size(&buffer[buffer_offset..buffer_len]) {
Ok(record_size) => record_size,
Err(LimboError::Corrupt(_))
if self.io_state.get() != SortedChunkIOState::ReadEOF =>
{
// Failed to decode a partial record.
break;
}
Err(e) => {
return Err(e);
}
};
if record_size > buffer_len - buffer_offset {
// Extract records from the buffer until we run out of the buffer or we hit an incomplete record.
let (record_size, bytes_read) =
match read_varint(&buffer[buffer_offset..buffer_len]) {
Ok((record_size, bytes_read)) => (record_size as usize, bytes_read),
Err(LimboError::Corrupt(_))
if self.io_state.get() != SortedChunkIOState::ReadEOF =>
{
// Failed to decode a partial varint.
break;
}
Err(e) => {
return Err(e);
}
};
if record_size > buffer_len - (buffer_offset + bytes_read) {
if self.io_state.get() == SortedChunkIOState::ReadEOF {
crate::bail_corrupt_error!("Incomplete record");
}
break;
}
buffer_offset += bytes_read;
let mut record = ImmutableRecord::new(record_size);
record.start_serialization(&buffer[buffer_offset..buffer_offset + record_size]);
@@ -350,6 +347,10 @@ impl SortedChunk {
if self.io_state.get() == SortedChunkIOState::ReadEOF {
return Ok(());
}
if self.chunk_size - self.total_bytes_read.get() == 0 {
self.io_state.set(SortedChunkIOState::ReadEOF);
return Ok(());
}
self.io_state.set(SortedChunkIOState::WaitingForRead);
let read_buffer_size = self.buffer.borrow().len() - self.buffer_len.get();
@@ -397,14 +398,28 @@ impl SortedChunk {
fn write(&mut self, records: &mut Vec<SortableImmutableRecord>) -> Result<()> {
assert!(self.io_state.get() == SortedChunkIOState::None);
self.io_state.set(SortedChunkIOState::WaitingForWrite);
self.chunk_size = 0;
// Pre-compute varint lengths for record sizes to determine the total buffer size.
let mut record_size_lengths = Vec::with_capacity(records.len());
for record in records.iter() {
let record_size = record.record.get_payload().len();
let size_len = varint_len(record_size as u64);
record_size_lengths.push(size_len);
self.chunk_size += size_len + record_size;
}
let drop_fn = Rc::new(|_buffer: BufferData| {});
let mut buffer = Buffer::allocate(self.chunk_size, drop_fn);
let mut buf_pos = 0;
let buf = buffer.as_mut_slice();
for record in records.drain(..) {
for (record, size_len) in records.drain(..).zip(record_size_lengths) {
let payload = record.record.get_payload();
// Write the record size varint.
write_varint(&mut buf[buf_pos..buf_pos + size_len], payload.len() as u64);
buf_pos += size_len;
// Write the record payload.
buf[buf_pos..buf_pos + payload.len()].copy_from_slice(payload);
buf_pos += payload.len();
}