diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 599cd6826..c02af788d 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -39,7 +39,7 @@ pub const NO_LOCK: u32 = 0; pub const SHARED_LOCK: u32 = 1; pub const WRITE_LOCK: u32 = 2; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Default)] pub struct CheckpointResult { /// number of frames in WAL pub num_wal_frames: u64, @@ -47,17 +47,11 @@ pub struct CheckpointResult { pub num_checkpointed_frames: u64, } -impl Default for CheckpointResult { - fn default() -> Self { - Self::new() - } -} - impl CheckpointResult { - pub fn new() -> Self { + pub fn new(n_frames: u64, n_ckpt: u64) -> Self { Self { - num_wal_frames: 0, - num_checkpointed_frames: 0, + num_wal_frames: n_frames, + num_checkpointed_frames: n_ckpt, } } } @@ -423,6 +417,9 @@ pub struct WalFile { /// Hack for now in case of rollback, will not be needed once we remove this bullshit frame cache. start_pages_in_frames: usize, + /// Count of possible pages to checkpoint, and number of backfilled + prev_checkpoint: CheckpointResult, + /// Private copy of WalHeader pub header: WalHeader, } @@ -802,7 +799,8 @@ impl Wal for WalFile { fn should_checkpoint(&self) -> bool { let shared = self.get_shared(); let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize; - frame_id >= self.checkpoint_threshold + let nbackfills = shared.nbackfills.load(Ordering::SeqCst) as usize; + frame_id > self.checkpoint_threshold + nbackfills } #[instrument(skip_all, level = Level::DEBUG)] @@ -822,7 +820,19 @@ impl Wal for WalFile { match state { CheckpointState::Start => { // TODO(pere): check what frames are safe to checkpoint between many readers! - self.ongoing_checkpoint.min_frame = self.min_frame; + let (shared_max, nbackfills) = { + let shared = self.get_shared(); + ( + shared.max_frame.load(Ordering::SeqCst), + shared.nbackfills.load(Ordering::SeqCst), + ) + }; + if shared_max <= nbackfills { + // if there's nothing to do and we are fully back-filled, to match sqlite + // we return the previous number of backfilled pages from last checkpoint. + return Ok(IOResult::Done(self.prev_checkpoint)); + } + self.ongoing_checkpoint.min_frame = nbackfills + 1; let shared = self.get_shared(); let busy = !shared.checkpoint_lock.write(); if busy { @@ -936,15 +946,23 @@ impl Wal for WalFile { } let shared = self.get_shared(); shared.checkpoint_lock.unlock(); + let max_frame = shared.max_frame.load(Ordering::SeqCst); + let nbackfills = shared.nbackfills.load(Ordering::SeqCst); // Record two num pages fields to return as checkpoint result to caller. // Ref: pnLog, pnCkpt on https://www.sqlite.org/c3ref/wal_checkpoint_v2.html - let checkpoint_result = CheckpointResult { - num_wal_frames: shared.max_frame.load(Ordering::SeqCst), - num_checkpointed_frames: self.ongoing_checkpoint.max_frame, - }; + let frames_in_wal = max_frame.saturating_sub(nbackfills); + let frames_checkpointed = self + .ongoing_checkpoint + .max_frame + .saturating_sub(self.ongoing_checkpoint.min_frame - 1); + let checkpoint_result = + CheckpointResult::new(frames_in_wal, frames_checkpointed); let everything_backfilled = shared.max_frame.load(Ordering::SeqCst) == self.ongoing_checkpoint.max_frame; + shared + .nbackfills + .store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst); if everything_backfilled { // TODO: Even in Passive mode, if everything was backfilled we should // truncate and fsync the *db file* @@ -962,11 +980,8 @@ impl Wal for WalFile { // TODO: if all frames were backfilled into the db file, calls fsync // TODO(pere): truncate wal file here. } - } else { - shared - .nbackfills - .store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst); } + self.prev_checkpoint = checkpoint_result; self.ongoing_checkpoint.state = CheckpointState::Start; return Ok(IOResult::Done(checkpoint_result)); } @@ -1094,6 +1109,7 @@ impl WalFile { min_frame: 0, max_frame_read_lock_index: 0, last_checksum, + prev_checkpoint: CheckpointResult::default(), start_pages_in_frames: 0, header: *header, } diff --git a/sqlite3/tests/compat/mod.rs b/sqlite3/tests/compat/mod.rs index 3bb3b0841..a6aba67c5 100644 --- a/sqlite3/tests/compat/mod.rs +++ b/sqlite3/tests/compat/mod.rs @@ -206,6 +206,7 @@ mod tests { #[cfg(not(feature = "sqlite3"))] mod libsql_ext { + use super::*; #[test] @@ -471,7 +472,7 @@ mod tests { assert_eq!(sqlite3_open(c_path.as_ptr(), &mut db), SQLITE_OK); // Insert at least 1000 rows to go over checkpoint threshold. let mut stmt = ptr::null_mut(); - for i in 1..=2000 { + for i in 1..2000 { let sql = std::ffi::CString::new(format!("INSERT INTO test (id) VALUES ({i})")) .unwrap(); @@ -506,7 +507,11 @@ mod tests { ); assert_eq!(sqlite3_step(stmt), SQLITE_ROW); let count = sqlite3_column_int64(stmt, 0); - assert_eq!(count, 2000); + // with a sane `should_checkpoint` method we have no garuantee that all 2000 rows are present, as the checkpoint was + // triggered by cacheflush on insertions. the pattern will trigger a checkpoint when the wal has > 1000 frames, + // so it will be triggered but will no longer be triggered on each consecutive + // write. here we can assert that we have > 1500 rows. + assert!(count > 1500); assert_eq!(sqlite3_step(stmt), SQLITE_DONE); assert_eq!(sqlite3_finalize(stmt), SQLITE_OK); }