diff --git a/core/lib.rs b/core/lib.rs index b74d3ff24..dcd83574f 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -77,18 +77,18 @@ use std::{ #[cfg(feature = "fs")] use storage::database::DatabaseFile; use storage::page_cache::DumbLruPageCache; -pub use storage::pager::PagerCacheflushStatus; -use storage::pager::{DB_STATE_INITIALIZED, DB_STATE_UNINITIALIZED}; +use storage::pager::{PagerCacheflushResult, DB_STATE_INITIALIZED, DB_STATE_UNINITIALIZED}; pub use storage::{ buffer_pool::BufferPool, database::DatabaseStorage, pager::PageRef, pager::{Page, Pager}, - wal::{CheckpointMode, CheckpointResult, CheckpointStatus, Wal, WalFile, WalFileShared}, + wal::{CheckpointMode, CheckpointResult, Wal, WalFile, WalFileShared}, }; use tracing::{instrument, Level}; use translate::select::prepare_select_plan; use turso_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser}; +use types::IOResult; pub use types::RefValue; pub use types::Value; use util::parse_schema_rows; @@ -755,7 +755,7 @@ impl Connection { /// This will write the dirty pages to the WAL and then fsync the WAL. /// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to /// the database file and then fsync the database file. - pub fn cacheflush(&self) -> Result { + pub fn cacheflush(&self) -> Result> { if self.closed.get() { return Err(LimboError::InternalError("Connection closed".to_string())); } diff --git a/core/schema.rs b/core/schema.rs index 7ebfcb08f..89fafec67 100644 --- a/core/schema.rs +++ b/core/schema.rs @@ -2,7 +2,7 @@ use crate::result::LimboResult; use crate::storage::btree::BTreeCursor; use crate::translate::collate::CollationSeq; use crate::translate::plan::SelectPlan; -use crate::types::CursorResult; +use crate::types::IOResult; use crate::util::{module_args_from_sql, module_name_from_sql, UnparsedFromSqlIndex}; use crate::{util::normalize_ident, Result}; use crate::{LimboError, MvCursor, Pager, RefValue, SymbolTable, VirtualTable}; @@ -158,24 +158,24 @@ impl Schema { HashMap::with_capacity(10); match pager.begin_read_tx()? { - CursorResult::Ok(v) => { + IOResult::Done(v) => { if matches!(v, LimboResult::Busy) { return Err(LimboError::Busy); } } - CursorResult::IO => pager.io.run_once()?, + IOResult::IO => pager.io.run_once()?, } match cursor.rewind()? { - CursorResult::Ok(v) => v, - CursorResult::IO => pager.io.run_once()?, + IOResult::Done(v) => v, + IOResult::IO => pager.io.run_once()?, }; loop { let Some(row) = (loop { match cursor.record()? { - CursorResult::Ok(v) => break v, - CursorResult::IO => pager.io.run_once()?, + IOResult::Done(v) => break v, + IOResult::IO => pager.io.run_once()?, } }) else { break; @@ -285,7 +285,7 @@ impl Schema { drop(record_cursor); drop(row); - if matches!(cursor.next()?, CursorResult::IO) { + if matches!(cursor.next()?, IOResult::IO) { pager.io.run_once()?; }; } diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 4b22c4cdd..f54279a77 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -21,8 +21,8 @@ use crate::{ }; use crate::{ - return_corrupt, - types::{compare_immutable, CursorResult, ImmutableRecord, RefValue, SeekKey, SeekOp, Value}, + return_corrupt, return_if_io, + types::{compare_immutable, IOResult, ImmutableRecord, RefValue, SeekKey, SeekOp, Value}, LimboError, Result, }; @@ -111,21 +111,11 @@ pub const MAX_SIBLING_PAGES_TO_BALANCE: usize = 3; /// We only need maximum 5 pages to balance 3 pages, because we can guarantee that cells from 3 pages will fit in 5 pages. pub const MAX_NEW_SIBLING_PAGES_AFTER_BALANCE: usize = 5; -/// Evaluate a Result>, if IO return IO. -macro_rules! return_if_io { - ($expr:expr) => { - match $expr? { - CursorResult::Ok(v) => v, - CursorResult::IO => return Ok(CursorResult::IO), - } - }; -} - /// Check if the page is unlocked, if not return IO. macro_rules! return_if_locked { ($expr:expr) => {{ if $expr.is_locked() { - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } }}; } @@ -143,12 +133,12 @@ macro_rules! debug_validate_cells { macro_rules! return_if_locked_maybe_load { ($pager:expr, $btree_page:expr) => {{ if $btree_page.get().is_locked() { - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } if !$btree_page.get().is_loaded() { let page = $pager.read_page($btree_page.get().get().id)?; $btree_page.page.replace(page); - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } }}; } @@ -641,22 +631,22 @@ impl BTreeCursor { /// Check if the table is empty. /// This is done by checking if the root page has no cells. #[instrument(skip_all, level = Level::INFO)] - fn is_empty_table(&self) -> Result> { + fn is_empty_table(&self) -> Result> { if let Some(mv_cursor) = &self.mv_cursor { let mv_cursor = mv_cursor.borrow(); - return Ok(CursorResult::Ok(mv_cursor.is_empty())); + return Ok(IOResult::Done(mv_cursor.is_empty())); } let page = self.pager.read_page(self.root_page)?; return_if_locked!(page); let cell_count = page.get().contents.as_ref().unwrap().cell_count(); - Ok(CursorResult::Ok(cell_count == 0)) + Ok(IOResult::Done(cell_count == 0)) } /// Move the cursor to the previous record and return it. /// Used in backwards iteration. #[instrument(skip(self), level = Level::INFO, name = "prev")] - fn get_prev_record(&mut self) -> Result> { + fn get_prev_record(&mut self) -> Result> { loop { let page = self.stack.top(); @@ -686,7 +676,7 @@ impl BTreeCursor { let page_type = contents.page_type(); if should_visit_internal_node { self.going_upwards = false; - return Ok(CursorResult::Ok(true)); + return Ok(IOResult::Done(true)); } else if matches!( page_type, PageType::IndexLeaf | PageType::TableLeaf | PageType::TableInterior @@ -707,7 +697,7 @@ impl BTreeCursor { } else { // moved to begin of btree // dbg!(false); - return Ok(CursorResult::Ok(false)); + return Ok(IOResult::Done(false)); } } // continue to next loop to get record from the new page @@ -726,7 +716,7 @@ impl BTreeCursor { continue; } BTreeCell::TableLeafCell(TableLeafCell { .. }) => { - return Ok(CursorResult::Ok(true)); + return Ok(IOResult::Done(true)); } BTreeCell::IndexInteriorCell(IndexInteriorCell { left_child_page, .. @@ -749,10 +739,10 @@ impl BTreeCursor { // On the first pass we must take the record from the interior cell (since unlike table btrees, index interior cells have payloads) // We then mark going_upwards=false so that we go back down the tree on the next invocation. self.going_upwards = false; - return Ok(CursorResult::Ok(true)); + return Ok(IOResult::Done(true)); } BTreeCell::IndexLeafCell(IndexLeafCell { .. }) => { - return Ok(CursorResult::Ok(true)); + return Ok(IOResult::Done(true)); } } } @@ -766,7 +756,7 @@ impl BTreeCursor { payload: &'static [u8], start_next_page: u32, payload_size: u64, - ) -> Result> { + ) -> Result> { if self.read_overflow_state.borrow().is_none() { let page = self.read_page(start_next_page as usize)?; *self.read_overflow_state.borrow_mut() = Some(ReadPayloadOverflow { @@ -775,7 +765,7 @@ impl BTreeCursor { remaining_to_read: payload_size as usize - payload.len(), page, }); - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } let mut read_overflow_state = self.read_overflow_state.borrow_mut(); let ReadPayloadOverflow { @@ -786,7 +776,7 @@ impl BTreeCursor { } = read_overflow_state.as_mut().unwrap(); if page_btree.get().is_locked() { - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } tracing::debug!(next_page, remaining_to_read, "reading overflow page"); let page = page_btree.get(); @@ -807,7 +797,7 @@ impl BTreeCursor { })?; *page_btree = new_page; *next_page = next; - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } turso_assert!( *remaining_to_read == 0 && next == 0, @@ -826,7 +816,7 @@ impl BTreeCursor { self.record_cursor.borrow_mut().invalidate(); let _ = read_overflow_state.take(); - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } /// Calculates how much of a cell's payload should be stored locally vs in overflow pages @@ -888,7 +878,7 @@ impl BTreeCursor { buffer: &mut Vec, mut amount: u32, is_write: bool, - ) -> Result> { + ) -> Result> { if let CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages { .. }) @@ -980,9 +970,9 @@ impl BTreeCursor { is_write, }); - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } #[instrument(skip_all, level = Level::INFO)] @@ -990,7 +980,7 @@ impl BTreeCursor { &mut self, buffer: &mut Vec, usable_space: usize, - ) -> Result> { + ) -> Result> { loop { let mut state = std::mem::replace(&mut self.state, CursorState::None); @@ -1044,7 +1034,7 @@ impl BTreeCursor { }, ); - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { @@ -1066,7 +1056,7 @@ impl BTreeCursor { is_write: *is_write, }); - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } let page = page_btree.get(); @@ -1102,7 +1092,7 @@ impl BTreeCursor { if *remaining_to_read == 0 { self.state = CursorState::None; - return Ok(CursorResult::Ok(())); + return Ok(IOResult::Done(())); } let next = contents.read_u32_no_offset(0); if next == 0 { @@ -1117,7 +1107,7 @@ impl BTreeCursor { *page_btree = self.read_page(next as usize)?; // Return IO to allow other operations - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } _ => { return Err(LimboError::InternalError( @@ -1163,16 +1153,16 @@ impl BTreeCursor { /// Move the cursor to the next record and return it. /// Used in forwards iteration, which is the default. #[instrument(skip(self), level = Level::INFO, name = "next")] - fn get_next_record(&mut self) -> Result> { + fn get_next_record(&mut self) -> Result> { if let Some(mv_cursor) = &self.mv_cursor { let mut mv_cursor = mv_cursor.borrow_mut(); let rowid = mv_cursor.current_row_id(); match rowid { Some(_rowid) => { mv_cursor.forward(); - return Ok(CursorResult::Ok(true)); + return Ok(IOResult::Done(true)); } - None => return Ok(CursorResult::Ok(false)), + None => return Ok(IOResult::Done(false)), } } loop { @@ -1203,7 +1193,7 @@ impl BTreeCursor { "skipping advance", ); self.going_upwards = false; - return Ok(CursorResult::Ok(true)); + return Ok(IOResult::Done(true)); } // Important to advance only after loading the page in order to not advance > 1 times self.stack.advance(); @@ -1227,7 +1217,7 @@ impl BTreeCursor { self.stack.pop(); continue; } else { - return Ok(CursorResult::Ok(false)); + return Ok(IOResult::Done(false)); } } } @@ -1242,7 +1232,7 @@ impl BTreeCursor { self.stack.pop(); continue; } else { - return Ok(CursorResult::Ok(false)); + return Ok(IOResult::Done(false)); } } turso_assert!(cell_idx < contents.cell_count(), "cell index out of bounds"); @@ -1257,14 +1247,14 @@ impl BTreeCursor { continue; } BTreeCell::TableLeafCell(TableLeafCell { .. }) => { - return Ok(CursorResult::Ok(true)); + return Ok(IOResult::Done(true)); } BTreeCell::IndexInteriorCell(IndexInteriorCell { left_child_page, .. }) => { if self.going_upwards { self.going_upwards = false; - return Ok(CursorResult::Ok(true)); + return Ok(IOResult::Done(true)); } else { let mem_page = self.read_page(*left_child_page as usize)?; self.stack.push(mem_page); @@ -1272,7 +1262,7 @@ impl BTreeCursor { } } BTreeCell::IndexLeafCell(IndexLeafCell { .. }) => { - return Ok(CursorResult::Ok(true)); + return Ok(IOResult::Done(true)); } } } @@ -1282,7 +1272,7 @@ impl BTreeCursor { /// This may be used to seek to a specific record in a point query (e.g. SELECT * FROM table WHERE col = 10) /// or e.g. find the first record greater than the seek key in a range query (e.g. SELECT * FROM table WHERE col > 10). /// We don't include the rowid in the comparison and that's why the last value from the record is not included. - fn do_seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result> { + fn do_seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result> { let ret = return_if_io!(match key { SeekKey::TableRowId(rowid) => { self.tablebtree_seek(rowid, op) @@ -1292,7 +1282,7 @@ impl BTreeCursor { } }); self.valid_state = CursorValidState::Valid; - Ok(CursorResult::Ok(ret)) + Ok(IOResult::Done(ret)) } /// Move the cursor to the root page of the btree. @@ -1309,7 +1299,7 @@ impl BTreeCursor { /// Move the cursor to the rightmost record in the btree. #[instrument(skip(self), level = Level::INFO)] - fn move_to_rightmost(&mut self) -> Result> { + fn move_to_rightmost(&mut self) -> Result> { self.move_to_root()?; loop { @@ -1322,9 +1312,9 @@ impl BTreeCursor { if contents.is_leaf() { if contents.cell_count() > 0 { self.stack.set_cell_index(contents.cell_count() as i32 - 1); - return Ok(CursorResult::Ok(true)); + return Ok(IOResult::Done(true)); } - return Ok(CursorResult::Ok(false)); + return Ok(IOResult::Done(false)); } match contents.rightmost_pointer() { @@ -1344,7 +1334,7 @@ impl BTreeCursor { /// Specialized version of move_to() for table btrees. #[instrument(skip(self), level = Level::INFO)] - fn tablebtree_move_to(&mut self, rowid: i64, seek_op: SeekOp) -> Result> { + fn tablebtree_move_to(&mut self, rowid: i64, seek_op: SeekOp) -> Result> { 'outer: loop { let page = self.stack.top(); return_if_locked_maybe_load!(self.pager, page); @@ -1354,7 +1344,7 @@ impl BTreeCursor { self.seek_state = CursorSeekState::FoundLeaf { eq_seen: Cell::new(false), }; - return Ok(CursorResult::Ok(())); + return Ok(IOResult::Done(())); } let cell_count = contents.cell_count(); @@ -1466,7 +1456,7 @@ impl BTreeCursor { &mut self, index_key: &ImmutableRecord, cmp: SeekOp, - ) -> Result> { + ) -> Result> { let iter_dir = cmp.iteration_direction(); let key_values = index_key.get_values(); @@ -1489,7 +1479,7 @@ impl BTreeCursor { self.seek_state = CursorSeekState::FoundLeaf { eq_seen: Cell::new(eq_seen), }; - return Ok(CursorResult::Ok(())); + return Ok(IOResult::Done(())); } if matches!( @@ -1674,7 +1664,7 @@ impl BTreeCursor { /// Specialized version of do_seek() for table btrees that uses binary search instead /// of iterating cells in order. #[instrument(skip_all, level = Level::INFO)] - fn tablebtree_seek(&mut self, rowid: i64, seek_op: SeekOp) -> Result> { + fn tablebtree_seek(&mut self, rowid: i64, seek_op: SeekOp) -> Result> { turso_assert!( self.mv_cursor.is_none(), "attempting to seek with MV cursor" @@ -1701,7 +1691,7 @@ impl BTreeCursor { let cell_count = contents.cell_count(); if cell_count == 0 { self.stack.set_cell_index(0); - return Ok(CursorResult::Ok(SeekResult::NotFound)); + return Ok(IOResult::Done(SeekResult::NotFound)); } let min_cell_idx = Cell::new(0); let max_cell_idx = Cell::new(cell_count as isize - 1); @@ -1739,13 +1729,13 @@ impl BTreeCursor { if min > max { if let Some(nearest_matching_cell) = nearest_matching_cell.get() { self.stack.set_cell_index(nearest_matching_cell as i32); - return Ok(CursorResult::Ok(SeekResult::Found)); + return Ok(IOResult::Done(SeekResult::Found)); } else { // if !eq_only - matching entry can exist in neighbour leaf page // this can happen if key in the interiour page was deleted - but divider kept untouched // in such case BTree can navigate to the leaf which no longer has matching key for seek_op // in this case, caller must advance cursor if necessary - return Ok(CursorResult::Ok(if seek_op.eq_only() { + return Ok(IOResult::Done(if seek_op.eq_only() { SeekResult::NotFound } else { let contents = page.get().contents.as_ref().unwrap(); @@ -1781,7 +1771,7 @@ impl BTreeCursor { // rowids are unique, so we can return the rowid immediately if found && seek_op.eq_only() { self.stack.set_cell_index(cur_cell_idx as i32); - return Ok(CursorResult::Ok(SeekResult::Found)); + return Ok(IOResult::Done(SeekResult::Found)); } if found { @@ -1816,7 +1806,7 @@ impl BTreeCursor { &mut self, key: &ImmutableRecord, seek_op: SeekOp, - ) -> Result> { + ) -> Result> { let key_values = key.get_values(); let index_info_default = IndexKeyInfo::default(); let index_info = *self.index_key_info.as_ref().unwrap_or(&index_info_default); @@ -1849,7 +1839,7 @@ impl BTreeCursor { let contents = page.get().contents.as_ref().unwrap(); let cell_count = contents.cell_count(); if cell_count == 0 { - return Ok(CursorResult::Ok(SeekResult::NotFound)); + return Ok(IOResult::Done(SeekResult::NotFound)); } let min = Cell::new(0); @@ -1894,20 +1884,20 @@ impl BTreeCursor { if min > max { if let Some(nearest_matching_cell) = nearest_matching_cell.get() { self.stack.set_cell_index(nearest_matching_cell as i32); - return Ok(CursorResult::Ok(SeekResult::Found)); + return Ok(IOResult::Done(SeekResult::Found)); } else { // Similar logic as in tablebtree_seek(), but for indexes. // The difference is that since index keys are not necessarily unique, we need to TryAdvance // even when eq_only=true and we have seen an EQ match up in the tree in an interior node. if seek_op.eq_only() && !eq_seen.get() { - return Ok(CursorResult::Ok(SeekResult::NotFound)); + return Ok(IOResult::Done(SeekResult::NotFound)); } // set cursor to the position where which would hold the op-boundary if it were present self.stack.set_cell_index(match &seek_op { SeekOp::GT | SeekOp::GE { .. } => cell_count as i32, SeekOp::LT | SeekOp::LE { .. } => 0, }); - return Ok(CursorResult::Ok(SeekResult::TryAdvance)); + return Ok(IOResult::Done(SeekResult::TryAdvance)); }; } @@ -2010,7 +2000,7 @@ impl BTreeCursor { payload: &'static [u8], next_page: Option, payload_size: u64, - ) -> Result> { + ) -> Result> { if let Some(next_page) = next_page { self.process_overflow_read(payload, next_page, payload_size) } else { @@ -2024,12 +2014,12 @@ impl BTreeCursor { .start_serialization(payload); self.record_cursor.borrow_mut().invalidate(); - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } } #[instrument(skip_all, level = Level::INFO)] - pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result> { + pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result> { turso_assert!( self.mv_cursor.is_none(), "attempting to move with MV cursor" @@ -2076,13 +2066,13 @@ impl BTreeCursor { SeekKey::IndexKey(index_key) => self.indexbtree_move_to(index_key, cmp), }; return_if_io!(ret); - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } /// Insert a record into the btree. /// If the insert operation overflows the page, it will be split and the btree will be balanced. #[instrument(skip_all, level = Level::INFO)] - fn insert_into_page(&mut self, bkey: &BTreeKey) -> Result> { + fn insert_into_page(&mut self, bkey: &BTreeKey) -> Result> { let record = bkey .get_record() .expect("expected record present on insert"); @@ -2240,7 +2230,7 @@ impl BTreeCursor { return_if_io!(self.balance()); } WriteState::Finish => { - break Ok(CursorResult::Ok(())); + break Ok(IOResult::Done(())); } }; }; @@ -2262,7 +2252,7 @@ impl BTreeCursor { /// It will try to split the page in half by keys not by content. /// Sqlite tries to have a page at least 40% full. #[instrument(skip(self), level = Level::INFO)] - fn balance(&mut self) -> Result> { + fn balance(&mut self) -> Result> { turso_assert!( matches!(self.state, CursorState::Write(_)), "Cursor must be in balancing state" @@ -2300,7 +2290,7 @@ impl BTreeCursor { { let write_info = self.state.mut_write_info().unwrap(); write_info.state = WriteState::Finish; - return Ok(CursorResult::Ok(())); + return Ok(IOResult::Done(())); } } @@ -2316,7 +2306,7 @@ impl BTreeCursor { WriteState::BalanceNonRootPickSiblings | WriteState::BalanceNonRootDoBalancing => { return_if_io!(self.balance_non_root()); } - WriteState::Finish => return Ok(CursorResult::Ok(())), + WriteState::Finish => return Ok(IOResult::Done(())), _ => panic!("unexpected state on balance {state:?}"), } } @@ -2324,7 +2314,7 @@ impl BTreeCursor { /// Balance a non root page by trying to balance cells between a maximum of 3 siblings that should be neighboring the page that overflowed/underflowed. #[instrument(skip_all, level = Level::INFO)] - fn balance_non_root(&mut self) -> Result> { + fn balance_non_root(&mut self) -> Result> { turso_assert!( matches!(self.state, CursorState::Write(_)), "Cursor must be in balancing state" @@ -2489,7 +2479,7 @@ impl BTreeCursor { sibling_count, first_divider_cell: first_cell_divider, })); - (WriteState::BalanceNonRootDoBalancing, Ok(CursorResult::IO)) + (WriteState::BalanceNonRootDoBalancing, Ok(IOResult::IO)) } WriteState::BalanceNonRootDoBalancing => { // Ensure all involved pages are in memory. @@ -3308,7 +3298,7 @@ impl BTreeCursor { self.pager .free_page(Some(page.get().clone()), page.get().get().id)?; } - (WriteState::BalanceStart, Ok(CursorResult::Ok(()))) + (WriteState::BalanceStart, Ok(IOResult::Done(()))) } WriteState::Finish => todo!(), }; @@ -3851,7 +3841,7 @@ impl BTreeCursor { /// Find the index of the cell in the page that contains the given rowid. #[instrument( skip_all, level = Level::INFO)] - fn find_cell(&mut self, page: &PageContent, key: &BTreeKey) -> Result> { + fn find_cell(&mut self, page: &PageContent, key: &BTreeKey) -> Result> { let cell_count = page.cell_count(); let mut low = 0; let mut high = if cell_count > 0 { cell_count - 1 } else { 0 }; @@ -3922,11 +3912,11 @@ impl BTreeCursor { self.find_cell_state.reset(); assert!(result_index <= cell_count); - Ok(CursorResult::Ok(result_index)) + Ok(IOResult::Done(result_index)) } #[instrument(skip_all, level = Level::INFO)] - pub fn seek_end(&mut self) -> Result> { + pub fn seek_end(&mut self) -> Result> { assert!(self.mv_cursor.is_none()); // unsure about this -_- self.move_to_root()?; loop { @@ -3940,7 +3930,7 @@ impl BTreeCursor { if contents.is_leaf() { // set cursor just past the last cell to append self.stack.set_cell_index(contents.cell_count() as i32); - return Ok(CursorResult::Ok(())); + return Ok(IOResult::Done(())); } match contents.rightmost_pointer() { @@ -3955,16 +3945,16 @@ impl BTreeCursor { } #[instrument(skip_all, level = Level::INFO)] - pub fn seek_to_last(&mut self) -> Result> { + pub fn seek_to_last(&mut self) -> Result> { let has_record = return_if_io!(self.move_to_rightmost()); self.invalidate_record(); self.has_record.replace(has_record); if !has_record { let is_empty = return_if_io!(self.is_empty_table()); assert!(is_empty); - return Ok(CursorResult::Ok(())); + return Ok(IOResult::Done(())); } - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } pub fn is_empty(&self) -> bool { @@ -3976,7 +3966,7 @@ impl BTreeCursor { } #[instrument(skip_all, level = Level::INFO)] - pub fn rewind(&mut self) -> Result> { + pub fn rewind(&mut self) -> Result> { if self.mv_cursor.is_some() { let cursor_has_record = return_if_io!(self.get_next_record()); self.invalidate_record(); @@ -3988,25 +3978,25 @@ impl BTreeCursor { self.invalidate_record(); self.has_record.replace(cursor_has_record); } - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } #[instrument(skip_all, level = Level::INFO)] - pub fn last(&mut self) -> Result> { + pub fn last(&mut self) -> Result> { assert!(self.mv_cursor.is_none()); let cursor_has_record = return_if_io!(self.move_to_rightmost()); self.has_record.replace(cursor_has_record); self.invalidate_record(); - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } #[instrument(skip_all, level = Level::INFO)] - pub fn next(&mut self) -> Result> { + pub fn next(&mut self) -> Result> { return_if_io!(self.restore_context()); let cursor_has_record = return_if_io!(self.get_next_record()); self.has_record.replace(cursor_has_record); self.invalidate_record(); - Ok(CursorResult::Ok(cursor_has_record)) + Ok(IOResult::Done(cursor_has_record)) } fn invalidate_record(&mut self) { @@ -4018,20 +4008,20 @@ impl BTreeCursor { } #[instrument(skip_all, level = Level::INFO)] - pub fn prev(&mut self) -> Result> { + pub fn prev(&mut self) -> Result> { assert!(self.mv_cursor.is_none()); return_if_io!(self.restore_context()); let cursor_has_record = return_if_io!(self.get_prev_record()); self.has_record.replace(cursor_has_record); self.invalidate_record(); - Ok(CursorResult::Ok(cursor_has_record)) + Ok(IOResult::Done(cursor_has_record)) } #[instrument(skip(self), level = Level::INFO)] - pub fn rowid(&mut self) -> Result>> { + pub fn rowid(&mut self) -> Result>> { if let Some(mv_cursor) = &self.mv_cursor { let mv_cursor = mv_cursor.borrow(); - return Ok(CursorResult::Ok( + return Ok(IOResult::Done( mv_cursor.current_row_id().map(|rowid| rowid.row_id), )); } @@ -4052,17 +4042,17 @@ impl BTreeCursor { page_type ); }; - Ok(CursorResult::Ok(Some(rowid))) + Ok(IOResult::Done(Some(rowid))) } else { - Ok(CursorResult::Ok(self.get_index_rowid_from_record())) + Ok(IOResult::Done(self.get_index_rowid_from_record())) } } else { - Ok(CursorResult::Ok(None)) + Ok(IOResult::Done(None)) } } #[instrument(skip(self), level = Level::INFO)] - pub fn seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result> { + pub fn seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result> { assert!(self.mv_cursor.is_none()); // Empty trace to capture the span information tracing::trace!(""); @@ -4077,16 +4067,16 @@ impl BTreeCursor { self.valid_state = CursorValidState::Valid; self.has_record .replace(matches!(seek_result, SeekResult::Found)); - Ok(CursorResult::Ok(seek_result)) + Ok(IOResult::Done(seek_result)) } /// Return a reference to the record the cursor is currently pointing to. /// If record was not parsed yet, then we have to parse it and in case of I/O we yield control /// back. #[instrument(skip(self), level = Level::INFO)] - pub fn record(&self) -> Result>>> { + pub fn record(&self) -> Result>>> { if !self.has_record.get() { - return Ok(CursorResult::Ok(None)); + return Ok(IOResult::Done(None)); } let invalidated = self .reusable_immutable_record @@ -4098,7 +4088,7 @@ impl BTreeCursor { let record_ref = Ref::filter_map(self.reusable_immutable_record.borrow(), |opt| opt.as_ref()) .unwrap(); - return Ok(CursorResult::Ok(Some(record_ref))); + return Ok(IOResult::Done(Some(record_ref))); } if *self.parse_record_state.borrow() == ParseRecordState::Init { *self.parse_record_state.borrow_mut() = ParseRecordState::Parsing { @@ -4148,7 +4138,7 @@ impl BTreeCursor { *self.parse_record_state.borrow_mut() = ParseRecordState::Init; let record_ref = Ref::filter_map(self.reusable_immutable_record.borrow(), |opt| opt.as_ref()).unwrap(); - Ok(CursorResult::Ok(Some(record_ref))) + Ok(IOResult::Done(Some(record_ref))) } #[instrument(skip(self), level = Level::INFO)] @@ -4156,7 +4146,7 @@ impl BTreeCursor { &mut self, key: &BTreeKey, mut moved_before: bool, /* Indicate whether it's necessary to traverse to find the leaf page */ - ) -> Result> { + ) -> Result> { tracing::debug!(valid_state = ?self.valid_state, cursor_state = ?self.state, is_write_in_progress = self.is_write_in_progress()); match &self.mv_cursor { Some(mv_cursor) => match key.maybe_rowid() { @@ -4207,7 +4197,7 @@ impl BTreeCursor { } } }; - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } /// Delete state machine flow: @@ -4222,7 +4212,7 @@ impl BTreeCursor { /// 8. SeekAfterBalancing -> adjust the cursor to a node that is closer to the deleted value. go to Finish /// 9. Finish -> Delete operation is done. Return CursorResult(Ok()) #[instrument(skip(self), level = Level::INFO)] - pub fn delete(&mut self) -> Result> { + pub fn delete(&mut self) -> Result> { assert!(self.mv_cursor.is_none()); if let CursorState::None = &self.state { @@ -4250,11 +4240,11 @@ impl BTreeCursor { ) { if return_if_io!(self.rowid()).is_none() { self.state = CursorState::None; - return Ok(CursorResult::Ok(())); + return Ok(IOResult::Done(())); } } else if self.reusable_immutable_record.borrow().is_none() { self.state = CursorState::None; - return Ok(CursorResult::Ok(())); + return Ok(IOResult::Done(())); } let delete_info = self.state.mut_delete_info().unwrap(); @@ -4494,7 +4484,7 @@ impl BTreeCursor { // was taken in InteriorNodeReplacement. We must also check if the parent needs balancing!!! self.stack.retreat(); self.state = CursorState::None; - return Ok(CursorResult::Ok(())); + return Ok(IOResult::Done(())); } // Only reaches this function call if state = DeleteState::WaitForBalancingToComplete // self.save_context(); @@ -4510,7 +4500,7 @@ impl BTreeCursor { match self.balance()? { // TODO(Krishna): Add second balance in the case where deletion causes cursor to end up // a level deeper. - CursorResult::Ok(()) => { + IOResult::Done(()) => { let write_info = match &self.state { CursorState::Write(wi) => wi.clone(), _ => unreachable!("Balance operation changed cursor state"), @@ -4523,7 +4513,7 @@ impl BTreeCursor { }); } - CursorResult::IO => { + IOResult::IO => { // Move to seek state // Save balance progress and return IO let write_info = match &self.state { @@ -4535,7 +4525,7 @@ impl BTreeCursor { state: DeleteState::WaitForBalancingToComplete { target_key }, balance_write_info: Some(write_info), }); - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } } } @@ -4552,7 +4542,7 @@ impl BTreeCursor { return_if_io!(self.seek(key, SeekOp::LT)); self.state = CursorState::None; - return Ok(CursorResult::Ok(())); + return Ok(IOResult::Done(())); } } } @@ -4572,7 +4562,7 @@ impl BTreeCursor { } #[instrument(skip_all, level = Level::INFO)] - pub fn exists(&mut self, key: &Value) -> Result> { + pub fn exists(&mut self, key: &Value) -> Result> { assert!(self.mv_cursor.is_none()); let int_key = match key { Value::Integer(i) => i, @@ -4583,14 +4573,14 @@ impl BTreeCursor { let has_record = matches!(seek_result, SeekResult::Found); self.has_record.set(has_record); self.invalidate_record(); - Ok(CursorResult::Ok(has_record)) + Ok(IOResult::Done(has_record)) } /// Clear the overflow pages linked to a specific page provided by the leaf cell /// Uses a state machine to keep track of it's operations so that traversal can be /// resumed from last point after IO interruption #[instrument(skip_all, level = Level::INFO)] - fn clear_overflow_pages(&mut self, cell: &BTreeCell) -> Result> { + fn clear_overflow_pages(&mut self, cell: &BTreeCell) -> Result> { loop { let state = self.overflow_state.take().unwrap_or(OverflowState::Start); @@ -4602,7 +4592,7 @@ impl BTreeCursor { BTreeCell::IndexInteriorCell(interior_cell) => { interior_cell.first_overflow_page } - BTreeCell::TableInteriorCell(_) => return Ok(CursorResult::Ok(())), // No overflow pages + BTreeCell::TableInteriorCell(_) => return Ok(IOResult::Done(())), // No overflow pages }; if let Some(page) = first_overflow_page { @@ -4637,7 +4627,7 @@ impl BTreeCursor { } OverflowState::Done => { self.overflow_state = None; - return Ok(CursorResult::Ok(())); + return Ok(IOResult::Done(())); } }; } @@ -4659,7 +4649,7 @@ impl BTreeCursor { /// /// The destruction order would be: [4',4,5,2,6,7,3,1] #[instrument(skip(self), level = Level::INFO)] - pub fn btree_destroy(&mut self) -> Result>> { + pub fn btree_destroy(&mut self) -> Result>> { if let CursorState::None = &self.state { self.move_to_root()?; self.state = CursorState::Destroy(DestroyInfo { @@ -4786,7 +4776,7 @@ impl BTreeCursor { } DestroyState::ClearOverflowPages { cell } => { match self.clear_overflow_pages(&cell)? { - CursorResult::Ok(_) => match cell { + IOResult::Done(_) => match cell { // For an index interior cell, clear the left child page now that overflow pages have been cleared BTreeCell::IndexInteriorCell(index_int_cell) => { let child_page = @@ -4807,7 +4797,7 @@ impl BTreeCursor { } _ => panic!("unexpected cell type"), }, - CursorResult::IO => return Ok(CursorResult::IO), + IOResult::IO => return Ok(IOResult::IO), } } DestroyState::FreePage => { @@ -4827,7 +4817,7 @@ impl BTreeCursor { self.state = CursorState::None; // TODO: For now, no-op the result return None always. This will change once [AUTO_VACUUM](https://www.sqlite.org/lang_vacuum.html) is introduced // At that point, the last root page(call this x) will be moved into the position of the root page of this table and the value returned will be x - return Ok(CursorResult::Ok(None)); + return Ok(IOResult::Done(None)); } } } @@ -4843,7 +4833,7 @@ impl BTreeCursor { page_ref: BTreePage, cell_idx: usize, record: &ImmutableRecord, - ) -> Result> { + ) -> Result> { // build the new payload let page_type = page_ref.get().get().contents.as_ref().unwrap().page_type(); let serial_types_len = self.record_cursor.borrow_mut().len(record); @@ -4868,7 +4858,7 @@ impl BTreeCursor { // if it all fits in local space and old_local_size is enough, do an in-place overwrite if new_payload.len() == old_local_size { self.overwrite_content(page_ref.clone(), old_offset, &new_payload)?; - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } else { // doesn't fit, drop it and insert a new one drop_cell( @@ -4882,7 +4872,7 @@ impl BTreeCursor { cell_idx, self.usable_space() as u16, )?; - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } } @@ -4891,13 +4881,13 @@ impl BTreeCursor { page_ref: BTreePage, dest_offset: usize, new_payload: &[u8], - ) -> Result> { + ) -> Result> { return_if_locked!(page_ref.get()); let page_ref = page_ref.get(); let buf = page_ref.get().contents.as_mut().unwrap().as_ptr(); buf[dest_offset..dest_offset + new_payload.len()].copy_from_slice(new_payload); - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } fn get_immutable_record_or_create(&self) -> std::cell::RefMut<'_, Option> { @@ -4920,7 +4910,7 @@ impl BTreeCursor { /// /// Only supposed to be used in the context of a simple Count Select Statement #[instrument(skip(self), level = Level::INFO)] - pub fn count(&mut self) -> Result> { + pub fn count(&mut self) -> Result> { if self.count == 0 { self.move_to_root()?; } @@ -4957,7 +4947,7 @@ impl BTreeCursor { // All pages of the b-tree have been visited. Return successfully self.move_to_root()?; - return Ok(CursorResult::Ok(self.count)); + return Ok(IOResult::Done(self.count)); } // Move to parent @@ -5017,9 +5007,9 @@ impl BTreeCursor { /// If context is defined, restore it and set it None on success #[instrument(skip_all, level = Level::INFO)] - fn restore_context(&mut self) -> Result> { + fn restore_context(&mut self) -> Result> { if self.context.is_none() || !matches!(self.valid_state, CursorValidState::RequireSeek) { - return Ok(CursorResult::Ok(())); + return Ok(IOResult::Done(())); } let ctx = self.context.take().unwrap(); let seek_key = match ctx { @@ -5028,13 +5018,13 @@ impl BTreeCursor { }; let res = self.seek(seek_key, SeekOp::GE { eq_only: true })?; match res { - CursorResult::Ok(_) => { + IOResult::Done(_) => { self.valid_state = CursorValidState::Valid; - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } - CursorResult::IO => { + IOResult::IO => { self.context = Some(ctx); - Ok(CursorResult::IO) + Ok(IOResult::IO) } } } @@ -5157,14 +5147,14 @@ pub fn integrity_check( state: &mut IntegrityCheckState, errors: &mut Vec, pager: &Rc, -) -> Result> { +) -> Result> { let Some(IntegrityCheckPageEntry { page_idx, level, max_intkey, }) = state.page_stack.last().cloned() else { - return Ok(CursorResult::Ok(())); + return Ok(IOResult::Done(())); }; let page = btree_read_page(pager, page_idx)?; return_if_locked_maybe_load!(pager, page); @@ -5310,7 +5300,7 @@ pub fn integrity_check( contents.num_frag_free_bytes() as usize, ); - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } pub fn btree_read_page(pager: &Rc, page_idx: usize) -> Result { @@ -6822,7 +6812,7 @@ mod tests { assert!( matches!( cursor.seek(seek_key, SeekOp::GE { eq_only: true }).unwrap(), - CursorResult::Ok(SeekResult::Found) + IOResult::Done(SeekResult::Found) ), "key {key} is not found" ); @@ -6911,8 +6901,8 @@ mod tests { .unwrap(); loop { match pager.end_tx(false, false, &conn, false).unwrap() { - crate::PagerCacheflushStatus::Done(_) => break, - crate::PagerCacheflushStatus::IO => { + IOResult::Done(_) => break, + IOResult::IO => { pager.io.run_once().unwrap(); } } @@ -6990,8 +6980,8 @@ mod tests { let index_root_page_result = pager.btree_create(&CreateBTreeFlags::new_index()).unwrap(); let index_root_page = match index_root_page_result { - crate::types::CursorResult::Ok(id) => id as usize, - crate::types::CursorResult::IO => { + crate::types::IOResult::Done(id) => id as usize, + crate::types::IOResult::IO => { panic!("btree_create returned IO in test, unexpected") } }; @@ -7038,8 +7028,8 @@ mod tests { cursor.move_to_root().unwrap(); loop { match pager.end_tx(false, false, &conn, false).unwrap() { - crate::PagerCacheflushStatus::Done(_) => break, - crate::PagerCacheflushStatus::IO => { + IOResult::Done(_) => break, + IOResult::IO => { pager.io.run_once().unwrap(); } } @@ -7341,7 +7331,7 @@ mod tests { // Clear overflow pages let clear_result = cursor.clear_overflow_pages(&leaf_cell)?; match clear_result { - CursorResult::Ok(_) => { + IOResult::Done(_) => { // Verify proper number of pages were added to freelist assert_eq!( header_accessor::get_freelist_pages(&pager)?, @@ -7369,7 +7359,7 @@ mod tests { } } } - CursorResult::IO => { + IOResult::IO => { cursor.pager.io.run_once()?; } } @@ -7399,7 +7389,7 @@ mod tests { // Try to clear non-existent overflow pages let clear_result = cursor.clear_overflow_pages(&leaf_cell)?; match clear_result { - CursorResult::Ok(_) => { + IOResult::Done(_) => { // Verify freelist was not modified assert_eq!( header_accessor::get_freelist_pages(&pager)?, @@ -7414,7 +7404,7 @@ mod tests { "No trunk page should be created when no overflow pages exist" ); } - CursorResult::IO => { + IOResult::IO => { cursor.pager.io.run_once()?; } } @@ -8445,15 +8435,15 @@ mod tests { } fn run_until_done( - mut action: impl FnMut() -> Result>, + mut action: impl FnMut() -> Result>, pager: &Pager, ) -> Result { loop { match action()? { - CursorResult::Ok(res) => { + IOResult::Done(res) => { return Ok(res); } - CursorResult::IO => pager.io.run_once().unwrap(), + IOResult::IO => pager.io.run_once().unwrap(), } } } diff --git a/core/storage/header_accessor.rs b/core/storage/header_accessor.rs index a2aba8013..b29b047d6 100644 --- a/core/storage/header_accessor.rs +++ b/core/storage/header_accessor.rs @@ -5,7 +5,7 @@ use crate::{ pager::{PageRef, Pager}, sqlite3_ondisk::DATABASE_HEADER_PAGE_ID, }, - types::CursorResult, + types::IOResult, LimboError, Result, }; use std::sync::atomic::Ordering; @@ -35,7 +35,7 @@ const HEADER_OFFSET_VERSION_VALID_FOR: usize = 92; const HEADER_OFFSET_VERSION_NUMBER: usize = 96; // Helper to get a read-only reference to the header page. -fn get_header_page(pager: &Pager) -> Result> { +fn get_header_page(pager: &Pager) -> Result> { if pager.db_state.load(Ordering::SeqCst) < 2 { return Err(LimboError::InternalError( "Database is empty, header does not exist - page 1 should've been allocated before this".to_string(), @@ -43,13 +43,13 @@ fn get_header_page(pager: &Pager) -> Result> { } let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?; if page.is_locked() { - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } - Ok(CursorResult::Ok(page)) + Ok(IOResult::Done(page)) } // Helper to get a writable reference to the header page and mark it dirty. -fn get_header_page_for_write(pager: &Pager) -> Result> { +fn get_header_page_for_write(pager: &Pager) -> Result> { if pager.db_state.load(Ordering::SeqCst) < 2 { // This should not be called on an empty DB for writing, as page 1 is allocated on first transaction. return Err(LimboError::InternalError( @@ -58,22 +58,22 @@ fn get_header_page_for_write(pager: &Pager) -> Result> { } let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?; if page.is_locked() { - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } page.set_dirty(); pager.add_dirty(DATABASE_HEADER_PAGE_ID); - Ok(CursorResult::Ok(page)) + Ok(IOResult::Done(page)) } /// Helper function to run async header accessors until completion fn run_header_accessor_until_done(pager: &Pager, mut accessor: F) -> Result where - F: FnMut() -> Result>, + F: FnMut() -> Result>, { loop { match accessor()? { - CursorResult::Ok(value) => return Ok(value), - CursorResult::IO => { + IOResult::Done(value) => return Ok(value), + IOResult::IO => { pager.io.run_once()?; } } @@ -103,13 +103,13 @@ macro_rules! impl_header_field_accessor { paste::paste! { // Async version #[allow(dead_code)] - pub fn [](pager: &Pager) -> Result> { + pub fn [](pager: &Pager) -> Result> { if pager.db_state.load(Ordering::SeqCst) < 2 { return Err(LimboError::InternalError(format!("Database is empty, header does not exist - page 1 should've been allocated before this"))); } let page = match get_header_page(pager)? { - CursorResult::Ok(page) => page, - CursorResult::IO => return Ok(CursorResult::IO), + IOResult::Done(page) => page, + IOResult::IO => return Ok(IOResult::IO), }; let page_inner = page.get(); let page_content = page_inner.contents.as_ref().unwrap(); @@ -120,10 +120,10 @@ macro_rules! impl_header_field_accessor { let value = <$type>::from_be_bytes(bytes); $( if value == 0 { - return Ok(CursorResult::Ok($ifzero)); + return Ok(IOResult::Done($ifzero)); } )? - Ok(CursorResult::Ok(value)) + Ok(IOResult::Done(value)) } // Sync version @@ -134,10 +134,10 @@ macro_rules! impl_header_field_accessor { // Async setter #[allow(dead_code)] - pub fn [](pager: &Pager, value: $type) -> Result> { + pub fn [](pager: &Pager, value: $type) -> Result> { let page = match get_header_page_for_write(pager)? { - CursorResult::Ok(page) => page, - CursorResult::IO => return Ok(CursorResult::IO), + IOResult::Done(page) => page, + IOResult::IO => return Ok(IOResult::IO), }; let page_inner = page.get(); let page_content = page_inner.contents.as_ref().unwrap(); @@ -146,7 +146,7 @@ macro_rules! impl_header_field_accessor { buf_slice[$offset..$offset + std::mem::size_of::<$type>()].copy_from_slice(&value.to_be_bytes()); page.set_dirty(); pager.add_dirty(1); - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } // Sync setter @@ -214,14 +214,14 @@ pub fn set_page_size(pager: &Pager, value: u32) -> Result<()> { } #[allow(dead_code)] -pub fn get_page_size_async(pager: &Pager) -> Result> { +pub fn get_page_size_async(pager: &Pager) -> Result> { match get_page_size_u16_async(pager)? { - CursorResult::Ok(size) => { + IOResult::Done(size) => { if size == 1 { - return Ok(CursorResult::Ok(MAX_PAGE_SIZE)); + return Ok(IOResult::Done(MAX_PAGE_SIZE)); } - Ok(CursorResult::Ok(size as u32)) + Ok(IOResult::Done(size as u32)) } - CursorResult::IO => Ok(CursorResult::IO), + IOResult::IO => Ok(IOResult::IO), } } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 9d9a927d5..3e227a011 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -4,9 +4,9 @@ use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::header_accessor; use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType}; -use crate::storage::wal::{CheckpointResult, Wal, WalFsyncStatus}; -use crate::types::CursorResult; -use crate::Completion; +use crate::storage::wal::{CheckpointResult, Wal}; +use crate::types::IOResult; +use crate::{return_if_io, Completion}; use crate::{Buffer, Connection, LimboError, Result}; use parking_lot::RwLock; use std::cell::{Cell, OnceCell, RefCell, UnsafeCell}; @@ -19,7 +19,7 @@ use tracing::{instrument, trace, Level}; use super::btree::{btree_init_page, BTreePage}; use super::page_cache::{CacheError, CacheResizeResult, DumbLruPageCache, PageCacheKey}; use super::sqlite3_ondisk::{begin_write_btree_page, DATABASE_HEADER_SIZE}; -use super::wal::{CheckpointMode, CheckpointStatus}; +use super::wal::CheckpointMode; #[cfg(not(feature = "omit_autovacuum"))] use {crate::io::Buffer as IoBuffer, ptrmap::*}; @@ -231,14 +231,6 @@ pub struct Pager { #[derive(Debug, Copy, Clone)] /// The status of the current cache flush. -/// A Done state means that the WAL was committed to disk and fsynced, -/// plus potentially checkpointed to the DB (and the DB then fsynced). -pub enum PagerCacheflushStatus { - Done(PagerCacheflushResult), - IO, -} - -#[derive(Debug, Copy, Clone)] pub enum PagerCacheflushResult { /// The WAL was written to disk and fsynced. WalWritten, @@ -312,17 +304,17 @@ impl Pager { /// `target_page_num` (1-indexed) is the page whose entry is sought. /// Returns `Ok(None)` if the page is not supposed to have a ptrmap entry (e.g. header, or a ptrmap page itself). #[cfg(not(feature = "omit_autovacuum"))] - pub fn ptrmap_get(&self, target_page_num: u32) -> Result>> { + pub fn ptrmap_get(&self, target_page_num: u32) -> Result>> { tracing::trace!("ptrmap_get(page_idx = {})", target_page_num); let configured_page_size = match header_accessor::get_page_size_async(self)? { - CursorResult::Ok(size) => size as usize, - CursorResult::IO => return Ok(CursorResult::IO), + IOResult::Done(size) => size as usize, + IOResult::IO => return Ok(IOResult::IO), }; if target_page_num < FIRST_PTRMAP_PAGE_NO || is_ptrmap_page(target_page_num, configured_page_size) { - return Ok(CursorResult::Ok(None)); + return Ok(IOResult::Done(None)); } let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size); @@ -336,10 +328,10 @@ impl Pager { let ptrmap_page = self.read_page(ptrmap_pg_no as usize)?; if ptrmap_page.is_locked() { - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } if !ptrmap_page.is_loaded() { - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } let ptrmap_page_inner = ptrmap_page.get(); @@ -376,7 +368,7 @@ impl Pager { let entry_slice = &ptrmap_page_data_slice [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE]; match PtrmapEntry::deserialize(entry_slice) { - Some(entry) => Ok(CursorResult::Ok(Some(entry))), + Some(entry) => Ok(IOResult::Done(Some(entry))), None => Err(LimboError::Corrupt(format!( "Failed to deserialize ptrmap entry for page {target_page_num} from ptrmap page {ptrmap_pg_no}" ))), @@ -392,7 +384,7 @@ impl Pager { db_page_no_to_update: u32, entry_type: PtrmapType, parent_page_no: u32, - ) -> Result> { + ) -> Result> { tracing::trace!( "ptrmap_put(page_idx = {}, entry_type = {:?}, parent_page_no = {})", db_page_no_to_update, @@ -401,8 +393,8 @@ impl Pager { ); let page_size = match header_accessor::get_page_size_async(self)? { - CursorResult::Ok(size) => size as usize, - CursorResult::IO => return Ok(CursorResult::IO), + IOResult::Done(size) => size as usize, + IOResult::IO => return Ok(IOResult::IO), }; if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO @@ -427,10 +419,10 @@ impl Pager { let ptrmap_page = self.read_page(ptrmap_pg_no as usize)?; if ptrmap_page.is_locked() { - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } if !ptrmap_page.is_loaded() { - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } let ptrmap_page_inner = ptrmap_page.get(); @@ -467,13 +459,13 @@ impl Pager { ptrmap_page.set_dirty(); self.add_dirty(ptrmap_pg_no as usize); - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } /// This method is used to allocate a new root page for a btree, both for tables and indexes /// FIXME: handle no room in page cache #[instrument(skip_all, level = Level::INFO)] - pub fn btree_create(&self, flags: &CreateBTreeFlags) -> Result> { + pub fn btree_create(&self, flags: &CreateBTreeFlags) -> Result> { let page_type = match flags { _ if flags.is_table() => PageType::TableLeaf, _ if flags.is_index() => PageType::IndexLeaf, @@ -483,7 +475,7 @@ impl Pager { { let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?; let page_id = page.get().get().id; - Ok(CursorResult::Ok(page_id as u32)) + Ok(IOResult::Done(page_id as u32)) } // If autovacuum is enabled, we need to allocate a new page number that is greater than the largest root page number @@ -494,21 +486,21 @@ impl Pager { AutoVacuumMode::None => { let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?; let page_id = page.get().get().id; - Ok(CursorResult::Ok(page_id as u32)) + Ok(IOResult::Done(page_id as u32)) } AutoVacuumMode::Full => { let mut root_page_num = match header_accessor::get_vacuum_mode_largest_root_page_async(self)? { - CursorResult::Ok(value) => value, - CursorResult::IO => return Ok(CursorResult::IO), + IOResult::Done(value) => value, + IOResult::IO => return Ok(IOResult::IO), }; assert!(root_page_num > 0); // Largest root page number cannot be 0 because that is set to 1 when creating the database with autovacuum enabled root_page_num += 1; assert!(root_page_num >= FIRST_PTRMAP_PAGE_NO); // can never be less than 2 because we have already incremented let page_size = match header_accessor::get_page_size_async(self)? { - CursorResult::Ok(size) => size as usize, - CursorResult::IO => return Ok(CursorResult::IO), + IOResult::Done(size) => size as usize, + IOResult::IO => return Ok(IOResult::IO), }; while is_ptrmap_page(root_page_num, page_size) { @@ -531,8 +523,8 @@ impl Pager { // For now map allocated_page_id since we are not swapping it with root_page_num match self.ptrmap_put(allocated_page_id, PtrmapType::RootPage, 0)? { - CursorResult::Ok(_) => Ok(CursorResult::Ok(allocated_page_id)), - CursorResult::IO => Ok(CursorResult::IO), + IOResult::Done(_) => Ok(IOResult::Done(allocated_page_id)), + IOResult::IO => Ok(IOResult::IO), } } AutoVacuumMode::Incremental => { @@ -604,17 +596,17 @@ impl Pager { #[inline(always)] #[instrument(skip_all, level = Level::INFO)] - pub fn begin_read_tx(&self) -> Result> { + pub fn begin_read_tx(&self) -> Result> { // We allocate the first page lazily in the first transaction match self.maybe_allocate_page1()? { - CursorResult::Ok(_) => {} - CursorResult::IO => return Ok(CursorResult::IO), + IOResult::Done(_) => {} + IOResult::IO => return Ok(IOResult::IO), } - Ok(CursorResult::Ok(self.wal.borrow_mut().begin_read_tx()?)) + Ok(IOResult::Done(self.wal.borrow_mut().begin_read_tx()?)) } #[instrument(skip_all, level = Level::INFO)] - fn maybe_allocate_page1(&self) -> Result> { + fn maybe_allocate_page1(&self) -> Result> { if self.db_state.load(Ordering::SeqCst) < DB_STATE_INITIALIZED { if let Ok(_lock) = self.init_lock.try_lock() { match ( @@ -623,29 +615,29 @@ impl Pager { ) { // In case of being empty or (allocating and this connection is performing allocation) then allocate the first page (0, false) | (1, true) => match self.allocate_page1()? { - CursorResult::Ok(_) => Ok(CursorResult::Ok(())), - CursorResult::IO => Ok(CursorResult::IO), + IOResult::Done(_) => Ok(IOResult::Done(())), + IOResult::IO => Ok(IOResult::IO), }, - _ => Ok(CursorResult::IO), + _ => Ok(IOResult::IO), } } else { - Ok(CursorResult::IO) + Ok(IOResult::IO) } } else { - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } } #[inline(always)] #[instrument(skip_all, level = Level::INFO)] - pub fn begin_write_tx(&self) -> Result> { + pub fn begin_write_tx(&self) -> Result> { // TODO(Diego): The only possibly allocate page1 here is because OpenEphemeral needs a write transaction // we should have a unique API to begin transactions, something like sqlite3BtreeBeginTrans match self.maybe_allocate_page1()? { - CursorResult::Ok(_) => {} - CursorResult::IO => return Ok(CursorResult::IO), + IOResult::Done(_) => {} + IOResult::IO => return Ok(IOResult::IO), } - Ok(CursorResult::Ok(self.wal.borrow_mut().begin_write_tx()?)) + Ok(IOResult::Done(self.wal.borrow_mut().begin_write_tx()?)) } #[instrument(skip_all, level = Level::INFO)] @@ -655,17 +647,17 @@ impl Pager { schema_did_change: bool, connection: &Connection, wal_checkpoint_disabled: bool, - ) -> Result { + ) -> Result> { tracing::trace!("end_tx(rollback={})", rollback); if rollback { self.wal.borrow().end_write_tx()?; self.wal.borrow().end_read_tx()?; - return Ok(PagerCacheflushStatus::Done(PagerCacheflushResult::Rollback)); + return Ok(IOResult::Done(PagerCacheflushResult::Rollback)); } let cacheflush_status = self.cacheflush(wal_checkpoint_disabled)?; match cacheflush_status { - PagerCacheflushStatus::IO => Ok(PagerCacheflushStatus::IO), - PagerCacheflushStatus::Done(_) => { + IOResult::IO => Ok(IOResult::IO), + IOResult::Done(_) => { let maybe_schema_pair = if schema_did_change { let schema = connection.schema.borrow().clone(); // Lock first before writing to the database schema in case someone tries to read the schema before it's updated @@ -777,7 +769,10 @@ impl Pager { /// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to /// the database file and then fsync the database file. #[instrument(skip_all, level = Level::INFO)] - pub fn cacheflush(&self, wal_checkpoint_disabled: bool) -> Result { + pub fn cacheflush( + &self, + wal_checkpoint_disabled: bool, + ) -> Result> { let mut checkpoint_result = CheckpointResult::default(); let res = loop { let state = self.flush_info.borrow().state; @@ -807,20 +802,18 @@ impl Pager { } self.dirty_pages.borrow_mut().clear(); self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames; - return Ok(PagerCacheflushStatus::IO); + return Ok(IOResult::IO); } FlushState::WaitAppendFrames => { let in_flight = *self.flush_info.borrow().in_flight_writes.borrow(); if in_flight == 0 { self.flush_info.borrow_mut().state = FlushState::SyncWal; } else { - return Ok(PagerCacheflushStatus::IO); + return Ok(IOResult::IO); } } FlushState::SyncWal => { - if WalFsyncStatus::IO == self.wal.borrow_mut().sync()? { - return Ok(PagerCacheflushStatus::IO); - } + return_if_io!(self.wal.borrow_mut().sync()); if wal_checkpoint_disabled || !self.wal.borrow().should_checkpoint() { self.flush_info.borrow_mut().state = FlushState::Start; @@ -830,11 +823,11 @@ impl Pager { } FlushState::Checkpoint => { match self.checkpoint()? { - CheckpointStatus::Done(res) => { + IOResult::Done(res) => { checkpoint_result = res; self.flush_info.borrow_mut().state = FlushState::SyncDbFile; } - CheckpointStatus::IO => return Ok(PagerCacheflushStatus::IO), + IOResult::IO => return Ok(IOResult::IO), }; } FlushState::SyncDbFile => { @@ -843,7 +836,7 @@ impl Pager { } FlushState::WaitSyncDbFile => { if *self.syncing.borrow() { - return Ok(PagerCacheflushStatus::IO); + return Ok(IOResult::IO); } else { self.flush_info.borrow_mut().state = FlushState::Start; break PagerCacheflushResult::Checkpointed(checkpoint_result); @@ -853,7 +846,7 @@ impl Pager { }; // We should only signal that we finished appenind frames after wal sync to avoid inconsistencies when sync fails self.wal.borrow_mut().finish_append_frames_commit()?; - Ok(PagerCacheflushStatus::Done(res)) + Ok(IOResult::Done(res)) } #[instrument(skip_all, level = Level::INFO)] @@ -873,7 +866,7 @@ impl Pager { } #[instrument(skip_all, level = Level::INFO, target = "pager_checkpoint",)] - pub fn checkpoint(&self) -> Result { + pub fn checkpoint(&self) -> Result> { let mut checkpoint_result = CheckpointResult::default(); loop { let state = *self.checkpoint_state.borrow(); @@ -886,8 +879,8 @@ impl Pager { in_flight, CheckpointMode::Passive, )? { - CheckpointStatus::IO => return Ok(CheckpointStatus::IO), - CheckpointStatus::Done(res) => { + IOResult::IO => return Ok(IOResult::IO), + IOResult::Done(res) => { checkpoint_result = res; self.checkpoint_state.replace(CheckpointState::SyncDbFile); } @@ -900,7 +893,7 @@ impl Pager { } CheckpointState::WaitSyncDbFile => { if *self.syncing.borrow() { - return Ok(CheckpointStatus::IO); + return Ok(IOResult::IO); } else { self.checkpoint_state .replace(CheckpointState::CheckpointDone); @@ -908,10 +901,10 @@ impl Pager { } CheckpointState::CheckpointDone => { return if *self.checkpoint_inflight.borrow() > 0 { - Ok(CheckpointStatus::IO) + Ok(IOResult::IO) } else { self.checkpoint_state.replace(CheckpointState::Checkpoint); - Ok(CheckpointStatus::Done(checkpoint_result)) + Ok(IOResult::Done(checkpoint_result)) }; } } @@ -935,7 +928,7 @@ impl Pager { { let mut wal = self.wal.borrow_mut(); // fsync the wal syncronously before beginning checkpoint - while let Ok(WalFsyncStatus::IO) = wal.sync() { + while let Ok(IOResult::IO) = wal.sync() { if attempts >= 10 { return Err(LimboError::InternalError( "Failed to fsync WAL before final checkpoint, fd likely closed".into(), @@ -964,10 +957,10 @@ impl Pager { Rc::new(RefCell::new(0)), CheckpointMode::Passive, ) { - Ok(CheckpointStatus::IO) => { + Ok(IOResult::IO) => { self.io.run_once()?; } - Ok(CheckpointStatus::Done(res)) => { + Ok(IOResult::Done(res)) => { checkpoint_result = res; break; } @@ -1056,7 +1049,7 @@ impl Pager { } #[instrument(skip_all, level = Level::INFO)] - pub fn allocate_page1(&self) -> Result> { + pub fn allocate_page1(&self) -> Result> { let state = self.allocate_page1_state.borrow().clone(); match state { AllocatePage1State::Start => { @@ -1093,7 +1086,7 @@ impl Pager { write_counter, page: page1, }); - Ok(CursorResult::IO) + Ok(IOResult::IO) } AllocatePage1State::Writing { write_counter, @@ -1101,7 +1094,7 @@ impl Pager { } => { tracing::trace!("allocate_page1(Writing)"); if *write_counter.borrow() > 0 { - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } tracing::trace!("allocate_page1(Writing done)"); let page1_ref = page.get(); @@ -1112,7 +1105,7 @@ impl Pager { })?; self.db_state.store(DB_STATE_INITIALIZED, Ordering::SeqCst); self.allocate_page1_state.replace(AllocatePage1State::Done); - Ok(CursorResult::Ok(page1_ref.clone())) + Ok(IOResult::Done(page1_ref.clone())) } AllocatePage1State::Done => unreachable!("cannot try to allocate page 1 again"), } @@ -1500,15 +1493,15 @@ mod ptrmap_tests { use crate::storage::wal::{WalFile, WalFileShared}; pub fn run_until_done( - mut action: impl FnMut() -> Result>, + mut action: impl FnMut() -> Result>, pager: &Pager, ) -> Result { loop { match action()? { - CursorResult::Ok(res) => { + IOResult::Done(res) => { return Ok(res); } - CursorResult::IO => pager.io.run_once().unwrap(), + IOResult::IO => pager.io.run_once().unwrap(), } } } @@ -1554,9 +1547,9 @@ mod ptrmap_tests { // Allocate all the pages as btree root pages for _ in 0..initial_db_pages { match pager.btree_create(&CreateBTreeFlags::new_table()) { - Ok(CursorResult::Ok(_root_page_id)) => (), - Ok(CursorResult::IO) => { - panic!("test_pager_setup: btree_create returned CursorResult::IO unexpectedly"); + Ok(IOResult::Done(_root_page_id)) => (), + Ok(IOResult::IO) => { + panic!("test_pager_setup: btree_create returned IOResult::IO unexpectedly"); } Err(e) => { panic!("test_pager_setup: btree_create failed: {e:?}"); @@ -1591,8 +1584,8 @@ mod ptrmap_tests { // Read the entry from the ptrmap page and verify it let entry = pager.ptrmap_get(db_page_to_update).unwrap(); - assert!(matches!(entry, CursorResult::Ok(Some(_)))); - let CursorResult::Ok(Some(entry)) = entry else { + assert!(matches!(entry, IOResult::Done(Some(_)))); + let IOResult::Done(Some(entry)) = entry else { panic!("entry is not Some"); }; assert_eq!(entry.entry_type, PtrmapType::RootPage); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 45114fed4..f9929dc8e 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -23,6 +23,7 @@ use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_write_wal_frame, finish_read_page, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; +use crate::types::IOResult; use crate::{turso_assert, Buffer, LimboError, Result}; use crate::{Completion, Page}; @@ -244,8 +245,8 @@ pub trait Wal { pager: &Pager, write_counter: Rc>, mode: CheckpointMode, - ) -> Result; - fn sync(&mut self) -> Result; + ) -> Result>; + fn sync(&mut self) -> Result>; fn get_max_frame_in_wal(&self) -> u64; fn get_max_frame(&self) -> u64; fn get_min_frame(&self) -> u64; @@ -316,14 +317,12 @@ impl Wal for DummyWAL { _pager: &Pager, _write_counter: Rc>, _mode: crate::CheckpointMode, - ) -> Result { - Ok(crate::CheckpointStatus::Done( - crate::CheckpointResult::default(), - )) + ) -> Result> { + Ok(IOResult::Done(CheckpointResult::default())) } - fn sync(&mut self) -> Result { - Ok(crate::storage::wal::WalFsyncStatus::Done) + fn sync(&mut self) -> Result> { + Ok(IOResult::Done(())) } fn get_max_frame_in_wal(&self) -> u64 { @@ -366,18 +365,6 @@ pub enum CheckpointState { Done, } -#[derive(Debug, Copy, Clone, PartialEq)] -pub enum WalFsyncStatus { - Done, - IO, -} - -#[derive(Debug, Copy, Clone)] -pub enum CheckpointStatus { - Done(CheckpointResult), - IO, -} - // Checkpointing is a state machine that has multiple steps. Since there are multiple steps we save // in flight information of the checkpoint in OngoingCheckpoint. page is just a helper Page to do // page operations like reading a frame to a page, and writing a page to disk. This page should not @@ -726,7 +713,7 @@ impl Wal for WalFile { pager: &Pager, write_counter: Rc>, mode: CheckpointMode, - ) -> Result { + ) -> Result> { assert!( matches!(mode, CheckpointMode::Passive), "only passive mode supported for now" @@ -812,7 +799,7 @@ impl Wal for WalFile { } CheckpointState::WaitReadFrame => { if self.ongoing_checkpoint.page.is_locked() { - return Ok(CheckpointStatus::IO); + return Ok(IOResult::IO); } else { self.ongoing_checkpoint.state = CheckpointState::WritePage; } @@ -828,7 +815,7 @@ impl Wal for WalFile { } CheckpointState::WaitWritePage => { if *write_counter.borrow() > 0 { - return Ok(CheckpointStatus::IO); + return Ok(IOResult::IO); } // If page was in cache clear it. if let Some(page) = pager.cache_get(self.ongoing_checkpoint.page.get().id) { @@ -847,7 +834,7 @@ impl Wal for WalFile { } CheckpointState::Done => { if *write_counter.borrow() > 0 { - return Ok(CheckpointStatus::IO); + return Ok(IOResult::IO); } let shared = self.get_shared(); shared.checkpoint_lock.unlock(); @@ -883,14 +870,14 @@ impl Wal for WalFile { .store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst); } self.ongoing_checkpoint.state = CheckpointState::Start; - return Ok(CheckpointStatus::Done(checkpoint_result)); + return Ok(IOResult::Done(checkpoint_result)); } } } } #[instrument(err, skip_all, level = Level::INFO)] - fn sync(&mut self) -> Result { + fn sync(&mut self) -> Result> { match self.sync_state.get() { SyncState::NotSyncing => { tracing::debug!("wal_sync"); @@ -905,15 +892,15 @@ impl Wal for WalFile { let shared = self.get_shared(); shared.file.sync(completion)?; self.sync_state.set(SyncState::Syncing); - Ok(WalFsyncStatus::IO) + Ok(IOResult::IO) } SyncState::Syncing => { if self.syncing.get() { tracing::debug!("wal_sync is already syncing"); - Ok(WalFsyncStatus::IO) + Ok(IOResult::IO) } else { self.sync_state.set(SyncState::NotSyncing); - Ok(WalFsyncStatus::Done) + Ok(IOResult::Done(())) } } } diff --git a/core/types.rs b/core/types.rs index f03d27a8a..fd5d76721 100644 --- a/core/types.rs +++ b/core/types.rs @@ -1724,7 +1724,7 @@ fn compare_records_int( /// This function is an optimized version of `compare_records_generic()` for the /// common case where: /// - (a) The first field of the unpacked record is a string -/// - (b) The serialized record's first field is also a string +/// - (b) The serialized record's first field is also a string /// - (c) The header size varint fits in a single byte (most records) /// /// This optimization avoids the overhead of generic field parsing by directly @@ -1754,7 +1754,7 @@ fn compare_records_int( /// The function follows SQLite's string comparison semantics: /// /// 1. **Type checking**: Ensures both sides are strings, otherwise falls back -/// 2. **String comparison**: Uses collation if provided, binary otherwise +/// 2. **String comparison**: Uses collation if provided, binary otherwise /// 3. **Sort order**: Applies ascending/descending order to comparison result /// 4. **Length comparison**: If strings are equal, compares lengths /// 5. **Remaining fields**: If first field is equal and more fields exist, @@ -1886,7 +1886,7 @@ fn compare_records_string( /// # Arguments /// /// * `serialized` - The left-hand side record in serialized format -/// * `unpacked` - The right-hand side record as an array of parsed values +/// * `unpacked` - The right-hand side record as an array of parsed values /// * `index_info` - Contains sort order information for each field /// * `collations` - Array of collation sequences for string comparisons /// * `skip` - Number of initial fields to skip (assumes caller verified equality) @@ -2308,11 +2308,22 @@ impl Cursor { } #[derive(Debug)] -pub enum CursorResult { - Ok(T), +pub enum IOResult { + Done(T), IO, } +/// Evaluate a Result>, if IO return IO. +#[macro_export] +macro_rules! return_if_io { + ($expr:expr) => { + match $expr? { + IOResult::Done(v) => v, + IOResult::IO => return Ok(IOResult::IO), + } + }; +} + #[derive(Debug)] pub enum SeekResult { /// Record matching the [SeekOp] found in the B-tree and cursor was positioned to point onto that record diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index b72353a98..c1b1a2af5 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -42,9 +42,7 @@ use crate::{ use crate::{ storage::wal::CheckpointResult, - types::{ - AggContext, Cursor, CursorResult, ExternalAggState, SeekKey, SeekOp, Value, ValueType, - }, + types::{AggContext, Cursor, ExternalAggState, IOResult, SeekKey, SeekOp, Value, ValueType}, util::{ cast_real_to_integer, cast_text_to_integer, cast_text_to_numeric, cast_text_to_real, checked_cast_text_to_numeric, parse_schema_rows, RoundToPrecision, @@ -95,8 +93,8 @@ use crate::{ macro_rules! return_if_io { ($expr:expr) => { match $expr? { - CursorResult::Ok(v) => v, - CursorResult::IO => return Ok(InsnFunctionStepResult::IO), + IOResult::Done(v) => v, + IOResult::IO => return Ok(InsnFunctionStepResult::IO), } }; } @@ -1293,7 +1291,7 @@ pub fn op_last( /// - **Single-byte case**: Values 0-127 (0x00-0x7F) are returned immediately /// - **Two-byte case**: Values 128-16383 (0x80-0x3FFF) are handled inline /// - **Multi-byte case**: Larger values fall back to the full `read_varint()` implementation -/// +/// /// This function is similar to `sqlite3GetVarint32` #[inline(always)] fn read_varint_fast(buf: &[u8]) -> Result<(u64, usize)> { @@ -1396,10 +1394,10 @@ pub fn op_column( let mut index_cursor = state.get_cursor(index_cursor_id); let index_cursor = index_cursor.as_btree_mut(); match index_cursor.rowid()? { - CursorResult::IO => { + IOResult::IO => { break 'd Some((index_cursor_id, table_cursor_id)); } - CursorResult::Ok(rowid) => rowid, + IOResult::Done(rowid) => rowid, } }; let mut table_cursor = state.get_cursor(table_cursor_id); @@ -1408,8 +1406,8 @@ pub fn op_column( SeekKey::TableRowId(rowid.unwrap()), SeekOp::GE { eq_only: true }, )? { - CursorResult::Ok(_) => None, - CursorResult::IO => Some((index_cursor_id, table_cursor_id)), + IOResult::Done(_) => None, + IOResult::IO => Some((index_cursor_id, table_cursor_id)), } }; if let Some(deferred_seek) = deferred_seek { @@ -2245,10 +2243,10 @@ pub fn op_row_id( let mut index_cursor = state.get_cursor(index_cursor_id); let index_cursor = index_cursor.as_btree_mut(); let record = match index_cursor.record()? { - CursorResult::IO => { + IOResult::IO => { break 'd Some((index_cursor_id, table_cursor_id)); } - CursorResult::Ok(record) => record, + IOResult::Done(record) => record, }; let record = record.as_ref().unwrap(); let mut record_cursor_ref = index_cursor.record_cursor.borrow_mut(); @@ -2262,8 +2260,8 @@ pub fn op_row_id( let mut table_cursor = state.get_cursor(table_cursor_id); let table_cursor = table_cursor.as_btree_mut(); match table_cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true })? { - CursorResult::Ok(_) => None, - CursorResult::IO => Some((index_cursor_id, table_cursor_id)), + IOResult::Done(_) => None, + IOResult::IO => Some((index_cursor_id, table_cursor_id)), } }; if let Some(deferred_seek) = deferred_seek { @@ -2668,8 +2666,8 @@ pub fn seek_internal( } }; match cursor.seek(seek_key, *op)? { - CursorResult::Ok(seek_result) => seek_result, - CursorResult::IO => return Ok(SeekInternalResult::IO), + IOResult::Done(seek_result) => seek_result, + IOResult::IO => return Ok(SeekInternalResult::IO), } }; let found = match seek_result { @@ -2714,8 +2712,8 @@ pub fn seek_internal( SeekOp::LT { .. } | SeekOp::LE { .. } => cursor.prev()?, }; match result { - CursorResult::Ok(found) => found, - CursorResult::IO => return Ok(SeekInternalResult::IO), + IOResult::Done(found) => found, + IOResult::IO => return Ok(SeekInternalResult::IO), } }; return Ok(if found { @@ -2728,8 +2726,8 @@ pub fn seek_internal( let mut cursor = state.get_cursor(cursor_id); let cursor = cursor.as_btree_mut(); match cursor.last()? { - CursorResult::Ok(()) => {} - CursorResult::IO => return Ok(SeekInternalResult::IO), + IOResult::Done(()) => {} + IOResult::IO => return Ok(SeekInternalResult::IO), } // the MoveLast variant is only used for SeekOp::LT and SeekOp::LE when the seek condition is always true, // so we have always found what we were looking for. @@ -2770,7 +2768,7 @@ pub fn seek_internal( /// - `IdxLE`: "less than or equal" - equality should be treated as "less" /// - `IdxGT`: "greater than" - equality should be treated as "less" (so condition fails) /// -/// - **`IdxGE` and `IdxLT`**: Return `Ordering::Equal` (equivalent to `default_rc = 0`) +/// - **`IdxGE` and `IdxLT`**: Return `Ordering::Equal` (equivalent to `default_rc = 0`) /// - When keys are equal, these operations should treat it as true equality /// - `IdxGE`: "greater than or equal" - equality should be treated as "equal" /// - `IdxLT`: "less than" - equality should be treated as "equal" (so condition fails) @@ -5700,7 +5698,7 @@ pub fn op_destroy( // TODO not sure if should be BTreeCursor::new_table or BTreeCursor::new_index here or neither and just pass an emtpy vec let mut cursor = BTreeCursor::new(None, pager.clone(), *root, Vec::new(), 0); let former_root_page_result = cursor.btree_destroy()?; - if let CursorResult::Ok(former_root_page) = former_root_page_result { + if let IOResult::Done(former_root_page) = former_root_page_result { state.registers[*former_root_reg] = Register::Value(Value::Integer(former_root_page.unwrap_or(0) as i64)); } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 415da0b39..c6f98e002 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -27,13 +27,10 @@ pub mod sorter; use crate::{ error::LimboError, function::{AggFunc, FuncCtx}, - storage::{pager::PagerCacheflushStatus, sqlite3_ondisk::SmallVec}, + storage::sqlite3_ondisk::SmallVec, translate::plan::TableReferences, - types::{RawSlice, TextRef}, - vdbe::execute::OpIdxInsertState, - vdbe::execute::OpInsertState, - vdbe::execute::OpNewRowidState, - vdbe::execute::OpSeekState, + types::{IOResult, RawSlice, TextRef}, + vdbe::execute::{OpIdxInsertState, OpInsertState, OpNewRowidState, OpSeekState}, RefValue, }; @@ -159,13 +156,13 @@ pub enum StepResult { } /// If there is I/O, the instruction is restarted. -/// Evaluate a Result>, if IO return Ok(StepResult::IO). +/// Evaluate a Result>, if IO return Ok(StepResult::IO). #[macro_export] -macro_rules! return_if_io { +macro_rules! return_step_if_io { ($expr:expr) => { match $expr? { - CursorResult::Ok(v) => v, - CursorResult::IO => return Ok(StepResult::IO), + IOResult::Ok(v) => v, + IOResult::IO => return Ok(StepResult::IO), } }; } @@ -509,7 +506,7 @@ impl Program { connection.wal_checkpoint_disabled.get(), )?; match cacheflush_status { - PagerCacheflushStatus::Done(status) => { + IOResult::Done(status) => { if self.change_cnt_on { self.connection.set_changes(self.n_change.get()); } @@ -522,7 +519,7 @@ impl Program { connection.transaction_state.replace(TransactionState::None); *commit_state = CommitState::Ready; } - PagerCacheflushStatus::IO => { + IOResult::IO => { tracing::trace!("Cacheflush IO"); *commit_state = CommitState::Committing; return Ok(StepResult::IO); diff --git a/tests/integration/common.rs b/tests/integration/common.rs index 958a0399a..e0c3feea3 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -6,7 +6,8 @@ use tempfile::TempDir; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; -use turso_core::{Connection, Database, PagerCacheflushStatus, IO}; +use turso_core::types::IOResult; +use turso_core::{Connection, Database, IO}; #[allow(dead_code)] pub struct TempDatabase { @@ -115,10 +116,10 @@ impl TempDatabase { pub(crate) fn do_flush(conn: &Arc, tmp_db: &TempDatabase) -> anyhow::Result<()> { loop { match conn.cacheflush()? { - PagerCacheflushStatus::Done(_) => { + IOResult::Done(_) => { break; } - PagerCacheflushStatus::IO => { + IOResult::IO => { tmp_db.io.run_once()?; } }