diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 575626be5..74d96f170 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1589,9 +1589,7 @@ impl Pager { .checkpoint(self, write_counter.clone(), mode) })?; - if checkpoint_result.everything_backfilled() - && checkpoint_result.num_checkpointed_frames != 0 - { + if checkpoint_result.everything_backfilled() && checkpoint_result.num_backfilled != 0 { let db_size = self .io .block(|| self.with_header(|header| header.database_size))? diff --git a/core/storage/wal.rs b/core/storage/wal.rs index cd6b30e30..71dd7d510 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -35,10 +35,10 @@ use super::sqlite3_ondisk::{self, WalHeader}; #[derive(Debug, Clone, Default)] pub struct CheckpointResult { - /// number of frames in WAL - pub num_wal_frames: u64, + /// number of frames in WAL that could have been backfilled + pub num_attempted: u64, /// number of frames moved successfully from WAL to db file after checkpoint - pub num_checkpointed_frames: u64, + pub num_backfilled: u64, /// In the case of everything backfilled, we need to hold the locks until the db /// file is truncated. maybe_guard: Option, @@ -53,14 +53,14 @@ impl Drop for CheckpointResult { impl CheckpointResult { pub fn new(n_frames: u64, n_ckpt: u64) -> Self { Self { - num_wal_frames: n_frames, - num_checkpointed_frames: n_ckpt, + num_attempted: n_frames, + num_backfilled: n_ckpt, maybe_guard: None, } } pub const fn everything_backfilled(&self) -> bool { - self.num_wal_frames == self.num_checkpointed_frames + self.num_attempted == self.num_backfilled } pub fn release_guard(&mut self) { let _ = self.maybe_guard.take(); @@ -81,6 +81,15 @@ pub enum CheckpointMode { Truncate, } +impl CheckpointMode { + fn should_restart_log(&self) -> bool { + matches!(self, CheckpointMode::Truncate | CheckpointMode::Restart) + } + fn require_all_backfilled(&self) -> bool { + !matches!(self, CheckpointMode::Passive) + } +} + #[repr(transparent)] #[derive(Debug, Default)] /// A 64-bit read-write lock with embedded 32-bit value storage. @@ -1381,11 +1390,7 @@ impl WalFile { } // acquire the appropriate exclusive locks depending on the checkpoint mode self.acquire_proper_checkpoint_guard(mode)?; - self.ongoing_checkpoint.max_frame = - match self.determine_max_safe_checkpoint_frame(mode) { - Err(_) => return Err(LimboError::Busy), - Ok(res) => res, - }; + self.ongoing_checkpoint.max_frame = self.determine_max_safe_checkpoint_frame(); self.ongoing_checkpoint.min_frame = nbackfills + 1; self.ongoing_checkpoint.current_page = 0; self.ongoing_checkpoint.state = CheckpointState::ReadFrame; @@ -1546,12 +1551,11 @@ impl WalFile { .nbackfills .store(self.ongoing_checkpoint.max_frame, Ordering::Release); - if matches!(mode, CheckpointMode::Restart | CheckpointMode::Truncate) { - if checkpoint_result.everything_backfilled() { - self.restart_log(mode)?; - } else { - return Err(LimboError::Busy); - } + if mode.require_all_backfilled() && !checkpoint_result.everything_backfilled() { + return Err(LimboError::Busy); + } + if mode.should_restart_log() { + self.restart_log(mode)?; } // store a copy of the checkpoint result to return in the future if pragma @@ -1565,7 +1569,7 @@ impl WalFile { // b. the physical db file size differs from the expected pages * page_size // and truncate + sync the db file if necessary. if checkpoint_result.everything_backfilled() - && checkpoint_result.num_checkpointed_frames > 0 + && checkpoint_result.num_backfilled > 0 { checkpoint_result.maybe_guard = self.checkpoint_guard.take(); } else { @@ -1583,67 +1587,58 @@ impl WalFile { } /// Coordinate what the maximum safe frame is for us to backfill when checkpointing. - /// We can never backfill a frame with a higher number than any reader's max frame, + /// We can never backfill a frame with a higher number than any reader's read mark, /// because we might overwrite content the reader is reading from the database file. /// /// A checkpoint must never overwrite a page in the main DB file if some /// active reader might still need to read that page from the WAL. /// Concretely: the checkpoint may only copy frames `<= aReadMark[k]` for - /// every in‑use reader slot `k > 0`. + /// every in-use reader slot `k > 0`. /// /// `read_locks[0]` is special: readers holding slot 0 ignore the WAL entirely /// (they read only the DB file). Its value is a placeholder and does not /// constrain `mxSafeFrame`. /// - /// Slot 1 is the “default” reader slot. If it is free (we can take its - /// write-lock) we raise it to the global max so new readers see the most - /// recent snapshot. We do not clear it to `READMARK_NOT_USED` in ordinary - /// checkpoints (SQLite only clears nonzero slots during a log reset). + /// For each slot 1..N: + /// - If we can acquire the write lock (slot is free): + /// - Slot 1: Set to mxSafeFrame (allowing new readers to see up to this point) + /// - Slots 2+: Set to READMARK_NOT_USED (freeing the slot) + /// - If we cannot acquire the lock (SQLITE_BUSY): + /// - Lower mxSafeFrame to that reader's mark + /// - In PASSIVE mode: Already have no busy handler, continue scanning + /// - In FULL/RESTART/TRUNCATE: Disable busy handler for remaining slots /// - /// Slots 2..N: If a reader is stuck at an older frame, that frame becomes the - /// limit. If we can’t atomically bump that slot (write-lock fails), we must - /// clamp `mxSafeFrame` down to that mark. In PASSIVE mode we stop trying - /// immediately (we are not allowed to block or spin). In the blocking modes - /// (FULL/RESTART/TRUNCATE) we can loop and retry, but for now we can - /// just respect the first busy slot and move on. + /// Locking behavior: + /// - PASSIVE: Never waits, no busy handler (xBusy==NULL) + /// - FULL/RESTART/TRUNCATE: May wait via busy handler, but after first BUSY, + /// switches to non-blocking for remaining slots /// - /// Locking rules: - /// This routine tries to take an exclusive (write) lock on each slot to - /// update/clean it. If the try-lock fails: - /// PASSIVE: do not wait; just lower `mxSafeFrame` and break. - /// Others: lower `mxSafeFrame` and continue scanning. - /// - /// We never modify slot values while a reader holds that slot. - fn determine_max_safe_checkpoint_frame(&self, mode: CheckpointMode) -> Result { + /// We never modify slot values while a reader holds that slot's lock. + /// TOOD: implement proper BUSY handling behavior + fn determine_max_safe_checkpoint_frame(&self) -> u64 { let shared = self.get_shared(); let shared_max = shared.max_frame.load(Ordering::Acquire); + let mut max_safe_frame = shared_max; - // Start optimistic: we want to advance everyone to shared_max - for (idx, lock) in shared.read_locks.iter_mut().enumerate().skip(1) { - let mark = lock.get_value(); - if mark == READMARK_NOT_USED || mark >= shared_max as u32 { - continue; - } - // Try to bump this slot to shared_max (requires exclusive on the slot) - if lock.write() { - // Slot is free to edit, bump to shared_max (slot 1 keeps a real mark, others can be cleaned) - let val = if idx == 1 { - shared_max as u32 + for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate().skip(1) { + let this_mark = read_lock.get_value(); + if this_mark < max_safe_frame as u32 { + let busy = !read_lock.write(); + if !busy { + let val = if read_lock_idx == 1 { + // store the max_frame for the default read slot 1 + max_safe_frame as u32 + } else { + READMARK_NOT_USED + }; + read_lock.set_value_exclusive(val); + read_lock.unlock(); } else { - READMARK_NOT_USED - }; - lock.set_value_exclusive(val); - lock.unlock(); - } else { - // Reader is using this slot. - return match mode { - // clamp down for passive - CheckpointMode::Passive => Ok(mark as u64), - _ => Err(LimboError::Busy), // all others must wait - }; + max_safe_frame = this_mark as u64; + } } } - Ok(shared_max) + max_safe_frame } /// Called once the entire WAL has been back‑filled in RESTART or TRUNCATE mode. @@ -2087,8 +2082,8 @@ pub mod test { let pager = conn.pager.borrow(); let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); let res = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Restart); - assert_eq!(res.num_wal_frames, mx_before); - assert_eq!(res.num_checkpointed_frames, mx_before); + assert_eq!(res.num_attempted, mx_before); + assert_eq!(res.num_backfilled, mx_before); } // Validate post‑RESTART header & counters. @@ -2183,15 +2178,15 @@ pub mod test { }; (res, maxf) }; - assert_eq!(res1.num_wal_frames, max_before); + assert_eq!(res1.num_attempted, max_before); assert!( - res1.num_checkpointed_frames < res1.num_wal_frames, + res1.num_backfilled < res1.num_attempted, "Partial backfill expected, {} : {}", - res1.num_checkpointed_frames, - res1.num_wal_frames + res1.num_backfilled, + res1.num_attempted ); assert_eq!( - res1.num_checkpointed_frames, readmark, + res1.num_backfilled, readmark, "Checkpointed frames should match read mark" ); // Release reader @@ -2206,7 +2201,7 @@ pub mod test { let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); let res2 = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive); assert_eq!( - res2.num_checkpointed_frames, res2.num_wal_frames, + res2.num_backfilled, res2.num_attempted, "Second checkpoint completes remaining frames" ); } @@ -2355,11 +2350,11 @@ pub mod test { }; assert!( - checkpoint_result.num_checkpointed_frames < checkpoint_result.num_wal_frames, + checkpoint_result.num_backfilled < checkpoint_result.num_attempted, "Should not checkpoint all frames when readers are active" ); assert_eq!( - checkpoint_result.num_checkpointed_frames, r1_max_frame, + checkpoint_result.num_backfilled, r1_max_frame, "Should have checkpointed up to R1's max frame" ); @@ -2542,7 +2537,7 @@ pub mod test { let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive) }; - assert_eq!(result1.num_checkpointed_frames, r1_frame); + assert_eq!(result1.num_backfilled, r1_frame); // finish reader‑1 conn1.execute("COMMIT").unwrap(); @@ -2553,10 +2548,7 @@ pub mod test { let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive) }; - assert_eq!( - result1.num_checkpointed_frames + result2.num_checkpointed_frames, - r2_frame - ); + assert_eq!(result1.num_backfilled + result2.num_backfilled, r2_frame); // verify visible rows let mut stmt = conn_r2.query("SELECT COUNT(*) FROM test").unwrap().unwrap(); @@ -2847,8 +2839,8 @@ pub mod test { run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Full) }; - assert_eq!(result.num_wal_frames, mx_before); - assert_eq!(result.num_checkpointed_frames, mx_before); + assert_eq!(result.num_attempted, mx_before); + assert_eq!(result.num_backfilled, mx_before); } #[test] @@ -2924,7 +2916,7 @@ pub mod test { run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Full) }; - assert_eq!(result.num_wal_frames, mx_now); - assert_eq!(result.num_checkpointed_frames, mx_now); + assert_eq!(result.num_attempted, mx_now - r_snapshot); + assert!(result.everything_backfilled()); } } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 0d80ff2ad..dc1c770db 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -357,18 +357,17 @@ pub fn op_checkpoint( let result = program.connection.checkpoint(*checkpoint_mode); match result { Ok(CheckpointResult { - num_wal_frames: num_wal_pages, - num_checkpointed_frames: num_checkpointed_pages, + num_attempted, + num_backfilled, .. }) => { // https://sqlite.org/pragma.html#pragma_wal_checkpoint // 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy). state.registers[*dest] = Register::Value(Value::Integer(0)); // 2nd col: # modified pages written to wal file - state.registers[*dest + 1] = Register::Value(Value::Integer(num_wal_pages as i64)); + state.registers[*dest + 1] = Register::Value(Value::Integer(num_attempted as i64)); // 3rd col: # pages moved to db after checkpoint - state.registers[*dest + 2] = - Register::Value(Value::Integer(num_checkpointed_pages as i64)); + state.registers[*dest + 2] = Register::Value(Value::Integer(num_backfilled as i64)); } Err(_err) => state.registers[*dest] = Register::Value(Value::Integer(1)), } diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index 2446c3778..723a3f178 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -1328,10 +1328,10 @@ pub unsafe extern "C" fn sqlite3_wal_checkpoint_v2( match db.conn.checkpoint(chkptmode) { Ok(res) => { if !log_size.is_null() { - (*log_size) = res.num_wal_frames as ffi::c_int; + (*log_size) = res.num_attempted as ffi::c_int; } if !checkpoint_count.is_null() { - (*checkpoint_count) = res.num_checkpointed_frames as ffi::c_int; + (*checkpoint_count) = res.num_backfilled as ffi::c_int; } SQLITE_OK }