mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-13 20:24:18 +01:00
Polish checkpointing and fix tests, add documentation
This commit is contained in:
@@ -1274,7 +1274,8 @@ impl WalFile {
|
||||
}
|
||||
// acquire the appropriate exclusive locks depending on the checkpoint mode
|
||||
self.acquire_proper_checkpoint_guard(mode)?;
|
||||
self.ongoing_checkpoint.max_frame = self.determine_max_safe_checkpoint_frame();
|
||||
self.ongoing_checkpoint.max_frame =
|
||||
self.determine_max_safe_checkpoint_frame(mode);
|
||||
self.ongoing_checkpoint.min_frame = nbackfills + 1;
|
||||
self.ongoing_checkpoint.current_page = 0;
|
||||
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
|
||||
@@ -1379,8 +1380,10 @@ impl WalFile {
|
||||
let frames_possible = current_mx.saturating_sub(nbackfills);
|
||||
|
||||
// the total # of frames we actually backfilled
|
||||
let frames_checkpointed =
|
||||
current_mx.saturating_sub(self.ongoing_checkpoint.min_frame - 1);
|
||||
let frames_checkpointed = self
|
||||
.ongoing_checkpoint
|
||||
.max_frame
|
||||
.saturating_sub(self.ongoing_checkpoint.min_frame - 1);
|
||||
|
||||
if matches!(mode, CheckpointMode::Truncate) {
|
||||
// sqlite always returns zeros for truncate mode
|
||||
@@ -1436,32 +1439,61 @@ impl WalFile {
|
||||
/// Coordinate what the maximum safe frame is for us to backfill when checkpointing.
|
||||
/// We can never backfill a frame with a higher number than any reader's max frame,
|
||||
/// because we might overwrite content the reader is reading from the database file.
|
||||
fn determine_max_safe_checkpoint_frame(&self) -> u64 {
|
||||
///
|
||||
/// A checkpoint must never overwrite a page in the main DB file if some
|
||||
/// active reader might still need to read that page from the WAL.
|
||||
/// Concretely: the checkpoint may only copy frames `<= aReadMark[k]` for
|
||||
/// every in‑use reader slot `k > 0`.
|
||||
///
|
||||
/// `read_locks[0]` is special: readers holding slot 0 ignore the WAL entirely
|
||||
/// (they read only the DB file). Its value is a placeholder and does not
|
||||
/// constrain `mxSafeFrame`.
|
||||
///
|
||||
/// Slot 1 is the “default” reader slot. If it is free (we can take its
|
||||
/// write-lock) we raise it to the global max so new readers see the most
|
||||
/// recent snapshot. We do not clear it to `READMARK_NOT_USED` in ordinary
|
||||
/// checkpoints (SQLite only clears nonzero slots during a log reset).
|
||||
///
|
||||
/// Slots 2..N: If a reader is stuck at an older frame, that frame becomes the
|
||||
/// limit. If we can’t atomically bump that slot (write-lock fails), we must
|
||||
/// clamp `mxSafeFrame` down to that mark. In PASSIVE mode we stop trying
|
||||
/// immediately (we are not allowed to block or spin). In the blocking modes
|
||||
/// (FULL/RESTART/TRUNCATE) we can loop and retry, but for now we can
|
||||
/// just respect the first busy slot and move on.
|
||||
///
|
||||
/// Locking rules:
|
||||
/// This routine **tries** to take an exclusive (write) lock on each slot to
|
||||
/// update/clean it. If the try-lock fails:
|
||||
/// PASSIVE: do not wait; just lower `mxSafeFrame` and break.
|
||||
/// Others: lower `mxSafeFrame` and continue scanning.
|
||||
///
|
||||
/// We never modify slot values while a reader holds that slot.
|
||||
fn determine_max_safe_checkpoint_frame(&self, mode: CheckpointMode) -> u64 {
|
||||
let shared = self.get_shared();
|
||||
let mut max_safe_frame = shared.max_frame.load(Ordering::SeqCst);
|
||||
// If a reader is positioned before max_frame we either:
|
||||
// bump its mark up if we can take the slot write-lock OR
|
||||
// lower max_frame if the reader is busy and we cannot overtake
|
||||
|
||||
for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate().skip(1) {
|
||||
let this_mark = read_lock.value.load(Ordering::SeqCst);
|
||||
if this_mark == READMARK_NOT_USED {
|
||||
// this read lock is not used, skip it
|
||||
continue;
|
||||
}
|
||||
if this_mark < max_safe_frame as u32 {
|
||||
let busy = !read_lock.write();
|
||||
if !busy {
|
||||
// readmark 1 is the default reader, and should always contain the max safe
|
||||
// frame for new readers so we bump it up to the current max frame instead of clearing it.
|
||||
let new_mark = if read_lock_idx == 1 {
|
||||
max_safe_frame as u32
|
||||
} else {
|
||||
READMARK_NOT_USED
|
||||
};
|
||||
read_lock.value.store(new_mark, Ordering::SeqCst);
|
||||
// Only adjust, never clear, in ordinary checkpoints
|
||||
if read_lock_idx == 1 {
|
||||
// store the shared max_frame for the default read slot 1
|
||||
read_lock
|
||||
.value
|
||||
.store(max_safe_frame as u32, Ordering::SeqCst);
|
||||
}
|
||||
read_lock.unlock();
|
||||
} else {
|
||||
max_safe_frame = this_mark as u64;
|
||||
if matches!(mode, CheckpointMode::Passive) {
|
||||
// Don't keep poking, PASSIVE can't block or spin
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1703,10 +1735,8 @@ impl WalFileShared {
|
||||
);
|
||||
{
|
||||
let mut hdr = self.wal_header.lock();
|
||||
// bump checkpoint sequence
|
||||
hdr.checkpoint_seq = hdr.checkpoint_seq.wrapping_add(1);
|
||||
// keep hdr.magic, hdr.file_format, hdr.page_size as-is
|
||||
hdr.checkpoint_seq = hdr.checkpoint_seq.wrapping_add(1);
|
||||
hdr.salt_1 = hdr.salt_1.wrapping_add(1);
|
||||
hdr.salt_2 = io.generate_random_number() as u32;
|
||||
|
||||
@@ -1744,6 +1774,8 @@ pub mod test {
|
||||
CheckpointMode, CheckpointResult, Completion, Connection, Database, LimboError, PlatformIO,
|
||||
Wal, WalFileShared, IO,
|
||||
};
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
use std::{
|
||||
cell::{Cell, RefCell, UnsafeCell},
|
||||
rc::Rc,
|
||||
@@ -1821,7 +1853,7 @@ pub mod test {
|
||||
"WAL file should not have been empty before checkpoint"
|
||||
);
|
||||
assert_eq!(
|
||||
bytes_after, 32,
|
||||
bytes_after, 0,
|
||||
"WAL file should be truncated to 0 bytes, but is {bytes_after} bytes",
|
||||
);
|
||||
std::fs::remove_dir_all(path).unwrap();
|
||||
@@ -1863,58 +1895,85 @@ pub mod test {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wal_restart_checkpoint_resets_sequence() {
|
||||
fn restart_checkpoint_resets_wal_state_and_increments_ckpt_seq() {
|
||||
let (db, path) = get_database();
|
||||
|
||||
let mut walpath = path.clone().into_os_string().into_string().unwrap();
|
||||
walpath.push_str("/test.db-wal");
|
||||
let walpath = std::path::PathBuf::from(walpath);
|
||||
let walpath = {
|
||||
let mut p = path.clone().into_os_string().into_string().unwrap();
|
||||
p.push_str("/test.db-wal");
|
||||
std::path::PathBuf::from(p)
|
||||
};
|
||||
|
||||
let conn = db.connect().unwrap();
|
||||
conn.execute("create table test(id integer primary key, value text)")
|
||||
.unwrap();
|
||||
bulk_inserts(&conn.clone(), 20, 3);
|
||||
bulk_inserts(&conn, 20, 3);
|
||||
conn.pager.borrow_mut().cacheflush().unwrap();
|
||||
|
||||
// Snapshot header & counters before the RESTART checkpoint.
|
||||
let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone();
|
||||
let (seq0, salt10, salt20, _ps) = wal_header_snapshot(&wal_shared);
|
||||
let (before_max, before_backfills) = unsafe {
|
||||
let (seq_before, salt1_before, salt2_before, _ps_before) = wal_header_snapshot(&wal_shared);
|
||||
let (mx_before, backfill_before) = unsafe {
|
||||
let s = &*wal_shared.get();
|
||||
(
|
||||
s.max_frame.load(Ordering::SeqCst),
|
||||
s.nbackfills.load(Ordering::SeqCst),
|
||||
)
|
||||
};
|
||||
assert!(before_max > 0);
|
||||
assert_eq!(before_backfills, 0);
|
||||
assert!(mx_before > 0);
|
||||
assert_eq!(backfill_before, 0);
|
||||
|
||||
let meta_before = std::fs::metadata(&walpath).unwrap();
|
||||
|
||||
let bytes_before = meta_before.len();
|
||||
#[cfg(unix)]
|
||||
let size_before = meta_before.blocks();
|
||||
#[cfg(not(unix))]
|
||||
let size_before = meta_before.len();
|
||||
// Run a RESTART checkpoint, should backfill everything and reset WAL counters,
|
||||
// but NOT truncate the file.
|
||||
{
|
||||
let pager_ref = conn.pager.borrow();
|
||||
let mut wal = pager_ref.wal.borrow_mut();
|
||||
let r = run_checkpoint_until_done(&mut *wal, &pager_ref, CheckpointMode::Restart);
|
||||
assert_eq!(r.num_wal_frames, before_max);
|
||||
assert_eq!(r.num_checkpointed_frames, before_max);
|
||||
let pager = conn.pager.borrow();
|
||||
let mut wal = pager.wal.borrow_mut();
|
||||
let res = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Restart);
|
||||
assert_eq!(res.num_wal_frames, mx_before);
|
||||
assert_eq!(res.num_checkpointed_frames, mx_before);
|
||||
}
|
||||
|
||||
// After restart: max_frame == 0, nbackfills == 0, salts changed, seq incremented.
|
||||
let (seq1, salt11, salt21, _ps2) = wal_header_snapshot(&wal_shared);
|
||||
assert_eq!(seq1, seq0.wrapping_add(1), "checkpoint_seq increments");
|
||||
assert_ne!(salt21, salt20);
|
||||
assert_eq!(salt11, salt10.wrapping_add(1), "salt_1 should increment");
|
||||
let (after_max, after_backfills) = unsafe {
|
||||
// Validate post‑RESTART header & counters.
|
||||
let (seq_after, salt1_after, salt2_after, _ps_after) = wal_header_snapshot(&wal_shared);
|
||||
assert_eq!(
|
||||
seq_after,
|
||||
seq_before.wrapping_add(1),
|
||||
"checkpoint_seq must increment on RESTART"
|
||||
);
|
||||
assert_eq!(
|
||||
salt1_after,
|
||||
salt1_before.wrapping_add(1),
|
||||
"salt_1 is incremented"
|
||||
);
|
||||
assert_ne!(salt2_after, salt2_before, "salt_2 is randomized");
|
||||
|
||||
let (mx_after, backfill_after) = unsafe {
|
||||
let s = &*wal_shared.get();
|
||||
(
|
||||
s.max_frame.load(Ordering::SeqCst),
|
||||
s.nbackfills.load(Ordering::SeqCst),
|
||||
)
|
||||
};
|
||||
assert_eq!(after_max, 0);
|
||||
assert_eq!(after_backfills, 0);
|
||||
assert_eq!(mx_after, 0, "mxFrame reset to 0 after RESTART");
|
||||
assert_eq!(backfill_after, 0, "nBackfill reset to 0 after RESTART");
|
||||
|
||||
// Next write should create frame_id = 1 again.
|
||||
// File size should be unchanged for RESTART (no truncate).
|
||||
let meta_after = std::fs::metadata(&walpath).unwrap();
|
||||
#[cfg(unix)]
|
||||
let size_after = meta_after.blocks();
|
||||
#[cfg(not(unix))]
|
||||
let size_after = meta_after.len();
|
||||
assert_eq!(
|
||||
size_before, size_after,
|
||||
"RESTART must not change WAL file size"
|
||||
);
|
||||
|
||||
// Next write should start a new sequence at frame 1.
|
||||
conn.execute("insert into test(value) values ('post_restart')")
|
||||
.unwrap();
|
||||
conn.pager
|
||||
@@ -1924,14 +1983,8 @@ pub mod test {
|
||||
.finish_append_frames_commit()
|
||||
.unwrap();
|
||||
let new_max = unsafe { (&*wal_shared.get()).max_frame.load(Ordering::SeqCst) };
|
||||
assert_eq!(new_max, 1, "Sequence restarted at 1");
|
||||
assert_eq!(new_max, 1, "first append after RESTART starts at frame 1");
|
||||
|
||||
let meta_after = std::fs::metadata(&walpath).unwrap();
|
||||
let bytes_after = meta_after.len();
|
||||
assert_eq!(
|
||||
bytes_before, bytes_after,
|
||||
"WAL file should not change size after full checkpoint"
|
||||
);
|
||||
std::fs::remove_dir_all(path).unwrap();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user