mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-09 18:24:20 +01:00
Merge 'Simplify blocking operations – add io.block(fn) for IO trait implementors' from Levy A.
This PR make it simpler to block async operations instead of creating sync equivalent functions. eg.: `get_page_size` and `get_page_size_async`. All functions should be async by default, and making them block should more explicit. Also makes it easier to grep for top-level uses of blocking code without needing to "go to definition". Closes #2173
This commit is contained in:
@@ -2,8 +2,7 @@ use crate::result::LimboResult;
|
||||
use crate::storage::btree::BTreeCursor;
|
||||
use crate::translate::collate::CollationSeq;
|
||||
use crate::translate::plan::SelectPlan;
|
||||
use crate::types::IOResult;
|
||||
use crate::util::{module_args_from_sql, module_name_from_sql, UnparsedFromSqlIndex};
|
||||
use crate::util::{module_args_from_sql, module_name_from_sql, IOExt, UnparsedFromSqlIndex};
|
||||
use crate::{util::normalize_ident, Result};
|
||||
use crate::{LimboError, MvCursor, Pager, RefValue, SymbolTable, VirtualTable};
|
||||
use core::fmt;
|
||||
@@ -157,29 +156,14 @@ impl Schema {
|
||||
let mut automatic_indices: HashMap<String, Vec<(String, usize)>> =
|
||||
HashMap::with_capacity(10);
|
||||
|
||||
loop {
|
||||
match pager.begin_read_tx()? {
|
||||
IOResult::Done(v) => {
|
||||
if matches!(v, LimboResult::Busy) {
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
break;
|
||||
}
|
||||
IOResult::IO => pager.io.run_once()?,
|
||||
}
|
||||
if matches!(pager.io.block(|| pager.begin_read_tx())?, LimboResult::Busy) {
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
|
||||
while let IOResult::IO = cursor.rewind()? {
|
||||
pager.io.run_once()?
|
||||
}
|
||||
pager.io.block(|| cursor.rewind())?;
|
||||
|
||||
loop {
|
||||
let Some(row) = (loop {
|
||||
match cursor.record()? {
|
||||
IOResult::Done(v) => break v,
|
||||
IOResult::IO => pager.io.run_once()?,
|
||||
}
|
||||
}) else {
|
||||
let Some(row) = pager.io.block(|| cursor.record())? else {
|
||||
break;
|
||||
};
|
||||
|
||||
@@ -287,9 +271,7 @@ impl Schema {
|
||||
drop(record_cursor);
|
||||
drop(row);
|
||||
|
||||
while let IOResult::IO = cursor.next()? {
|
||||
pager.io.run_once()?;
|
||||
}
|
||||
pager.io.block(|| cursor.next())?;
|
||||
}
|
||||
|
||||
pager.end_read_tx()?;
|
||||
|
||||
@@ -6570,6 +6570,7 @@ mod tests {
|
||||
schema::IndexColumn,
|
||||
storage::{database::DatabaseFile, page_cache::DumbLruPageCache},
|
||||
types::Text,
|
||||
util::IOExt as _,
|
||||
vdbe::Register,
|
||||
BufferPool, Completion, Connection, StepResult, WalFile, WalFileShared,
|
||||
};
|
||||
@@ -8640,18 +8641,8 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
fn run_until_done<T>(
|
||||
mut action: impl FnMut() -> Result<IOResult<T>>,
|
||||
pager: &Pager,
|
||||
) -> Result<T> {
|
||||
loop {
|
||||
match action()? {
|
||||
IOResult::Done(res) => {
|
||||
return Ok(res);
|
||||
}
|
||||
IOResult::IO => pager.io.run_once().unwrap(),
|
||||
}
|
||||
}
|
||||
fn run_until_done<T>(action: impl FnMut() -> Result<IOResult<T>>, pager: &Pager) -> Result<T> {
|
||||
pager.io.block(action)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::storage::header_accessor;
|
||||
use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType};
|
||||
use crate::storage::wal::{CheckpointResult, Wal};
|
||||
use crate::types::IOResult;
|
||||
use crate::util::IOExt as _;
|
||||
use crate::{return_if_io, Completion};
|
||||
use crate::{turso_assert, Buffer, Connection, LimboError, Result};
|
||||
use parking_lot::RwLock;
|
||||
@@ -1059,23 +1060,14 @@ impl Pager {
|
||||
num_checkpointed_frames: 0,
|
||||
});
|
||||
}
|
||||
let checkpoint_result: CheckpointResult;
|
||||
loop {
|
||||
match self.wal.borrow_mut().checkpoint(
|
||||
self,
|
||||
Rc::new(RefCell::new(0)),
|
||||
CheckpointMode::Passive,
|
||||
) {
|
||||
Ok(IOResult::IO) => {
|
||||
self.io.run_once()?;
|
||||
}
|
||||
Ok(IOResult::Done(res)) => {
|
||||
checkpoint_result = res;
|
||||
break;
|
||||
}
|
||||
Err(err) => panic!("error while clearing cache {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
let checkpoint_result = self.io.block(|| {
|
||||
self.wal
|
||||
.borrow_mut()
|
||||
.checkpoint(self, Rc::new(RefCell::new(0)), CheckpointMode::Passive)
|
||||
.map_err(|err| panic!("error while clearing cache {err}"))
|
||||
})?;
|
||||
|
||||
// TODO: only clear cache of things that are really invalidated
|
||||
self.page_cache
|
||||
.write()
|
||||
|
||||
17
core/util.rs
17
core/util.rs
@@ -1,4 +1,6 @@
|
||||
use crate::translate::expr::WalkControl;
|
||||
use crate::types::IOResult;
|
||||
use crate::IO;
|
||||
use crate::{
|
||||
schema::{self, Column, Schema, Type},
|
||||
translate::{collate::CollationSeq, expr::walk_expr, plan::JoinOrderMember},
|
||||
@@ -11,6 +13,21 @@ use turso_sqlite3_parser::ast::{
|
||||
self, CreateTableBody, Expr, FunctionTail, Literal, UnaryOperator,
|
||||
};
|
||||
|
||||
pub trait IOExt {
|
||||
fn block<T>(&self, f: impl FnMut() -> Result<IOResult<T>>) -> Result<T>;
|
||||
}
|
||||
|
||||
impl<I: ?Sized + IO> IOExt for I {
|
||||
fn block<T>(&self, mut f: impl FnMut() -> Result<IOResult<T>>) -> Result<T> {
|
||||
Ok(loop {
|
||||
match f()? {
|
||||
IOResult::Done(v) => break v,
|
||||
IOResult::IO => self.run_once()?,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub trait RoundToPrecision {
|
||||
fn round_to_precision(self, precision: i32) -> f64;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user