Merge 'Separate user-callable cacheflush from internal cacheflush logic' from Diego Reis

Cacheflush should only spill pages to WAL as non-commit frames, without
checkpointing nor syncing.
- [docs](https://sqlite.org/c3ref/db_cacheflush.html)
- [sqlite3PagerFlush](https://github.com/sqlite/sqlite/blob/625d0b70febe
cb0864a81b2a047a961a59e8c17e/src/pager.c#L4669)

Reviewed-by: Pere Diaz Bou <pere-altea@homail.com>

Closes #2044
This commit is contained in:
Jussi Saurio
2025-07-16 19:44:12 +03:00
6 changed files with 175 additions and 58 deletions

View File

@@ -205,6 +205,17 @@ impl Connection {
})?;
Ok(())
}
/// Flush dirty pages to disk.
/// This will write the dirty pages to the WAL.
pub fn cacheflush(&self) -> Result<()> {
let conn = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()))?;
conn.cacheflush()?;
Ok(())
}
}
impl Debug for Connection {

View File

@@ -1,3 +1,4 @@
use tokio::fs;
use turso::{Builder, Value};
#[tokio::test]
@@ -55,3 +56,62 @@ async fn test_rows_next() {
);
assert!(res.next().await.unwrap().is_none());
}
#[tokio::test]
async fn test_cacheflush() {
let builder = Builder::new_local("test.db");
let db = builder.build().await.unwrap();
let conn = db.connect().unwrap();
conn.execute("CREATE TABLE IF NOT EXISTS asdf (x INTEGER)", ())
.await
.unwrap();
// Tests if cache flush breaks transaction isolation
conn.execute("BEGIN", ()).await.unwrap();
conn.execute("INSERT INTO asdf (x) VALUES (1)", ())
.await
.unwrap();
conn.cacheflush().unwrap();
conn.execute("ROLLBACK", ()).await.unwrap();
conn.execute("INSERT INTO asdf (x) VALUES (2)", ())
.await
.unwrap();
conn.execute("INSERT INTO asdf (x) VALUES (3)", ())
.await
.unwrap();
let mut res = conn.query("SELECT * FROM asdf", ()).await.unwrap();
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
2.into()
);
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
3.into()
);
// Tests if cache flush doesn't break a committed transaction
conn.execute("BEGIN", ()).await.unwrap();
conn.execute("INSERT INTO asdf (x) VALUES (1)", ())
.await
.unwrap();
conn.cacheflush().unwrap();
conn.execute("COMMIT", ()).await.unwrap();
let mut res = conn
.query("SELECT * FROM asdf WHERE x = 1", ())
.await
.unwrap();
assert_eq!(
res.next().await.unwrap().unwrap().get_value(0).unwrap(),
1.into()
);
fs::remove_file("test.db").await.unwrap();
fs::remove_file("test.db-wal").await.unwrap();
}

View File

@@ -77,7 +77,7 @@ use std::{
#[cfg(feature = "fs")]
use storage::database::DatabaseFile;
use storage::page_cache::DumbLruPageCache;
use storage::pager::{PagerCacheflushResult, DB_STATE_INITIALIZED, DB_STATE_UNINITIALIZED};
use storage::pager::{DB_STATE_INITIALIZED, DB_STATE_UNINITIALIZED};
pub use storage::{
buffer_pool::BufferPool,
database::DatabaseStorage,
@@ -752,16 +752,11 @@ impl Connection {
}
/// Flush dirty pages to disk.
/// This will write the dirty pages to the WAL and then fsync the WAL.
/// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
/// the database file and then fsync the database file.
pub fn cacheflush(&self) -> Result<IOResult<PagerCacheflushResult>> {
pub fn cacheflush(&self) -> Result<IOResult<()>> {
if self.closed.get() {
return Err(LimboError::InternalError("Connection closed".to_string()));
}
self.pager
.borrow()
.cacheflush(self.wal_checkpoint_disabled.get())
self.pager.borrow().cacheflush()
}
pub fn clear_page_cache(&self) -> Result<()> {

View File

@@ -140,10 +140,18 @@ impl Page {
}
}
}
#[derive(Clone, Copy, Debug)]
/// The state of the current pager cache flush.
enum FlushState {
enum CacheFlushState {
/// Idle.
Start,
/// Waiting for all in-flight writes to the on-disk WAL to complete.
WaitAppendFrames,
}
#[derive(Clone, Copy, Debug)]
/// The state of the current pager cache commit.
enum CommitState {
/// Idle.
Start,
/// Waiting for all in-flight writes to the on-disk WAL to complete.
@@ -176,10 +184,17 @@ pub enum BtreePageAllocMode {
Le(u32),
}
/// This will keep track of the state of current cache commit in order to not repeat work
struct CommitInfo {
state: CommitState,
/// Number of writes taking place. When in_flight gets to 0 we can schedule a fsync.
in_flight_writes: Rc<RefCell<usize>>,
}
/// This will keep track of the state of current cache flush in order to not repeat work
struct FlushInfo {
state: FlushState,
/// Number of writes taking place. When in_flight gets to 0 we can schedule a fsync.
state: CacheFlushState,
/// Number of writes taking place.
in_flight_writes: Rc<RefCell<usize>>,
}
@@ -210,6 +225,7 @@ pub struct Pager {
pub io: Arc<dyn crate::io::IO>,
dirty_pages: Rc<RefCell<HashSet<usize>>>,
commit_info: RefCell<CommitInfo>,
flush_info: RefCell<FlushInfo>,
checkpoint_state: RefCell<CheckpointState>,
checkpoint_inflight: Rc<RefCell<usize>>,
@@ -231,7 +247,7 @@ pub struct Pager {
#[derive(Debug, Copy, Clone)]
/// The status of the current cache flush.
pub enum PagerCacheflushResult {
pub enum PagerCommitResult {
/// The WAL was written to disk and fsynced.
WalWritten,
/// The WAL was written, fsynced, and a checkpoint was performed.
@@ -271,8 +287,8 @@ impl Pager {
page_cache,
io,
dirty_pages: Rc::new(RefCell::new(HashSet::new())),
flush_info: RefCell::new(FlushInfo {
state: FlushState::Start,
commit_info: RefCell::new(CommitInfo {
state: CommitState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
}),
syncing: Rc::new(RefCell::new(false)),
@@ -285,6 +301,10 @@ impl Pager {
allocate_page1_state,
page_size: Cell::new(None),
reserved_space: OnceCell::new(),
flush_info: RefCell::new(FlushInfo {
state: CacheFlushState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
}),
})
}
@@ -647,15 +667,15 @@ impl Pager {
schema_did_change: bool,
connection: &Connection,
wal_checkpoint_disabled: bool,
) -> Result<IOResult<PagerCacheflushResult>> {
) -> Result<IOResult<PagerCommitResult>> {
tracing::trace!("end_tx(rollback={})", rollback);
if rollback {
self.wal.borrow().end_write_tx()?;
self.wal.borrow().end_read_tx()?;
return Ok(IOResult::Done(PagerCacheflushResult::Rollback));
return Ok(IOResult::Done(PagerCommitResult::Rollback));
}
let cacheflush_status = self.cacheflush(wal_checkpoint_disabled)?;
match cacheflush_status {
let commit_status = self.commit_dirty_pages(wal_checkpoint_disabled)?;
match commit_status {
IOResult::IO => Ok(IOResult::IO),
IOResult::Done(_) => {
let maybe_schema_pair = if schema_did_change {
@@ -671,7 +691,7 @@ impl Pager {
if let Some((schema, mut db_schema)) = maybe_schema_pair {
*db_schema = schema;
}
Ok(cacheflush_status)
Ok(commit_status)
}
}
}
@@ -764,21 +784,59 @@ impl Pager {
Ok(self.wal.borrow().get_max_frame_in_wal())
}
/// Flush dirty pages to disk.
/// Flush all dirty pages to disk.
/// Unlike commit_dirty_pages, this function does not commit, checkpoint now sync the WAL/Database.
#[instrument(skip_all, level = Level::INFO)]
pub fn cacheflush(&self) -> Result<IOResult<()>> {
let state = self.flush_info.borrow().state;
trace!(?state);
match state {
CacheFlushState::Start => {
for page_id in self.dirty_pages.borrow().iter() {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(*page_id);
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!("cacheflush(page={}, page_type={:?})", page_id, page_type);
self.wal.borrow_mut().append_frame(
page.clone(),
0,
self.flush_info.borrow().in_flight_writes.clone(),
)?;
page.clear_dirty();
}
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = CacheFlushState::WaitAppendFrames;
return Ok(IOResult::IO);
}
CacheFlushState::WaitAppendFrames => {
let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
if in_flight == 0 {
self.flush_info.borrow_mut().state = CacheFlushState::Start;
self.wal.borrow_mut().finish_append_frames_commit()?;
return Ok(IOResult::Done(()));
} else {
return Ok(IOResult::IO);
}
}
}
}
/// Flush all dirty pages to disk.
/// In the base case, it will write the dirty pages to the WAL and then fsync the WAL.
/// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
/// the database file and then fsync the database file.
#[instrument(skip_all, level = Level::INFO)]
pub fn cacheflush(
pub fn commit_dirty_pages(
&self,
wal_checkpoint_disabled: bool,
) -> Result<IOResult<PagerCacheflushResult>> {
) -> Result<IOResult<PagerCommitResult>> {
let mut checkpoint_result = CheckpointResult::default();
let res = loop {
let state = self.flush_info.borrow().state;
let state = self.commit_info.borrow().state;
trace!(?state);
match state {
FlushState::Start => {
CommitState::Start => {
let db_size = header_accessor::get_database_size(self)?;
for (dirty_page_idx, page_id) in self.dirty_pages.borrow().iter().enumerate() {
let is_last_frame = dirty_page_idx == self.dirty_pages.borrow().len() - 1;
@@ -786,12 +844,16 @@ impl Pager {
let page_key = PageCacheKey::new(*page_id);
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!("cacheflush(page={}, page_type={:?}", page_id, page_type);
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
let db_size = if is_last_frame { db_size } else { 0 };
self.wal.borrow_mut().append_frame(
page.clone(),
db_size,
self.flush_info.borrow().in_flight_writes.clone(),
self.commit_info.borrow().in_flight_writes.clone(),
)?;
page.clear_dirty();
}
@@ -801,45 +863,40 @@ impl Pager {
cache.clear().unwrap();
}
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames;
self.commit_info.borrow_mut().state = CommitState::WaitAppendFrames;
return Ok(IOResult::IO);
}
FlushState::WaitAppendFrames => {
let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
CommitState::WaitAppendFrames => {
let in_flight = *self.commit_info.borrow().in_flight_writes.borrow();
if in_flight == 0 {
self.flush_info.borrow_mut().state = FlushState::SyncWal;
self.commit_info.borrow_mut().state = CommitState::SyncWal;
} else {
return Ok(IOResult::IO);
}
}
FlushState::SyncWal => {
CommitState::SyncWal => {
return_if_io!(self.wal.borrow_mut().sync());
if wal_checkpoint_disabled || !self.wal.borrow().should_checkpoint() {
self.flush_info.borrow_mut().state = FlushState::Start;
break PagerCacheflushResult::WalWritten;
self.commit_info.borrow_mut().state = CommitState::Start;
break PagerCommitResult::WalWritten;
}
self.flush_info.borrow_mut().state = FlushState::Checkpoint;
self.commit_info.borrow_mut().state = CommitState::Checkpoint;
}
FlushState::Checkpoint => {
match self.checkpoint()? {
IOResult::Done(res) => {
checkpoint_result = res;
self.flush_info.borrow_mut().state = FlushState::SyncDbFile;
}
IOResult::IO => return Ok(IOResult::IO),
};
CommitState::Checkpoint => {
checkpoint_result = return_if_io!(self.checkpoint());
self.commit_info.borrow_mut().state = CommitState::SyncDbFile;
}
FlushState::SyncDbFile => {
CommitState::SyncDbFile => {
sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
self.flush_info.borrow_mut().state = FlushState::WaitSyncDbFile;
self.commit_info.borrow_mut().state = CommitState::WaitSyncDbFile;
}
FlushState::WaitSyncDbFile => {
CommitState::WaitSyncDbFile => {
if *self.syncing.borrow() {
return Ok(IOResult::IO);
} else {
self.flush_info.borrow_mut().state = FlushState::Start;
break PagerCacheflushResult::Checkpointed(checkpoint_result);
self.commit_info.borrow_mut().state = CommitState::Start;
break PagerCommitResult::Checkpointed(checkpoint_result);
}
}
}

View File

@@ -27,7 +27,7 @@ pub mod sorter;
use crate::{
error::LimboError,
function::{AggFunc, FuncCtx},
storage::sqlite3_ondisk::SmallVec,
storage::{pager, sqlite3_ondisk::SmallVec},
translate::plan::TableReferences,
types::{IOResult, RawSlice, TextRef},
vdbe::execute::{OpIdxInsertState, OpInsertState, OpNewRowidState, OpSeekState},
@@ -510,10 +510,7 @@ impl Program {
if self.change_cnt_on {
self.connection.set_changes(self.n_change.get());
}
if matches!(
status,
crate::storage::pager::PagerCacheflushResult::Rollback
) {
if matches!(status, pager::PagerCommitResult::Rollback) {
pager.rollback(schema_did_change, connection)?;
}
connection.transaction_state.replace(TransactionState::None);

View File

@@ -3,11 +3,8 @@ use rusqlite::params;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::TempDir;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
use turso_core::types::IOResult;
use turso_core::{Connection, Database, IO};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use turso_core::{types::IOResult, Connection, Database, IO};
#[allow(dead_code)]
pub struct TempDatabase {