refactor: Change redundant "Status" enums to IOResult

Let's unify the semantics of "something done" or yields I/O into a
single type
This commit is contained in:
Diego Reis
2025-07-15 20:42:48 -03:00
parent d0af54ae77
commit 0e9771ac07
6 changed files with 58 additions and 78 deletions

View File

@@ -77,18 +77,18 @@ use std::{
#[cfg(feature = "fs")]
use storage::database::DatabaseFile;
use storage::page_cache::DumbLruPageCache;
pub use storage::pager::PagerCacheflushStatus;
use storage::pager::{DB_STATE_INITIALIZED, DB_STATE_UNINITIALIZED};
use storage::pager::{PagerCacheflushResult, DB_STATE_INITIALIZED, DB_STATE_UNINITIALIZED};
pub use storage::{
buffer_pool::BufferPool,
database::DatabaseStorage,
pager::PageRef,
pager::{Page, Pager},
wal::{CheckpointMode, CheckpointResult, CheckpointStatus, Wal, WalFile, WalFileShared},
wal::{CheckpointMode, CheckpointResult, Wal, WalFile, WalFileShared},
};
use tracing::{instrument, Level};
use translate::select::prepare_select_plan;
use turso_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser};
use types::IOResult;
pub use types::RefValue;
pub use types::Value;
use util::parse_schema_rows;
@@ -755,7 +755,7 @@ impl Connection {
/// This will write the dirty pages to the WAL and then fsync the WAL.
/// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
/// the database file and then fsync the database file.
pub fn cacheflush(&self) -> Result<PagerCacheflushStatus> {
pub fn cacheflush(&self) -> Result<IOResult<PagerCacheflushResult>> {
if self.closed.get() {
return Err(LimboError::InternalError("Connection closed".to_string()));
}

View File

@@ -4,10 +4,10 @@ use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::DatabaseStorage;
use crate::storage::header_accessor;
use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType};
use crate::storage::wal::{CheckpointResult, Wal, WalFsyncStatus};
use crate::storage::wal::{CheckpointResult, Wal};
use crate::types::IOResult;
use crate::{return_if_io, Completion};
use crate::{Buffer, Connection, LimboError, Result};
use crate::{CheckpointStatus, Completion};
use parking_lot::RwLock;
use std::cell::{Cell, OnceCell, RefCell, UnsafeCell};
use std::collections::HashSet;
@@ -231,14 +231,6 @@ pub struct Pager {
#[derive(Debug, Copy, Clone)]
/// The status of the current cache flush.
/// A Done state means that the WAL was committed to disk and fsynced,
/// plus potentially checkpointed to the DB (and the DB then fsynced).
pub enum PagerCacheflushStatus {
Done(PagerCacheflushResult),
IO,
}
#[derive(Debug, Copy, Clone)]
pub enum PagerCacheflushResult {
/// The WAL was written to disk and fsynced.
WalWritten,
@@ -655,17 +647,17 @@ impl Pager {
schema_did_change: bool,
connection: &Connection,
wal_checkpoint_disabled: bool,
) -> Result<PagerCacheflushStatus> {
) -> Result<IOResult<PagerCacheflushResult>> {
tracing::trace!("end_tx(rollback={})", rollback);
if rollback {
self.wal.borrow().end_write_tx()?;
self.wal.borrow().end_read_tx()?;
return Ok(PagerCacheflushStatus::Done(PagerCacheflushResult::Rollback));
return Ok(IOResult::Done(PagerCacheflushResult::Rollback));
}
let cacheflush_status = self.cacheflush(wal_checkpoint_disabled)?;
match cacheflush_status {
PagerCacheflushStatus::IO => Ok(PagerCacheflushStatus::IO),
PagerCacheflushStatus::Done(_) => {
IOResult::IO => Ok(IOResult::IO),
IOResult::Done(_) => {
let maybe_schema_pair = if schema_did_change {
let schema = connection.schema.borrow().clone();
// Lock first before writing to the database schema in case someone tries to read the schema before it's updated
@@ -777,7 +769,10 @@ impl Pager {
/// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
/// the database file and then fsync the database file.
#[instrument(skip_all, level = Level::INFO)]
pub fn cacheflush(&self, wal_checkpoint_disabled: bool) -> Result<PagerCacheflushStatus> {
pub fn cacheflush(
&self,
wal_checkpoint_disabled: bool,
) -> Result<IOResult<PagerCacheflushResult>> {
let mut checkpoint_result = CheckpointResult::default();
let res = loop {
let state = self.flush_info.borrow().state;
@@ -807,20 +802,18 @@ impl Pager {
}
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames;
return Ok(PagerCacheflushStatus::IO);
return Ok(IOResult::IO);
}
FlushState::WaitAppendFrames => {
let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
if in_flight == 0 {
self.flush_info.borrow_mut().state = FlushState::SyncWal;
} else {
return Ok(PagerCacheflushStatus::IO);
return Ok(IOResult::IO);
}
}
FlushState::SyncWal => {
if WalFsyncStatus::IO == self.wal.borrow_mut().sync()? {
return Ok(PagerCacheflushStatus::IO);
}
return_if_io!(self.wal.borrow_mut().sync());
if wal_checkpoint_disabled || !self.wal.borrow().should_checkpoint() {
self.flush_info.borrow_mut().state = FlushState::Start;
@@ -830,11 +823,11 @@ impl Pager {
}
FlushState::Checkpoint => {
match self.checkpoint()? {
CheckpointStatus::Done(res) => {
IOResult::Done(res) => {
checkpoint_result = res;
self.flush_info.borrow_mut().state = FlushState::SyncDbFile;
}
CheckpointStatus::IO => return Ok(PagerCacheflushStatus::IO),
IOResult::IO => return Ok(IOResult::IO),
};
}
FlushState::SyncDbFile => {
@@ -843,7 +836,7 @@ impl Pager {
}
FlushState::WaitSyncDbFile => {
if *self.syncing.borrow() {
return Ok(PagerCacheflushStatus::IO);
return Ok(IOResult::IO);
} else {
self.flush_info.borrow_mut().state = FlushState::Start;
break PagerCacheflushResult::Checkpointed(checkpoint_result);
@@ -853,7 +846,7 @@ impl Pager {
};
// We should only signal that we finished appenind frames after wal sync to avoid inconsistencies when sync fails
self.wal.borrow_mut().finish_append_frames_commit()?;
Ok(PagerCacheflushStatus::Done(res))
Ok(IOResult::Done(res))
}
#[instrument(skip_all, level = Level::INFO)]
@@ -873,7 +866,7 @@ impl Pager {
}
#[instrument(skip_all, level = Level::INFO, target = "pager_checkpoint",)]
pub fn checkpoint(&self) -> Result<CheckpointStatus> {
pub fn checkpoint(&self) -> Result<IOResult<CheckpointResult>> {
let mut checkpoint_result = CheckpointResult::default();
loop {
let state = *self.checkpoint_state.borrow();
@@ -886,8 +879,8 @@ impl Pager {
in_flight,
CheckpointMode::Passive,
)? {
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
CheckpointStatus::Done(res) => {
IOResult::IO => return Ok(IOResult::IO),
IOResult::Done(res) => {
checkpoint_result = res;
self.checkpoint_state.replace(CheckpointState::SyncDbFile);
}
@@ -900,7 +893,7 @@ impl Pager {
}
CheckpointState::WaitSyncDbFile => {
if *self.syncing.borrow() {
return Ok(CheckpointStatus::IO);
return Ok(IOResult::IO);
} else {
self.checkpoint_state
.replace(CheckpointState::CheckpointDone);
@@ -908,10 +901,10 @@ impl Pager {
}
CheckpointState::CheckpointDone => {
return if *self.checkpoint_inflight.borrow() > 0 {
Ok(CheckpointStatus::IO)
Ok(IOResult::IO)
} else {
self.checkpoint_state.replace(CheckpointState::Checkpoint);
Ok(CheckpointStatus::Done(checkpoint_result))
Ok(IOResult::Done(checkpoint_result))
};
}
}
@@ -935,7 +928,7 @@ impl Pager {
{
let mut wal = self.wal.borrow_mut();
// fsync the wal syncronously before beginning checkpoint
while let Ok(WalFsyncStatus::IO) = wal.sync() {
while let Ok(IOResult::IO) = wal.sync() {
if attempts >= 10 {
return Err(LimboError::InternalError(
"Failed to fsync WAL before final checkpoint, fd likely closed".into(),
@@ -964,10 +957,10 @@ impl Pager {
Rc::new(RefCell::new(0)),
CheckpointMode::Passive,
) {
Ok(CheckpointStatus::IO) => {
Ok(IOResult::IO) => {
self.io.run_once()?;
}
Ok(CheckpointStatus::Done(res)) => {
Ok(IOResult::Done(res)) => {
checkpoint_result = res;
break;
}
@@ -1556,7 +1549,7 @@ mod ptrmap_tests {
match pager.btree_create(&CreateBTreeFlags::new_table()) {
Ok(IOResult::Done(_root_page_id)) => (),
Ok(IOResult::IO) => {
panic!("test_pager_setup: btree_create returned CursorResult::IO unexpectedly");
panic!("test_pager_setup: btree_create returned IOResult::IO unexpectedly");
}
Err(e) => {
panic!("test_pager_setup: btree_create failed: {e:?}");

View File

@@ -23,6 +23,7 @@ use crate::storage::sqlite3_ondisk::{
begin_read_wal_frame, begin_write_wal_frame, finish_read_page, WAL_FRAME_HEADER_SIZE,
WAL_HEADER_SIZE,
};
use crate::types::IOResult;
use crate::{turso_assert, Buffer, LimboError, Result};
use crate::{Completion, Page};
@@ -244,8 +245,8 @@ pub trait Wal {
pager: &Pager,
write_counter: Rc<RefCell<usize>>,
mode: CheckpointMode,
) -> Result<CheckpointStatus>;
fn sync(&mut self) -> Result<WalFsyncStatus>;
) -> Result<IOResult<CheckpointResult>>;
fn sync(&mut self) -> Result<IOResult<()>>;
fn get_max_frame_in_wal(&self) -> u64;
fn get_max_frame(&self) -> u64;
fn get_min_frame(&self) -> u64;
@@ -316,14 +317,12 @@ impl Wal for DummyWAL {
_pager: &Pager,
_write_counter: Rc<RefCell<usize>>,
_mode: crate::CheckpointMode,
) -> Result<crate::CheckpointStatus> {
Ok(crate::CheckpointStatus::Done(
crate::CheckpointResult::default(),
))
) -> Result<IOResult<CheckpointResult>> {
Ok(IOResult::Done(CheckpointResult::default()))
}
fn sync(&mut self) -> Result<crate::storage::wal::WalFsyncStatus> {
Ok(crate::storage::wal::WalFsyncStatus::Done)
fn sync(&mut self) -> Result<IOResult<()>> {
Ok(IOResult::Done(()))
}
fn get_max_frame_in_wal(&self) -> u64 {
@@ -366,18 +365,6 @@ pub enum CheckpointState {
Done,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum WalFsyncStatus {
Done,
IO,
}
#[derive(Debug, Copy, Clone)]
pub enum CheckpointStatus {
Done(CheckpointResult),
IO,
}
// Checkpointing is a state machine that has multiple steps. Since there are multiple steps we save
// in flight information of the checkpoint in OngoingCheckpoint. page is just a helper Page to do
// page operations like reading a frame to a page, and writing a page to disk. This page should not
@@ -726,7 +713,7 @@ impl Wal for WalFile {
pager: &Pager,
write_counter: Rc<RefCell<usize>>,
mode: CheckpointMode,
) -> Result<CheckpointStatus> {
) -> Result<IOResult<CheckpointResult>> {
assert!(
matches!(mode, CheckpointMode::Passive),
"only passive mode supported for now"
@@ -812,7 +799,7 @@ impl Wal for WalFile {
}
CheckpointState::WaitReadFrame => {
if self.ongoing_checkpoint.page.is_locked() {
return Ok(CheckpointStatus::IO);
return Ok(IOResult::IO);
} else {
self.ongoing_checkpoint.state = CheckpointState::WritePage;
}
@@ -828,7 +815,7 @@ impl Wal for WalFile {
}
CheckpointState::WaitWritePage => {
if *write_counter.borrow() > 0 {
return Ok(CheckpointStatus::IO);
return Ok(IOResult::IO);
}
// If page was in cache clear it.
if let Some(page) = pager.cache_get(self.ongoing_checkpoint.page.get().id) {
@@ -847,7 +834,7 @@ impl Wal for WalFile {
}
CheckpointState::Done => {
if *write_counter.borrow() > 0 {
return Ok(CheckpointStatus::IO);
return Ok(IOResult::IO);
}
let shared = self.get_shared();
shared.checkpoint_lock.unlock();
@@ -883,14 +870,14 @@ impl Wal for WalFile {
.store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst);
}
self.ongoing_checkpoint.state = CheckpointState::Start;
return Ok(CheckpointStatus::Done(checkpoint_result));
return Ok(IOResult::Done(checkpoint_result));
}
}
}
}
#[instrument(err, skip_all, level = Level::INFO)]
fn sync(&mut self) -> Result<WalFsyncStatus> {
fn sync(&mut self) -> Result<IOResult<()>> {
match self.sync_state.get() {
SyncState::NotSyncing => {
tracing::debug!("wal_sync");
@@ -905,15 +892,15 @@ impl Wal for WalFile {
let shared = self.get_shared();
shared.file.sync(completion)?;
self.sync_state.set(SyncState::Syncing);
Ok(WalFsyncStatus::IO)
Ok(IOResult::IO)
}
SyncState::Syncing => {
if self.syncing.get() {
tracing::debug!("wal_sync is already syncing");
Ok(WalFsyncStatus::IO)
Ok(IOResult::IO)
} else {
self.sync_state.set(SyncState::NotSyncing);
Ok(WalFsyncStatus::Done)
Ok(IOResult::Done(()))
}
}
}

View File

@@ -2666,8 +2666,8 @@ pub fn seek_internal(
}
};
match cursor.seek(seek_key, *op)? {
CursorResult::Ok(seek_result) => seek_result,
CursorResult::IO => return Ok(SeekInternalResult::IO),
IOResult::Done(seek_result) => seek_result,
IOResult::IO => return Ok(SeekInternalResult::IO),
}
};
let found = match seek_result {
@@ -2712,8 +2712,8 @@ pub fn seek_internal(
SeekOp::LT { .. } | SeekOp::LE { .. } => cursor.prev()?,
};
match result {
CursorResult::Ok(found) => found,
CursorResult::IO => return Ok(SeekInternalResult::IO),
IOResult::Done(found) => found,
IOResult::IO => return Ok(SeekInternalResult::IO),
}
};
return Ok(if found {
@@ -2726,8 +2726,8 @@ pub fn seek_internal(
let mut cursor = state.get_cursor(cursor_id);
let cursor = cursor.as_btree_mut();
match cursor.last()? {
CursorResult::Ok(()) => {}
CursorResult::IO => return Ok(SeekInternalResult::IO),
IOResult::Done(()) => {}
IOResult::IO => return Ok(SeekInternalResult::IO),
}
// the MoveLast variant is only used for SeekOp::LT and SeekOp::LE when the seek condition is always true,
// so we have always found what we were looking for.

View File

@@ -31,7 +31,7 @@ use crate::{
translate::plan::TableReferences,
types::{IOResult, RawSlice, TextRef},
vdbe::execute::{OpIdxInsertState, OpInsertState, OpNewRowidState, OpSeekState},
PagerCacheflushStatus, RefValue,
RefValue,
};
use crate::{
@@ -506,7 +506,7 @@ impl Program {
connection.wal_checkpoint_disabled.get(),
)?;
match cacheflush_status {
PagerCacheflushStatus::Done(status) => {
IOResult::Done(status) => {
if self.change_cnt_on {
self.connection.set_changes(self.n_change.get());
}
@@ -519,7 +519,7 @@ impl Program {
connection.transaction_state.replace(TransactionState::None);
*commit_state = CommitState::Ready;
}
PagerCacheflushStatus::IO => {
IOResult::IO => {
tracing::trace!("Cacheflush IO");
*commit_state = CommitState::Committing;
return Ok(StepResult::IO);

View File

@@ -116,10 +116,10 @@ impl TempDatabase {
pub(crate) fn do_flush(conn: &Arc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<()> {
loop {
match conn.cacheflush()? {
PagerCacheflushStatus::Done(_) => {
IOResult::Done(_) => {
break;
}
PagerCacheflushStatus::IO => {
IOResult::IO => {
tmp_db.io.run_once()?;
}
}