Merge 'Add io_yield macros to reduce boilerplate' from Preston Thorpe

```rust
io_yield_one!(c);

// instead of

return Ok(IOResult::IO(IOCompletions::Single(c))));
```

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #2630
This commit is contained in:
Preston Thorpe
2025-08-16 17:12:45 -04:00
committed by GitHub
6 changed files with 81 additions and 69 deletions

View File

@@ -6,7 +6,7 @@ use super::operator::{
use crate::schema::{BTreeTable, Column, Schema};
use crate::types::{IOCompletions, IOResult, Value};
use crate::util::{extract_column_name_from_expr, extract_view_columns};
use crate::{Completion, LimboError, Result, Statement};
use crate::{io_yield_one, Completion, LimboError, Result, Statement};
use fallible_iterator::FallibleIterator;
use std::collections::BTreeMap;
use std::fmt;
@@ -562,9 +562,7 @@ impl IncrementalView {
// Yield control after processing a batch
// TODO: currently this inner statement is the one that is tracking completions
// so as a stop gap we can just return a dummy completion here
return Ok(IOResult::IO(
IOCompletions::Single(Completion::new_dummy()),
));
io_yield_one!(Completion::new_dummy());
}
// This step() call resumes from where the statement left off
@@ -611,9 +609,7 @@ impl IncrementalView {
// Process current batch before yielding
self.merge_delta(&batch_delta);
// The Statement needs to wait for IO
return Ok(IOResult::IO(IOCompletions::Single(
Completion::new_dummy(),
)));
io_yield_one!(Completion::new_dummy());
}
}
}

View File

@@ -1,6 +1,7 @@
use tracing::{instrument, Level};
use crate::{
io_yield_many, io_yield_one,
schema::Index,
storage::{
pager::{BtreePageAllocMode, Pager},
@@ -679,7 +680,7 @@ impl BTreeCursor {
}
let (page, c) = self.pager.read_page(self.root_page)?;
*self.is_empty_table_state.borrow_mut() = EmptyTableState::ReadPage { page };
Ok(IOResult::IO(IOCompletions::Single(c)))
io_yield_one!(c);
}
EmptyTableState::ReadPage { page } => {
turso_assert!(page.is_loaded(), "page should be loaded");
@@ -713,7 +714,7 @@ impl BTreeCursor {
self.stack.set_cell_index(past_rightmost_pointer);
let (page, c) = self.read_page(rightmost_pointer as usize)?;
self.stack.push_backwards(page);
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
}
if cell_idx >= cell_count as i32 {
@@ -777,7 +778,7 @@ impl BTreeCursor {
let (mem_page, c) = self.read_page(left_child_page as usize)?;
self.stack.push_backwards(mem_page);
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
}
@@ -798,7 +799,7 @@ impl BTreeCursor {
remaining_to_read: payload_size as usize - payload.len(),
page,
});
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
let mut read_overflow_state = self.read_overflow_state.borrow_mut();
let ReadPayloadOverflow {
@@ -831,7 +832,7 @@ impl BTreeCursor {
})?;
*page_btree = new_page;
*next_page = next;
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
turso_assert!(
*remaining_to_read == 0 && next == 0,
@@ -1005,7 +1006,7 @@ impl BTreeCursor {
is_write,
});
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
Ok(IOResult::Done(()))
}
@@ -1065,7 +1066,7 @@ impl BTreeCursor {
},
);
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage {
mut remaining_to_read,
@@ -1130,7 +1131,7 @@ impl BTreeCursor {
is_write,
});
// Return IO to allow other operations
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
_ => {
return Err(LimboError::InternalError(
@@ -1239,7 +1240,7 @@ impl BTreeCursor {
self.stack.advance();
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
_ => {
if self.ancestor_pages_have_more_children() {
@@ -1277,7 +1278,7 @@ impl BTreeCursor {
let left_child_page = contents.cell_interior_read_left_child_page(cell_idx);
let (mem_page, c) = self.read_page(left_child_page as usize)?;
self.stack.push(mem_page);
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
}
@@ -1330,7 +1331,7 @@ impl BTreeCursor {
let rightmost_page_id = *rightmost_page_id;
let c = self.move_to_root()?;
self.move_to_right_state = (MoveToRightState::ProcessPage, rightmost_page_id);
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
MoveToRightState::ProcessPage => {
let mem_page = self.stack.top();
@@ -1351,7 +1352,7 @@ impl BTreeCursor {
self.stack.set_cell_index(contents.cell_count() as i32 + 1);
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
None => {
@@ -1420,7 +1421,7 @@ impl BTreeCursor {
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
};
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
self.stack.set_cell_index(cell_count as i32 + 1);
match contents.rightmost_pointer() {
@@ -1430,7 +1431,7 @@ impl BTreeCursor {
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
};
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
None => {
unreachable!("we shall not go back up! The only way is down the slope");
@@ -1558,7 +1559,7 @@ impl BTreeCursor {
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
};
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
None => {
unreachable!("we shall not go back up! The only way is down the slope");
@@ -1596,7 +1597,7 @@ impl BTreeCursor {
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
};
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
let cur_cell_idx = (min + max) >> 1; // rustc generates extra insns for (min+max)/2 due to them being isize. we know min&max are >=0 here.
@@ -2093,7 +2094,7 @@ impl BTreeCursor {
self.move_to_state = MoveToState::MoveToPage;
if matches!(self.seek_state, CursorSeekState::Start) {
let c = self.move_to_root()?;
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
}
MoveToState::MoveToPage => {
@@ -2623,7 +2624,7 @@ impl BTreeCursor {
*sub_state = BalanceSubState::NonRootDoBalancing;
if !completions.is_empty() {
// TODO: when tracking IO return all the completions here
return Ok(IOResult::IO(IOCompletions::Many(completions)));
io_yield_many!(completions);
}
}
BalanceSubState::NonRootDoBalancing => {
@@ -4100,7 +4101,7 @@ impl BTreeCursor {
SeekEndState::Start => {
let c = self.move_to_root()?;
self.seek_end_state = SeekEndState::ProcessPage;
Ok(IOResult::IO(IOCompletions::Single(c)))
io_yield_one!(c);
}
SeekEndState::ProcessPage => {
let mem_page = self.stack.top();
@@ -4118,7 +4119,7 @@ impl BTreeCursor {
self.stack.set_cell_index(contents.cell_count() as i32 + 1); // invalid on interior
let (child, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(child);
Ok(IOResult::IO(IOCompletions::Single(c)))
io_yield_one!(c);
}
None => unreachable!("interior page must have rightmost pointer"),
}
@@ -4173,7 +4174,7 @@ impl BTreeCursor {
mv_cursor.rewind();
} else {
let c = self.move_to_root()?;
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
}
RewindState::NextRecord => {
@@ -4914,7 +4915,7 @@ impl BTreeCursor {
self.overflow_state = OverflowState::ProcessPage {
next_page: page.get(),
};
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
} else {
self.overflow_state = OverflowState::Done;
}
@@ -4946,7 +4947,7 @@ impl BTreeCursor {
self.overflow_state = OverflowState::ProcessPage {
next_page: page.get(),
};
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
} else {
self.overflow_state = OverflowState::Done;
}
@@ -4981,7 +4982,7 @@ impl BTreeCursor {
self.state = CursorState::Destroy(DestroyInfo {
state: DestroyState::Start,
});
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
loop {
@@ -5038,7 +5039,7 @@ impl BTreeCursor {
"unable to get a mut reference to destroy state in cursor",
);
destroy_info.state = DestroyState::LoadPage;
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
} else {
let destroy_info = self.state.mut_destroy_info().expect(
"unable to get a mut reference to destroy state in cursor",
@@ -5096,7 +5097,7 @@ impl BTreeCursor {
"unable to get a mut reference to destroy state in cursor",
);
destroy_info.state = DestroyState::LoadPage;
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
},
}
@@ -5114,7 +5115,7 @@ impl BTreeCursor {
.mut_destroy_info()
.expect("unable to get a mut reference to destroy state in cursor");
destroy_info.state = DestroyState::LoadPage;
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
// For any leaf cell, advance the index now that overflow pages have been cleared
BTreeCell::TableLeafCell(_) | BTreeCell::IndexLeafCell(_) => {
@@ -5299,7 +5300,7 @@ impl BTreeCursor {
CountState::Start => {
let c = self.move_to_root()?;
self.count_state = CountState::Loop;
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
CountState::Loop => {
mem_page_rc = self.stack.top();
@@ -5325,7 +5326,7 @@ impl BTreeCursor {
// All pages of the b-tree have been visited. Return successfully
let c = self.move_to_root()?;
self.count_state = CountState::Finish;
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
// Move to parent
@@ -5356,7 +5357,7 @@ impl BTreeCursor {
self.stack.advance();
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
} else {
// Move to child left page
let cell = contents.cell_get(cell_idx, self.usable_space())?;
@@ -5371,7 +5372,7 @@ impl BTreeCursor {
self.stack.advance();
let (mem_page, c) = self.read_page(left_child_page as usize)?;
self.stack.push(mem_page);
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
_ => unreachable!(),
}
@@ -5419,7 +5420,7 @@ impl BTreeCursor {
self.valid_state =
CursorValidState::RequireAdvance(IterationDirection::Forwards);
self.context = Some(ctx);
return Ok(IOResult::IO(IOCompletions::Single(Completion::new_dummy())));
io_yield_one!(Completion::new_dummy());
}
self.valid_state = CursorValidState::Valid;
Ok(IOResult::Done(()))
@@ -5565,7 +5566,7 @@ pub fn integrity_check(
None => {
let (page, c) = btree_read_page(pager, page_idx)?;
state.page = Some(page.get());
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
};
turso_assert!(page.is_loaded(), "page should be loaded");

View File

@@ -10,6 +10,7 @@ use crate::storage::{
};
use crate::types::IOCompletions;
use crate::util::IOExt as _;
use crate::{io_yield_many, io_yield_one};
use crate::{
return_if_io, turso_assert, types::WalFrameInfo, Completion, Connection, IOResult, LimboError,
Result, TransactionState,
@@ -49,7 +50,7 @@ impl HeaderRef {
let (page, c) = pager.read_page(DatabaseHeader::PAGE_ID)?;
*pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page };
Ok(IOResult::IO(IOCompletions::Single(c)))
io_yield_one!(c);
}
HeaderRefState::CreateHeader { page } => {
turso_assert!(page.is_loaded(), "page should be loaded");
@@ -85,7 +86,7 @@ impl HeaderRefMut {
let (page, c) = pager.read_page(DatabaseHeader::PAGE_ID)?;
*pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page };
Ok(IOResult::IO(IOCompletions::Single(c)))
io_yield_one!(c);
}
HeaderRefState::CreateHeader { page } => {
turso_assert!(page.is_loaded(), "page should be loaded");
@@ -605,7 +606,7 @@ impl Pager {
ptrmap_page,
offset_in_ptrmap_page,
});
Ok(IOResult::IO(IOCompletions::Single(c)))
io_yield_one!(c);
}
PtrMapGetState::Deserialize {
ptrmap_page,
@@ -704,7 +705,7 @@ impl Pager {
ptrmap_page,
offset_in_ptrmap_page,
});
Ok(IOResult::IO(IOCompletions::Single(c)))
io_yield_one!(c);
}
PtrMapPutState::Deserialize {
ptrmap_page,
@@ -926,21 +927,20 @@ impl Pager {
match (self.db_state.get(), self.allocating_page1()) {
// In case of being empty or (allocating and this connection is performing allocation) then allocate the first page
(DbState::Uninitialized, false) | (DbState::Initializing, true) => {
match self.allocate_page1()? {
IOResult::Done(_) => Ok(IOResult::Done(())),
IOResult::IO(io) => Ok(IOResult::IO(io)),
if let IOResult::IO(c) = self.allocate_page1()? {
return Ok(IOResult::IO(c));
} else {
return Ok(IOResult::Done(()));
}
}
// Give a chance for the allocation to happen elsewhere
_ => Ok(IOResult::IO(IOCompletions::Single(Completion::new_dummy()))),
_ => {}
}
} else {
// Give a chance for the allocation to happen elsewhere
Ok(IOResult::IO(IOCompletions::Single(Completion::new_dummy())))
}
} else {
Ok(IOResult::Done(()))
// Give a chance for the allocation to happen elsewhere
io_yield_one!(Completion::new_dummy());
}
Ok(IOResult::Done(()))
}
#[inline(always)]
@@ -1224,13 +1224,13 @@ impl Pager {
return Ok(IOResult::Done(PagerCommitResult::WalWritten));
} else {
self.commit_info.borrow_mut().state = CommitState::SyncWal;
return Ok(IOResult::IO(IOCompletions::Many(completions)));
io_yield_many!(completions);
}
}
CommitState::SyncWal => {
self.commit_info.borrow_mut().state = CommitState::AfterSyncWal;
let c = wal.borrow_mut().sync()?;
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
CommitState::AfterSyncWal => {
if wal_auto_checkpoint_disabled || !wal.borrow().should_checkpoint() {
@@ -1246,7 +1246,7 @@ impl Pager {
CommitState::SyncDbFile => {
let c = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
self.commit_info.borrow_mut().state = CommitState::AfterSyncDbFile;
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
CommitState::AfterSyncDbFile => {
turso_assert!(!*self.syncing.borrow(), "should have finished syncing");
@@ -1338,7 +1338,7 @@ impl Pager {
let c = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
self.checkpoint_state
.replace(CheckpointState::CheckpointDone { res });
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
CheckpointState::CheckpointDone { res } => {
turso_assert!(!*self.syncing.borrow(), "syncing should be done");
@@ -1505,7 +1505,7 @@ impl Pager {
// Add as leaf to current trunk
let (page, c) = self.read_page(trunk_page_id as usize)?;
trunk_page.replace(page);
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
let trunk_page = trunk_page.as_ref().unwrap();
turso_assert!(trunk_page.is_loaded(), "trunk_page should be loaded");
@@ -1602,7 +1602,7 @@ impl Pager {
self.allocate_page1_state
.replace(AllocatePage1State::Writing { page: page1 });
Ok(IOResult::IO(IOCompletions::Single(c)))
io_yield_one!(c);
}
AllocatePage1State::Writing { page } => {
let page1_ref = page.get();
@@ -1697,7 +1697,7 @@ impl Pager {
trunk_page,
current_db_size: new_db_size,
};
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
AllocatePageState::SearchAvailableFreeListLeaf {
trunk_page,
@@ -1733,7 +1733,7 @@ impl Pager {
leaf_page,
number_of_freelist_leaves,
};
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
// No freelist leaves on this trunk page.

View File

@@ -19,7 +19,9 @@ use crate::storage::sqlite3_ondisk::{
write_pages_vectored, PageSize, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
};
use crate::types::{IOCompletions, IOResult};
use crate::{bail_corrupt_error, turso_assert, Buffer, LimboError, Result};
use crate::{
bail_corrupt_error, io_yield_many, io_yield_one, turso_assert, Buffer, LimboError, Result,
};
use crate::{Completion, Page};
use self::sqlite3_ondisk::{checksum_wal, PageContent, WAL_MAGIC_BE, WAL_MAGIC_LE};
@@ -1423,7 +1425,7 @@ impl WalFile {
self.buffer_pool.clone(),
)?;
self.ongoing_checkpoint.state = CheckpointState::AccumulatePage;
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
}
self.ongoing_checkpoint.current_page += 1;
@@ -1461,7 +1463,7 @@ impl WalFile {
)?;
// batch is queued
self.ongoing_checkpoint.state = CheckpointState::AfterFlush;
return Ok(IOResult::IO(IOCompletions::Many(completions)));
io_yield_many!(completions);
}
CheckpointState::AfterFlush => {
turso_assert!(

View File

@@ -18,6 +18,19 @@ use turso_sqlite3_parser::ast::{
};
use turso_sqlite3_parser::lexer::sql::Parser;
#[macro_export]
macro_rules! io_yield_one {
($c:expr) => {
return Ok(IOResult::IO(IOCompletions::Single($c)));
};
}
#[macro_export]
macro_rules! io_yield_many {
($v:expr) => {
return Ok(IOResult::IO(IOCompletions::Many($v)));
};
}
pub trait IOExt {
fn block<T>(&self, f: impl FnMut() -> Result<IOResult<T>>) -> Result<T>;
}

View File

@@ -7,7 +7,6 @@ use std::rc::Rc;
use std::sync::Arc;
use tempfile;
use crate::return_if_io;
use crate::types::IOCompletions;
use crate::util::IOExt;
use crate::{
@@ -19,6 +18,7 @@ use crate::{
types::{IOResult, ImmutableRecord, KeyInfo, RecordCursor, RefValue},
Result,
};
use crate::{io_yield_many, io_yield_one, return_if_io};
#[derive(Debug, Clone, Copy)]
enum SortState {
@@ -152,7 +152,7 @@ impl Sorter {
SortState::Flush => {
self.sort_state = SortState::InitHeap;
if let Some(c) = self.flush()? {
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
}
SortState::InitHeap => {
@@ -207,7 +207,7 @@ impl Sorter {
self.insert_state = InsertState::Insert;
if self.current_buffer_size + payload_size > self.max_buffer_size {
if let Some(c) = self.flush()? {
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
}
}
@@ -242,7 +242,7 @@ impl Sorter {
completions.push(c);
}
self.init_chunk_heap_state = InitChunkHeapState::PushChunk;
Ok(IOResult::IO(IOCompletions::Many(completions)))
io_yield_many!(completions);
}
InitChunkHeapState::PushChunk => {
// Make sure all chunks read at least one record into their buffer.
@@ -464,7 +464,7 @@ impl SortedChunk {
self.io_state.set(SortedChunkIOState::ReadEOF);
} else {
let c = self.read()?;
return Ok(IOResult::IO(IOCompletions::Single(c)));
io_yield_one!(c);
}
}
}