From 817ad8d50f056c2075d7f8b4f23c9e854485cd4e Mon Sep 17 00:00:00 2001 From: Diego Reis Date: Thu, 10 Jul 2025 22:41:49 -0300 Subject: [PATCH] Separate user-callable cacheflush from internal cacheflush logic Cacheflush should only spill pages to WAL as non-commit frames, without checkpointing nor syncing. Check SQLite's sqlite3PagerFlush --- core/lib.rs | 11 +-- core/storage/pager.rs | 141 ++++++++++++++++++++++++++---------- core/vdbe/mod.rs | 7 +- tests/integration/common.rs | 7 +- 4 files changed, 108 insertions(+), 58 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index dcd83574f..747ff7a49 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -77,7 +77,7 @@ use std::{ #[cfg(feature = "fs")] use storage::database::DatabaseFile; use storage::page_cache::DumbLruPageCache; -use storage::pager::{PagerCacheflushResult, DB_STATE_INITIALIZED, DB_STATE_UNINITIALIZED}; +use storage::pager::{DB_STATE_INITIALIZED, DB_STATE_UNINITIALIZED}; pub use storage::{ buffer_pool::BufferPool, database::DatabaseStorage, @@ -752,16 +752,11 @@ impl Connection { } /// Flush dirty pages to disk. - /// This will write the dirty pages to the WAL and then fsync the WAL. - /// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to - /// the database file and then fsync the database file. - pub fn cacheflush(&self) -> Result> { + pub fn cacheflush(&self) -> Result> { if self.closed.get() { return Err(LimboError::InternalError("Connection closed".to_string())); } - self.pager - .borrow() - .cacheflush(self.wal_checkpoint_disabled.get()) + self.pager.borrow().cacheflush() } pub fn clear_page_cache(&self) -> Result<()> { diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 3e227a011..13a817e25 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -140,10 +140,18 @@ impl Page { } } } - #[derive(Clone, Copy, Debug)] /// The state of the current pager cache flush. -enum FlushState { +enum CacheFlushState { + /// Idle. + Start, + /// Waiting for all in-flight writes to the on-disk WAL to complete. + WaitAppendFrames, +} + +#[derive(Clone, Copy, Debug)] +/// The state of the current pager cache commit. +enum CommitState { /// Idle. Start, /// Waiting for all in-flight writes to the on-disk WAL to complete. @@ -176,10 +184,17 @@ pub enum BtreePageAllocMode { Le(u32), } +/// This will keep track of the state of current cache commit in order to not repeat work +struct CommitInfo { + state: CommitState, + /// Number of writes taking place. When in_flight gets to 0 we can schedule a fsync. + in_flight_writes: Rc>, +} + /// This will keep track of the state of current cache flush in order to not repeat work struct FlushInfo { - state: FlushState, - /// Number of writes taking place. When in_flight gets to 0 we can schedule a fsync. + state: CacheFlushState, + /// Number of writes taking place. in_flight_writes: Rc>, } @@ -210,6 +225,7 @@ pub struct Pager { pub io: Arc, dirty_pages: Rc>>, + commit_info: RefCell, flush_info: RefCell, checkpoint_state: RefCell, checkpoint_inflight: Rc>, @@ -231,7 +247,7 @@ pub struct Pager { #[derive(Debug, Copy, Clone)] /// The status of the current cache flush. -pub enum PagerCacheflushResult { +pub enum PagerCommitResult { /// The WAL was written to disk and fsynced. WalWritten, /// The WAL was written, fsynced, and a checkpoint was performed. @@ -271,8 +287,8 @@ impl Pager { page_cache, io, dirty_pages: Rc::new(RefCell::new(HashSet::new())), - flush_info: RefCell::new(FlushInfo { - state: FlushState::Start, + commit_info: RefCell::new(CommitInfo { + state: CommitState::Start, in_flight_writes: Rc::new(RefCell::new(0)), }), syncing: Rc::new(RefCell::new(false)), @@ -285,6 +301,10 @@ impl Pager { allocate_page1_state, page_size: Cell::new(None), reserved_space: OnceCell::new(), + flush_info: RefCell::new(FlushInfo { + state: CacheFlushState::Start, + in_flight_writes: Rc::new(RefCell::new(0)), + }), }) } @@ -647,15 +667,15 @@ impl Pager { schema_did_change: bool, connection: &Connection, wal_checkpoint_disabled: bool, - ) -> Result> { + ) -> Result> { tracing::trace!("end_tx(rollback={})", rollback); if rollback { self.wal.borrow().end_write_tx()?; self.wal.borrow().end_read_tx()?; - return Ok(IOResult::Done(PagerCacheflushResult::Rollback)); + return Ok(IOResult::Done(PagerCommitResult::Rollback)); } - let cacheflush_status = self.cacheflush(wal_checkpoint_disabled)?; - match cacheflush_status { + let commit_status = self.commit_dirty_pages(wal_checkpoint_disabled)?; + match commit_status { IOResult::IO => Ok(IOResult::IO), IOResult::Done(_) => { let maybe_schema_pair = if schema_did_change { @@ -671,7 +691,7 @@ impl Pager { if let Some((schema, mut db_schema)) = maybe_schema_pair { *db_schema = schema; } - Ok(cacheflush_status) + Ok(commit_status) } } } @@ -764,21 +784,63 @@ impl Pager { Ok(self.wal.borrow().get_max_frame_in_wal()) } - /// Flush dirty pages to disk. + /// Flush all dirty pages to disk. + /// Unlike commit_dirty_pages, this function does not commit, checkpoint now sync the WAL/Database. + #[instrument(skip_all, level = Level::INFO)] + pub fn cacheflush(&self) -> Result> { + let state = self.flush_info.borrow().state; + trace!(?state); + match state { + CacheFlushState::Start => { + for page_id in self.dirty_pages.borrow().iter() { + let mut cache = self.page_cache.write(); + let page_key = PageCacheKey::new(*page_id); + let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); + let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); + trace!("cacheflush(page={}, page_type={:?})", page_id, page_type); + self.wal.borrow_mut().append_frame( + page.clone(), + 0, + self.flush_info.borrow().in_flight_writes.clone(), + )?; + page.clear_dirty(); + } + { + let mut cache = self.page_cache.write(); + cache.clear().unwrap(); + } + self.dirty_pages.borrow_mut().clear(); + self.flush_info.borrow_mut().state = CacheFlushState::WaitAppendFrames; + return Ok(IOResult::IO); + } + CacheFlushState::WaitAppendFrames => { + let in_flight = *self.flush_info.borrow().in_flight_writes.borrow(); + if in_flight == 0 { + self.flush_info.borrow_mut().state = CacheFlushState::Start; + self.wal.borrow_mut().finish_append_frames_commit()?; + return Ok(IOResult::Done(())); + } else { + return Ok(IOResult::IO); + } + } + } + } + + /// Flush all dirty pages to disk. /// In the base case, it will write the dirty pages to the WAL and then fsync the WAL. /// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to /// the database file and then fsync the database file. #[instrument(skip_all, level = Level::INFO)] - pub fn cacheflush( + pub fn commit_dirty_pages( &self, wal_checkpoint_disabled: bool, - ) -> Result> { + ) -> Result> { let mut checkpoint_result = CheckpointResult::default(); let res = loop { - let state = self.flush_info.borrow().state; + let state = self.commit_info.borrow().state; trace!(?state); match state { - FlushState::Start => { + CommitState::Start => { let db_size = header_accessor::get_database_size(self)?; for (dirty_page_idx, page_id) in self.dirty_pages.borrow().iter().enumerate() { let is_last_frame = dirty_page_idx == self.dirty_pages.borrow().len() - 1; @@ -786,12 +848,16 @@ impl Pager { let page_key = PageCacheKey::new(*page_id); let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); - trace!("cacheflush(page={}, page_type={:?}", page_id, page_type); + trace!( + "commit_dirty_pages(page={}, page_type={:?}", + page_id, + page_type + ); let db_size = if is_last_frame { db_size } else { 0 }; self.wal.borrow_mut().append_frame( page.clone(), db_size, - self.flush_info.borrow().in_flight_writes.clone(), + self.commit_info.borrow().in_flight_writes.clone(), )?; page.clear_dirty(); } @@ -801,45 +867,40 @@ impl Pager { cache.clear().unwrap(); } self.dirty_pages.borrow_mut().clear(); - self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames; + self.commit_info.borrow_mut().state = CommitState::WaitAppendFrames; return Ok(IOResult::IO); } - FlushState::WaitAppendFrames => { - let in_flight = *self.flush_info.borrow().in_flight_writes.borrow(); + CommitState::WaitAppendFrames => { + let in_flight = *self.commit_info.borrow().in_flight_writes.borrow(); if in_flight == 0 { - self.flush_info.borrow_mut().state = FlushState::SyncWal; + self.commit_info.borrow_mut().state = CommitState::SyncWal; } else { return Ok(IOResult::IO); } } - FlushState::SyncWal => { + CommitState::SyncWal => { return_if_io!(self.wal.borrow_mut().sync()); if wal_checkpoint_disabled || !self.wal.borrow().should_checkpoint() { - self.flush_info.borrow_mut().state = FlushState::Start; - break PagerCacheflushResult::WalWritten; + self.commit_info.borrow_mut().state = CommitState::Start; + break PagerCommitResult::WalWritten; } - self.flush_info.borrow_mut().state = FlushState::Checkpoint; + self.commit_info.borrow_mut().state = CommitState::Checkpoint; } - FlushState::Checkpoint => { - match self.checkpoint()? { - IOResult::Done(res) => { - checkpoint_result = res; - self.flush_info.borrow_mut().state = FlushState::SyncDbFile; - } - IOResult::IO => return Ok(IOResult::IO), - }; + CommitState::Checkpoint => { + checkpoint_result = return_if_io!(self.checkpoint()); + self.commit_info.borrow_mut().state = CommitState::SyncDbFile; } - FlushState::SyncDbFile => { + CommitState::SyncDbFile => { sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?; - self.flush_info.borrow_mut().state = FlushState::WaitSyncDbFile; + self.commit_info.borrow_mut().state = CommitState::WaitSyncDbFile; } - FlushState::WaitSyncDbFile => { + CommitState::WaitSyncDbFile => { if *self.syncing.borrow() { return Ok(IOResult::IO); } else { - self.flush_info.borrow_mut().state = FlushState::Start; - break PagerCacheflushResult::Checkpointed(checkpoint_result); + self.commit_info.borrow_mut().state = CommitState::Start; + break PagerCommitResult::Checkpointed(checkpoint_result); } } } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index c6f98e002..c9a234469 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -27,7 +27,7 @@ pub mod sorter; use crate::{ error::LimboError, function::{AggFunc, FuncCtx}, - storage::sqlite3_ondisk::SmallVec, + storage::{pager, sqlite3_ondisk::SmallVec}, translate::plan::TableReferences, types::{IOResult, RawSlice, TextRef}, vdbe::execute::{OpIdxInsertState, OpInsertState, OpNewRowidState, OpSeekState}, @@ -510,10 +510,7 @@ impl Program { if self.change_cnt_on { self.connection.set_changes(self.n_change.get()); } - if matches!( - status, - crate::storage::pager::PagerCacheflushResult::Rollback - ) { + if matches!(status, pager::PagerCommitResult::Rollback) { pager.rollback(schema_did_change, connection)?; } connection.transaction_state.replace(TransactionState::None); diff --git a/tests/integration/common.rs b/tests/integration/common.rs index e0c3feea3..51943888f 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -3,11 +3,8 @@ use rusqlite::params; use std::path::{Path, PathBuf}; use std::sync::Arc; use tempfile::TempDir; -use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::util::SubscriberInitExt; -use tracing_subscriber::EnvFilter; -use turso_core::types::IOResult; -use turso_core::{Connection, Database, IO}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; +use turso_core::{types::IOResult, Connection, Database, IO}; #[allow(dead_code)] pub struct TempDatabase {