diff --git a/core/incremental/view.rs b/core/incremental/view.rs index 0987178fa..e3572c189 100644 --- a/core/incremental/view.rs +++ b/core/incremental/view.rs @@ -6,7 +6,7 @@ use super::operator::{ use crate::schema::{BTreeTable, Column, Schema}; use crate::types::{IOCompletions, IOResult, Value}; use crate::util::{extract_column_name_from_expr, extract_view_columns}; -use crate::{Completion, LimboError, Result, Statement}; +use crate::{io_yield_one, Completion, LimboError, Result, Statement}; use fallible_iterator::FallibleIterator; use std::collections::BTreeMap; use std::fmt; @@ -562,9 +562,7 @@ impl IncrementalView { // Yield control after processing a batch // TODO: currently this inner statement is the one that is tracking completions // so as a stop gap we can just return a dummy completion here - return Ok(IOResult::IO( - IOCompletions::Single(Completion::new_dummy()), - )); + io_yield_one!(Completion::new_dummy()); } // This step() call resumes from where the statement left off @@ -611,9 +609,7 @@ impl IncrementalView { // Process current batch before yielding self.merge_delta(&batch_delta); // The Statement needs to wait for IO - return Ok(IOResult::IO(IOCompletions::Single( - Completion::new_dummy(), - ))); + io_yield_one!(Completion::new_dummy()); } } } diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 214dfde9a..57253366d 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -1,6 +1,7 @@ use tracing::{instrument, Level}; use crate::{ + io_yield_many, io_yield_one, schema::Index, storage::{ pager::{BtreePageAllocMode, Pager}, @@ -679,7 +680,7 @@ impl BTreeCursor { } let (page, c) = self.pager.read_page(self.root_page)?; *self.is_empty_table_state.borrow_mut() = EmptyTableState::ReadPage { page }; - Ok(IOResult::IO(IOCompletions::Single(c))) + io_yield_one!(c); } EmptyTableState::ReadPage { page } => { turso_assert!(page.is_loaded(), "page should be loaded"); @@ -713,7 +714,7 @@ impl BTreeCursor { self.stack.set_cell_index(past_rightmost_pointer); let (page, c) = self.read_page(rightmost_pointer as usize)?; self.stack.push_backwards(page); - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } } if cell_idx >= cell_count as i32 { @@ -777,7 +778,7 @@ impl BTreeCursor { let (mem_page, c) = self.read_page(left_child_page as usize)?; self.stack.push_backwards(mem_page); - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } } @@ -798,7 +799,7 @@ impl BTreeCursor { remaining_to_read: payload_size as usize - payload.len(), page, }); - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } let mut read_overflow_state = self.read_overflow_state.borrow_mut(); let ReadPayloadOverflow { @@ -831,7 +832,7 @@ impl BTreeCursor { })?; *page_btree = new_page; *next_page = next; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } turso_assert!( *remaining_to_read == 0 && next == 0, @@ -1005,7 +1006,7 @@ impl BTreeCursor { is_write, }); - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } Ok(IOResult::Done(())) } @@ -1065,7 +1066,7 @@ impl BTreeCursor { }, ); - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { mut remaining_to_read, @@ -1130,7 +1131,7 @@ impl BTreeCursor { is_write, }); // Return IO to allow other operations - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } _ => { return Err(LimboError::InternalError( @@ -1239,7 +1240,7 @@ impl BTreeCursor { self.stack.advance(); let (mem_page, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } _ => { if self.ancestor_pages_have_more_children() { @@ -1277,7 +1278,7 @@ impl BTreeCursor { let left_child_page = contents.cell_interior_read_left_child_page(cell_idx); let (mem_page, c) = self.read_page(left_child_page as usize)?; self.stack.push(mem_page); - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } } @@ -1330,7 +1331,7 @@ impl BTreeCursor { let rightmost_page_id = *rightmost_page_id; let c = self.move_to_root()?; self.move_to_right_state = (MoveToRightState::ProcessPage, rightmost_page_id); - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } MoveToRightState::ProcessPage => { let mem_page = self.stack.top(); @@ -1351,7 +1352,7 @@ impl BTreeCursor { self.stack.set_cell_index(contents.cell_count() as i32 + 1); let (mem_page, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } None => { @@ -1420,7 +1421,7 @@ impl BTreeCursor { self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), }; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } self.stack.set_cell_index(cell_count as i32 + 1); match contents.rightmost_pointer() { @@ -1430,7 +1431,7 @@ impl BTreeCursor { self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), }; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } None => { unreachable!("we shall not go back up! The only way is down the slope"); @@ -1558,7 +1559,7 @@ impl BTreeCursor { self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), }; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } None => { unreachable!("we shall not go back up! The only way is down the slope"); @@ -1596,7 +1597,7 @@ impl BTreeCursor { self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), }; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } let cur_cell_idx = (min + max) >> 1; // rustc generates extra insns for (min+max)/2 due to them being isize. we know min&max are >=0 here. @@ -2093,7 +2094,7 @@ impl BTreeCursor { self.move_to_state = MoveToState::MoveToPage; if matches!(self.seek_state, CursorSeekState::Start) { let c = self.move_to_root()?; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } } MoveToState::MoveToPage => { @@ -2623,7 +2624,7 @@ impl BTreeCursor { *sub_state = BalanceSubState::NonRootDoBalancing; if !completions.is_empty() { // TODO: when tracking IO return all the completions here - return Ok(IOResult::IO(IOCompletions::Many(completions))); + io_yield_many!(completions); } } BalanceSubState::NonRootDoBalancing => { @@ -4100,7 +4101,7 @@ impl BTreeCursor { SeekEndState::Start => { let c = self.move_to_root()?; self.seek_end_state = SeekEndState::ProcessPage; - Ok(IOResult::IO(IOCompletions::Single(c))) + io_yield_one!(c); } SeekEndState::ProcessPage => { let mem_page = self.stack.top(); @@ -4118,7 +4119,7 @@ impl BTreeCursor { self.stack.set_cell_index(contents.cell_count() as i32 + 1); // invalid on interior let (child, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(child); - Ok(IOResult::IO(IOCompletions::Single(c))) + io_yield_one!(c); } None => unreachable!("interior page must have rightmost pointer"), } @@ -4173,7 +4174,7 @@ impl BTreeCursor { mv_cursor.rewind(); } else { let c = self.move_to_root()?; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } } RewindState::NextRecord => { @@ -4914,7 +4915,7 @@ impl BTreeCursor { self.overflow_state = OverflowState::ProcessPage { next_page: page.get(), }; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } else { self.overflow_state = OverflowState::Done; } @@ -4946,7 +4947,7 @@ impl BTreeCursor { self.overflow_state = OverflowState::ProcessPage { next_page: page.get(), }; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } else { self.overflow_state = OverflowState::Done; } @@ -4981,7 +4982,7 @@ impl BTreeCursor { self.state = CursorState::Destroy(DestroyInfo { state: DestroyState::Start, }); - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } loop { @@ -5038,7 +5039,7 @@ impl BTreeCursor { "unable to get a mut reference to destroy state in cursor", ); destroy_info.state = DestroyState::LoadPage; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } else { let destroy_info = self.state.mut_destroy_info().expect( "unable to get a mut reference to destroy state in cursor", @@ -5096,7 +5097,7 @@ impl BTreeCursor { "unable to get a mut reference to destroy state in cursor", ); destroy_info.state = DestroyState::LoadPage; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } }, } @@ -5114,7 +5115,7 @@ impl BTreeCursor { .mut_destroy_info() .expect("unable to get a mut reference to destroy state in cursor"); destroy_info.state = DestroyState::LoadPage; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } // For any leaf cell, advance the index now that overflow pages have been cleared BTreeCell::TableLeafCell(_) | BTreeCell::IndexLeafCell(_) => { @@ -5299,7 +5300,7 @@ impl BTreeCursor { CountState::Start => { let c = self.move_to_root()?; self.count_state = CountState::Loop; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } CountState::Loop => { mem_page_rc = self.stack.top(); @@ -5325,7 +5326,7 @@ impl BTreeCursor { // All pages of the b-tree have been visited. Return successfully let c = self.move_to_root()?; self.count_state = CountState::Finish; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } // Move to parent @@ -5356,7 +5357,7 @@ impl BTreeCursor { self.stack.advance(); let (mem_page, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } else { // Move to child left page let cell = contents.cell_get(cell_idx, self.usable_space())?; @@ -5371,7 +5372,7 @@ impl BTreeCursor { self.stack.advance(); let (mem_page, c) = self.read_page(left_child_page as usize)?; self.stack.push(mem_page); - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } _ => unreachable!(), } @@ -5419,7 +5420,7 @@ impl BTreeCursor { self.valid_state = CursorValidState::RequireAdvance(IterationDirection::Forwards); self.context = Some(ctx); - return Ok(IOResult::IO(IOCompletions::Single(Completion::new_dummy()))); + io_yield_one!(Completion::new_dummy()); } self.valid_state = CursorValidState::Valid; Ok(IOResult::Done(())) @@ -5565,7 +5566,7 @@ pub fn integrity_check( None => { let (page, c) = btree_read_page(pager, page_idx)?; state.page = Some(page.get()); - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } }; turso_assert!(page.is_loaded(), "page should be loaded"); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 2e6c2bf8a..e256d41de 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -10,6 +10,7 @@ use crate::storage::{ }; use crate::types::IOCompletions; use crate::util::IOExt as _; +use crate::{io_yield_many, io_yield_one}; use crate::{ return_if_io, turso_assert, types::WalFrameInfo, Completion, Connection, IOResult, LimboError, Result, TransactionState, @@ -49,7 +50,7 @@ impl HeaderRef { let (page, c) = pager.read_page(DatabaseHeader::PAGE_ID)?; *pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page }; - Ok(IOResult::IO(IOCompletions::Single(c))) + io_yield_one!(c); } HeaderRefState::CreateHeader { page } => { turso_assert!(page.is_loaded(), "page should be loaded"); @@ -85,7 +86,7 @@ impl HeaderRefMut { let (page, c) = pager.read_page(DatabaseHeader::PAGE_ID)?; *pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page }; - Ok(IOResult::IO(IOCompletions::Single(c))) + io_yield_one!(c); } HeaderRefState::CreateHeader { page } => { turso_assert!(page.is_loaded(), "page should be loaded"); @@ -605,7 +606,7 @@ impl Pager { ptrmap_page, offset_in_ptrmap_page, }); - Ok(IOResult::IO(IOCompletions::Single(c))) + io_yield_one!(c); } PtrMapGetState::Deserialize { ptrmap_page, @@ -704,7 +705,7 @@ impl Pager { ptrmap_page, offset_in_ptrmap_page, }); - Ok(IOResult::IO(IOCompletions::Single(c))) + io_yield_one!(c); } PtrMapPutState::Deserialize { ptrmap_page, @@ -926,21 +927,20 @@ impl Pager { match (self.db_state.get(), self.allocating_page1()) { // In case of being empty or (allocating and this connection is performing allocation) then allocate the first page (DbState::Uninitialized, false) | (DbState::Initializing, true) => { - match self.allocate_page1()? { - IOResult::Done(_) => Ok(IOResult::Done(())), - IOResult::IO(io) => Ok(IOResult::IO(io)), + if let IOResult::IO(c) = self.allocate_page1()? { + return Ok(IOResult::IO(c)); + } else { + return Ok(IOResult::Done(())); } } // Give a chance for the allocation to happen elsewhere - _ => Ok(IOResult::IO(IOCompletions::Single(Completion::new_dummy()))), + _ => {} } - } else { - // Give a chance for the allocation to happen elsewhere - Ok(IOResult::IO(IOCompletions::Single(Completion::new_dummy()))) } - } else { - Ok(IOResult::Done(())) + // Give a chance for the allocation to happen elsewhere + io_yield_one!(Completion::new_dummy()); } + Ok(IOResult::Done(())) } #[inline(always)] @@ -1224,13 +1224,13 @@ impl Pager { return Ok(IOResult::Done(PagerCommitResult::WalWritten)); } else { self.commit_info.borrow_mut().state = CommitState::SyncWal; - return Ok(IOResult::IO(IOCompletions::Many(completions))); + io_yield_many!(completions); } } CommitState::SyncWal => { self.commit_info.borrow_mut().state = CommitState::AfterSyncWal; let c = wal.borrow_mut().sync()?; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } CommitState::AfterSyncWal => { if wal_auto_checkpoint_disabled || !wal.borrow().should_checkpoint() { @@ -1246,7 +1246,7 @@ impl Pager { CommitState::SyncDbFile => { let c = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?; self.commit_info.borrow_mut().state = CommitState::AfterSyncDbFile; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } CommitState::AfterSyncDbFile => { turso_assert!(!*self.syncing.borrow(), "should have finished syncing"); @@ -1338,7 +1338,7 @@ impl Pager { let c = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?; self.checkpoint_state .replace(CheckpointState::CheckpointDone { res }); - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } CheckpointState::CheckpointDone { res } => { turso_assert!(!*self.syncing.borrow(), "syncing should be done"); @@ -1505,7 +1505,7 @@ impl Pager { // Add as leaf to current trunk let (page, c) = self.read_page(trunk_page_id as usize)?; trunk_page.replace(page); - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } let trunk_page = trunk_page.as_ref().unwrap(); turso_assert!(trunk_page.is_loaded(), "trunk_page should be loaded"); @@ -1602,7 +1602,7 @@ impl Pager { self.allocate_page1_state .replace(AllocatePage1State::Writing { page: page1 }); - Ok(IOResult::IO(IOCompletions::Single(c))) + io_yield_one!(c); } AllocatePage1State::Writing { page } => { let page1_ref = page.get(); @@ -1697,7 +1697,7 @@ impl Pager { trunk_page, current_db_size: new_db_size, }; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } AllocatePageState::SearchAvailableFreeListLeaf { trunk_page, @@ -1733,7 +1733,7 @@ impl Pager { leaf_page, number_of_freelist_leaves, }; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } // No freelist leaves on this trunk page. diff --git a/core/storage/wal.rs b/core/storage/wal.rs index a8f72b963..624a1156e 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -19,7 +19,9 @@ use crate::storage::sqlite3_ondisk::{ write_pages_vectored, PageSize, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; use crate::types::{IOCompletions, IOResult}; -use crate::{bail_corrupt_error, turso_assert, Buffer, LimboError, Result}; +use crate::{ + bail_corrupt_error, io_yield_many, io_yield_one, turso_assert, Buffer, LimboError, Result, +}; use crate::{Completion, Page}; use self::sqlite3_ondisk::{checksum_wal, PageContent, WAL_MAGIC_BE, WAL_MAGIC_LE}; @@ -1423,7 +1425,7 @@ impl WalFile { self.buffer_pool.clone(), )?; self.ongoing_checkpoint.state = CheckpointState::AccumulatePage; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } } self.ongoing_checkpoint.current_page += 1; @@ -1461,7 +1463,7 @@ impl WalFile { )?; // batch is queued self.ongoing_checkpoint.state = CheckpointState::AfterFlush; - return Ok(IOResult::IO(IOCompletions::Many(completions))); + io_yield_many!(completions); } CheckpointState::AfterFlush => { turso_assert!( diff --git a/core/util.rs b/core/util.rs index 04f314bff..27ea6c994 100644 --- a/core/util.rs +++ b/core/util.rs @@ -18,6 +18,19 @@ use turso_sqlite3_parser::ast::{ }; use turso_sqlite3_parser::lexer::sql::Parser; +#[macro_export] +macro_rules! io_yield_one { + ($c:expr) => { + return Ok(IOResult::IO(IOCompletions::Single($c))); + }; +} +#[macro_export] +macro_rules! io_yield_many { + ($v:expr) => { + return Ok(IOResult::IO(IOCompletions::Many($v))); + }; +} + pub trait IOExt { fn block(&self, f: impl FnMut() -> Result>) -> Result; } diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 25dffd400..0c06336ae 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -7,7 +7,6 @@ use std::rc::Rc; use std::sync::Arc; use tempfile; -use crate::return_if_io; use crate::types::IOCompletions; use crate::util::IOExt; use crate::{ @@ -19,6 +18,7 @@ use crate::{ types::{IOResult, ImmutableRecord, KeyInfo, RecordCursor, RefValue}, Result, }; +use crate::{io_yield_many, io_yield_one, return_if_io}; #[derive(Debug, Clone, Copy)] enum SortState { @@ -152,7 +152,7 @@ impl Sorter { SortState::Flush => { self.sort_state = SortState::InitHeap; if let Some(c) = self.flush()? { - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } } SortState::InitHeap => { @@ -207,7 +207,7 @@ impl Sorter { self.insert_state = InsertState::Insert; if self.current_buffer_size + payload_size > self.max_buffer_size { if let Some(c) = self.flush()? { - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } } } @@ -242,7 +242,7 @@ impl Sorter { completions.push(c); } self.init_chunk_heap_state = InitChunkHeapState::PushChunk; - Ok(IOResult::IO(IOCompletions::Many(completions))) + io_yield_many!(completions); } InitChunkHeapState::PushChunk => { // Make sure all chunks read at least one record into their buffer. @@ -464,7 +464,7 @@ impl SortedChunk { self.io_state.set(SortedChunkIOState::ReadEOF); } else { let c = self.read()?; - return Ok(IOResult::IO(IOCompletions::Single(c))); + io_yield_one!(c); } } }