From 76d6c4a28d750a3a5e0ed86d2eff6a10ba97c3ca Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Sat, 9 Aug 2025 14:04:28 -0300 Subject: [PATCH] only open 1 file for sorter so chunks just reuse that file --- core/vdbe/sorter.rs | 99 ++++++++++++++++++++++++++++++--------------- 1 file changed, 67 insertions(+), 32 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 075238343..ee28162bc 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -39,6 +39,20 @@ enum InitChunkHeapState { PushChunk, } +struct TempFile { + // When temp_dir is dropped the folder is deleted + _temp_dir: tempfile::TempDir, + file: Arc, +} + +impl core::ops::Deref for TempFile { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.file + } +} + pub struct Sorter { /// The records in the in-memory buffer. records: Vec, @@ -65,8 +79,10 @@ pub struct Sorter { io: Arc, /// The indices of the chunks for which the read is not complete. wait_for_read_complete: Vec, - /// The temporary directory for chunk files. - temp_dir: Option, + /// The temporary file for chunks. + temp_file: Option, + /// Offset where the next chunk will be placed in the `temp_fil` + next_chunk_offset: usize, /// State machine for [Sorter::sort] sort_state: SortState, /// State machine for [Sorter::insert] @@ -106,7 +122,8 @@ impl Sorter { max_payload_size_in_buffer: 0, io, wait_for_read_complete: Vec::new(), - temp_dir: None, + temp_file: None, + next_chunk_offset: 0, sort_state: SortState::Start, insert_state: InsertState::Start, init_chunk_heap_state: InitChunkHeapState::Start, @@ -300,30 +317,47 @@ impl Sorter { self.records.sort(); - if self.temp_dir.is_none() { - self.temp_dir = Some(tempfile::tempdir().map_err(LimboError::IOError)?); - } - - let chunk_file_path = self - .temp_dir - .as_ref() - .unwrap() - .path() - .join(format!("chunk_{}", self.chunks.len())); - let chunk_file = - self.io - .open_file(chunk_file_path.to_str().unwrap(), OpenFlags::Create, false)?; + let chunk_file = match &self.temp_file { + Some(temp_file) => temp_file.file.clone(), + None => { + let temp_dir = tempfile::tempdir().map_err(LimboError::IOError)?; + let chunk_file_path = temp_dir.as_ref().join("chunk_file"); + let chunk_file = self.io.open_file( + chunk_file_path.to_str().unwrap(), + OpenFlags::Create, + false, + )?; + self.temp_file = Some(TempFile { + _temp_dir: temp_dir, + file: chunk_file.clone(), + }); + chunk_file + } + }; // Make sure the chunk buffer size can fit the largest record and its size varint. let chunk_buffer_size = self .min_chunk_read_buffer_size .max(self.max_payload_size_in_buffer + 9); - let mut chunk = SortedChunk::new(chunk_file.clone(), chunk_buffer_size); - let c = chunk.write(&mut self.records)?; + + let mut chunk_size = 0; + // Pre-compute varint lengths for record sizes to determine the total buffer size. + let mut record_size_lengths = Vec::with_capacity(self.records.len()); + for record in self.records.iter() { + let record_size = record.record.get_payload().len(); + let size_len = varint_len(record_size as u64); + record_size_lengths.push(size_len); + chunk_size += size_len + record_size; + } + + let mut chunk = SortedChunk::new(chunk_file, self.next_chunk_offset, chunk_buffer_size); + let c = chunk.write(&mut self.records, record_size_lengths, chunk_size)?; self.chunks.push(chunk); self.current_buffer_size = 0; self.max_payload_size_in_buffer = 0; + // increase offset start for next chunk + self.next_chunk_offset += chunk_size; Ok(Some(c)) } @@ -338,6 +372,8 @@ enum NextState { struct SortedChunk { /// The chunk file. file: Arc, + /// Offset of the start of chunk in file + start_offset: usize, /// The size of this chunk file in bytes. chunk_size: usize, /// The read buffer. @@ -355,9 +391,10 @@ struct SortedChunk { } impl SortedChunk { - fn new(file: Arc, buffer_size: usize) -> Self { + fn new(file: Arc, start_offset: usize, buffer_size: usize) -> Self { Self { file, + start_offset, chunk_size: 0, buffer: Rc::new(RefCell::new(vec![0; buffer_size])), buffer_len: Rc::new(Cell::new(0)), @@ -483,23 +520,21 @@ impl SortedChunk { }); let c = Completion::new_read(read_buffer_ref, read_complete); - let c = self.file.pread(self.total_bytes_read.get(), c)?; + let c = self + .file + .pread(self.start_offset + self.total_bytes_read.get(), c)?; Ok(c) } - fn write(&mut self, records: &mut Vec) -> Result { + fn write( + &mut self, + records: &mut Vec, + record_size_lengths: Vec, + chunk_size: usize, + ) -> Result { assert!(self.io_state.get() == SortedChunkIOState::None); self.io_state.set(SortedChunkIOState::WaitingForWrite); - self.chunk_size = 0; - - // Pre-compute varint lengths for record sizes to determine the total buffer size. - let mut record_size_lengths = Vec::with_capacity(records.len()); - for record in records.iter() { - let record_size = record.record.get_payload().len(); - let size_len = varint_len(record_size as u64); - record_size_lengths.push(size_len); - self.chunk_size += size_len + record_size; - } + self.chunk_size = chunk_size; let buffer = Buffer::new_temporary(self.chunk_size); @@ -528,7 +563,7 @@ impl SortedChunk { }); let c = Completion::new_write(write_complete); - let c = self.file.pwrite(0, buffer_ref, c)?; + let c = self.file.pwrite(self.start_offset, buffer_ref, c)?; Ok(c) } }