Separate user-callable cacheflush from internal cacheflush logic

Cacheflush should only spill pages to WAL as non-commit frames, without checkpointing nor syncing. Check SQLite's sqlite3PagerFlush
This commit is contained in:
Diego Reis
2025-07-10 22:41:49 -03:00
parent 93634d56ba
commit 817ad8d50f
4 changed files with 108 additions and 58 deletions

View File

@@ -77,7 +77,7 @@ use std::{
#[cfg(feature = "fs")]
use storage::database::DatabaseFile;
use storage::page_cache::DumbLruPageCache;
use storage::pager::{PagerCacheflushResult, DB_STATE_INITIALIZED, DB_STATE_UNINITIALIZED};
use storage::pager::{DB_STATE_INITIALIZED, DB_STATE_UNINITIALIZED};
pub use storage::{
buffer_pool::BufferPool,
database::DatabaseStorage,
@@ -752,16 +752,11 @@ impl Connection {
}
/// Flush dirty pages to disk.
/// This will write the dirty pages to the WAL and then fsync the WAL.
/// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
/// the database file and then fsync the database file.
pub fn cacheflush(&self) -> Result<IOResult<PagerCacheflushResult>> {
pub fn cacheflush(&self) -> Result<IOResult<()>> {
if self.closed.get() {
return Err(LimboError::InternalError("Connection closed".to_string()));
}
self.pager
.borrow()
.cacheflush(self.wal_checkpoint_disabled.get())
self.pager.borrow().cacheflush()
}
pub fn clear_page_cache(&self) -> Result<()> {

View File

@@ -140,10 +140,18 @@ impl Page {
}
}
}
#[derive(Clone, Copy, Debug)]
/// The state of the current pager cache flush.
enum FlushState {
enum CacheFlushState {
/// Idle.
Start,
/// Waiting for all in-flight writes to the on-disk WAL to complete.
WaitAppendFrames,
}
#[derive(Clone, Copy, Debug)]
/// The state of the current pager cache commit.
enum CommitState {
/// Idle.
Start,
/// Waiting for all in-flight writes to the on-disk WAL to complete.
@@ -176,10 +184,17 @@ pub enum BtreePageAllocMode {
Le(u32),
}
/// This will keep track of the state of current cache commit in order to not repeat work
struct CommitInfo {
state: CommitState,
/// Number of writes taking place. When in_flight gets to 0 we can schedule a fsync.
in_flight_writes: Rc<RefCell<usize>>,
}
/// This will keep track of the state of current cache flush in order to not repeat work
struct FlushInfo {
state: FlushState,
/// Number of writes taking place. When in_flight gets to 0 we can schedule a fsync.
state: CacheFlushState,
/// Number of writes taking place.
in_flight_writes: Rc<RefCell<usize>>,
}
@@ -210,6 +225,7 @@ pub struct Pager {
pub io: Arc<dyn crate::io::IO>,
dirty_pages: Rc<RefCell<HashSet<usize>>>,
commit_info: RefCell<CommitInfo>,
flush_info: RefCell<FlushInfo>,
checkpoint_state: RefCell<CheckpointState>,
checkpoint_inflight: Rc<RefCell<usize>>,
@@ -231,7 +247,7 @@ pub struct Pager {
#[derive(Debug, Copy, Clone)]
/// The status of the current cache flush.
pub enum PagerCacheflushResult {
pub enum PagerCommitResult {
/// The WAL was written to disk and fsynced.
WalWritten,
/// The WAL was written, fsynced, and a checkpoint was performed.
@@ -271,8 +287,8 @@ impl Pager {
page_cache,
io,
dirty_pages: Rc::new(RefCell::new(HashSet::new())),
flush_info: RefCell::new(FlushInfo {
state: FlushState::Start,
commit_info: RefCell::new(CommitInfo {
state: CommitState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
}),
syncing: Rc::new(RefCell::new(false)),
@@ -285,6 +301,10 @@ impl Pager {
allocate_page1_state,
page_size: Cell::new(None),
reserved_space: OnceCell::new(),
flush_info: RefCell::new(FlushInfo {
state: CacheFlushState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
}),
})
}
@@ -647,15 +667,15 @@ impl Pager {
schema_did_change: bool,
connection: &Connection,
wal_checkpoint_disabled: bool,
) -> Result<IOResult<PagerCacheflushResult>> {
) -> Result<IOResult<PagerCommitResult>> {
tracing::trace!("end_tx(rollback={})", rollback);
if rollback {
self.wal.borrow().end_write_tx()?;
self.wal.borrow().end_read_tx()?;
return Ok(IOResult::Done(PagerCacheflushResult::Rollback));
return Ok(IOResult::Done(PagerCommitResult::Rollback));
}
let cacheflush_status = self.cacheflush(wal_checkpoint_disabled)?;
match cacheflush_status {
let commit_status = self.commit_dirty_pages(wal_checkpoint_disabled)?;
match commit_status {
IOResult::IO => Ok(IOResult::IO),
IOResult::Done(_) => {
let maybe_schema_pair = if schema_did_change {
@@ -671,7 +691,7 @@ impl Pager {
if let Some((schema, mut db_schema)) = maybe_schema_pair {
*db_schema = schema;
}
Ok(cacheflush_status)
Ok(commit_status)
}
}
}
@@ -764,21 +784,63 @@ impl Pager {
Ok(self.wal.borrow().get_max_frame_in_wal())
}
/// Flush dirty pages to disk.
/// Flush all dirty pages to disk.
/// Unlike commit_dirty_pages, this function does not commit, checkpoint now sync the WAL/Database.
#[instrument(skip_all, level = Level::INFO)]
pub fn cacheflush(&self) -> Result<IOResult<()>> {
let state = self.flush_info.borrow().state;
trace!(?state);
match state {
CacheFlushState::Start => {
for page_id in self.dirty_pages.borrow().iter() {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(*page_id);
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!("cacheflush(page={}, page_type={:?})", page_id, page_type);
self.wal.borrow_mut().append_frame(
page.clone(),
0,
self.flush_info.borrow().in_flight_writes.clone(),
)?;
page.clear_dirty();
}
{
let mut cache = self.page_cache.write();
cache.clear().unwrap();
}
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = CacheFlushState::WaitAppendFrames;
return Ok(IOResult::IO);
}
CacheFlushState::WaitAppendFrames => {
let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
if in_flight == 0 {
self.flush_info.borrow_mut().state = CacheFlushState::Start;
self.wal.borrow_mut().finish_append_frames_commit()?;
return Ok(IOResult::Done(()));
} else {
return Ok(IOResult::IO);
}
}
}
}
/// Flush all dirty pages to disk.
/// In the base case, it will write the dirty pages to the WAL and then fsync the WAL.
/// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
/// the database file and then fsync the database file.
#[instrument(skip_all, level = Level::INFO)]
pub fn cacheflush(
pub fn commit_dirty_pages(
&self,
wal_checkpoint_disabled: bool,
) -> Result<IOResult<PagerCacheflushResult>> {
) -> Result<IOResult<PagerCommitResult>> {
let mut checkpoint_result = CheckpointResult::default();
let res = loop {
let state = self.flush_info.borrow().state;
let state = self.commit_info.borrow().state;
trace!(?state);
match state {
FlushState::Start => {
CommitState::Start => {
let db_size = header_accessor::get_database_size(self)?;
for (dirty_page_idx, page_id) in self.dirty_pages.borrow().iter().enumerate() {
let is_last_frame = dirty_page_idx == self.dirty_pages.borrow().len() - 1;
@@ -786,12 +848,16 @@ impl Pager {
let page_key = PageCacheKey::new(*page_id);
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!("cacheflush(page={}, page_type={:?}", page_id, page_type);
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
let db_size = if is_last_frame { db_size } else { 0 };
self.wal.borrow_mut().append_frame(
page.clone(),
db_size,
self.flush_info.borrow().in_flight_writes.clone(),
self.commit_info.borrow().in_flight_writes.clone(),
)?;
page.clear_dirty();
}
@@ -801,45 +867,40 @@ impl Pager {
cache.clear().unwrap();
}
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames;
self.commit_info.borrow_mut().state = CommitState::WaitAppendFrames;
return Ok(IOResult::IO);
}
FlushState::WaitAppendFrames => {
let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
CommitState::WaitAppendFrames => {
let in_flight = *self.commit_info.borrow().in_flight_writes.borrow();
if in_flight == 0 {
self.flush_info.borrow_mut().state = FlushState::SyncWal;
self.commit_info.borrow_mut().state = CommitState::SyncWal;
} else {
return Ok(IOResult::IO);
}
}
FlushState::SyncWal => {
CommitState::SyncWal => {
return_if_io!(self.wal.borrow_mut().sync());
if wal_checkpoint_disabled || !self.wal.borrow().should_checkpoint() {
self.flush_info.borrow_mut().state = FlushState::Start;
break PagerCacheflushResult::WalWritten;
self.commit_info.borrow_mut().state = CommitState::Start;
break PagerCommitResult::WalWritten;
}
self.flush_info.borrow_mut().state = FlushState::Checkpoint;
self.commit_info.borrow_mut().state = CommitState::Checkpoint;
}
FlushState::Checkpoint => {
match self.checkpoint()? {
IOResult::Done(res) => {
checkpoint_result = res;
self.flush_info.borrow_mut().state = FlushState::SyncDbFile;
}
IOResult::IO => return Ok(IOResult::IO),
};
CommitState::Checkpoint => {
checkpoint_result = return_if_io!(self.checkpoint());
self.commit_info.borrow_mut().state = CommitState::SyncDbFile;
}
FlushState::SyncDbFile => {
CommitState::SyncDbFile => {
sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
self.flush_info.borrow_mut().state = FlushState::WaitSyncDbFile;
self.commit_info.borrow_mut().state = CommitState::WaitSyncDbFile;
}
FlushState::WaitSyncDbFile => {
CommitState::WaitSyncDbFile => {
if *self.syncing.borrow() {
return Ok(IOResult::IO);
} else {
self.flush_info.borrow_mut().state = FlushState::Start;
break PagerCacheflushResult::Checkpointed(checkpoint_result);
self.commit_info.borrow_mut().state = CommitState::Start;
break PagerCommitResult::Checkpointed(checkpoint_result);
}
}
}

View File

@@ -27,7 +27,7 @@ pub mod sorter;
use crate::{
error::LimboError,
function::{AggFunc, FuncCtx},
storage::sqlite3_ondisk::SmallVec,
storage::{pager, sqlite3_ondisk::SmallVec},
translate::plan::TableReferences,
types::{IOResult, RawSlice, TextRef},
vdbe::execute::{OpIdxInsertState, OpInsertState, OpNewRowidState, OpSeekState},
@@ -510,10 +510,7 @@ impl Program {
if self.change_cnt_on {
self.connection.set_changes(self.n_change.get());
}
if matches!(
status,
crate::storage::pager::PagerCacheflushResult::Rollback
) {
if matches!(status, pager::PagerCommitResult::Rollback) {
pager.rollback(schema_did_change, connection)?;
}
connection.transaction_state.replace(TransactionState::None);

View File

@@ -3,11 +3,8 @@ use rusqlite::params;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::TempDir;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
use turso_core::types::IOResult;
use turso_core::{Connection, Database, IO};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use turso_core::{types::IOResult, Connection, Database, IO};
#[allow(dead_code)]
pub struct TempDatabase {