bubble completions in Sorter

This commit is contained in:
pedrocarlo
2025-08-12 15:21:27 -03:00
committed by Jussi Saurio
parent 0b6ab9d969
commit c381fe3844

View File

@@ -8,6 +8,7 @@ use std::sync::Arc;
use tempfile;
use crate::return_if_io;
use crate::types::IOCompletions;
use crate::util::IOExt;
use crate::{
error::LimboError,
@@ -150,16 +151,17 @@ impl Sorter {
}
SortState::Flush => {
self.sort_state = SortState::InitHeap;
if let Some(_c) = self.flush()? {
return Ok(IOResult::IO);
if let Some(c) = self.flush()? {
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
}
SortState::InitHeap => {
if self.chunks.iter().any(|chunk| {
matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite)
}) {
return Ok(IOResult::IO);
}
turso_assert!(
!self.chunks.iter().any(|chunk| {
matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite)
}),
"chunks should been written"
);
return_if_io!(self.init_chunk_heap());
self.sort_state = SortState::Next;
}
@@ -204,17 +206,18 @@ impl Sorter {
InsertState::Start => {
self.insert_state = InsertState::Insert;
if self.current_buffer_size + payload_size > self.max_buffer_size {
if let Some(_c) = self.flush()? {
return Ok(IOResult::IO);
if let Some(c) = self.flush()? {
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
}
}
InsertState::Insert => {
if self.chunks.iter().any(|chunk| {
matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite)
}) {
return Ok(IOResult::IO);
}
turso_assert!(
!self.chunks.iter().any(|chunk| {
matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite)
}),
"chunks should have written"
);
self.records.push(SortableImmutableRecord::new(
record.clone(),
self.key_len,
@@ -239,17 +242,17 @@ impl Sorter {
completions.push(c);
}
self.init_chunk_heap_state = InitChunkHeapState::PushChunk;
Ok(IOResult::IO)
Ok(IOResult::IO(IOCompletions::Many(completions)))
}
InitChunkHeapState::PushChunk => {
// Make sure all chunks read at least one record into their buffer.
if self
.chunks
.iter()
.any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead))
{
return Ok(IOResult::IO);
}
turso_assert!(
!self.chunks.iter().any(|chunk| matches!(
chunk.io_state.get(),
SortedChunkIOState::WaitingForRead
)),
"chunks should have been read"
);
self.chunk_heap.reserve(self.chunks.len());
// TODO: blocking will be unnecessary here with IO completions
let io = self.io.clone();
@@ -264,13 +267,13 @@ impl Sorter {
fn next_from_chunk_heap(&mut self) -> Result<IOResult<Option<SortableImmutableRecord>>> {
// Make sure all chunks read at least one record into their buffer.
if self
.chunks
.iter()
.any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead))
{
return Ok(IOResult::IO);
}
turso_assert!(
!self
.chunks
.iter()
.any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead)),
"chunks should have been read"
);
if let Some((next_record, next_chunk_idx)) = self.chunk_heap.pop() {
// TODO: blocking will be unnecessary here with IO completions
@@ -460,8 +463,8 @@ impl SortedChunk {
if self.chunk_size - self.total_bytes_read.get() == 0 {
self.io_state.set(SortedChunkIOState::ReadEOF);
} else {
let _c = self.read()?;
return Ok(IOResult::IO);
let c = self.read()?;
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
}
}