diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index ca5e25896..25dffd400 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use tempfile; use crate::return_if_io; +use crate::types::IOCompletions; use crate::util::IOExt; use crate::{ error::LimboError, @@ -150,16 +151,17 @@ impl Sorter { } SortState::Flush => { self.sort_state = SortState::InitHeap; - if let Some(_c) = self.flush()? { - return Ok(IOResult::IO); + if let Some(c) = self.flush()? { + return Ok(IOResult::IO(IOCompletions::Single(c))); } } SortState::InitHeap => { - if self.chunks.iter().any(|chunk| { - matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite) - }) { - return Ok(IOResult::IO); - } + turso_assert!( + !self.chunks.iter().any(|chunk| { + matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite) + }), + "chunks should been written" + ); return_if_io!(self.init_chunk_heap()); self.sort_state = SortState::Next; } @@ -204,17 +206,18 @@ impl Sorter { InsertState::Start => { self.insert_state = InsertState::Insert; if self.current_buffer_size + payload_size > self.max_buffer_size { - if let Some(_c) = self.flush()? { - return Ok(IOResult::IO); + if let Some(c) = self.flush()? { + return Ok(IOResult::IO(IOCompletions::Single(c))); } } } InsertState::Insert => { - if self.chunks.iter().any(|chunk| { - matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite) - }) { - return Ok(IOResult::IO); - } + turso_assert!( + !self.chunks.iter().any(|chunk| { + matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite) + }), + "chunks should have written" + ); self.records.push(SortableImmutableRecord::new( record.clone(), self.key_len, @@ -239,17 +242,17 @@ impl Sorter { completions.push(c); } self.init_chunk_heap_state = InitChunkHeapState::PushChunk; - Ok(IOResult::IO) + Ok(IOResult::IO(IOCompletions::Many(completions))) } InitChunkHeapState::PushChunk => { // Make sure all chunks read at least one record into their buffer. - if self - .chunks - .iter() - .any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead)) - { - return Ok(IOResult::IO); - } + turso_assert!( + !self.chunks.iter().any(|chunk| matches!( + chunk.io_state.get(), + SortedChunkIOState::WaitingForRead + )), + "chunks should have been read" + ); self.chunk_heap.reserve(self.chunks.len()); // TODO: blocking will be unnecessary here with IO completions let io = self.io.clone(); @@ -264,13 +267,13 @@ impl Sorter { fn next_from_chunk_heap(&mut self) -> Result>> { // Make sure all chunks read at least one record into their buffer. - if self - .chunks - .iter() - .any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead)) - { - return Ok(IOResult::IO); - } + turso_assert!( + !self + .chunks + .iter() + .any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead)), + "chunks should have been read" + ); if let Some((next_record, next_chunk_idx)) = self.chunk_heap.pop() { // TODO: blocking will be unnecessary here with IO completions @@ -460,8 +463,8 @@ impl SortedChunk { if self.chunk_size - self.total_bytes_read.get() == 0 { self.io_state.set(SortedChunkIOState::ReadEOF); } else { - let _c = self.read()?; - return Ok(IOResult::IO); + let c = self.read()?; + return Ok(IOResult::IO(IOCompletions::Single(c))); } } }