diff --git a/core/schema.rs b/core/schema.rs index d1b7d511a..3085d4bb3 100644 --- a/core/schema.rs +++ b/core/schema.rs @@ -2,8 +2,7 @@ use crate::result::LimboResult; use crate::storage::btree::BTreeCursor; use crate::translate::collate::CollationSeq; use crate::translate::plan::SelectPlan; -use crate::types::IOResult; -use crate::util::{module_args_from_sql, module_name_from_sql, UnparsedFromSqlIndex}; +use crate::util::{module_args_from_sql, module_name_from_sql, IOExt, UnparsedFromSqlIndex}; use crate::{util::normalize_ident, Result}; use crate::{LimboError, MvCursor, Pager, RefValue, SymbolTable, VirtualTable}; use core::fmt; @@ -157,29 +156,14 @@ impl Schema { let mut automatic_indices: HashMap> = HashMap::with_capacity(10); - loop { - match pager.begin_read_tx()? { - IOResult::Done(v) => { - if matches!(v, LimboResult::Busy) { - return Err(LimboError::Busy); - } - break; - } - IOResult::IO => pager.io.run_once()?, - } + if matches!(pager.io.block(|| pager.begin_read_tx())?, LimboResult::Busy) { + return Err(LimboError::Busy); } - while let IOResult::IO = cursor.rewind()? { - pager.io.run_once()? - } + pager.io.block(|| cursor.rewind())?; loop { - let Some(row) = (loop { - match cursor.record()? { - IOResult::Done(v) => break v, - IOResult::IO => pager.io.run_once()?, - } - }) else { + let Some(row) = pager.io.block(|| cursor.record())? else { break; }; @@ -287,9 +271,7 @@ impl Schema { drop(record_cursor); drop(row); - while let IOResult::IO = cursor.next()? { - pager.io.run_once()?; - } + pager.io.block(|| cursor.next())?; } pager.end_read_tx()?; diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 1e83592a2..395232384 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -6570,6 +6570,7 @@ mod tests { schema::IndexColumn, storage::{database::DatabaseFile, page_cache::DumbLruPageCache}, types::Text, + util::IOExt as _, vdbe::Register, BufferPool, Completion, Connection, StepResult, WalFile, WalFileShared, }; @@ -8640,18 +8641,8 @@ mod tests { ); } - fn run_until_done( - mut action: impl FnMut() -> Result>, - pager: &Pager, - ) -> Result { - loop { - match action()? { - IOResult::Done(res) => { - return Ok(res); - } - IOResult::IO => pager.io.run_once().unwrap(), - } - } + fn run_until_done(action: impl FnMut() -> Result>, pager: &Pager) -> Result { + pager.io.block(action) } #[test] diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 1cabb79ce..a4c6265f8 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -6,6 +6,7 @@ use crate::storage::header_accessor; use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType}; use crate::storage::wal::{CheckpointResult, Wal}; use crate::types::IOResult; +use crate::util::IOExt as _; use crate::{return_if_io, Completion}; use crate::{turso_assert, Buffer, Connection, LimboError, Result}; use parking_lot::RwLock; @@ -1059,23 +1060,14 @@ impl Pager { num_checkpointed_frames: 0, }); } - let checkpoint_result: CheckpointResult; - loop { - match self.wal.borrow_mut().checkpoint( - self, - Rc::new(RefCell::new(0)), - CheckpointMode::Passive, - ) { - Ok(IOResult::IO) => { - self.io.run_once()?; - } - Ok(IOResult::Done(res)) => { - checkpoint_result = res; - break; - } - Err(err) => panic!("error while clearing cache {err}"), - } - } + + let checkpoint_result = self.io.block(|| { + self.wal + .borrow_mut() + .checkpoint(self, Rc::new(RefCell::new(0)), CheckpointMode::Passive) + .map_err(|err| panic!("error while clearing cache {err}")) + })?; + // TODO: only clear cache of things that are really invalidated self.page_cache .write() diff --git a/core/util.rs b/core/util.rs index 341959ff0..4e485d7d6 100644 --- a/core/util.rs +++ b/core/util.rs @@ -1,4 +1,6 @@ use crate::translate::expr::WalkControl; +use crate::types::IOResult; +use crate::IO; use crate::{ schema::{self, Column, Schema, Type}, translate::{collate::CollationSeq, expr::walk_expr, plan::JoinOrderMember}, @@ -11,6 +13,21 @@ use turso_sqlite3_parser::ast::{ self, CreateTableBody, Expr, FunctionTail, Literal, UnaryOperator, }; +pub trait IOExt { + fn block(&self, f: impl FnMut() -> Result>) -> Result; +} + +impl IOExt for I { + fn block(&self, mut f: impl FnMut() -> Result>) -> Result { + Ok(loop { + match f()? { + IOResult::Done(v) => break v, + IOResult::IO => self.run_once()?, + } + }) + } +} + pub trait RoundToPrecision { fn round_to_precision(self, precision: i32) -> f64; }