Merge 'refactor: Changes CursorResult to IOResult' from Diego Reis

This PR unify the concept of a result that either have something done or
yields to IO, into a single type.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #2103
This commit is contained in:
Jussi Saurio
2025-07-16 06:50:13 +03:00
10 changed files with 319 additions and 342 deletions

View File

@@ -77,18 +77,18 @@ use std::{
#[cfg(feature = "fs")]
use storage::database::DatabaseFile;
use storage::page_cache::DumbLruPageCache;
pub use storage::pager::PagerCacheflushStatus;
use storage::pager::{DB_STATE_INITIALIZED, DB_STATE_UNINITIALIZED};
use storage::pager::{PagerCacheflushResult, DB_STATE_INITIALIZED, DB_STATE_UNINITIALIZED};
pub use storage::{
buffer_pool::BufferPool,
database::DatabaseStorage,
pager::PageRef,
pager::{Page, Pager},
wal::{CheckpointMode, CheckpointResult, CheckpointStatus, Wal, WalFile, WalFileShared},
wal::{CheckpointMode, CheckpointResult, Wal, WalFile, WalFileShared},
};
use tracing::{instrument, Level};
use translate::select::prepare_select_plan;
use turso_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser};
use types::IOResult;
pub use types::RefValue;
pub use types::Value;
use util::parse_schema_rows;
@@ -755,7 +755,7 @@ impl Connection {
/// This will write the dirty pages to the WAL and then fsync the WAL.
/// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
/// the database file and then fsync the database file.
pub fn cacheflush(&self) -> Result<PagerCacheflushStatus> {
pub fn cacheflush(&self) -> Result<IOResult<PagerCacheflushResult>> {
if self.closed.get() {
return Err(LimboError::InternalError("Connection closed".to_string()));
}

View File

@@ -2,7 +2,7 @@ use crate::result::LimboResult;
use crate::storage::btree::BTreeCursor;
use crate::translate::collate::CollationSeq;
use crate::translate::plan::SelectPlan;
use crate::types::CursorResult;
use crate::types::IOResult;
use crate::util::{module_args_from_sql, module_name_from_sql, UnparsedFromSqlIndex};
use crate::{util::normalize_ident, Result};
use crate::{LimboError, MvCursor, Pager, RefValue, SymbolTable, VirtualTable};
@@ -158,24 +158,24 @@ impl Schema {
HashMap::with_capacity(10);
match pager.begin_read_tx()? {
CursorResult::Ok(v) => {
IOResult::Done(v) => {
if matches!(v, LimboResult::Busy) {
return Err(LimboError::Busy);
}
}
CursorResult::IO => pager.io.run_once()?,
IOResult::IO => pager.io.run_once()?,
}
match cursor.rewind()? {
CursorResult::Ok(v) => v,
CursorResult::IO => pager.io.run_once()?,
IOResult::Done(v) => v,
IOResult::IO => pager.io.run_once()?,
};
loop {
let Some(row) = (loop {
match cursor.record()? {
CursorResult::Ok(v) => break v,
CursorResult::IO => pager.io.run_once()?,
IOResult::Done(v) => break v,
IOResult::IO => pager.io.run_once()?,
}
}) else {
break;
@@ -285,7 +285,7 @@ impl Schema {
drop(record_cursor);
drop(row);
if matches!(cursor.next()?, CursorResult::IO) {
if matches!(cursor.next()?, IOResult::IO) {
pager.io.run_once()?;
};
}

File diff suppressed because it is too large Load Diff

View File

@@ -5,7 +5,7 @@ use crate::{
pager::{PageRef, Pager},
sqlite3_ondisk::DATABASE_HEADER_PAGE_ID,
},
types::CursorResult,
types::IOResult,
LimboError, Result,
};
use std::sync::atomic::Ordering;
@@ -35,7 +35,7 @@ const HEADER_OFFSET_VERSION_VALID_FOR: usize = 92;
const HEADER_OFFSET_VERSION_NUMBER: usize = 96;
// Helper to get a read-only reference to the header page.
fn get_header_page(pager: &Pager) -> Result<CursorResult<PageRef>> {
fn get_header_page(pager: &Pager) -> Result<IOResult<PageRef>> {
if pager.db_state.load(Ordering::SeqCst) < 2 {
return Err(LimboError::InternalError(
"Database is empty, header does not exist - page 1 should've been allocated before this".to_string(),
@@ -43,13 +43,13 @@ fn get_header_page(pager: &Pager) -> Result<CursorResult<PageRef>> {
}
let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?;
if page.is_locked() {
return Ok(CursorResult::IO);
return Ok(IOResult::IO);
}
Ok(CursorResult::Ok(page))
Ok(IOResult::Done(page))
}
// Helper to get a writable reference to the header page and mark it dirty.
fn get_header_page_for_write(pager: &Pager) -> Result<CursorResult<PageRef>> {
fn get_header_page_for_write(pager: &Pager) -> Result<IOResult<PageRef>> {
if pager.db_state.load(Ordering::SeqCst) < 2 {
// This should not be called on an empty DB for writing, as page 1 is allocated on first transaction.
return Err(LimboError::InternalError(
@@ -58,22 +58,22 @@ fn get_header_page_for_write(pager: &Pager) -> Result<CursorResult<PageRef>> {
}
let page = pager.read_page(DATABASE_HEADER_PAGE_ID)?;
if page.is_locked() {
return Ok(CursorResult::IO);
return Ok(IOResult::IO);
}
page.set_dirty();
pager.add_dirty(DATABASE_HEADER_PAGE_ID);
Ok(CursorResult::Ok(page))
Ok(IOResult::Done(page))
}
/// Helper function to run async header accessors until completion
fn run_header_accessor_until_done<T, F>(pager: &Pager, mut accessor: F) -> Result<T>
where
F: FnMut() -> Result<CursorResult<T>>,
F: FnMut() -> Result<IOResult<T>>,
{
loop {
match accessor()? {
CursorResult::Ok(value) => return Ok(value),
CursorResult::IO => {
IOResult::Done(value) => return Ok(value),
IOResult::IO => {
pager.io.run_once()?;
}
}
@@ -103,13 +103,13 @@ macro_rules! impl_header_field_accessor {
paste::paste! {
// Async version
#[allow(dead_code)]
pub fn [<get_ $field_name _async>](pager: &Pager) -> Result<CursorResult<$type>> {
pub fn [<get_ $field_name _async>](pager: &Pager) -> Result<IOResult<$type>> {
if pager.db_state.load(Ordering::SeqCst) < 2 {
return Err(LimboError::InternalError(format!("Database is empty, header does not exist - page 1 should've been allocated before this")));
}
let page = match get_header_page(pager)? {
CursorResult::Ok(page) => page,
CursorResult::IO => return Ok(CursorResult::IO),
IOResult::Done(page) => page,
IOResult::IO => return Ok(IOResult::IO),
};
let page_inner = page.get();
let page_content = page_inner.contents.as_ref().unwrap();
@@ -120,10 +120,10 @@ macro_rules! impl_header_field_accessor {
let value = <$type>::from_be_bytes(bytes);
$(
if value == 0 {
return Ok(CursorResult::Ok($ifzero));
return Ok(IOResult::Done($ifzero));
}
)?
Ok(CursorResult::Ok(value))
Ok(IOResult::Done(value))
}
// Sync version
@@ -134,10 +134,10 @@ macro_rules! impl_header_field_accessor {
// Async setter
#[allow(dead_code)]
pub fn [<set_ $field_name _async>](pager: &Pager, value: $type) -> Result<CursorResult<()>> {
pub fn [<set_ $field_name _async>](pager: &Pager, value: $type) -> Result<IOResult<()>> {
let page = match get_header_page_for_write(pager)? {
CursorResult::Ok(page) => page,
CursorResult::IO => return Ok(CursorResult::IO),
IOResult::Done(page) => page,
IOResult::IO => return Ok(IOResult::IO),
};
let page_inner = page.get();
let page_content = page_inner.contents.as_ref().unwrap();
@@ -146,7 +146,7 @@ macro_rules! impl_header_field_accessor {
buf_slice[$offset..$offset + std::mem::size_of::<$type>()].copy_from_slice(&value.to_be_bytes());
page.set_dirty();
pager.add_dirty(1);
Ok(CursorResult::Ok(()))
Ok(IOResult::Done(()))
}
// Sync setter
@@ -214,14 +214,14 @@ pub fn set_page_size(pager: &Pager, value: u32) -> Result<()> {
}
#[allow(dead_code)]
pub fn get_page_size_async(pager: &Pager) -> Result<CursorResult<u32>> {
pub fn get_page_size_async(pager: &Pager) -> Result<IOResult<u32>> {
match get_page_size_u16_async(pager)? {
CursorResult::Ok(size) => {
IOResult::Done(size) => {
if size == 1 {
return Ok(CursorResult::Ok(MAX_PAGE_SIZE));
return Ok(IOResult::Done(MAX_PAGE_SIZE));
}
Ok(CursorResult::Ok(size as u32))
Ok(IOResult::Done(size as u32))
}
CursorResult::IO => Ok(CursorResult::IO),
IOResult::IO => Ok(IOResult::IO),
}
}

View File

@@ -4,9 +4,9 @@ use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::DatabaseStorage;
use crate::storage::header_accessor;
use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType};
use crate::storage::wal::{CheckpointResult, Wal, WalFsyncStatus};
use crate::types::CursorResult;
use crate::Completion;
use crate::storage::wal::{CheckpointResult, Wal};
use crate::types::IOResult;
use crate::{return_if_io, Completion};
use crate::{Buffer, Connection, LimboError, Result};
use parking_lot::RwLock;
use std::cell::{Cell, OnceCell, RefCell, UnsafeCell};
@@ -19,7 +19,7 @@ use tracing::{instrument, trace, Level};
use super::btree::{btree_init_page, BTreePage};
use super::page_cache::{CacheError, CacheResizeResult, DumbLruPageCache, PageCacheKey};
use super::sqlite3_ondisk::{begin_write_btree_page, DATABASE_HEADER_SIZE};
use super::wal::{CheckpointMode, CheckpointStatus};
use super::wal::CheckpointMode;
#[cfg(not(feature = "omit_autovacuum"))]
use {crate::io::Buffer as IoBuffer, ptrmap::*};
@@ -231,14 +231,6 @@ pub struct Pager {
#[derive(Debug, Copy, Clone)]
/// The status of the current cache flush.
/// A Done state means that the WAL was committed to disk and fsynced,
/// plus potentially checkpointed to the DB (and the DB then fsynced).
pub enum PagerCacheflushStatus {
Done(PagerCacheflushResult),
IO,
}
#[derive(Debug, Copy, Clone)]
pub enum PagerCacheflushResult {
/// The WAL was written to disk and fsynced.
WalWritten,
@@ -312,17 +304,17 @@ impl Pager {
/// `target_page_num` (1-indexed) is the page whose entry is sought.
/// Returns `Ok(None)` if the page is not supposed to have a ptrmap entry (e.g. header, or a ptrmap page itself).
#[cfg(not(feature = "omit_autovacuum"))]
pub fn ptrmap_get(&self, target_page_num: u32) -> Result<CursorResult<Option<PtrmapEntry>>> {
pub fn ptrmap_get(&self, target_page_num: u32) -> Result<IOResult<Option<PtrmapEntry>>> {
tracing::trace!("ptrmap_get(page_idx = {})", target_page_num);
let configured_page_size = match header_accessor::get_page_size_async(self)? {
CursorResult::Ok(size) => size as usize,
CursorResult::IO => return Ok(CursorResult::IO),
IOResult::Done(size) => size as usize,
IOResult::IO => return Ok(IOResult::IO),
};
if target_page_num < FIRST_PTRMAP_PAGE_NO
|| is_ptrmap_page(target_page_num, configured_page_size)
{
return Ok(CursorResult::Ok(None));
return Ok(IOResult::Done(None));
}
let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size);
@@ -336,10 +328,10 @@ impl Pager {
let ptrmap_page = self.read_page(ptrmap_pg_no as usize)?;
if ptrmap_page.is_locked() {
return Ok(CursorResult::IO);
return Ok(IOResult::IO);
}
if !ptrmap_page.is_loaded() {
return Ok(CursorResult::IO);
return Ok(IOResult::IO);
}
let ptrmap_page_inner = ptrmap_page.get();
@@ -376,7 +368,7 @@ impl Pager {
let entry_slice = &ptrmap_page_data_slice
[offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE];
match PtrmapEntry::deserialize(entry_slice) {
Some(entry) => Ok(CursorResult::Ok(Some(entry))),
Some(entry) => Ok(IOResult::Done(Some(entry))),
None => Err(LimboError::Corrupt(format!(
"Failed to deserialize ptrmap entry for page {target_page_num} from ptrmap page {ptrmap_pg_no}"
))),
@@ -392,7 +384,7 @@ impl Pager {
db_page_no_to_update: u32,
entry_type: PtrmapType,
parent_page_no: u32,
) -> Result<CursorResult<()>> {
) -> Result<IOResult<()>> {
tracing::trace!(
"ptrmap_put(page_idx = {}, entry_type = {:?}, parent_page_no = {})",
db_page_no_to_update,
@@ -401,8 +393,8 @@ impl Pager {
);
let page_size = match header_accessor::get_page_size_async(self)? {
CursorResult::Ok(size) => size as usize,
CursorResult::IO => return Ok(CursorResult::IO),
IOResult::Done(size) => size as usize,
IOResult::IO => return Ok(IOResult::IO),
};
if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO
@@ -427,10 +419,10 @@ impl Pager {
let ptrmap_page = self.read_page(ptrmap_pg_no as usize)?;
if ptrmap_page.is_locked() {
return Ok(CursorResult::IO);
return Ok(IOResult::IO);
}
if !ptrmap_page.is_loaded() {
return Ok(CursorResult::IO);
return Ok(IOResult::IO);
}
let ptrmap_page_inner = ptrmap_page.get();
@@ -467,13 +459,13 @@ impl Pager {
ptrmap_page.set_dirty();
self.add_dirty(ptrmap_pg_no as usize);
Ok(CursorResult::Ok(()))
Ok(IOResult::Done(()))
}
/// This method is used to allocate a new root page for a btree, both for tables and indexes
/// FIXME: handle no room in page cache
#[instrument(skip_all, level = Level::INFO)]
pub fn btree_create(&self, flags: &CreateBTreeFlags) -> Result<CursorResult<u32>> {
pub fn btree_create(&self, flags: &CreateBTreeFlags) -> Result<IOResult<u32>> {
let page_type = match flags {
_ if flags.is_table() => PageType::TableLeaf,
_ if flags.is_index() => PageType::IndexLeaf,
@@ -483,7 +475,7 @@ impl Pager {
{
let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?;
let page_id = page.get().get().id;
Ok(CursorResult::Ok(page_id as u32))
Ok(IOResult::Done(page_id as u32))
}
// If autovacuum is enabled, we need to allocate a new page number that is greater than the largest root page number
@@ -494,21 +486,21 @@ impl Pager {
AutoVacuumMode::None => {
let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?;
let page_id = page.get().get().id;
Ok(CursorResult::Ok(page_id as u32))
Ok(IOResult::Done(page_id as u32))
}
AutoVacuumMode::Full => {
let mut root_page_num =
match header_accessor::get_vacuum_mode_largest_root_page_async(self)? {
CursorResult::Ok(value) => value,
CursorResult::IO => return Ok(CursorResult::IO),
IOResult::Done(value) => value,
IOResult::IO => return Ok(IOResult::IO),
};
assert!(root_page_num > 0); // Largest root page number cannot be 0 because that is set to 1 when creating the database with autovacuum enabled
root_page_num += 1;
assert!(root_page_num >= FIRST_PTRMAP_PAGE_NO); // can never be less than 2 because we have already incremented
let page_size = match header_accessor::get_page_size_async(self)? {
CursorResult::Ok(size) => size as usize,
CursorResult::IO => return Ok(CursorResult::IO),
IOResult::Done(size) => size as usize,
IOResult::IO => return Ok(IOResult::IO),
};
while is_ptrmap_page(root_page_num, page_size) {
@@ -531,8 +523,8 @@ impl Pager {
// For now map allocated_page_id since we are not swapping it with root_page_num
match self.ptrmap_put(allocated_page_id, PtrmapType::RootPage, 0)? {
CursorResult::Ok(_) => Ok(CursorResult::Ok(allocated_page_id)),
CursorResult::IO => Ok(CursorResult::IO),
IOResult::Done(_) => Ok(IOResult::Done(allocated_page_id)),
IOResult::IO => Ok(IOResult::IO),
}
}
AutoVacuumMode::Incremental => {
@@ -604,17 +596,17 @@ impl Pager {
#[inline(always)]
#[instrument(skip_all, level = Level::INFO)]
pub fn begin_read_tx(&self) -> Result<CursorResult<LimboResult>> {
pub fn begin_read_tx(&self) -> Result<IOResult<LimboResult>> {
// We allocate the first page lazily in the first transaction
match self.maybe_allocate_page1()? {
CursorResult::Ok(_) => {}
CursorResult::IO => return Ok(CursorResult::IO),
IOResult::Done(_) => {}
IOResult::IO => return Ok(IOResult::IO),
}
Ok(CursorResult::Ok(self.wal.borrow_mut().begin_read_tx()?))
Ok(IOResult::Done(self.wal.borrow_mut().begin_read_tx()?))
}
#[instrument(skip_all, level = Level::INFO)]
fn maybe_allocate_page1(&self) -> Result<CursorResult<()>> {
fn maybe_allocate_page1(&self) -> Result<IOResult<()>> {
if self.db_state.load(Ordering::SeqCst) < DB_STATE_INITIALIZED {
if let Ok(_lock) = self.init_lock.try_lock() {
match (
@@ -623,29 +615,29 @@ impl Pager {
) {
// In case of being empty or (allocating and this connection is performing allocation) then allocate the first page
(0, false) | (1, true) => match self.allocate_page1()? {
CursorResult::Ok(_) => Ok(CursorResult::Ok(())),
CursorResult::IO => Ok(CursorResult::IO),
IOResult::Done(_) => Ok(IOResult::Done(())),
IOResult::IO => Ok(IOResult::IO),
},
_ => Ok(CursorResult::IO),
_ => Ok(IOResult::IO),
}
} else {
Ok(CursorResult::IO)
Ok(IOResult::IO)
}
} else {
Ok(CursorResult::Ok(()))
Ok(IOResult::Done(()))
}
}
#[inline(always)]
#[instrument(skip_all, level = Level::INFO)]
pub fn begin_write_tx(&self) -> Result<CursorResult<LimboResult>> {
pub fn begin_write_tx(&self) -> Result<IOResult<LimboResult>> {
// TODO(Diego): The only possibly allocate page1 here is because OpenEphemeral needs a write transaction
// we should have a unique API to begin transactions, something like sqlite3BtreeBeginTrans
match self.maybe_allocate_page1()? {
CursorResult::Ok(_) => {}
CursorResult::IO => return Ok(CursorResult::IO),
IOResult::Done(_) => {}
IOResult::IO => return Ok(IOResult::IO),
}
Ok(CursorResult::Ok(self.wal.borrow_mut().begin_write_tx()?))
Ok(IOResult::Done(self.wal.borrow_mut().begin_write_tx()?))
}
#[instrument(skip_all, level = Level::INFO)]
@@ -655,17 +647,17 @@ impl Pager {
schema_did_change: bool,
connection: &Connection,
wal_checkpoint_disabled: bool,
) -> Result<PagerCacheflushStatus> {
) -> Result<IOResult<PagerCacheflushResult>> {
tracing::trace!("end_tx(rollback={})", rollback);
if rollback {
self.wal.borrow().end_write_tx()?;
self.wal.borrow().end_read_tx()?;
return Ok(PagerCacheflushStatus::Done(PagerCacheflushResult::Rollback));
return Ok(IOResult::Done(PagerCacheflushResult::Rollback));
}
let cacheflush_status = self.cacheflush(wal_checkpoint_disabled)?;
match cacheflush_status {
PagerCacheflushStatus::IO => Ok(PagerCacheflushStatus::IO),
PagerCacheflushStatus::Done(_) => {
IOResult::IO => Ok(IOResult::IO),
IOResult::Done(_) => {
let maybe_schema_pair = if schema_did_change {
let schema = connection.schema.borrow().clone();
// Lock first before writing to the database schema in case someone tries to read the schema before it's updated
@@ -777,7 +769,10 @@ impl Pager {
/// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
/// the database file and then fsync the database file.
#[instrument(skip_all, level = Level::INFO)]
pub fn cacheflush(&self, wal_checkpoint_disabled: bool) -> Result<PagerCacheflushStatus> {
pub fn cacheflush(
&self,
wal_checkpoint_disabled: bool,
) -> Result<IOResult<PagerCacheflushResult>> {
let mut checkpoint_result = CheckpointResult::default();
let res = loop {
let state = self.flush_info.borrow().state;
@@ -807,20 +802,18 @@ impl Pager {
}
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames;
return Ok(PagerCacheflushStatus::IO);
return Ok(IOResult::IO);
}
FlushState::WaitAppendFrames => {
let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
if in_flight == 0 {
self.flush_info.borrow_mut().state = FlushState::SyncWal;
} else {
return Ok(PagerCacheflushStatus::IO);
return Ok(IOResult::IO);
}
}
FlushState::SyncWal => {
if WalFsyncStatus::IO == self.wal.borrow_mut().sync()? {
return Ok(PagerCacheflushStatus::IO);
}
return_if_io!(self.wal.borrow_mut().sync());
if wal_checkpoint_disabled || !self.wal.borrow().should_checkpoint() {
self.flush_info.borrow_mut().state = FlushState::Start;
@@ -830,11 +823,11 @@ impl Pager {
}
FlushState::Checkpoint => {
match self.checkpoint()? {
CheckpointStatus::Done(res) => {
IOResult::Done(res) => {
checkpoint_result = res;
self.flush_info.borrow_mut().state = FlushState::SyncDbFile;
}
CheckpointStatus::IO => return Ok(PagerCacheflushStatus::IO),
IOResult::IO => return Ok(IOResult::IO),
};
}
FlushState::SyncDbFile => {
@@ -843,7 +836,7 @@ impl Pager {
}
FlushState::WaitSyncDbFile => {
if *self.syncing.borrow() {
return Ok(PagerCacheflushStatus::IO);
return Ok(IOResult::IO);
} else {
self.flush_info.borrow_mut().state = FlushState::Start;
break PagerCacheflushResult::Checkpointed(checkpoint_result);
@@ -853,7 +846,7 @@ impl Pager {
};
// We should only signal that we finished appenind frames after wal sync to avoid inconsistencies when sync fails
self.wal.borrow_mut().finish_append_frames_commit()?;
Ok(PagerCacheflushStatus::Done(res))
Ok(IOResult::Done(res))
}
#[instrument(skip_all, level = Level::INFO)]
@@ -873,7 +866,7 @@ impl Pager {
}
#[instrument(skip_all, level = Level::INFO, target = "pager_checkpoint",)]
pub fn checkpoint(&self) -> Result<CheckpointStatus> {
pub fn checkpoint(&self) -> Result<IOResult<CheckpointResult>> {
let mut checkpoint_result = CheckpointResult::default();
loop {
let state = *self.checkpoint_state.borrow();
@@ -886,8 +879,8 @@ impl Pager {
in_flight,
CheckpointMode::Passive,
)? {
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
CheckpointStatus::Done(res) => {
IOResult::IO => return Ok(IOResult::IO),
IOResult::Done(res) => {
checkpoint_result = res;
self.checkpoint_state.replace(CheckpointState::SyncDbFile);
}
@@ -900,7 +893,7 @@ impl Pager {
}
CheckpointState::WaitSyncDbFile => {
if *self.syncing.borrow() {
return Ok(CheckpointStatus::IO);
return Ok(IOResult::IO);
} else {
self.checkpoint_state
.replace(CheckpointState::CheckpointDone);
@@ -908,10 +901,10 @@ impl Pager {
}
CheckpointState::CheckpointDone => {
return if *self.checkpoint_inflight.borrow() > 0 {
Ok(CheckpointStatus::IO)
Ok(IOResult::IO)
} else {
self.checkpoint_state.replace(CheckpointState::Checkpoint);
Ok(CheckpointStatus::Done(checkpoint_result))
Ok(IOResult::Done(checkpoint_result))
};
}
}
@@ -935,7 +928,7 @@ impl Pager {
{
let mut wal = self.wal.borrow_mut();
// fsync the wal syncronously before beginning checkpoint
while let Ok(WalFsyncStatus::IO) = wal.sync() {
while let Ok(IOResult::IO) = wal.sync() {
if attempts >= 10 {
return Err(LimboError::InternalError(
"Failed to fsync WAL before final checkpoint, fd likely closed".into(),
@@ -964,10 +957,10 @@ impl Pager {
Rc::new(RefCell::new(0)),
CheckpointMode::Passive,
) {
Ok(CheckpointStatus::IO) => {
Ok(IOResult::IO) => {
self.io.run_once()?;
}
Ok(CheckpointStatus::Done(res)) => {
Ok(IOResult::Done(res)) => {
checkpoint_result = res;
break;
}
@@ -1056,7 +1049,7 @@ impl Pager {
}
#[instrument(skip_all, level = Level::INFO)]
pub fn allocate_page1(&self) -> Result<CursorResult<PageRef>> {
pub fn allocate_page1(&self) -> Result<IOResult<PageRef>> {
let state = self.allocate_page1_state.borrow().clone();
match state {
AllocatePage1State::Start => {
@@ -1093,7 +1086,7 @@ impl Pager {
write_counter,
page: page1,
});
Ok(CursorResult::IO)
Ok(IOResult::IO)
}
AllocatePage1State::Writing {
write_counter,
@@ -1101,7 +1094,7 @@ impl Pager {
} => {
tracing::trace!("allocate_page1(Writing)");
if *write_counter.borrow() > 0 {
return Ok(CursorResult::IO);
return Ok(IOResult::IO);
}
tracing::trace!("allocate_page1(Writing done)");
let page1_ref = page.get();
@@ -1112,7 +1105,7 @@ impl Pager {
})?;
self.db_state.store(DB_STATE_INITIALIZED, Ordering::SeqCst);
self.allocate_page1_state.replace(AllocatePage1State::Done);
Ok(CursorResult::Ok(page1_ref.clone()))
Ok(IOResult::Done(page1_ref.clone()))
}
AllocatePage1State::Done => unreachable!("cannot try to allocate page 1 again"),
}
@@ -1500,15 +1493,15 @@ mod ptrmap_tests {
use crate::storage::wal::{WalFile, WalFileShared};
pub fn run_until_done<T>(
mut action: impl FnMut() -> Result<CursorResult<T>>,
mut action: impl FnMut() -> Result<IOResult<T>>,
pager: &Pager,
) -> Result<T> {
loop {
match action()? {
CursorResult::Ok(res) => {
IOResult::Done(res) => {
return Ok(res);
}
CursorResult::IO => pager.io.run_once().unwrap(),
IOResult::IO => pager.io.run_once().unwrap(),
}
}
}
@@ -1554,9 +1547,9 @@ mod ptrmap_tests {
// Allocate all the pages as btree root pages
for _ in 0..initial_db_pages {
match pager.btree_create(&CreateBTreeFlags::new_table()) {
Ok(CursorResult::Ok(_root_page_id)) => (),
Ok(CursorResult::IO) => {
panic!("test_pager_setup: btree_create returned CursorResult::IO unexpectedly");
Ok(IOResult::Done(_root_page_id)) => (),
Ok(IOResult::IO) => {
panic!("test_pager_setup: btree_create returned IOResult::IO unexpectedly");
}
Err(e) => {
panic!("test_pager_setup: btree_create failed: {e:?}");
@@ -1591,8 +1584,8 @@ mod ptrmap_tests {
// Read the entry from the ptrmap page and verify it
let entry = pager.ptrmap_get(db_page_to_update).unwrap();
assert!(matches!(entry, CursorResult::Ok(Some(_))));
let CursorResult::Ok(Some(entry)) = entry else {
assert!(matches!(entry, IOResult::Done(Some(_))));
let IOResult::Done(Some(entry)) = entry else {
panic!("entry is not Some");
};
assert_eq!(entry.entry_type, PtrmapType::RootPage);

View File

@@ -23,6 +23,7 @@ use crate::storage::sqlite3_ondisk::{
begin_read_wal_frame, begin_write_wal_frame, finish_read_page, WAL_FRAME_HEADER_SIZE,
WAL_HEADER_SIZE,
};
use crate::types::IOResult;
use crate::{turso_assert, Buffer, LimboError, Result};
use crate::{Completion, Page};
@@ -244,8 +245,8 @@ pub trait Wal {
pager: &Pager,
write_counter: Rc<RefCell<usize>>,
mode: CheckpointMode,
) -> Result<CheckpointStatus>;
fn sync(&mut self) -> Result<WalFsyncStatus>;
) -> Result<IOResult<CheckpointResult>>;
fn sync(&mut self) -> Result<IOResult<()>>;
fn get_max_frame_in_wal(&self) -> u64;
fn get_max_frame(&self) -> u64;
fn get_min_frame(&self) -> u64;
@@ -316,14 +317,12 @@ impl Wal for DummyWAL {
_pager: &Pager,
_write_counter: Rc<RefCell<usize>>,
_mode: crate::CheckpointMode,
) -> Result<crate::CheckpointStatus> {
Ok(crate::CheckpointStatus::Done(
crate::CheckpointResult::default(),
))
) -> Result<IOResult<CheckpointResult>> {
Ok(IOResult::Done(CheckpointResult::default()))
}
fn sync(&mut self) -> Result<crate::storage::wal::WalFsyncStatus> {
Ok(crate::storage::wal::WalFsyncStatus::Done)
fn sync(&mut self) -> Result<IOResult<()>> {
Ok(IOResult::Done(()))
}
fn get_max_frame_in_wal(&self) -> u64 {
@@ -366,18 +365,6 @@ pub enum CheckpointState {
Done,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum WalFsyncStatus {
Done,
IO,
}
#[derive(Debug, Copy, Clone)]
pub enum CheckpointStatus {
Done(CheckpointResult),
IO,
}
// Checkpointing is a state machine that has multiple steps. Since there are multiple steps we save
// in flight information of the checkpoint in OngoingCheckpoint. page is just a helper Page to do
// page operations like reading a frame to a page, and writing a page to disk. This page should not
@@ -726,7 +713,7 @@ impl Wal for WalFile {
pager: &Pager,
write_counter: Rc<RefCell<usize>>,
mode: CheckpointMode,
) -> Result<CheckpointStatus> {
) -> Result<IOResult<CheckpointResult>> {
assert!(
matches!(mode, CheckpointMode::Passive),
"only passive mode supported for now"
@@ -812,7 +799,7 @@ impl Wal for WalFile {
}
CheckpointState::WaitReadFrame => {
if self.ongoing_checkpoint.page.is_locked() {
return Ok(CheckpointStatus::IO);
return Ok(IOResult::IO);
} else {
self.ongoing_checkpoint.state = CheckpointState::WritePage;
}
@@ -828,7 +815,7 @@ impl Wal for WalFile {
}
CheckpointState::WaitWritePage => {
if *write_counter.borrow() > 0 {
return Ok(CheckpointStatus::IO);
return Ok(IOResult::IO);
}
// If page was in cache clear it.
if let Some(page) = pager.cache_get(self.ongoing_checkpoint.page.get().id) {
@@ -847,7 +834,7 @@ impl Wal for WalFile {
}
CheckpointState::Done => {
if *write_counter.borrow() > 0 {
return Ok(CheckpointStatus::IO);
return Ok(IOResult::IO);
}
let shared = self.get_shared();
shared.checkpoint_lock.unlock();
@@ -883,14 +870,14 @@ impl Wal for WalFile {
.store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst);
}
self.ongoing_checkpoint.state = CheckpointState::Start;
return Ok(CheckpointStatus::Done(checkpoint_result));
return Ok(IOResult::Done(checkpoint_result));
}
}
}
}
#[instrument(err, skip_all, level = Level::INFO)]
fn sync(&mut self) -> Result<WalFsyncStatus> {
fn sync(&mut self) -> Result<IOResult<()>> {
match self.sync_state.get() {
SyncState::NotSyncing => {
tracing::debug!("wal_sync");
@@ -905,15 +892,15 @@ impl Wal for WalFile {
let shared = self.get_shared();
shared.file.sync(completion)?;
self.sync_state.set(SyncState::Syncing);
Ok(WalFsyncStatus::IO)
Ok(IOResult::IO)
}
SyncState::Syncing => {
if self.syncing.get() {
tracing::debug!("wal_sync is already syncing");
Ok(WalFsyncStatus::IO)
Ok(IOResult::IO)
} else {
self.sync_state.set(SyncState::NotSyncing);
Ok(WalFsyncStatus::Done)
Ok(IOResult::Done(()))
}
}
}

View File

@@ -1724,7 +1724,7 @@ fn compare_records_int(
/// This function is an optimized version of `compare_records_generic()` for the
/// common case where:
/// - (a) The first field of the unpacked record is a string
/// - (b) The serialized record's first field is also a string
/// - (b) The serialized record's first field is also a string
/// - (c) The header size varint fits in a single byte (most records)
///
/// This optimization avoids the overhead of generic field parsing by directly
@@ -1754,7 +1754,7 @@ fn compare_records_int(
/// The function follows SQLite's string comparison semantics:
///
/// 1. **Type checking**: Ensures both sides are strings, otherwise falls back
/// 2. **String comparison**: Uses collation if provided, binary otherwise
/// 2. **String comparison**: Uses collation if provided, binary otherwise
/// 3. **Sort order**: Applies ascending/descending order to comparison result
/// 4. **Length comparison**: If strings are equal, compares lengths
/// 5. **Remaining fields**: If first field is equal and more fields exist,
@@ -1886,7 +1886,7 @@ fn compare_records_string(
/// # Arguments
///
/// * `serialized` - The left-hand side record in serialized format
/// * `unpacked` - The right-hand side record as an array of parsed values
/// * `unpacked` - The right-hand side record as an array of parsed values
/// * `index_info` - Contains sort order information for each field
/// * `collations` - Array of collation sequences for string comparisons
/// * `skip` - Number of initial fields to skip (assumes caller verified equality)
@@ -2308,11 +2308,22 @@ impl Cursor {
}
#[derive(Debug)]
pub enum CursorResult<T> {
Ok(T),
pub enum IOResult<T> {
Done(T),
IO,
}
/// Evaluate a Result<IOResult<T>>, if IO return IO.
#[macro_export]
macro_rules! return_if_io {
($expr:expr) => {
match $expr? {
IOResult::Done(v) => v,
IOResult::IO => return Ok(IOResult::IO),
}
};
}
#[derive(Debug)]
pub enum SeekResult {
/// Record matching the [SeekOp] found in the B-tree and cursor was positioned to point onto that record

View File

@@ -42,9 +42,7 @@ use crate::{
use crate::{
storage::wal::CheckpointResult,
types::{
AggContext, Cursor, CursorResult, ExternalAggState, SeekKey, SeekOp, Value, ValueType,
},
types::{AggContext, Cursor, ExternalAggState, IOResult, SeekKey, SeekOp, Value, ValueType},
util::{
cast_real_to_integer, cast_text_to_integer, cast_text_to_numeric, cast_text_to_real,
checked_cast_text_to_numeric, parse_schema_rows, RoundToPrecision,
@@ -95,8 +93,8 @@ use crate::{
macro_rules! return_if_io {
($expr:expr) => {
match $expr? {
CursorResult::Ok(v) => v,
CursorResult::IO => return Ok(InsnFunctionStepResult::IO),
IOResult::Done(v) => v,
IOResult::IO => return Ok(InsnFunctionStepResult::IO),
}
};
}
@@ -1293,7 +1291,7 @@ pub fn op_last(
/// - **Single-byte case**: Values 0-127 (0x00-0x7F) are returned immediately
/// - **Two-byte case**: Values 128-16383 (0x80-0x3FFF) are handled inline
/// - **Multi-byte case**: Larger values fall back to the full `read_varint()` implementation
///
///
/// This function is similar to `sqlite3GetVarint32`
#[inline(always)]
fn read_varint_fast(buf: &[u8]) -> Result<(u64, usize)> {
@@ -1396,10 +1394,10 @@ pub fn op_column(
let mut index_cursor = state.get_cursor(index_cursor_id);
let index_cursor = index_cursor.as_btree_mut();
match index_cursor.rowid()? {
CursorResult::IO => {
IOResult::IO => {
break 'd Some((index_cursor_id, table_cursor_id));
}
CursorResult::Ok(rowid) => rowid,
IOResult::Done(rowid) => rowid,
}
};
let mut table_cursor = state.get_cursor(table_cursor_id);
@@ -1408,8 +1406,8 @@ pub fn op_column(
SeekKey::TableRowId(rowid.unwrap()),
SeekOp::GE { eq_only: true },
)? {
CursorResult::Ok(_) => None,
CursorResult::IO => Some((index_cursor_id, table_cursor_id)),
IOResult::Done(_) => None,
IOResult::IO => Some((index_cursor_id, table_cursor_id)),
}
};
if let Some(deferred_seek) = deferred_seek {
@@ -2245,10 +2243,10 @@ pub fn op_row_id(
let mut index_cursor = state.get_cursor(index_cursor_id);
let index_cursor = index_cursor.as_btree_mut();
let record = match index_cursor.record()? {
CursorResult::IO => {
IOResult::IO => {
break 'd Some((index_cursor_id, table_cursor_id));
}
CursorResult::Ok(record) => record,
IOResult::Done(record) => record,
};
let record = record.as_ref().unwrap();
let mut record_cursor_ref = index_cursor.record_cursor.borrow_mut();
@@ -2262,8 +2260,8 @@ pub fn op_row_id(
let mut table_cursor = state.get_cursor(table_cursor_id);
let table_cursor = table_cursor.as_btree_mut();
match table_cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true })? {
CursorResult::Ok(_) => None,
CursorResult::IO => Some((index_cursor_id, table_cursor_id)),
IOResult::Done(_) => None,
IOResult::IO => Some((index_cursor_id, table_cursor_id)),
}
};
if let Some(deferred_seek) = deferred_seek {
@@ -2668,8 +2666,8 @@ pub fn seek_internal(
}
};
match cursor.seek(seek_key, *op)? {
CursorResult::Ok(seek_result) => seek_result,
CursorResult::IO => return Ok(SeekInternalResult::IO),
IOResult::Done(seek_result) => seek_result,
IOResult::IO => return Ok(SeekInternalResult::IO),
}
};
let found = match seek_result {
@@ -2714,8 +2712,8 @@ pub fn seek_internal(
SeekOp::LT { .. } | SeekOp::LE { .. } => cursor.prev()?,
};
match result {
CursorResult::Ok(found) => found,
CursorResult::IO => return Ok(SeekInternalResult::IO),
IOResult::Done(found) => found,
IOResult::IO => return Ok(SeekInternalResult::IO),
}
};
return Ok(if found {
@@ -2728,8 +2726,8 @@ pub fn seek_internal(
let mut cursor = state.get_cursor(cursor_id);
let cursor = cursor.as_btree_mut();
match cursor.last()? {
CursorResult::Ok(()) => {}
CursorResult::IO => return Ok(SeekInternalResult::IO),
IOResult::Done(()) => {}
IOResult::IO => return Ok(SeekInternalResult::IO),
}
// the MoveLast variant is only used for SeekOp::LT and SeekOp::LE when the seek condition is always true,
// so we have always found what we were looking for.
@@ -2770,7 +2768,7 @@ pub fn seek_internal(
/// - `IdxLE`: "less than or equal" - equality should be treated as "less"
/// - `IdxGT`: "greater than" - equality should be treated as "less" (so condition fails)
///
/// - **`IdxGE` and `IdxLT`**: Return `Ordering::Equal` (equivalent to `default_rc = 0`)
/// - **`IdxGE` and `IdxLT`**: Return `Ordering::Equal` (equivalent to `default_rc = 0`)
/// - When keys are equal, these operations should treat it as true equality
/// - `IdxGE`: "greater than or equal" - equality should be treated as "equal"
/// - `IdxLT`: "less than" - equality should be treated as "equal" (so condition fails)
@@ -5700,7 +5698,7 @@ pub fn op_destroy(
// TODO not sure if should be BTreeCursor::new_table or BTreeCursor::new_index here or neither and just pass an emtpy vec
let mut cursor = BTreeCursor::new(None, pager.clone(), *root, Vec::new(), 0);
let former_root_page_result = cursor.btree_destroy()?;
if let CursorResult::Ok(former_root_page) = former_root_page_result {
if let IOResult::Done(former_root_page) = former_root_page_result {
state.registers[*former_root_reg] =
Register::Value(Value::Integer(former_root_page.unwrap_or(0) as i64));
}

View File

@@ -27,13 +27,10 @@ pub mod sorter;
use crate::{
error::LimboError,
function::{AggFunc, FuncCtx},
storage::{pager::PagerCacheflushStatus, sqlite3_ondisk::SmallVec},
storage::sqlite3_ondisk::SmallVec,
translate::plan::TableReferences,
types::{RawSlice, TextRef},
vdbe::execute::OpIdxInsertState,
vdbe::execute::OpInsertState,
vdbe::execute::OpNewRowidState,
vdbe::execute::OpSeekState,
types::{IOResult, RawSlice, TextRef},
vdbe::execute::{OpIdxInsertState, OpInsertState, OpNewRowidState, OpSeekState},
RefValue,
};
@@ -159,13 +156,13 @@ pub enum StepResult {
}
/// If there is I/O, the instruction is restarted.
/// Evaluate a Result<CursorResult<T>>, if IO return Ok(StepResult::IO).
/// Evaluate a Result<IOResult<T>>, if IO return Ok(StepResult::IO).
#[macro_export]
macro_rules! return_if_io {
macro_rules! return_step_if_io {
($expr:expr) => {
match $expr? {
CursorResult::Ok(v) => v,
CursorResult::IO => return Ok(StepResult::IO),
IOResult::Ok(v) => v,
IOResult::IO => return Ok(StepResult::IO),
}
};
}
@@ -509,7 +506,7 @@ impl Program {
connection.wal_checkpoint_disabled.get(),
)?;
match cacheflush_status {
PagerCacheflushStatus::Done(status) => {
IOResult::Done(status) => {
if self.change_cnt_on {
self.connection.set_changes(self.n_change.get());
}
@@ -522,7 +519,7 @@ impl Program {
connection.transaction_state.replace(TransactionState::None);
*commit_state = CommitState::Ready;
}
PagerCacheflushStatus::IO => {
IOResult::IO => {
tracing::trace!("Cacheflush IO");
*commit_state = CommitState::Committing;
return Ok(StepResult::IO);

View File

@@ -6,7 +6,8 @@ use tempfile::TempDir;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
use turso_core::{Connection, Database, PagerCacheflushStatus, IO};
use turso_core::types::IOResult;
use turso_core::{Connection, Database, IO};
#[allow(dead_code)]
pub struct TempDatabase {
@@ -115,10 +116,10 @@ impl TempDatabase {
pub(crate) fn do_flush(conn: &Arc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<()> {
loop {
match conn.cacheflush()? {
PagerCacheflushStatus::Done(_) => {
IOResult::Done(_) => {
break;
}
PagerCacheflushStatus::IO => {
IOResult::IO => {
tmp_db.io.run_once()?;
}
}