From 4a3408003a611c2aeeeb7d5274e67b1c19e3170f Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Sat, 9 Aug 2025 13:03:17 -0300 Subject: [PATCH] wait for flush to complete --- core/vdbe/sorter.rs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 03e1cda8c..075238343 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -71,6 +71,7 @@ pub struct Sorter { sort_state: SortState, /// State machine for [Sorter::insert] insert_state: InsertState, + /// State machine for [Sorter::init_chunk_heap] init_chunk_heap_state: InitChunkHeapState, } @@ -140,6 +141,11 @@ impl Sorter { } } SortState::InitHeap => { + if self.chunks.iter().any(|chunk| { + matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite) + }) { + return Ok(IOResult::IO); + } return_if_io!(self.init_chunk_heap()); self.sort_state = SortState::Next; } @@ -193,6 +199,11 @@ impl Sorter { } } InsertState::Insert => { + if self.chunks.iter().any(|chunk| { + matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite) + }) { + return Ok(IOResult::IO); + } self.records.push(SortableImmutableRecord::new( record.clone(), self.key_len, @@ -224,7 +235,7 @@ impl Sorter { if self .chunks .iter() - .any(|chunk| chunk.io_state.get() == SortedChunkIOState::WaitingForRead) + .any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead)) { return Ok(IOResult::IO); } @@ -245,7 +256,7 @@ impl Sorter { if self .chunks .iter() - .any(|chunk| chunk.io_state.get() == SortedChunkIOState::WaitingForRead) + .any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead)) { return Ok(IOResult::IO); } @@ -478,6 +489,7 @@ impl SortedChunk { fn write(&mut self, records: &mut Vec) -> Result { assert!(self.io_state.get() == SortedChunkIOState::None); + self.io_state.set(SortedChunkIOState::WaitingForWrite); self.chunk_size = 0; // Pre-compute varint lengths for record sizes to determine the total buffer size. @@ -506,7 +518,9 @@ impl SortedChunk { let buffer_ref = Arc::new(buffer); let buffer_ref_copy = buffer_ref.clone(); + let chunk_io_state_copy = self.io_state.clone(); let write_complete = Box::new(move |bytes_written: i32| { + chunk_io_state_copy.set(SortedChunkIOState::WriteComplete); let buf_len = buffer_ref_copy.len(); if bytes_written < buf_len as i32 { tracing::error!("wrote({bytes_written}) less than expected({buf_len})"); @@ -638,6 +652,8 @@ impl Eq for SortableImmutableRecord {} enum SortedChunkIOState { WaitingForRead, ReadComplete, + WaitingForWrite, + WriteComplete, ReadEOF, None, }