From b214c3dfc87f26612a238edb2fe2e53d15efe636 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 19 Jul 2025 19:02:06 -0400 Subject: [PATCH] Add diff chkpt modes to sqlite3 api, finish checkpoint logic and add tests --- core/storage/wal.rs | 361 +++++++++++++++++++++++++++++++++++++++----- sqlite3/src/lib.rs | 26 +++- 2 files changed, 344 insertions(+), 43 deletions(-) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index aaad7182e..e152ad375 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -483,9 +483,6 @@ pub struct WalFileShared { /// value. There is a limited amount because and unbounded amount of connections could be /// fatal. Therefore, for now we copy how SQLite behaves with limited amounts of read max /// frames that is equal to 5. - /// read_locks[0] is the exclusive read lock that is always 0 except during a checkpoint where - /// the log is restarted. read_locks[1] is the 'default reader' slot that always contains the - /// current max_frame. pub read_locks: [LimboRwLock; 5], /// There is only one write allowed in WAL mode. This lock takes care of ensuring there is only /// one used. @@ -510,6 +507,8 @@ impl fmt::Debug for WalFileShared { } #[derive(Clone, Debug)] +/// An RAII guard to ensure that no locks are leaked during checkpointing in +/// the case of errors. enum CheckpointGuard { Writer { ptr: Arc> }, Read0 { ptr: Arc> }, @@ -520,7 +519,7 @@ impl CheckpointGuard { let shared = &mut unsafe { &mut *ptr.get() }; if !shared.checkpoint_lock.write() { tracing::trace!("CheckpointGuard::new: checkpoint lock failed, returning Busy"); - // exclusive lock on checkpoint lock + // we hold the exclusive checkpoint lock no matter which mode for the duration return Err(LimboError::Busy); } match mode { @@ -529,12 +528,13 @@ impl CheckpointGuard { if !read0.write() { shared.checkpoint_lock.unlock(); tracing::trace!("CheckpointGuard::new: read0 lock failed, returning Busy"); - // exclusive lock on slot‑0 + // for passive and full we need to hold the read0 lock return Err(LimboError::Busy); } Ok(Self::Read0 { ptr }) } CheckpointMode::Restart | CheckpointMode::Truncate => { + // if we are resetting the log we must hold the write lock for the duration. if !shared.write_lock.write() { shared.checkpoint_lock.unlock(); tracing::trace!("CheckpointGuard::new: read0 lock failed, returning Busy"); @@ -545,14 +545,13 @@ impl CheckpointGuard { } } } - -/// Small macro to remove the writer lock in the event that a checkpoint errors out and would +/// macro to remove the writer lock in the event that a checkpoint errors out and would /// otherwise leak the lock. Only used during checkpointing. macro_rules! ensure_unlock { ($self:ident, $fun:expr) => { $fun.inspect_err(|e| { - tracing::trace!( - "CheckpointGuard::ensure_unlock: error occurred, releaseing held locks: {e}" + tracing::error!( + "CheckpointGuard::ensure_unlock: error occurred, releasing held locks: {e}" ); let _ = $self.checkpoint_guard.take(); })?; @@ -579,9 +578,23 @@ impl Wal for WalFile { #[instrument(skip_all, level = Level::DEBUG)] fn begin_read_tx(&mut self) -> Result<(LimboResult, bool)> { let max_frame_in_wal = self.get_shared().max_frame.load(Ordering::SeqCst); + let nbackfills = self.get_shared().nbackfills.load(Ordering::SeqCst); let db_has_changed = max_frame_in_wal > self.max_frame; + // Check if we can use slot 0 (bypass WAL) + if max_frame_in_wal == nbackfills { + let shared = self.get_shared(); + let lock = &mut shared.read_locks[0]; + // if wal is fully checkpointed, we can use read mark 0 to read directly + // from the db file + if lock.read() { + self.max_frame_read_lock_index = 0; + self.max_frame = max_frame_in_wal; + self.min_frame = max_frame_in_wal + 1; // Ignore WAL; + return Ok((LimboResult::Ok, db_has_changed)); + } + } let mut max_read_mark = 0; let mut max_read_mark_index = -1; // Find the largest mark we can find, ignore frames that are impossible to be in range and @@ -654,9 +667,8 @@ impl Wal for WalFile { /// Begin a write transaction #[instrument(skip_all, level = Level::DEBUG)] fn begin_write_tx(&mut self) -> Result { - let busy = !self.get_shared().write_lock.write(); - tracing::debug!("begin_write_transaction(busy={})", busy); - if busy { + if !self.get_shared().write_lock.write() { + tracing::debug!("begin_write_transaction(busy=true)"); return Ok(LimboResult::Busy); } // If the max frame is not the same as the one in the shared state, it means another @@ -917,22 +929,17 @@ impl Wal for WalFile { shared.nbackfills.load(Ordering::SeqCst), ) }; - if shared_max <= nbackfills { - // if there's nothing to do and we are fully back-filled, to match sqlite - // we return the previous number of backfilled pages from last checkpoint. + let needs_backfill = shared_max > nbackfills; + if !needs_backfill + && matches!(mode, CheckpointMode::Passive | CheckpointMode::Full) + { + // there are no frames to copy and we don't need to reset the log so we can + // return early success. self.prev_checkpoint.release_guard(); // just in case we are still holding return Ok(IOResult::Done(self.prev_checkpoint.clone())); } - let shared = self.get_shared(); - let busy = !shared.checkpoint_lock.write(); - if busy { - return Err(LimboError::Busy); - } - // acquire either the read0 or write lock depending on the checkpoint mode - if self.checkpoint_guard.is_none() { - let guard = CheckpointGuard::new(self.shared.clone(), mode)?; - self.checkpoint_guard = Some(guard); - } + // acquire the appropriate locks depending on the checkpoint mode + self.acquire_proper_checkpoint_guard(mode)?; self.ongoing_checkpoint.max_frame = self.determine_max_safe_checkpoint_frame(); self.ongoing_checkpoint.min_frame = nbackfills + 1; self.ongoing_checkpoint.current_page = 0; @@ -1058,7 +1065,7 @@ impl Wal for WalFile { { ensure_unlock!(self, self.restart_log(mode)); } - self.prev_checkpoint = checkpoint_result; + self.prev_checkpoint = checkpoint_result.clone(); // we cannot truncate the db file here, because we are currently inside a // mut borrow of pager.wal, and accessing the header will attempt a borrow, // so the caller will determine if: @@ -1066,13 +1073,10 @@ impl Wal for WalFile { // b. the db file size != num of db pages * page_size // and truncate the db file if necessary. if checkpoint_result.everything_backfilled() { - // temporarily hold the locks in the case that the db file must be truncated checkpoint_result.maybe_guard = self.checkpoint_guard.take(); } else { - // we can drop them now let _ = self.checkpoint_guard.take(); } - self.prev_checkpoint = checkpoint_result.clone(); self.ongoing_checkpoint.state = CheckpointState::Start; return Ok(IOResult::Done(checkpoint_result)); } @@ -1285,7 +1289,8 @@ impl WalFile { ); turso_assert!( matches!(self.checkpoint_guard, Some(CheckpointGuard::Writer { .. })), - "We must hold writer and checkpoint locks to restart the log" + "We must hold writer and checkpoint locks to restart the log, found: {:?}", + self.checkpoint_guard ); tracing::info!("restart_log(mode={mode:?})"); { @@ -1350,10 +1355,33 @@ impl WalFile { } } + self.last_checksum = self.get_shared().last_checksum; self.max_frame = 0; self.min_frame = 0; Ok(()) } + + fn acquire_proper_checkpoint_guard(&mut self, mode: CheckpointMode) -> Result<()> { + let needs_new_guard = !matches!( + (&self.checkpoint_guard, mode), + ( + Some(CheckpointGuard::Read0 { .. }), + CheckpointMode::Passive | CheckpointMode::Full, + ) | ( + Some(CheckpointGuard::Writer { .. }), + CheckpointMode::Restart | CheckpointMode::Truncate, + ), + ); + if needs_new_guard { + // Drop any existing guard + if self.checkpoint_guard.is_some() { + let _ = self.checkpoint_guard.take(); + } + let guard = CheckpointGuard::new(self.shared.clone(), mode)?; + self.checkpoint_guard = Some(guard); + } + Ok(()) + } } impl WalFileShared { @@ -1523,7 +1551,9 @@ impl WalFileShared { #[cfg(test)] pub mod test { use crate::{ - result::LimboResult, storage::sqlite3_ondisk::WAL_HEADER_SIZE, types::IOResult, + result::LimboResult, + storage::{sqlite3_ondisk::WAL_HEADER_SIZE, wal::READMARK_NOT_USED}, + types::IOResult, CheckpointMode, CheckpointResult, Completion, Connection, Database, LimboError, PlatformIO, Wal, WalFileShared, IO, }; @@ -1928,12 +1958,269 @@ pub mod test { .begin_read_tx() .unwrap(); - // checkpoint should fail - let result = { - let p = conn1.pager.borrow(); - let mut w = p.wal.borrow_mut(); - w.checkpoint(&p, Rc::new(RefCell::new(0)), CheckpointMode::Restart) + // checkpoint should succeed here because the wal is fully checkpointed (empty) + // so the reader is using readmark0 to read directly from the db file. + let p = conn1.pager.borrow(); + let mut w = p.wal.borrow_mut(); + loop { + match w.checkpoint(&p, Rc::new(RefCell::new(0)), CheckpointMode::Restart) { + Ok(IOResult::IO) => { + conn1.run_once().unwrap(); + } + Ok(IOResult::Done(result)) => { + assert_eq!( + result.num_checkpointed_frames, 0, + "Should not checkpoint any frames (empty)" + ); + break; + } + Err(e) => { + panic!("Checkpoint failed: {e}"); + } + } + } + drop(w); + conn2.pager.borrow_mut().end_read_tx().unwrap(); + + conn1 + .execute("create table test(id integer primary key, value text)") + .unwrap(); + for i in 0..10 { + conn1 + .execute(format!("insert into test(value) values ('value{i}')")) + .unwrap(); + } + // now that we have some frames to checkpoint, try again + conn2.pager.borrow_mut().begin_read_tx().unwrap(); + let p = conn1.pager.borrow(); + let mut w = p.wal.borrow_mut(); + loop { + match w.checkpoint(&p, Rc::new(RefCell::new(0)), CheckpointMode::Restart) { + Ok(IOResult::IO) => { + conn1.run_once().unwrap(); + } + Ok(IOResult::Done(_)) => { + panic!("Checkpoint should not have succeeded"); + } + Err(e) => { + assert!(matches!(e, LimboError::Busy), "should block readers"); + break; + } + } + } + } + + #[test] + fn test_wal_read_lock_slot_0_optimization() { + let (db, _path) = get_database(); + let conn = db.connect().unwrap(); + + conn.execute("create table test(id integer primary key, value text)") + .unwrap(); + conn.execute("insert into test(value) values ('initial')") + .unwrap(); + + // Checkpoint everything + { + let pager = conn.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + let result = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Full); + assert!(result.everything_backfilled()); + } + + // Now start a read transaction - should use slot 0 + let conn2 = db.connect().unwrap(); + { + let pager = conn2.pager.borrow_mut(); + let mut wal = pager.wal.borrow_mut(); + assert!(matches!(wal.begin_read_tx().unwrap(), LimboResult::Ok)); + + // Verify it's using slot 0 and max_frame is 0 + assert_eq!( + wal.get_max_frame(), + 0, + "Should ignore WAL when using slot 0" + ); + assert_eq!(wal.get_min_frame(), 0); + + // Verify the read lock index + let wal_file = &*wal as *const dyn Wal as *const crate::WalFile; + let wal_file = unsafe { &*wal_file }; + assert_eq!( + wal_file.max_frame_read_lock_index, 0, + "Should be using slot 0" + ); + } + + // Another reader should also be able to use slot 0 (shared lock) + let conn3 = db.connect().unwrap(); + { + let pager = conn3.pager.borrow_mut(); + let mut wal = pager.wal.borrow_mut(); + assert!(matches!(wal.begin_read_tx().unwrap(), LimboResult::Ok)); + } + } + + #[test] + fn test_wal_read_marks_after_restart() { + let (db, _path) = get_database(); + let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone(); + + let conn = db.connect().unwrap(); + conn.execute("create table test(id integer primary key, value text)") + .unwrap(); + bulk_inserts(&conn, 10, 5); + // Checkpoint with restart + { + let pager = conn.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + let result = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Restart); + assert!(result.everything_backfilled()); + } + + // Verify read marks after restart + let read_marks_after: Vec<_> = unsafe { + let s = &*wal_shared.get(); + (0..5) + .map(|i| s.read_locks[i].value.load(Ordering::SeqCst)) + .collect() }; - assert!(matches!(result, Err(LimboError::Busy))); + + assert_eq!(read_marks_after[0], 0, "Slot 0 should remain 0"); + assert_eq!( + read_marks_after[1], 0, + "Slot 1 (default reader) should be reset to 0" + ); + for i in 2..5 { + assert_eq!( + read_marks_after[i], READMARK_NOT_USED, + "Slot {i} should be READMARK_NOT_USED after restart", + ); + } + } + + #[test] + fn test_wal_concurrent_readers_during_checkpoint() { + let (db, _path) = get_database(); + let conn_writer = db.connect().unwrap(); + + conn_writer + .execute("create table test(id integer primary key, value text)") + .unwrap(); + bulk_inserts(&conn_writer, 5, 10); + + // Start multiple readers at different points + let conn_r1 = db.connect().unwrap(); + let conn_r2 = db.connect().unwrap(); + + // R1 starts reading + { + let pager = conn_r1.pager.borrow_mut(); + let mut wal = pager.wal.borrow_mut(); + assert!(matches!(wal.begin_read_tx().unwrap(), LimboResult::Ok)); + } + bulk_inserts(&conn_writer, 5, 10); + + // R2 starts reading, sees more frames than R1 + let r2_max_frame = { + let pager = conn_r2.pager.borrow_mut(); + let mut wal = pager.wal.borrow_mut(); + assert!(matches!(wal.begin_read_tx().unwrap(), LimboResult::Ok)); + wal.get_max_frame() + }; + + // try passive checkpoint, should only checkpoint up to R1's position + let checkpoint_result = { + let pager = conn_writer.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive) + }; + + assert!( + checkpoint_result.num_checkpointed_frames < checkpoint_result.num_wal_frames, + "Should not checkpoint all frames when readers are active" + ); + + // Verify R2 still sees its frames + assert_eq!( + conn_r2.pager.borrow().wal.borrow().get_max_frame(), + r2_max_frame, + "Reader should maintain its snapshot" + ); + } + + #[test] + fn test_wal_checkpoint_updates_read_marks() { + let (db, _path) = get_database(); + let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone(); + + let conn = db.connect().unwrap(); + conn.execute("create table test(id integer primary key, value text)") + .unwrap(); + bulk_inserts(&conn, 10, 5); + + // get max frame before checkpoint + let max_frame_before = unsafe { (*wal_shared.get()).max_frame.load(Ordering::SeqCst) }; + + { + let pager = conn.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + let _result = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive); + } + + // check that read mark 1 (default reader) was updated to max_frame + let read_mark_1 = unsafe { + (*wal_shared.get()).read_locks[1] + .value + .load(Ordering::SeqCst) + }; + + assert_eq!( + read_mark_1 as u64, max_frame_before, + "Read mark 1 should be updated to max frame during checkpoint" + ); + } + + #[test] + fn test_wal_writer_blocks_restart_checkpoint() { + let (db, _path) = get_database(); + let conn1 = db.connect().unwrap(); + let conn2 = db.connect().unwrap(); + + conn1 + .execute("create table test(id integer primary key, value text)") + .unwrap(); + bulk_inserts(&conn1, 5, 5); + + // start a write transaction + { + let pager = conn2.pager.borrow_mut(); + let mut wal = pager.wal.borrow_mut(); + assert!(matches!(wal.begin_write_tx().unwrap(), LimboResult::Ok)); + } + + // should fail because writer lock is held + let result = { + let pager = conn1.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Restart) + }; + + assert!( + matches!(result, Err(LimboError::Busy)), + "Restart checkpoint should fail when write lock is held" + ); + + // release write lock + conn2.pager.borrow().wal.borrow().end_write_tx().unwrap(); + + // now restart should succeed + let result = { + let pager = conn1.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Restart) + }; + + assert!(result.everything_backfilled()); } } diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index e216ccb5c..2fa5419b6 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -1109,15 +1109,14 @@ pub unsafe extern "C" fn sqlite3_wal_checkpoint_v2( db: *mut sqlite3, _db_name: *const ffi::c_char, mode: ffi::c_int, - _log_size: *mut ffi::c_int, - _checkpoint_count: *mut ffi::c_int, + log_size: *mut ffi::c_int, + checkpoint_count: *mut ffi::c_int, ) -> ffi::c_int { if db.is_null() { return SQLITE_MISUSE; } let db: &mut sqlite3 = &mut *db; let db = db.inner.lock().unwrap(); - // TODO: Checkpointing modes and reporting back log size and checkpoint count to caller. let chkptmode = match mode { SQLITE_CHECKPOINT_PASSIVE => CheckpointMode::Passive, SQLITE_CHECKPOINT_RESTART => CheckpointMode::Restart, @@ -1125,10 +1124,25 @@ pub unsafe extern "C" fn sqlite3_wal_checkpoint_v2( SQLITE_CHECKPOINT_FULL => CheckpointMode::Full, _ => return SQLITE_MISUSE, // Unsupported mode }; - if db.conn.checkpoint(chkptmode).is_err() { - return SQLITE_ERROR; + match db.conn.checkpoint(chkptmode) { + Ok(res) => { + if !log_size.is_null() { + (*log_size) = res.num_wal_frames as ffi::c_int; + } + if !checkpoint_count.is_null() { + (*checkpoint_count) = res.num_checkpointed_frames as ffi::c_int; + } + SQLITE_OK + } + Err(e) => { + println!("Checkpoint error: {e}"); + if matches!(e, turso_core::LimboError::Busy) { + SQLITE_BUSY + } else { + SQLITE_ERROR + } + } } - SQLITE_OK } /// Get the number of frames in the WAL.