avoid expensive Arc cloning

This commit is contained in:
Jussi Saurio
2025-10-09 17:43:28 +03:00
parent 1f310a4738
commit 1c35d5b342

View File

@@ -7,7 +7,6 @@ use std::sync::{atomic, Arc, RwLock};
use tempfile;
use crate::types::IOCompletions;
use crate::util::IOExt;
use crate::{
error::LimboError,
io::{Buffer, Completion, File, OpenFlags, IO},
@@ -88,6 +87,7 @@ pub struct Sorter {
/// State machine for [Sorter::init_chunk_heap]
init_chunk_heap_state: InitChunkHeapState,
seq_count: i64,
pending_completions: Vec<Completion>,
}
impl Sorter {
@@ -126,6 +126,7 @@ impl Sorter {
insert_state: InsertState::Start,
init_chunk_heap_state: InitChunkHeapState::Start,
seq_count: 0,
pending_completions: Vec::new(),
}
}
@@ -284,43 +285,58 @@ impl Sorter {
);
self.chunk_heap.reserve(self.chunks.len());
// TODO: blocking will be unnecessary here with IO completions
let io = self.io.clone();
let mut completions = vec![];
for chunk_idx in 0..self.chunks.len() {
io.block(|| self.push_to_chunk_heap(chunk_idx))?;
match self.push_to_chunk_heap(chunk_idx)? {
Some(c) => completions.push(c),
None => (),
};
}
self.init_chunk_heap_state = InitChunkHeapState::Start;
if !completions.is_empty() {
io_yield_many!(completions);
}
Ok(IOResult::Done(()))
}
}
}
fn next_from_chunk_heap(&mut self) -> Result<IOResult<Option<SortableImmutableRecord>>> {
if !self.pending_completions.is_empty() {
return Ok(IOResult::IO(IOCompletions::Many(
self.pending_completions.drain(..).collect(),
)));
}
// Make sure all chunks read at least one record into their buffer.
if let Some((next_record, next_chunk_idx)) = self.chunk_heap.pop() {
// TODO: blocking will be unnecessary here with IO completions
let io = self.io.clone();
io.block(|| self.push_to_chunk_heap(next_chunk_idx))?;
if let Some(c) = self.push_to_chunk_heap(next_chunk_idx)? {
self.pending_completions.push(c);
}
Ok(IOResult::Done(Some(next_record.0)))
} else {
Ok(IOResult::Done(None))
}
}
fn push_to_chunk_heap(&mut self, chunk_idx: usize) -> Result<IOResult<()>> {
fn push_to_chunk_heap(&mut self, chunk_idx: usize) -> Result<Option<Completion>> {
let chunk = &mut self.chunks[chunk_idx];
if let Some(record) = return_if_io!(chunk.next()) {
self.chunk_heap.push((
Reverse(SortableImmutableRecord::new(
record,
self.key_len,
self.index_key_info.clone(),
)?),
chunk_idx,
));
match chunk.next()? {
ChunkNextResult::Done(Some(record)) => {
self.chunk_heap.push((
Reverse(SortableImmutableRecord::new(
record,
self.key_len,
self.index_key_info.clone(),
)?),
chunk_idx,
));
Ok(None)
}
ChunkNextResult::Done(None) => Ok(None),
ChunkNextResult::IO(io) => Ok(Some(io)),
}
Ok(IOResult::Done(()))
}
fn flush(&mut self) -> Result<Option<Completion>> {
@@ -404,6 +420,11 @@ struct SortedChunk {
next_state: NextState,
}
enum ChunkNextResult {
Done(Option<ImmutableRecord>),
IO(Completion),
}
impl SortedChunk {
fn new(file: Arc<dyn File>, start_offset: usize, buffer_size: usize) -> Self {
Self {
@@ -427,13 +448,13 @@ impl SortedChunk {
self.buffer_len.store(len, atomic::Ordering::SeqCst);
}
fn next(&mut self) -> Result<IOResult<Option<ImmutableRecord>>> {
fn next(&mut self) -> Result<ChunkNextResult> {
loop {
match self.next_state {
NextState::Start => {
let mut buffer_len = self.buffer_len();
if self.records.is_empty() && buffer_len == 0 {
return Ok(IOResult::Done(None));
return Ok(ChunkNextResult::Done(None));
}
if self.records.is_empty() {
@@ -497,13 +518,15 @@ impl SortedChunk {
*self.io_state.write().unwrap() = SortedChunkIOState::ReadEOF;
} else {
let c = self.read()?;
io_yield_one!(c);
if !c.succeeded() {
return Ok(ChunkNextResult::IO(c));
}
}
}
}
NextState::Finish => {
self.next_state = NextState::Start;
return Ok(IOResult::Done(self.records.pop()));
return Ok(ChunkNextResult::Done(self.records.pop()));
}
}
}