mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-31 13:54:27 +01:00
Fix max_frame determination and comments in WAL checkpointing
This commit is contained in:
@@ -1589,9 +1589,7 @@ impl Pager {
|
||||
.checkpoint(self, write_counter.clone(), mode)
|
||||
})?;
|
||||
|
||||
if checkpoint_result.everything_backfilled()
|
||||
&& checkpoint_result.num_checkpointed_frames != 0
|
||||
{
|
||||
if checkpoint_result.everything_backfilled() && checkpoint_result.num_backfilled != 0 {
|
||||
let db_size = self
|
||||
.io
|
||||
.block(|| self.with_header(|header| header.database_size))?
|
||||
|
||||
@@ -35,10 +35,10 @@ use super::sqlite3_ondisk::{self, WalHeader};
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct CheckpointResult {
|
||||
/// number of frames in WAL
|
||||
pub num_wal_frames: u64,
|
||||
/// number of frames in WAL that could have been backfilled
|
||||
pub num_attempted: u64,
|
||||
/// number of frames moved successfully from WAL to db file after checkpoint
|
||||
pub num_checkpointed_frames: u64,
|
||||
pub num_backfilled: u64,
|
||||
/// In the case of everything backfilled, we need to hold the locks until the db
|
||||
/// file is truncated.
|
||||
maybe_guard: Option<CheckpointLocks>,
|
||||
@@ -53,14 +53,14 @@ impl Drop for CheckpointResult {
|
||||
impl CheckpointResult {
|
||||
pub fn new(n_frames: u64, n_ckpt: u64) -> Self {
|
||||
Self {
|
||||
num_wal_frames: n_frames,
|
||||
num_checkpointed_frames: n_ckpt,
|
||||
num_attempted: n_frames,
|
||||
num_backfilled: n_ckpt,
|
||||
maybe_guard: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub const fn everything_backfilled(&self) -> bool {
|
||||
self.num_wal_frames == self.num_checkpointed_frames
|
||||
self.num_attempted == self.num_backfilled
|
||||
}
|
||||
pub fn release_guard(&mut self) {
|
||||
let _ = self.maybe_guard.take();
|
||||
@@ -81,6 +81,15 @@ pub enum CheckpointMode {
|
||||
Truncate,
|
||||
}
|
||||
|
||||
impl CheckpointMode {
|
||||
fn should_restart_log(&self) -> bool {
|
||||
matches!(self, CheckpointMode::Truncate | CheckpointMode::Restart)
|
||||
}
|
||||
fn require_all_backfilled(&self) -> bool {
|
||||
!matches!(self, CheckpointMode::Passive)
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(transparent)]
|
||||
#[derive(Debug, Default)]
|
||||
/// A 64-bit read-write lock with embedded 32-bit value storage.
|
||||
@@ -1381,11 +1390,7 @@ impl WalFile {
|
||||
}
|
||||
// acquire the appropriate exclusive locks depending on the checkpoint mode
|
||||
self.acquire_proper_checkpoint_guard(mode)?;
|
||||
self.ongoing_checkpoint.max_frame =
|
||||
match self.determine_max_safe_checkpoint_frame(mode) {
|
||||
Err(_) => return Err(LimboError::Busy),
|
||||
Ok(res) => res,
|
||||
};
|
||||
self.ongoing_checkpoint.max_frame = self.determine_max_safe_checkpoint_frame();
|
||||
self.ongoing_checkpoint.min_frame = nbackfills + 1;
|
||||
self.ongoing_checkpoint.current_page = 0;
|
||||
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
|
||||
@@ -1546,12 +1551,11 @@ impl WalFile {
|
||||
.nbackfills
|
||||
.store(self.ongoing_checkpoint.max_frame, Ordering::Release);
|
||||
|
||||
if matches!(mode, CheckpointMode::Restart | CheckpointMode::Truncate) {
|
||||
if checkpoint_result.everything_backfilled() {
|
||||
self.restart_log(mode)?;
|
||||
} else {
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
if mode.require_all_backfilled() && !checkpoint_result.everything_backfilled() {
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
if mode.should_restart_log() {
|
||||
self.restart_log(mode)?;
|
||||
}
|
||||
|
||||
// store a copy of the checkpoint result to return in the future if pragma
|
||||
@@ -1565,7 +1569,7 @@ impl WalFile {
|
||||
// b. the physical db file size differs from the expected pages * page_size
|
||||
// and truncate + sync the db file if necessary.
|
||||
if checkpoint_result.everything_backfilled()
|
||||
&& checkpoint_result.num_checkpointed_frames > 0
|
||||
&& checkpoint_result.num_backfilled > 0
|
||||
{
|
||||
checkpoint_result.maybe_guard = self.checkpoint_guard.take();
|
||||
} else {
|
||||
@@ -1583,67 +1587,58 @@ impl WalFile {
|
||||
}
|
||||
|
||||
/// Coordinate what the maximum safe frame is for us to backfill when checkpointing.
|
||||
/// We can never backfill a frame with a higher number than any reader's max frame,
|
||||
/// We can never backfill a frame with a higher number than any reader's read mark,
|
||||
/// because we might overwrite content the reader is reading from the database file.
|
||||
///
|
||||
/// A checkpoint must never overwrite a page in the main DB file if some
|
||||
/// active reader might still need to read that page from the WAL.
|
||||
/// Concretely: the checkpoint may only copy frames `<= aReadMark[k]` for
|
||||
/// every in‑use reader slot `k > 0`.
|
||||
/// every in-use reader slot `k > 0`.
|
||||
///
|
||||
/// `read_locks[0]` is special: readers holding slot 0 ignore the WAL entirely
|
||||
/// (they read only the DB file). Its value is a placeholder and does not
|
||||
/// constrain `mxSafeFrame`.
|
||||
///
|
||||
/// Slot 1 is the “default” reader slot. If it is free (we can take its
|
||||
/// write-lock) we raise it to the global max so new readers see the most
|
||||
/// recent snapshot. We do not clear it to `READMARK_NOT_USED` in ordinary
|
||||
/// checkpoints (SQLite only clears nonzero slots during a log reset).
|
||||
/// For each slot 1..N:
|
||||
/// - If we can acquire the write lock (slot is free):
|
||||
/// - Slot 1: Set to mxSafeFrame (allowing new readers to see up to this point)
|
||||
/// - Slots 2+: Set to READMARK_NOT_USED (freeing the slot)
|
||||
/// - If we cannot acquire the lock (SQLITE_BUSY):
|
||||
/// - Lower mxSafeFrame to that reader's mark
|
||||
/// - In PASSIVE mode: Already have no busy handler, continue scanning
|
||||
/// - In FULL/RESTART/TRUNCATE: Disable busy handler for remaining slots
|
||||
///
|
||||
/// Slots 2..N: If a reader is stuck at an older frame, that frame becomes the
|
||||
/// limit. If we can’t atomically bump that slot (write-lock fails), we must
|
||||
/// clamp `mxSafeFrame` down to that mark. In PASSIVE mode we stop trying
|
||||
/// immediately (we are not allowed to block or spin). In the blocking modes
|
||||
/// (FULL/RESTART/TRUNCATE) we can loop and retry, but for now we can
|
||||
/// just respect the first busy slot and move on.
|
||||
/// Locking behavior:
|
||||
/// - PASSIVE: Never waits, no busy handler (xBusy==NULL)
|
||||
/// - FULL/RESTART/TRUNCATE: May wait via busy handler, but after first BUSY,
|
||||
/// switches to non-blocking for remaining slots
|
||||
///
|
||||
/// Locking rules:
|
||||
/// This routine tries to take an exclusive (write) lock on each slot to
|
||||
/// update/clean it. If the try-lock fails:
|
||||
/// PASSIVE: do not wait; just lower `mxSafeFrame` and break.
|
||||
/// Others: lower `mxSafeFrame` and continue scanning.
|
||||
///
|
||||
/// We never modify slot values while a reader holds that slot.
|
||||
fn determine_max_safe_checkpoint_frame(&self, mode: CheckpointMode) -> Result<u64> {
|
||||
/// We never modify slot values while a reader holds that slot's lock.
|
||||
/// TOOD: implement proper BUSY handling behavior
|
||||
fn determine_max_safe_checkpoint_frame(&self) -> u64 {
|
||||
let shared = self.get_shared();
|
||||
let shared_max = shared.max_frame.load(Ordering::Acquire);
|
||||
let mut max_safe_frame = shared_max;
|
||||
|
||||
// Start optimistic: we want to advance everyone to shared_max
|
||||
for (idx, lock) in shared.read_locks.iter_mut().enumerate().skip(1) {
|
||||
let mark = lock.get_value();
|
||||
if mark == READMARK_NOT_USED || mark >= shared_max as u32 {
|
||||
continue;
|
||||
}
|
||||
// Try to bump this slot to shared_max (requires exclusive on the slot)
|
||||
if lock.write() {
|
||||
// Slot is free to edit, bump to shared_max (slot 1 keeps a real mark, others can be cleaned)
|
||||
let val = if idx == 1 {
|
||||
shared_max as u32
|
||||
for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate().skip(1) {
|
||||
let this_mark = read_lock.get_value();
|
||||
if this_mark < max_safe_frame as u32 {
|
||||
let busy = !read_lock.write();
|
||||
if !busy {
|
||||
let val = if read_lock_idx == 1 {
|
||||
// store the max_frame for the default read slot 1
|
||||
max_safe_frame as u32
|
||||
} else {
|
||||
READMARK_NOT_USED
|
||||
};
|
||||
read_lock.set_value_exclusive(val);
|
||||
read_lock.unlock();
|
||||
} else {
|
||||
READMARK_NOT_USED
|
||||
};
|
||||
lock.set_value_exclusive(val);
|
||||
lock.unlock();
|
||||
} else {
|
||||
// Reader is using this slot.
|
||||
return match mode {
|
||||
// clamp down for passive
|
||||
CheckpointMode::Passive => Ok(mark as u64),
|
||||
_ => Err(LimboError::Busy), // all others must wait
|
||||
};
|
||||
max_safe_frame = this_mark as u64;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(shared_max)
|
||||
max_safe_frame
|
||||
}
|
||||
|
||||
/// Called once the entire WAL has been back‑filled in RESTART or TRUNCATE mode.
|
||||
@@ -2087,8 +2082,8 @@ pub mod test {
|
||||
let pager = conn.pager.borrow();
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let res = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Restart);
|
||||
assert_eq!(res.num_wal_frames, mx_before);
|
||||
assert_eq!(res.num_checkpointed_frames, mx_before);
|
||||
assert_eq!(res.num_attempted, mx_before);
|
||||
assert_eq!(res.num_backfilled, mx_before);
|
||||
}
|
||||
|
||||
// Validate post‑RESTART header & counters.
|
||||
@@ -2183,15 +2178,15 @@ pub mod test {
|
||||
};
|
||||
(res, maxf)
|
||||
};
|
||||
assert_eq!(res1.num_wal_frames, max_before);
|
||||
assert_eq!(res1.num_attempted, max_before);
|
||||
assert!(
|
||||
res1.num_checkpointed_frames < res1.num_wal_frames,
|
||||
res1.num_backfilled < res1.num_attempted,
|
||||
"Partial backfill expected, {} : {}",
|
||||
res1.num_checkpointed_frames,
|
||||
res1.num_wal_frames
|
||||
res1.num_backfilled,
|
||||
res1.num_attempted
|
||||
);
|
||||
assert_eq!(
|
||||
res1.num_checkpointed_frames, readmark,
|
||||
res1.num_backfilled, readmark,
|
||||
"Checkpointed frames should match read mark"
|
||||
);
|
||||
// Release reader
|
||||
@@ -2206,7 +2201,7 @@ pub mod test {
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
let res2 = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive);
|
||||
assert_eq!(
|
||||
res2.num_checkpointed_frames, res2.num_wal_frames,
|
||||
res2.num_backfilled, res2.num_attempted,
|
||||
"Second checkpoint completes remaining frames"
|
||||
);
|
||||
}
|
||||
@@ -2355,11 +2350,11 @@ pub mod test {
|
||||
};
|
||||
|
||||
assert!(
|
||||
checkpoint_result.num_checkpointed_frames < checkpoint_result.num_wal_frames,
|
||||
checkpoint_result.num_backfilled < checkpoint_result.num_attempted,
|
||||
"Should not checkpoint all frames when readers are active"
|
||||
);
|
||||
assert_eq!(
|
||||
checkpoint_result.num_checkpointed_frames, r1_max_frame,
|
||||
checkpoint_result.num_backfilled, r1_max_frame,
|
||||
"Should have checkpointed up to R1's max frame"
|
||||
);
|
||||
|
||||
@@ -2542,7 +2537,7 @@ pub mod test {
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive)
|
||||
};
|
||||
assert_eq!(result1.num_checkpointed_frames, r1_frame);
|
||||
assert_eq!(result1.num_backfilled, r1_frame);
|
||||
|
||||
// finish reader‑1
|
||||
conn1.execute("COMMIT").unwrap();
|
||||
@@ -2553,10 +2548,7 @@ pub mod test {
|
||||
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive)
|
||||
};
|
||||
assert_eq!(
|
||||
result1.num_checkpointed_frames + result2.num_checkpointed_frames,
|
||||
r2_frame
|
||||
);
|
||||
assert_eq!(result1.num_backfilled + result2.num_backfilled, r2_frame);
|
||||
|
||||
// verify visible rows
|
||||
let mut stmt = conn_r2.query("SELECT COUNT(*) FROM test").unwrap().unwrap();
|
||||
@@ -2847,8 +2839,8 @@ pub mod test {
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Full)
|
||||
};
|
||||
|
||||
assert_eq!(result.num_wal_frames, mx_before);
|
||||
assert_eq!(result.num_checkpointed_frames, mx_before);
|
||||
assert_eq!(result.num_attempted, mx_before);
|
||||
assert_eq!(result.num_backfilled, mx_before);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -2924,7 +2916,7 @@ pub mod test {
|
||||
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Full)
|
||||
};
|
||||
|
||||
assert_eq!(result.num_wal_frames, mx_now);
|
||||
assert_eq!(result.num_checkpointed_frames, mx_now);
|
||||
assert_eq!(result.num_attempted, mx_now - r_snapshot);
|
||||
assert!(result.everything_backfilled());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -357,18 +357,17 @@ pub fn op_checkpoint(
|
||||
let result = program.connection.checkpoint(*checkpoint_mode);
|
||||
match result {
|
||||
Ok(CheckpointResult {
|
||||
num_wal_frames: num_wal_pages,
|
||||
num_checkpointed_frames: num_checkpointed_pages,
|
||||
num_attempted,
|
||||
num_backfilled,
|
||||
..
|
||||
}) => {
|
||||
// https://sqlite.org/pragma.html#pragma_wal_checkpoint
|
||||
// 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy).
|
||||
state.registers[*dest] = Register::Value(Value::Integer(0));
|
||||
// 2nd col: # modified pages written to wal file
|
||||
state.registers[*dest + 1] = Register::Value(Value::Integer(num_wal_pages as i64));
|
||||
state.registers[*dest + 1] = Register::Value(Value::Integer(num_attempted as i64));
|
||||
// 3rd col: # pages moved to db after checkpoint
|
||||
state.registers[*dest + 2] =
|
||||
Register::Value(Value::Integer(num_checkpointed_pages as i64));
|
||||
state.registers[*dest + 2] = Register::Value(Value::Integer(num_backfilled as i64));
|
||||
}
|
||||
Err(_err) => state.registers[*dest] = Register::Value(Value::Integer(1)),
|
||||
}
|
||||
|
||||
@@ -1328,10 +1328,10 @@ pub unsafe extern "C" fn sqlite3_wal_checkpoint_v2(
|
||||
match db.conn.checkpoint(chkptmode) {
|
||||
Ok(res) => {
|
||||
if !log_size.is_null() {
|
||||
(*log_size) = res.num_wal_frames as ffi::c_int;
|
||||
(*log_size) = res.num_attempted as ffi::c_int;
|
||||
}
|
||||
if !checkpoint_count.is_null() {
|
||||
(*checkpoint_count) = res.num_checkpointed_frames as ffi::c_int;
|
||||
(*checkpoint_count) = res.num_backfilled as ffi::c_int;
|
||||
}
|
||||
SQLITE_OK
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user