mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-22 17:34:27 +01:00
Merge 'Stop checkpointing the entire WAL after every write when wal frame size > threshold' from Preston Thorpe
I know I have #2179 and now #2278 open both working on checkpointing, but I discovered during async IO benchmarks that once we hit `should_checkpoint` (there are greater than 1000 frames in the WAL), we will trigger checkpoint on cache flush every single write, and since we don't update `nbackfills` in the case that everything was backfilled, every checkpoint will backfill every single frame to the db file 💀 At least this can be reviewed+merged faster and easier. This also fixes the return value of `pragma wal_checkpoint;` to match sqlite. Sqlite returns the # of possible frames (`shared.max_frame - shared.nbackfills`, and then the number of frames we successfully checkpointed `ongoing_checkpoint.max_frame - ongoing_checkpoint.min_frame + 1`) If `pragma wal_checkpoint;` is called again when there are no pages to backfill, then it returns the value from the previous checkpoint. <img width="781" height="293" alt="image" src="https://github.com/user- attachments/assets/9aa78a6c-aa18-444d-82bd-398d684fed75" /> Closes #2280
This commit is contained in:
@@ -39,7 +39,7 @@ pub const NO_LOCK: u32 = 0;
|
||||
pub const SHARED_LOCK: u32 = 1;
|
||||
pub const WRITE_LOCK: u32 = 2;
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
#[derive(Debug, Copy, Clone, Default)]
|
||||
pub struct CheckpointResult {
|
||||
/// number of frames in WAL
|
||||
pub num_wal_frames: u64,
|
||||
@@ -47,17 +47,11 @@ pub struct CheckpointResult {
|
||||
pub num_checkpointed_frames: u64,
|
||||
}
|
||||
|
||||
impl Default for CheckpointResult {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl CheckpointResult {
|
||||
pub fn new() -> Self {
|
||||
pub fn new(n_frames: u64, n_ckpt: u64) -> Self {
|
||||
Self {
|
||||
num_wal_frames: 0,
|
||||
num_checkpointed_frames: 0,
|
||||
num_wal_frames: n_frames,
|
||||
num_checkpointed_frames: n_ckpt,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -423,6 +417,9 @@ pub struct WalFile {
|
||||
/// Hack for now in case of rollback, will not be needed once we remove this bullshit frame cache.
|
||||
start_pages_in_frames: usize,
|
||||
|
||||
/// Count of possible pages to checkpoint, and number of backfilled
|
||||
prev_checkpoint: CheckpointResult,
|
||||
|
||||
/// Private copy of WalHeader
|
||||
pub header: WalHeader,
|
||||
}
|
||||
@@ -802,7 +799,8 @@ impl Wal for WalFile {
|
||||
fn should_checkpoint(&self) -> bool {
|
||||
let shared = self.get_shared();
|
||||
let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize;
|
||||
frame_id >= self.checkpoint_threshold
|
||||
let nbackfills = shared.nbackfills.load(Ordering::SeqCst) as usize;
|
||||
frame_id > self.checkpoint_threshold + nbackfills
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
@@ -822,7 +820,19 @@ impl Wal for WalFile {
|
||||
match state {
|
||||
CheckpointState::Start => {
|
||||
// TODO(pere): check what frames are safe to checkpoint between many readers!
|
||||
self.ongoing_checkpoint.min_frame = self.min_frame;
|
||||
let (shared_max, nbackfills) = {
|
||||
let shared = self.get_shared();
|
||||
(
|
||||
shared.max_frame.load(Ordering::SeqCst),
|
||||
shared.nbackfills.load(Ordering::SeqCst),
|
||||
)
|
||||
};
|
||||
if shared_max <= nbackfills {
|
||||
// if there's nothing to do and we are fully back-filled, to match sqlite
|
||||
// we return the previous number of backfilled pages from last checkpoint.
|
||||
return Ok(IOResult::Done(self.prev_checkpoint));
|
||||
}
|
||||
self.ongoing_checkpoint.min_frame = nbackfills + 1;
|
||||
let shared = self.get_shared();
|
||||
let busy = !shared.checkpoint_lock.write();
|
||||
if busy {
|
||||
@@ -936,15 +946,23 @@ impl Wal for WalFile {
|
||||
}
|
||||
let shared = self.get_shared();
|
||||
shared.checkpoint_lock.unlock();
|
||||
let max_frame = shared.max_frame.load(Ordering::SeqCst);
|
||||
let nbackfills = shared.nbackfills.load(Ordering::SeqCst);
|
||||
|
||||
// Record two num pages fields to return as checkpoint result to caller.
|
||||
// Ref: pnLog, pnCkpt on https://www.sqlite.org/c3ref/wal_checkpoint_v2.html
|
||||
let checkpoint_result = CheckpointResult {
|
||||
num_wal_frames: shared.max_frame.load(Ordering::SeqCst),
|
||||
num_checkpointed_frames: self.ongoing_checkpoint.max_frame,
|
||||
};
|
||||
let frames_in_wal = max_frame.saturating_sub(nbackfills);
|
||||
let frames_checkpointed = self
|
||||
.ongoing_checkpoint
|
||||
.max_frame
|
||||
.saturating_sub(self.ongoing_checkpoint.min_frame - 1);
|
||||
let checkpoint_result =
|
||||
CheckpointResult::new(frames_in_wal, frames_checkpointed);
|
||||
let everything_backfilled = shared.max_frame.load(Ordering::SeqCst)
|
||||
== self.ongoing_checkpoint.max_frame;
|
||||
shared
|
||||
.nbackfills
|
||||
.store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst);
|
||||
if everything_backfilled {
|
||||
// TODO: Even in Passive mode, if everything was backfilled we should
|
||||
// truncate and fsync the *db file*
|
||||
@@ -962,11 +980,8 @@ impl Wal for WalFile {
|
||||
// TODO: if all frames were backfilled into the db file, calls fsync
|
||||
// TODO(pere): truncate wal file here.
|
||||
}
|
||||
} else {
|
||||
shared
|
||||
.nbackfills
|
||||
.store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst);
|
||||
}
|
||||
self.prev_checkpoint = checkpoint_result;
|
||||
self.ongoing_checkpoint.state = CheckpointState::Start;
|
||||
return Ok(IOResult::Done(checkpoint_result));
|
||||
}
|
||||
@@ -1094,6 +1109,7 @@ impl WalFile {
|
||||
min_frame: 0,
|
||||
max_frame_read_lock_index: 0,
|
||||
last_checksum,
|
||||
prev_checkpoint: CheckpointResult::default(),
|
||||
start_pages_in_frames: 0,
|
||||
header: *header,
|
||||
}
|
||||
|
||||
@@ -206,6 +206,7 @@ mod tests {
|
||||
|
||||
#[cfg(not(feature = "sqlite3"))]
|
||||
mod libsql_ext {
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
@@ -471,7 +472,7 @@ mod tests {
|
||||
assert_eq!(sqlite3_open(c_path.as_ptr(), &mut db), SQLITE_OK);
|
||||
// Insert at least 1000 rows to go over checkpoint threshold.
|
||||
let mut stmt = ptr::null_mut();
|
||||
for i in 1..=2000 {
|
||||
for i in 1..2000 {
|
||||
let sql =
|
||||
std::ffi::CString::new(format!("INSERT INTO test (id) VALUES ({i})"))
|
||||
.unwrap();
|
||||
@@ -506,7 +507,11 @@ mod tests {
|
||||
);
|
||||
assert_eq!(sqlite3_step(stmt), SQLITE_ROW);
|
||||
let count = sqlite3_column_int64(stmt, 0);
|
||||
assert_eq!(count, 2000);
|
||||
// with a sane `should_checkpoint` method we have no garuantee that all 2000 rows are present, as the checkpoint was
|
||||
// triggered by cacheflush on insertions. the pattern will trigger a checkpoint when the wal has > 1000 frames,
|
||||
// so it will be triggered but will no longer be triggered on each consecutive
|
||||
// write. here we can assert that we have > 1500 rows.
|
||||
assert!(count > 1500);
|
||||
assert_eq!(sqlite3_step(stmt), SQLITE_DONE);
|
||||
assert_eq!(sqlite3_finalize(stmt), SQLITE_OK);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user