only open 1 file for sorter so chunks just reuse that file

This commit is contained in:
pedrocarlo
2025-08-09 14:04:28 -03:00
parent 4a3408003a
commit 76d6c4a28d

View File

@@ -39,6 +39,20 @@ enum InitChunkHeapState {
PushChunk,
}
struct TempFile {
// When temp_dir is dropped the folder is deleted
_temp_dir: tempfile::TempDir,
file: Arc<dyn File>,
}
impl core::ops::Deref for TempFile {
type Target = Arc<dyn File>;
fn deref(&self) -> &Self::Target {
&self.file
}
}
pub struct Sorter {
/// The records in the in-memory buffer.
records: Vec<SortableImmutableRecord>,
@@ -65,8 +79,10 @@ pub struct Sorter {
io: Arc<dyn IO>,
/// The indices of the chunks for which the read is not complete.
wait_for_read_complete: Vec<usize>,
/// The temporary directory for chunk files.
temp_dir: Option<tempfile::TempDir>,
/// The temporary file for chunks.
temp_file: Option<TempFile>,
/// Offset where the next chunk will be placed in the `temp_fil`
next_chunk_offset: usize,
/// State machine for [Sorter::sort]
sort_state: SortState,
/// State machine for [Sorter::insert]
@@ -106,7 +122,8 @@ impl Sorter {
max_payload_size_in_buffer: 0,
io,
wait_for_read_complete: Vec::new(),
temp_dir: None,
temp_file: None,
next_chunk_offset: 0,
sort_state: SortState::Start,
insert_state: InsertState::Start,
init_chunk_heap_state: InitChunkHeapState::Start,
@@ -300,30 +317,47 @@ impl Sorter {
self.records.sort();
if self.temp_dir.is_none() {
self.temp_dir = Some(tempfile::tempdir().map_err(LimboError::IOError)?);
}
let chunk_file_path = self
.temp_dir
.as_ref()
.unwrap()
.path()
.join(format!("chunk_{}", self.chunks.len()));
let chunk_file =
self.io
.open_file(chunk_file_path.to_str().unwrap(), OpenFlags::Create, false)?;
let chunk_file = match &self.temp_file {
Some(temp_file) => temp_file.file.clone(),
None => {
let temp_dir = tempfile::tempdir().map_err(LimboError::IOError)?;
let chunk_file_path = temp_dir.as_ref().join("chunk_file");
let chunk_file = self.io.open_file(
chunk_file_path.to_str().unwrap(),
OpenFlags::Create,
false,
)?;
self.temp_file = Some(TempFile {
_temp_dir: temp_dir,
file: chunk_file.clone(),
});
chunk_file
}
};
// Make sure the chunk buffer size can fit the largest record and its size varint.
let chunk_buffer_size = self
.min_chunk_read_buffer_size
.max(self.max_payload_size_in_buffer + 9);
let mut chunk = SortedChunk::new(chunk_file.clone(), chunk_buffer_size);
let c = chunk.write(&mut self.records)?;
let mut chunk_size = 0;
// Pre-compute varint lengths for record sizes to determine the total buffer size.
let mut record_size_lengths = Vec::with_capacity(self.records.len());
for record in self.records.iter() {
let record_size = record.record.get_payload().len();
let size_len = varint_len(record_size as u64);
record_size_lengths.push(size_len);
chunk_size += size_len + record_size;
}
let mut chunk = SortedChunk::new(chunk_file, self.next_chunk_offset, chunk_buffer_size);
let c = chunk.write(&mut self.records, record_size_lengths, chunk_size)?;
self.chunks.push(chunk);
self.current_buffer_size = 0;
self.max_payload_size_in_buffer = 0;
// increase offset start for next chunk
self.next_chunk_offset += chunk_size;
Ok(Some(c))
}
@@ -338,6 +372,8 @@ enum NextState {
struct SortedChunk {
/// The chunk file.
file: Arc<dyn File>,
/// Offset of the start of chunk in file
start_offset: usize,
/// The size of this chunk file in bytes.
chunk_size: usize,
/// The read buffer.
@@ -355,9 +391,10 @@ struct SortedChunk {
}
impl SortedChunk {
fn new(file: Arc<dyn File>, buffer_size: usize) -> Self {
fn new(file: Arc<dyn File>, start_offset: usize, buffer_size: usize) -> Self {
Self {
file,
start_offset,
chunk_size: 0,
buffer: Rc::new(RefCell::new(vec![0; buffer_size])),
buffer_len: Rc::new(Cell::new(0)),
@@ -483,23 +520,21 @@ impl SortedChunk {
});
let c = Completion::new_read(read_buffer_ref, read_complete);
let c = self.file.pread(self.total_bytes_read.get(), c)?;
let c = self
.file
.pread(self.start_offset + self.total_bytes_read.get(), c)?;
Ok(c)
}
fn write(&mut self, records: &mut Vec<SortableImmutableRecord>) -> Result<Completion> {
fn write(
&mut self,
records: &mut Vec<SortableImmutableRecord>,
record_size_lengths: Vec<usize>,
chunk_size: usize,
) -> Result<Completion> {
assert!(self.io_state.get() == SortedChunkIOState::None);
self.io_state.set(SortedChunkIOState::WaitingForWrite);
self.chunk_size = 0;
// Pre-compute varint lengths for record sizes to determine the total buffer size.
let mut record_size_lengths = Vec::with_capacity(records.len());
for record in records.iter() {
let record_size = record.record.get_payload().len();
let size_len = varint_len(record_size as u64);
record_size_lengths.push(size_len);
self.chunk_size += size_len + record_size;
}
self.chunk_size = chunk_size;
let buffer = Buffer::new_temporary(self.chunk_size);
@@ -528,7 +563,7 @@ impl SortedChunk {
});
let c = Completion::new_write(write_complete);
let c = self.file.pwrite(0, buffer_ref, c)?;
let c = self.file.pwrite(self.start_offset, buffer_ref, c)?;
Ok(c)
}
}