wait for flush to complete

This commit is contained in:
pedrocarlo
2025-08-09 13:03:17 -03:00
parent 5924274d61
commit 4a3408003a

View File

@@ -71,6 +71,7 @@ pub struct Sorter {
sort_state: SortState,
/// State machine for [Sorter::insert]
insert_state: InsertState,
/// State machine for [Sorter::init_chunk_heap]
init_chunk_heap_state: InitChunkHeapState,
}
@@ -140,6 +141,11 @@ impl Sorter {
}
}
SortState::InitHeap => {
if self.chunks.iter().any(|chunk| {
matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite)
}) {
return Ok(IOResult::IO);
}
return_if_io!(self.init_chunk_heap());
self.sort_state = SortState::Next;
}
@@ -193,6 +199,11 @@ impl Sorter {
}
}
InsertState::Insert => {
if self.chunks.iter().any(|chunk| {
matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite)
}) {
return Ok(IOResult::IO);
}
self.records.push(SortableImmutableRecord::new(
record.clone(),
self.key_len,
@@ -224,7 +235,7 @@ impl Sorter {
if self
.chunks
.iter()
.any(|chunk| chunk.io_state.get() == SortedChunkIOState::WaitingForRead)
.any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead))
{
return Ok(IOResult::IO);
}
@@ -245,7 +256,7 @@ impl Sorter {
if self
.chunks
.iter()
.any(|chunk| chunk.io_state.get() == SortedChunkIOState::WaitingForRead)
.any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead))
{
return Ok(IOResult::IO);
}
@@ -478,6 +489,7 @@ impl SortedChunk {
fn write(&mut self, records: &mut Vec<SortableImmutableRecord>) -> Result<Completion> {
assert!(self.io_state.get() == SortedChunkIOState::None);
self.io_state.set(SortedChunkIOState::WaitingForWrite);
self.chunk_size = 0;
// Pre-compute varint lengths for record sizes to determine the total buffer size.
@@ -506,7 +518,9 @@ impl SortedChunk {
let buffer_ref = Arc::new(buffer);
let buffer_ref_copy = buffer_ref.clone();
let chunk_io_state_copy = self.io_state.clone();
let write_complete = Box::new(move |bytes_written: i32| {
chunk_io_state_copy.set(SortedChunkIOState::WriteComplete);
let buf_len = buffer_ref_copy.len();
if bytes_written < buf_len as i32 {
tracing::error!("wrote({bytes_written}) less than expected({buf_len})");
@@ -638,6 +652,8 @@ impl Eq for SortableImmutableRecord {}
enum SortedChunkIOState {
WaitingForRead,
ReadComplete,
WaitingForWrite,
WriteComplete,
ReadEOF,
None,
}