diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 366d59e1e..47f480890 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -7,7 +7,6 @@ use std::sync::{atomic, Arc, RwLock}; use tempfile; use crate::types::IOCompletions; -use crate::util::IOExt; use crate::{ error::LimboError, io::{Buffer, Completion, File, OpenFlags, IO}, @@ -88,6 +87,7 @@ pub struct Sorter { /// State machine for [Sorter::init_chunk_heap] init_chunk_heap_state: InitChunkHeapState, seq_count: i64, + pending_completions: Vec, } impl Sorter { @@ -126,6 +126,7 @@ impl Sorter { insert_state: InsertState::Start, init_chunk_heap_state: InitChunkHeapState::Start, seq_count: 0, + pending_completions: Vec::new(), } } @@ -284,43 +285,58 @@ impl Sorter { ); self.chunk_heap.reserve(self.chunks.len()); // TODO: blocking will be unnecessary here with IO completions - let io = self.io.clone(); + let mut completions = vec![]; for chunk_idx in 0..self.chunks.len() { - io.block(|| self.push_to_chunk_heap(chunk_idx))?; + match self.push_to_chunk_heap(chunk_idx)? { + Some(c) => completions.push(c), + None => (), + }; } self.init_chunk_heap_state = InitChunkHeapState::Start; + if !completions.is_empty() { + io_yield_many!(completions); + } Ok(IOResult::Done(())) } } } fn next_from_chunk_heap(&mut self) -> Result>> { + if !self.pending_completions.is_empty() { + return Ok(IOResult::IO(IOCompletions::Many( + self.pending_completions.drain(..).collect(), + ))); + } // Make sure all chunks read at least one record into their buffer. if let Some((next_record, next_chunk_idx)) = self.chunk_heap.pop() { // TODO: blocking will be unnecessary here with IO completions - let io = self.io.clone(); - io.block(|| self.push_to_chunk_heap(next_chunk_idx))?; + if let Some(c) = self.push_to_chunk_heap(next_chunk_idx)? { + self.pending_completions.push(c); + } Ok(IOResult::Done(Some(next_record.0))) } else { Ok(IOResult::Done(None)) } } - fn push_to_chunk_heap(&mut self, chunk_idx: usize) -> Result> { + fn push_to_chunk_heap(&mut self, chunk_idx: usize) -> Result> { let chunk = &mut self.chunks[chunk_idx]; - if let Some(record) = return_if_io!(chunk.next()) { - self.chunk_heap.push(( - Reverse(SortableImmutableRecord::new( - record, - self.key_len, - self.index_key_info.clone(), - )?), - chunk_idx, - )); + match chunk.next()? { + ChunkNextResult::Done(Some(record)) => { + self.chunk_heap.push(( + Reverse(SortableImmutableRecord::new( + record, + self.key_len, + self.index_key_info.clone(), + )?), + chunk_idx, + )); + Ok(None) + } + ChunkNextResult::Done(None) => Ok(None), + ChunkNextResult::IO(io) => Ok(Some(io)), } - - Ok(IOResult::Done(())) } fn flush(&mut self) -> Result> { @@ -404,6 +420,11 @@ struct SortedChunk { next_state: NextState, } +enum ChunkNextResult { + Done(Option), + IO(Completion), +} + impl SortedChunk { fn new(file: Arc, start_offset: usize, buffer_size: usize) -> Self { Self { @@ -427,13 +448,13 @@ impl SortedChunk { self.buffer_len.store(len, atomic::Ordering::SeqCst); } - fn next(&mut self) -> Result>> { + fn next(&mut self) -> Result { loop { match self.next_state { NextState::Start => { let mut buffer_len = self.buffer_len(); if self.records.is_empty() && buffer_len == 0 { - return Ok(IOResult::Done(None)); + return Ok(ChunkNextResult::Done(None)); } if self.records.is_empty() { @@ -497,13 +518,15 @@ impl SortedChunk { *self.io_state.write().unwrap() = SortedChunkIOState::ReadEOF; } else { let c = self.read()?; - io_yield_one!(c); + if !c.succeeded() { + return Ok(ChunkNextResult::IO(c)); + } } } } NextState::Finish => { self.next_state = NextState::Start; - return Ok(IOResult::Done(self.records.pop())); + return Ok(ChunkNextResult::Done(self.records.pop())); } } }