mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 17:05:36 +01:00
Merge 'Fix max_frame determination and comments in WAL checkpointing' from Preston Thorpe
in #2521, I messed up and introduced improper calculation of the current checkpoint's max safe frame (mostly due to incorrect comments that I had left on the method). The confusion partially stems from our lack of Busy handling at the moment, but essentially when determining the max safe frame for all readers, for passive mode we cannot simply `break` out of the loop when we find a reader with a lower read mark than we have, because _another_ reader might have an even _lower_ read mark, and we could proceed with the first mark < shared_max. And for !passive modes, we still attempt to backfill with the same lower frame, we just return `Busy` at the end, after backfilling what we can (we just don't reset the log for restart/truncate). Most of the changes in this PR is just the renaming the fields of Checkpoint Result, because the names were confusing Closes #2560
This commit is contained in:
@@ -1589,9 +1589,7 @@ impl Pager {
|
||||
.checkpoint(self, write_counter.clone(), mode)
|
||||
})?;
|
||||
|
||||
if checkpoint_result.everything_backfilled()
|
||||
&& checkpoint_result.num_checkpointed_frames != 0
|
||||
{
|
||||
if checkpoint_result.everything_backfilled() && checkpoint_result.num_backfilled != 0 {
|
||||
let db_size = self
|
||||
.io
|
||||
.block(|| self.with_header(|header| header.database_size))?
|
||||
|
||||
@@ -35,10 +35,10 @@ use super::sqlite3_ondisk::{self, WalHeader};
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct CheckpointResult {
|
||||
/// number of frames in WAL
|
||||
pub num_wal_frames: u64,
|
||||
/// number of frames in WAL that could have been backfilled
|
||||
pub num_attempted: u64,
|
||||
/// number of frames moved successfully from WAL to db file after checkpoint
|
||||
pub num_checkpointed_frames: u64,
|
||||
pub num_backfilled: u64,
|
||||
/// In the case of everything backfilled, we need to hold the locks until the db
|
||||
/// file is truncated.
|
||||
maybe_guard: Option<CheckpointLocks>,
|
||||
@@ -53,14 +53,14 @@ impl Drop for CheckpointResult {
|
||||
impl CheckpointResult {
|
||||
pub fn new(n_frames: u64, n_ckpt: u64) -> Self {
|
||||
Self {
|
||||
num_wal_frames: n_frames,
|
||||
num_checkpointed_frames: n_ckpt,
|
||||
num_attempted: n_frames,
|
||||
num_backfilled: n_ckpt,
|
||||
maybe_guard: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub const fn everything_backfilled(&self) -> bool {
|
||||
self.num_wal_frames == self.num_checkpointed_frames
|
||||
self.num_attempted == self.num_backfilled
|
||||
}
|
||||
pub fn release_guard(&mut self) {
|
||||
let _ = self.maybe_guard.take();
|
||||
@@ -81,6 +81,15 @@ pub enum CheckpointMode {
|
||||
Truncate,
|
||||
}
|
||||
|
||||
impl CheckpointMode {
|
||||
fn should_restart_log(&self) -> bool {
|
||||
matches!(self, CheckpointMode::Truncate | CheckpointMode::Restart)
|
||||
}
|
||||
fn require_all_backfilled(&self) -> bool {
|
||||
!matches!(self, CheckpointMode::Passive)
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(transparent)]
|
||||
#[derive(Debug, Default)]
|
||||
/// A 64-bit read-write lock with embedded 32-bit value storage.
|
||||
@@ -1382,11 +1391,7 @@ impl WalFile {
|
||||
}
|
||||
// acquire the appropriate exclusive locks depending on the checkpoint mode
|
||||
self.acquire_proper_checkpoint_guard(mode)?;
|
||||
self.ongoing_checkpoint.max_frame =
|
||||
match self.determine_max_safe_checkpoint_frame(mode) {
|
||||
Err(_) => return Err(LimboError::Busy),
|
||||
Ok(res) => res,
|
||||
};
|
||||
self.ongoing_checkpoint.max_frame = self.determine_max_safe_checkpoint_frame();
|
||||
self.ongoing_checkpoint.min_frame = nbackfills + 1;
|
||||
self.ongoing_checkpoint.current_page = 0;
|
||||
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
|
||||
@@ -1547,12 +1552,11 @@ impl WalFile {
|
||||
.nbackfills
|
||||
.store(self.ongoing_checkpoint.max_frame, Ordering::Release);
|
||||
|
||||
if matches!(mode, CheckpointMode::Restart | CheckpointMode::Truncate) {
|
||||
if checkpoint_result.everything_backfilled() {
|
||||
self.restart_log(mode)?;
|
||||
} else {
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
if mode.require_all_backfilled() && !checkpoint_result.everything_backfilled() {
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
if mode.should_restart_log() {
|
||||
self.restart_log(mode)?;
|
||||
}
|
||||
|
||||
// store a copy of the checkpoint result to return in the future if pragma
|
||||
@@ -1566,7 +1570,7 @@ impl WalFile {
|
||||
// b. the physical db file size differs from the expected pages * page_size
|
||||
// and truncate + sync the db file if necessary.
|
||||
if checkpoint_result.everything_backfilled()
|
||||
&& checkpoint_result.num_checkpointed_frames > 0
|
||||
&& checkpoint_result.num_backfilled > 0
|
||||
{
|
||||
checkpoint_result.maybe_guard = self.checkpoint_guard.take();
|
||||
} else {
|
||||
@@ -1584,67 +1588,58 @@ impl WalFile {
|
||||
}
|
||||
|
||||
/// Coordinate what the maximum safe frame is for us to backfill when checkpointing.
|
||||
/// We can never backfill a frame with a higher number than any reader's max frame,
|
||||
/// We can never backfill a frame with a higher number than any reader's read mark,
|
||||
/// because we might overwrite content the reader is reading from the database file.
|
||||
///
|
||||
/// A checkpoint must never overwrite a page in the main DB file if some
|
||||
/// active reader might still need to read that page from the WAL.
|
||||
/// Concretely: the checkpoint may only copy frames `<= aReadMark[k]` for
|
||||
/// every in‑use reader slot `k > 0`.
|
||||
/// every in-use reader slot `k > 0`.
|
||||
///
|
||||
/// `read_locks[0]` is special: readers holding slot 0 ignore the WAL entirely
|
||||
/// (they read only the DB file). Its value is a placeholder and does not
|
||||
/// constrain `mxSafeFrame`.
|
||||
///
|
||||
/// Slot 1 is the “default” reader slot. If it is free (we can take its
|
||||
/// write-lock) we raise it to the global max so new readers see the most
|
||||
/// recent snapshot. We do not clear it to `READMARK_NOT_USED` in ordinary
|
||||
/// checkpoints (SQLite only clears nonzero slots during a log reset).
|
||||
/// For each slot 1..N:
|
||||
/// - If we can acquire the write lock (slot is free):
|
||||
/// - Slot 1: Set to mxSafeFrame (allowing new readers to see up to this point)
|
||||
/// - Slots 2+: Set to READMARK_NOT_USED (freeing the slot)
|
||||
/// - If we cannot acquire the lock (SQLITE_BUSY):
|
||||
/// - Lower mxSafeFrame to that reader's mark
|
||||
/// - In PASSIVE mode: Already have no busy handler, continue scanning
|
||||
/// - In FULL/RESTART/TRUNCATE: Disable busy handler for remaining slots
|
||||
///
|
||||
/// Slots 2..N: If a reader is stuck at an older frame, that frame becomes the
|
||||
/// limit. If we can’t atomically bump that slot (write-lock fails), we must
|
||||
/// clamp `mxSafeFrame` down to that mark. In PASSIVE mode we stop trying
|
||||
/// immediately (we are not allowed to block or spin). In the blocking modes
|
||||
/// (FULL/RESTART/TRUNCATE) we can loop and retry, but for now we can
|
||||
/// just respect the first busy slot and move on.
|
||||
/// Locking behavior:
|
||||
/// - PASSIVE: Never waits, no busy handler (xBusy==NULL)
|
||||
/// - FULL/RESTART/TRUNCATE: May wait via busy handler, but after first BUSY,
|
||||
/// switches to non-blocking for remaining slots
|
||||
///
|
||||
/// Locking rules:
|
||||
/// This routine tries to take an exclusive (write) lock on each slot to
|
||||
/// update/clean it. If the try-lock fails:
|
||||
/// PASSIVE: do not wait; just lower `mxSafeFrame` and break.
|
||||
/// Others: lower `mxSafeFrame` and continue scanning.
|
||||
///
|
||||
/// We never modify slot values while a reader holds that slot.
|
||||
fn determine_max_safe_checkpoint_frame(&self, mode: CheckpointMode) -> Result<u64> {
|
||||
/// We never modify slot values while a reader holds that slot's lock.
|
||||
/// TOOD: implement proper BUSY handling behavior
|
||||
fn determine_max_safe_checkpoint_frame(&self) -> u64 {
|
||||
let shared = self.get_shared();
|
||||
let shared_max = shared.max_frame.load(Ordering::Acquire);
|
||||
let mut max_safe_frame = shared_max;
|
||||
|
||||
// Start optimistic: we want to advance everyone to shared_max
|
||||
for (idx, lock) in shared.read_locks.iter_mut().enumerate().skip(1) {
|
||||
let mark = lock.get_value();
|
||||
if mark == READMARK_NOT_USED || mark >= shared_max as u32 {
|
||||
continue;
|
||||
}
|
||||
// Try to bump this slot to shared_max (requires exclusive on the slot)
|
||||
if lock.write() {
|
||||
// Slot is free to edit, bump to shared_max (slot 1 keeps a real mark, others can be cleaned)
|
||||
let val = if idx == 1 {
|
||||
shared_max as u32
|
||||
for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate().skip(1) {
|
||||
let this_mark = read_lock.get_value();
|
||||
if this_mark < max_safe_frame as u32 {
|
||||
let busy = !read_lock.write();
|
||||
if !busy {
|
||||
let val = if read_lock_idx == 1 {
|
||||
// store the max_frame for the default read slot 1
|
||||
max_safe_frame as u32
|
||||
} else {
|
||||
READMARK_NOT_USED
|
||||
};
|
||||
read_lock.set_value_exclusive(val);
|
||||
read_lock.unlock();
|
||||
} else {
|
||||
READMARK_NOT_USED
|
||||
};
|
||||
lock.set_value_exclusive(val);
|
||||
lock.unlock();
|
||||
} else {
|
||||
// Reader is using this slot.
|
||||
return match mode {
|
||||
// clamp down for passive
|
||||
CheckpointMode::Passive => Ok(mark as u64),
|
||||
_ => Err(LimboError::Busy), // all others must wait
|
||||
};
|
||||
max_safe_frame = this_mark as u64;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(shared_max)
|
||||
max_safe_frame
|
||||
}
|
||||
|
||||
/// Called once the entire WAL has been back‑filled in RESTART or TRUNCATE mode.
|
||||
@@ -2088,8 +2083,8 @@ pub mod test {
|
||||
let pager = conn.pager.borrow();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let res = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Restart);
|
||||
assert_eq!(res.num_wal_frames, mx_before);
|
||||
assert_eq!(res.num_checkpointed_frames, mx_before);
|
||||
assert_eq!(res.num_attempted, mx_before);
|
||||
assert_eq!(res.num_backfilled, mx_before);
|
||||
}
|
||||
|
||||
// Validate post‑RESTART header & counters.
|
||||
@@ -2184,15 +2179,15 @@ pub mod test {
|
||||
};
|
||||
(res, maxf)
|
||||
};
|
||||
assert_eq!(res1.num_wal_frames, max_before);
|
||||
assert_eq!(res1.num_attempted, max_before);
|
||||
assert!(
|
||||
res1.num_checkpointed_frames < res1.num_wal_frames,
|
||||
res1.num_backfilled < res1.num_attempted,
|
||||
"Partial backfill expected, {} : {}",
|
||||
res1.num_checkpointed_frames,
|
||||
res1.num_wal_frames
|
||||
res1.num_backfilled,
|
||||
res1.num_attempted
|
||||
);
|
||||
assert_eq!(
|
||||
res1.num_checkpointed_frames, readmark,
|
||||
res1.num_backfilled, readmark,
|
||||
"Checkpointed frames should match read mark"
|
||||
);
|
||||
// Release reader
|
||||
@@ -2207,7 +2202,7 @@ pub mod test {
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let res2 = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive);
|
||||
assert_eq!(
|
||||
res2.num_checkpointed_frames, res2.num_wal_frames,
|
||||
res2.num_backfilled, res2.num_attempted,
|
||||
"Second checkpoint completes remaining frames"
|
||||
);
|
||||
}
|
||||
@@ -2356,11 +2351,11 @@ pub mod test {
|
||||
};
|
||||
|
||||
assert!(
|
||||
checkpoint_result.num_checkpointed_frames < checkpoint_result.num_wal_frames,
|
||||
checkpoint_result.num_backfilled < checkpoint_result.num_attempted,
|
||||
"Should not checkpoint all frames when readers are active"
|
||||
);
|
||||
assert_eq!(
|
||||
checkpoint_result.num_checkpointed_frames, r1_max_frame,
|
||||
checkpoint_result.num_backfilled, r1_max_frame,
|
||||
"Should have checkpointed up to R1's max frame"
|
||||
);
|
||||
|
||||
@@ -2543,7 +2538,7 @@ pub mod test {
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive)
|
||||
};
|
||||
assert_eq!(result1.num_checkpointed_frames, r1_frame);
|
||||
assert_eq!(result1.num_backfilled, r1_frame);
|
||||
|
||||
// finish reader‑1
|
||||
conn1.execute("COMMIT").unwrap();
|
||||
@@ -2554,10 +2549,7 @@ pub mod test {
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive)
|
||||
};
|
||||
assert_eq!(
|
||||
result1.num_checkpointed_frames + result2.num_checkpointed_frames,
|
||||
r2_frame
|
||||
);
|
||||
assert_eq!(result1.num_backfilled + result2.num_backfilled, r2_frame);
|
||||
|
||||
// verify visible rows
|
||||
let mut stmt = conn_r2.query("SELECT COUNT(*) FROM test").unwrap().unwrap();
|
||||
@@ -2848,8 +2840,8 @@ pub mod test {
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Full)
|
||||
};
|
||||
|
||||
assert_eq!(result.num_wal_frames, mx_before);
|
||||
assert_eq!(result.num_checkpointed_frames, mx_before);
|
||||
assert_eq!(result.num_attempted, mx_before);
|
||||
assert_eq!(result.num_backfilled, mx_before);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -2925,7 +2917,7 @@ pub mod test {
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Full)
|
||||
};
|
||||
|
||||
assert_eq!(result.num_wal_frames, mx_now);
|
||||
assert_eq!(result.num_checkpointed_frames, mx_now);
|
||||
assert_eq!(result.num_attempted, mx_now - r_snapshot);
|
||||
assert!(result.everything_backfilled());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -357,18 +357,17 @@ pub fn op_checkpoint(
|
||||
let result = program.connection.checkpoint(*checkpoint_mode);
|
||||
match result {
|
||||
Ok(CheckpointResult {
|
||||
num_wal_frames: num_wal_pages,
|
||||
num_checkpointed_frames: num_checkpointed_pages,
|
||||
num_attempted,
|
||||
num_backfilled,
|
||||
..
|
||||
}) => {
|
||||
// https://sqlite.org/pragma.html#pragma_wal_checkpoint
|
||||
// 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy).
|
||||
state.registers[*dest] = Register::Value(Value::Integer(0));
|
||||
// 2nd col: # modified pages written to wal file
|
||||
state.registers[*dest + 1] = Register::Value(Value::Integer(num_wal_pages as i64));
|
||||
state.registers[*dest + 1] = Register::Value(Value::Integer(num_attempted as i64));
|
||||
// 3rd col: # pages moved to db after checkpoint
|
||||
state.registers[*dest + 2] =
|
||||
Register::Value(Value::Integer(num_checkpointed_pages as i64));
|
||||
state.registers[*dest + 2] = Register::Value(Value::Integer(num_backfilled as i64));
|
||||
}
|
||||
Err(_err) => state.registers[*dest] = Register::Value(Value::Integer(1)),
|
||||
}
|
||||
|
||||
@@ -1328,10 +1328,10 @@ pub unsafe extern "C" fn sqlite3_wal_checkpoint_v2(
|
||||
match db.conn.checkpoint(chkptmode) {
|
||||
Ok(res) => {
|
||||
if !log_size.is_null() {
|
||||
(*log_size) = res.num_wal_frames as ffi::c_int;
|
||||
(*log_size) = res.num_attempted as ffi::c_int;
|
||||
}
|
||||
if !checkpoint_count.is_null() {
|
||||
(*checkpoint_count) = res.num_checkpointed_frames as ffi::c_int;
|
||||
(*checkpoint_count) = res.num_backfilled as ffi::c_int;
|
||||
}
|
||||
SQLITE_OK
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user