Add diff chkpt modes to sqlite3 api, finish checkpoint logic and add tests

This commit is contained in:
PThorpe92
2025-07-19 19:02:06 -04:00
committed by Jussi Saurio
parent eaa6f99fa8
commit b214c3dfc8
2 changed files with 344 additions and 43 deletions

View File

@@ -483,9 +483,6 @@ pub struct WalFileShared {
/// value. There is a limited amount because and unbounded amount of connections could be
/// fatal. Therefore, for now we copy how SQLite behaves with limited amounts of read max
/// frames that is equal to 5.
/// read_locks[0] is the exclusive read lock that is always 0 except during a checkpoint where
/// the log is restarted. read_locks[1] is the 'default reader' slot that always contains the
/// current max_frame.
pub read_locks: [LimboRwLock; 5],
/// There is only one write allowed in WAL mode. This lock takes care of ensuring there is only
/// one used.
@@ -510,6 +507,8 @@ impl fmt::Debug for WalFileShared {
}
#[derive(Clone, Debug)]
/// An RAII guard to ensure that no locks are leaked during checkpointing in
/// the case of errors.
enum CheckpointGuard {
Writer { ptr: Arc<UnsafeCell<WalFileShared>> },
Read0 { ptr: Arc<UnsafeCell<WalFileShared>> },
@@ -520,7 +519,7 @@ impl CheckpointGuard {
let shared = &mut unsafe { &mut *ptr.get() };
if !shared.checkpoint_lock.write() {
tracing::trace!("CheckpointGuard::new: checkpoint lock failed, returning Busy");
// exclusive lock on checkpoint lock
// we hold the exclusive checkpoint lock no matter which mode for the duration
return Err(LimboError::Busy);
}
match mode {
@@ -529,12 +528,13 @@ impl CheckpointGuard {
if !read0.write() {
shared.checkpoint_lock.unlock();
tracing::trace!("CheckpointGuard::new: read0 lock failed, returning Busy");
// exclusive lock on slot0
// for passive and full we need to hold the read0 lock
return Err(LimboError::Busy);
}
Ok(Self::Read0 { ptr })
}
CheckpointMode::Restart | CheckpointMode::Truncate => {
// if we are resetting the log we must hold the write lock for the duration.
if !shared.write_lock.write() {
shared.checkpoint_lock.unlock();
tracing::trace!("CheckpointGuard::new: read0 lock failed, returning Busy");
@@ -545,14 +545,13 @@ impl CheckpointGuard {
}
}
}
/// Small macro to remove the writer lock in the event that a checkpoint errors out and would
/// macro to remove the writer lock in the event that a checkpoint errors out and would
/// otherwise leak the lock. Only used during checkpointing.
macro_rules! ensure_unlock {
($self:ident, $fun:expr) => {
$fun.inspect_err(|e| {
tracing::trace!(
"CheckpointGuard::ensure_unlock: error occurred, releaseing held locks: {e}"
tracing::error!(
"CheckpointGuard::ensure_unlock: error occurred, releasing held locks: {e}"
);
let _ = $self.checkpoint_guard.take();
})?;
@@ -579,9 +578,23 @@ impl Wal for WalFile {
#[instrument(skip_all, level = Level::DEBUG)]
fn begin_read_tx(&mut self) -> Result<(LimboResult, bool)> {
let max_frame_in_wal = self.get_shared().max_frame.load(Ordering::SeqCst);
let nbackfills = self.get_shared().nbackfills.load(Ordering::SeqCst);
let db_has_changed = max_frame_in_wal > self.max_frame;
// Check if we can use slot 0 (bypass WAL)
if max_frame_in_wal == nbackfills {
let shared = self.get_shared();
let lock = &mut shared.read_locks[0];
// if wal is fully checkpointed, we can use read mark 0 to read directly
// from the db file
if lock.read() {
self.max_frame_read_lock_index = 0;
self.max_frame = max_frame_in_wal;
self.min_frame = max_frame_in_wal + 1; // Ignore WAL;
return Ok((LimboResult::Ok, db_has_changed));
}
}
let mut max_read_mark = 0;
let mut max_read_mark_index = -1;
// Find the largest mark we can find, ignore frames that are impossible to be in range and
@@ -654,9 +667,8 @@ impl Wal for WalFile {
/// Begin a write transaction
#[instrument(skip_all, level = Level::DEBUG)]
fn begin_write_tx(&mut self) -> Result<LimboResult> {
let busy = !self.get_shared().write_lock.write();
tracing::debug!("begin_write_transaction(busy={})", busy);
if busy {
if !self.get_shared().write_lock.write() {
tracing::debug!("begin_write_transaction(busy=true)");
return Ok(LimboResult::Busy);
}
// If the max frame is not the same as the one in the shared state, it means another
@@ -917,22 +929,17 @@ impl Wal for WalFile {
shared.nbackfills.load(Ordering::SeqCst),
)
};
if shared_max <= nbackfills {
// if there's nothing to do and we are fully back-filled, to match sqlite
// we return the previous number of backfilled pages from last checkpoint.
let needs_backfill = shared_max > nbackfills;
if !needs_backfill
&& matches!(mode, CheckpointMode::Passive | CheckpointMode::Full)
{
// there are no frames to copy and we don't need to reset the log so we can
// return early success.
self.prev_checkpoint.release_guard(); // just in case we are still holding
return Ok(IOResult::Done(self.prev_checkpoint.clone()));
}
let shared = self.get_shared();
let busy = !shared.checkpoint_lock.write();
if busy {
return Err(LimboError::Busy);
}
// acquire either the read0 or write lock depending on the checkpoint mode
if self.checkpoint_guard.is_none() {
let guard = CheckpointGuard::new(self.shared.clone(), mode)?;
self.checkpoint_guard = Some(guard);
}
// acquire the appropriate locks depending on the checkpoint mode
self.acquire_proper_checkpoint_guard(mode)?;
self.ongoing_checkpoint.max_frame = self.determine_max_safe_checkpoint_frame();
self.ongoing_checkpoint.min_frame = nbackfills + 1;
self.ongoing_checkpoint.current_page = 0;
@@ -1058,7 +1065,7 @@ impl Wal for WalFile {
{
ensure_unlock!(self, self.restart_log(mode));
}
self.prev_checkpoint = checkpoint_result;
self.prev_checkpoint = checkpoint_result.clone();
// we cannot truncate the db file here, because we are currently inside a
// mut borrow of pager.wal, and accessing the header will attempt a borrow,
// so the caller will determine if:
@@ -1066,13 +1073,10 @@ impl Wal for WalFile {
// b. the db file size != num of db pages * page_size
// and truncate the db file if necessary.
if checkpoint_result.everything_backfilled() {
// temporarily hold the locks in the case that the db file must be truncated
checkpoint_result.maybe_guard = self.checkpoint_guard.take();
} else {
// we can drop them now
let _ = self.checkpoint_guard.take();
}
self.prev_checkpoint = checkpoint_result.clone();
self.ongoing_checkpoint.state = CheckpointState::Start;
return Ok(IOResult::Done(checkpoint_result));
}
@@ -1285,7 +1289,8 @@ impl WalFile {
);
turso_assert!(
matches!(self.checkpoint_guard, Some(CheckpointGuard::Writer { .. })),
"We must hold writer and checkpoint locks to restart the log"
"We must hold writer and checkpoint locks to restart the log, found: {:?}",
self.checkpoint_guard
);
tracing::info!("restart_log(mode={mode:?})");
{
@@ -1350,10 +1355,33 @@ impl WalFile {
}
}
self.last_checksum = self.get_shared().last_checksum;
self.max_frame = 0;
self.min_frame = 0;
Ok(())
}
fn acquire_proper_checkpoint_guard(&mut self, mode: CheckpointMode) -> Result<()> {
let needs_new_guard = !matches!(
(&self.checkpoint_guard, mode),
(
Some(CheckpointGuard::Read0 { .. }),
CheckpointMode::Passive | CheckpointMode::Full,
) | (
Some(CheckpointGuard::Writer { .. }),
CheckpointMode::Restart | CheckpointMode::Truncate,
),
);
if needs_new_guard {
// Drop any existing guard
if self.checkpoint_guard.is_some() {
let _ = self.checkpoint_guard.take();
}
let guard = CheckpointGuard::new(self.shared.clone(), mode)?;
self.checkpoint_guard = Some(guard);
}
Ok(())
}
}
impl WalFileShared {
@@ -1523,7 +1551,9 @@ impl WalFileShared {
#[cfg(test)]
pub mod test {
use crate::{
result::LimboResult, storage::sqlite3_ondisk::WAL_HEADER_SIZE, types::IOResult,
result::LimboResult,
storage::{sqlite3_ondisk::WAL_HEADER_SIZE, wal::READMARK_NOT_USED},
types::IOResult,
CheckpointMode, CheckpointResult, Completion, Connection, Database, LimboError, PlatformIO,
Wal, WalFileShared, IO,
};
@@ -1928,12 +1958,269 @@ pub mod test {
.begin_read_tx()
.unwrap();
// checkpoint should fail
let result = {
let p = conn1.pager.borrow();
let mut w = p.wal.borrow_mut();
w.checkpoint(&p, Rc::new(RefCell::new(0)), CheckpointMode::Restart)
// checkpoint should succeed here because the wal is fully checkpointed (empty)
// so the reader is using readmark0 to read directly from the db file.
let p = conn1.pager.borrow();
let mut w = p.wal.borrow_mut();
loop {
match w.checkpoint(&p, Rc::new(RefCell::new(0)), CheckpointMode::Restart) {
Ok(IOResult::IO) => {
conn1.run_once().unwrap();
}
Ok(IOResult::Done(result)) => {
assert_eq!(
result.num_checkpointed_frames, 0,
"Should not checkpoint any frames (empty)"
);
break;
}
Err(e) => {
panic!("Checkpoint failed: {e}");
}
}
}
drop(w);
conn2.pager.borrow_mut().end_read_tx().unwrap();
conn1
.execute("create table test(id integer primary key, value text)")
.unwrap();
for i in 0..10 {
conn1
.execute(format!("insert into test(value) values ('value{i}')"))
.unwrap();
}
// now that we have some frames to checkpoint, try again
conn2.pager.borrow_mut().begin_read_tx().unwrap();
let p = conn1.pager.borrow();
let mut w = p.wal.borrow_mut();
loop {
match w.checkpoint(&p, Rc::new(RefCell::new(0)), CheckpointMode::Restart) {
Ok(IOResult::IO) => {
conn1.run_once().unwrap();
}
Ok(IOResult::Done(_)) => {
panic!("Checkpoint should not have succeeded");
}
Err(e) => {
assert!(matches!(e, LimboError::Busy), "should block readers");
break;
}
}
}
}
#[test]
fn test_wal_read_lock_slot_0_optimization() {
let (db, _path) = get_database();
let conn = db.connect().unwrap();
conn.execute("create table test(id integer primary key, value text)")
.unwrap();
conn.execute("insert into test(value) values ('initial')")
.unwrap();
// Checkpoint everything
{
let pager = conn.pager.borrow();
let mut wal = pager.wal.borrow_mut();
let result = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Full);
assert!(result.everything_backfilled());
}
// Now start a read transaction - should use slot 0
let conn2 = db.connect().unwrap();
{
let pager = conn2.pager.borrow_mut();
let mut wal = pager.wal.borrow_mut();
assert!(matches!(wal.begin_read_tx().unwrap(), LimboResult::Ok));
// Verify it's using slot 0 and max_frame is 0
assert_eq!(
wal.get_max_frame(),
0,
"Should ignore WAL when using slot 0"
);
assert_eq!(wal.get_min_frame(), 0);
// Verify the read lock index
let wal_file = &*wal as *const dyn Wal as *const crate::WalFile;
let wal_file = unsafe { &*wal_file };
assert_eq!(
wal_file.max_frame_read_lock_index, 0,
"Should be using slot 0"
);
}
// Another reader should also be able to use slot 0 (shared lock)
let conn3 = db.connect().unwrap();
{
let pager = conn3.pager.borrow_mut();
let mut wal = pager.wal.borrow_mut();
assert!(matches!(wal.begin_read_tx().unwrap(), LimboResult::Ok));
}
}
#[test]
fn test_wal_read_marks_after_restart() {
let (db, _path) = get_database();
let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone();
let conn = db.connect().unwrap();
conn.execute("create table test(id integer primary key, value text)")
.unwrap();
bulk_inserts(&conn, 10, 5);
// Checkpoint with restart
{
let pager = conn.pager.borrow();
let mut wal = pager.wal.borrow_mut();
let result = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Restart);
assert!(result.everything_backfilled());
}
// Verify read marks after restart
let read_marks_after: Vec<_> = unsafe {
let s = &*wal_shared.get();
(0..5)
.map(|i| s.read_locks[i].value.load(Ordering::SeqCst))
.collect()
};
assert!(matches!(result, Err(LimboError::Busy)));
assert_eq!(read_marks_after[0], 0, "Slot 0 should remain 0");
assert_eq!(
read_marks_after[1], 0,
"Slot 1 (default reader) should be reset to 0"
);
for i in 2..5 {
assert_eq!(
read_marks_after[i], READMARK_NOT_USED,
"Slot {i} should be READMARK_NOT_USED after restart",
);
}
}
#[test]
fn test_wal_concurrent_readers_during_checkpoint() {
let (db, _path) = get_database();
let conn_writer = db.connect().unwrap();
conn_writer
.execute("create table test(id integer primary key, value text)")
.unwrap();
bulk_inserts(&conn_writer, 5, 10);
// Start multiple readers at different points
let conn_r1 = db.connect().unwrap();
let conn_r2 = db.connect().unwrap();
// R1 starts reading
{
let pager = conn_r1.pager.borrow_mut();
let mut wal = pager.wal.borrow_mut();
assert!(matches!(wal.begin_read_tx().unwrap(), LimboResult::Ok));
}
bulk_inserts(&conn_writer, 5, 10);
// R2 starts reading, sees more frames than R1
let r2_max_frame = {
let pager = conn_r2.pager.borrow_mut();
let mut wal = pager.wal.borrow_mut();
assert!(matches!(wal.begin_read_tx().unwrap(), LimboResult::Ok));
wal.get_max_frame()
};
// try passive checkpoint, should only checkpoint up to R1's position
let checkpoint_result = {
let pager = conn_writer.pager.borrow();
let mut wal = pager.wal.borrow_mut();
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive)
};
assert!(
checkpoint_result.num_checkpointed_frames < checkpoint_result.num_wal_frames,
"Should not checkpoint all frames when readers are active"
);
// Verify R2 still sees its frames
assert_eq!(
conn_r2.pager.borrow().wal.borrow().get_max_frame(),
r2_max_frame,
"Reader should maintain its snapshot"
);
}
#[test]
fn test_wal_checkpoint_updates_read_marks() {
let (db, _path) = get_database();
let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone();
let conn = db.connect().unwrap();
conn.execute("create table test(id integer primary key, value text)")
.unwrap();
bulk_inserts(&conn, 10, 5);
// get max frame before checkpoint
let max_frame_before = unsafe { (*wal_shared.get()).max_frame.load(Ordering::SeqCst) };
{
let pager = conn.pager.borrow();
let mut wal = pager.wal.borrow_mut();
let _result = run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive);
}
// check that read mark 1 (default reader) was updated to max_frame
let read_mark_1 = unsafe {
(*wal_shared.get()).read_locks[1]
.value
.load(Ordering::SeqCst)
};
assert_eq!(
read_mark_1 as u64, max_frame_before,
"Read mark 1 should be updated to max frame during checkpoint"
);
}
#[test]
fn test_wal_writer_blocks_restart_checkpoint() {
let (db, _path) = get_database();
let conn1 = db.connect().unwrap();
let conn2 = db.connect().unwrap();
conn1
.execute("create table test(id integer primary key, value text)")
.unwrap();
bulk_inserts(&conn1, 5, 5);
// start a write transaction
{
let pager = conn2.pager.borrow_mut();
let mut wal = pager.wal.borrow_mut();
assert!(matches!(wal.begin_write_tx().unwrap(), LimboResult::Ok));
}
// should fail because writer lock is held
let result = {
let pager = conn1.pager.borrow();
let mut wal = pager.wal.borrow_mut();
wal.checkpoint(&pager, Rc::new(RefCell::new(0)), CheckpointMode::Restart)
};
assert!(
matches!(result, Err(LimboError::Busy)),
"Restart checkpoint should fail when write lock is held"
);
// release write lock
conn2.pager.borrow().wal.borrow().end_write_tx().unwrap();
// now restart should succeed
let result = {
let pager = conn1.pager.borrow();
let mut wal = pager.wal.borrow_mut();
run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Restart)
};
assert!(result.everything_backfilled());
}
}

View File

@@ -1109,15 +1109,14 @@ pub unsafe extern "C" fn sqlite3_wal_checkpoint_v2(
db: *mut sqlite3,
_db_name: *const ffi::c_char,
mode: ffi::c_int,
_log_size: *mut ffi::c_int,
_checkpoint_count: *mut ffi::c_int,
log_size: *mut ffi::c_int,
checkpoint_count: *mut ffi::c_int,
) -> ffi::c_int {
if db.is_null() {
return SQLITE_MISUSE;
}
let db: &mut sqlite3 = &mut *db;
let db = db.inner.lock().unwrap();
// TODO: Checkpointing modes and reporting back log size and checkpoint count to caller.
let chkptmode = match mode {
SQLITE_CHECKPOINT_PASSIVE => CheckpointMode::Passive,
SQLITE_CHECKPOINT_RESTART => CheckpointMode::Restart,
@@ -1125,10 +1124,25 @@ pub unsafe extern "C" fn sqlite3_wal_checkpoint_v2(
SQLITE_CHECKPOINT_FULL => CheckpointMode::Full,
_ => return SQLITE_MISUSE, // Unsupported mode
};
if db.conn.checkpoint(chkptmode).is_err() {
return SQLITE_ERROR;
match db.conn.checkpoint(chkptmode) {
Ok(res) => {
if !log_size.is_null() {
(*log_size) = res.num_wal_frames as ffi::c_int;
}
if !checkpoint_count.is_null() {
(*checkpoint_count) = res.num_checkpointed_frames as ffi::c_int;
}
SQLITE_OK
}
Err(e) => {
println!("Checkpoint error: {e}");
if matches!(e, turso_core::LimboError::Busy) {
SQLITE_BUSY
} else {
SQLITE_ERROR
}
}
}
SQLITE_OK
}
/// Get the number of frames in the WAL.