diff --git a/core/storage/wal.rs b/core/storage/wal.rs index a7ab813d4..eb9a5307a 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1274,7 +1274,8 @@ impl WalFile { } // acquire the appropriate exclusive locks depending on the checkpoint mode self.acquire_proper_checkpoint_guard(mode)?; - self.ongoing_checkpoint.max_frame = self.determine_max_safe_checkpoint_frame(); + self.ongoing_checkpoint.max_frame = + self.determine_max_safe_checkpoint_frame(mode); self.ongoing_checkpoint.min_frame = nbackfills + 1; self.ongoing_checkpoint.current_page = 0; self.ongoing_checkpoint.state = CheckpointState::ReadFrame; @@ -1379,8 +1380,10 @@ impl WalFile { let frames_possible = current_mx.saturating_sub(nbackfills); // the total # of frames we actually backfilled - let frames_checkpointed = - current_mx.saturating_sub(self.ongoing_checkpoint.min_frame - 1); + let frames_checkpointed = self + .ongoing_checkpoint + .max_frame + .saturating_sub(self.ongoing_checkpoint.min_frame - 1); if matches!(mode, CheckpointMode::Truncate) { // sqlite always returns zeros for truncate mode @@ -1436,32 +1439,61 @@ impl WalFile { /// Coordinate what the maximum safe frame is for us to backfill when checkpointing. /// We can never backfill a frame with a higher number than any reader's max frame, /// because we might overwrite content the reader is reading from the database file. - fn determine_max_safe_checkpoint_frame(&self) -> u64 { + /// + /// A checkpoint must never overwrite a page in the main DB file if some + /// active reader might still need to read that page from the WAL. + /// Concretely: the checkpoint may only copy frames `<= aReadMark[k]` for + /// every in‑use reader slot `k > 0`. + /// + /// `read_locks[0]` is special: readers holding slot 0 ignore the WAL entirely + /// (they read only the DB file). Its value is a placeholder and does not + /// constrain `mxSafeFrame`. + /// + /// Slot 1 is the “default” reader slot. If it is free (we can take its + /// write-lock) we raise it to the global max so new readers see the most + /// recent snapshot. We do not clear it to `READMARK_NOT_USED` in ordinary + /// checkpoints (SQLite only clears nonzero slots during a log reset). + /// + /// Slots 2..N: If a reader is stuck at an older frame, that frame becomes the + /// limit. If we can’t atomically bump that slot (write-lock fails), we must + /// clamp `mxSafeFrame` down to that mark. In PASSIVE mode we stop trying + /// immediately (we are not allowed to block or spin). In the blocking modes + /// (FULL/RESTART/TRUNCATE) we can loop and retry, but for now we can + /// just respect the first busy slot and move on. + /// + /// Locking rules: + /// This routine **tries** to take an exclusive (write) lock on each slot to + /// update/clean it. If the try-lock fails: + /// PASSIVE: do not wait; just lower `mxSafeFrame` and break. + /// Others: lower `mxSafeFrame` and continue scanning. + /// + /// We never modify slot values while a reader holds that slot. + fn determine_max_safe_checkpoint_frame(&self, mode: CheckpointMode) -> u64 { let shared = self.get_shared(); let mut max_safe_frame = shared.max_frame.load(Ordering::SeqCst); - // If a reader is positioned before max_frame we either: - // bump its mark up if we can take the slot write-lock OR - // lower max_frame if the reader is busy and we cannot overtake + for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate().skip(1) { let this_mark = read_lock.value.load(Ordering::SeqCst); if this_mark == READMARK_NOT_USED { - // this read lock is not used, skip it continue; } if this_mark < max_safe_frame as u32 { let busy = !read_lock.write(); if !busy { - // readmark 1 is the default reader, and should always contain the max safe - // frame for new readers so we bump it up to the current max frame instead of clearing it. - let new_mark = if read_lock_idx == 1 { - max_safe_frame as u32 - } else { - READMARK_NOT_USED - }; - read_lock.value.store(new_mark, Ordering::SeqCst); + // Only adjust, never clear, in ordinary checkpoints + if read_lock_idx == 1 { + // store the shared max_frame for the default read slot 1 + read_lock + .value + .store(max_safe_frame as u32, Ordering::SeqCst); + } read_lock.unlock(); } else { max_safe_frame = this_mark as u64; + if matches!(mode, CheckpointMode::Passive) { + // Don't keep poking, PASSIVE can't block or spin + break; + } } } } @@ -1703,10 +1735,8 @@ impl WalFileShared { ); { let mut hdr = self.wal_header.lock(); - // bump checkpoint sequence hdr.checkpoint_seq = hdr.checkpoint_seq.wrapping_add(1); // keep hdr.magic, hdr.file_format, hdr.page_size as-is - hdr.checkpoint_seq = hdr.checkpoint_seq.wrapping_add(1); hdr.salt_1 = hdr.salt_1.wrapping_add(1); hdr.salt_2 = io.generate_random_number() as u32; @@ -1744,6 +1774,8 @@ pub mod test { CheckpointMode, CheckpointResult, Completion, Connection, Database, LimboError, PlatformIO, Wal, WalFileShared, IO, }; + #[cfg(unix)] + use std::os::unix::fs::MetadataExt; use std::{ cell::{Cell, RefCell, UnsafeCell}, rc::Rc, @@ -1821,7 +1853,7 @@ pub mod test { "WAL file should not have been empty before checkpoint" ); assert_eq!( - bytes_after, 32, + bytes_after, 0, "WAL file should be truncated to 0 bytes, but is {bytes_after} bytes", ); std::fs::remove_dir_all(path).unwrap(); @@ -1863,58 +1895,85 @@ pub mod test { } #[test] - fn test_wal_restart_checkpoint_resets_sequence() { + fn restart_checkpoint_resets_wal_state_and_increments_ckpt_seq() { let (db, path) = get_database(); - let mut walpath = path.clone().into_os_string().into_string().unwrap(); - walpath.push_str("/test.db-wal"); - let walpath = std::path::PathBuf::from(walpath); + let walpath = { + let mut p = path.clone().into_os_string().into_string().unwrap(); + p.push_str("/test.db-wal"); + std::path::PathBuf::from(p) + }; let conn = db.connect().unwrap(); conn.execute("create table test(id integer primary key, value text)") .unwrap(); - bulk_inserts(&conn.clone(), 20, 3); + bulk_inserts(&conn, 20, 3); conn.pager.borrow_mut().cacheflush().unwrap(); + // Snapshot header & counters before the RESTART checkpoint. let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone(); - let (seq0, salt10, salt20, _ps) = wal_header_snapshot(&wal_shared); - let (before_max, before_backfills) = unsafe { + let (seq_before, salt1_before, salt2_before, _ps_before) = wal_header_snapshot(&wal_shared); + let (mx_before, backfill_before) = unsafe { let s = &*wal_shared.get(); ( s.max_frame.load(Ordering::SeqCst), s.nbackfills.load(Ordering::SeqCst), ) }; - assert!(before_max > 0); - assert_eq!(before_backfills, 0); + assert!(mx_before > 0); + assert_eq!(backfill_before, 0); let meta_before = std::fs::metadata(&walpath).unwrap(); - - let bytes_before = meta_before.len(); + #[cfg(unix)] + let size_before = meta_before.blocks(); + #[cfg(not(unix))] + let size_before = meta_before.len(); + // Run a RESTART checkpoint, should backfill everything and reset WAL counters, + // but NOT truncate the file. { - let pager_ref = conn.pager.borrow(); - let mut wal = pager_ref.wal.borrow_mut(); - let r = run_checkpoint_until_done(&mut *wal, &pager_ref, CheckpointMode::Restart); - assert_eq!(r.num_wal_frames, before_max); - assert_eq!(r.num_checkpointed_frames, before_max); + let pager = conn.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + let res = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Restart); + assert_eq!(res.num_wal_frames, mx_before); + assert_eq!(res.num_checkpointed_frames, mx_before); } - // After restart: max_frame == 0, nbackfills == 0, salts changed, seq incremented. - let (seq1, salt11, salt21, _ps2) = wal_header_snapshot(&wal_shared); - assert_eq!(seq1, seq0.wrapping_add(1), "checkpoint_seq increments"); - assert_ne!(salt21, salt20); - assert_eq!(salt11, salt10.wrapping_add(1), "salt_1 should increment"); - let (after_max, after_backfills) = unsafe { + // Validate post‑RESTART header & counters. + let (seq_after, salt1_after, salt2_after, _ps_after) = wal_header_snapshot(&wal_shared); + assert_eq!( + seq_after, + seq_before.wrapping_add(1), + "checkpoint_seq must increment on RESTART" + ); + assert_eq!( + salt1_after, + salt1_before.wrapping_add(1), + "salt_1 is incremented" + ); + assert_ne!(salt2_after, salt2_before, "salt_2 is randomized"); + + let (mx_after, backfill_after) = unsafe { let s = &*wal_shared.get(); ( s.max_frame.load(Ordering::SeqCst), s.nbackfills.load(Ordering::SeqCst), ) }; - assert_eq!(after_max, 0); - assert_eq!(after_backfills, 0); + assert_eq!(mx_after, 0, "mxFrame reset to 0 after RESTART"); + assert_eq!(backfill_after, 0, "nBackfill reset to 0 after RESTART"); - // Next write should create frame_id = 1 again. + // File size should be unchanged for RESTART (no truncate). + let meta_after = std::fs::metadata(&walpath).unwrap(); + #[cfg(unix)] + let size_after = meta_after.blocks(); + #[cfg(not(unix))] + let size_after = meta_after.len(); + assert_eq!( + size_before, size_after, + "RESTART must not change WAL file size" + ); + + // Next write should start a new sequence at frame 1. conn.execute("insert into test(value) values ('post_restart')") .unwrap(); conn.pager @@ -1924,14 +1983,8 @@ pub mod test { .finish_append_frames_commit() .unwrap(); let new_max = unsafe { (&*wal_shared.get()).max_frame.load(Ordering::SeqCst) }; - assert_eq!(new_max, 1, "Sequence restarted at 1"); + assert_eq!(new_max, 1, "first append after RESTART starts at frame 1"); - let meta_after = std::fs::metadata(&walpath).unwrap(); - let bytes_after = meta_after.len(); - assert_eq!( - bytes_before, bytes_after, - "WAL file should not change size after full checkpoint" - ); std::fs::remove_dir_all(path).unwrap(); }