From db005c81a067b564ca924ecf716879a149f270ab Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Tue, 1 Jul 2025 18:07:11 -0300 Subject: [PATCH 1/4] add option to disable wal checkpoint --- core/lib.rs | 16 ++++++++++++---- core/storage/btree.rs | 4 ++-- core/storage/pager.rs | 19 +++++++++++++------ core/vdbe/mod.rs | 3 ++- sqlite3/README.md | 8 ++++---- sqlite3/src/lib.rs | 11 +++++++++++ 6 files changed, 44 insertions(+), 17 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 615c1f0ce..d1e867cbf 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -118,8 +118,8 @@ pub struct Database { maybe_shared_wal: RwLock>>>, is_empty: Arc, init_lock: Arc>, - open_flags: OpenFlags, + wal_checkpoint_disabled: Cell, } unsafe impl Send for Database {} @@ -211,6 +211,7 @@ impl Database { open_flags: flags, is_empty: Arc::new(AtomicUsize::new(is_empty)), init_lock: Arc::new(Mutex::new(())), + wal_checkpoint_disabled: Cell::new(false), }; let db = Arc::new(db); @@ -674,7 +675,8 @@ impl Connection { /// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to /// the database file and then fsync the database file. pub fn cacheflush(&self) -> Result { - self.pager.cacheflush() + self.pager + .cacheflush(self._db.wal_checkpoint_disabled.get()) } pub fn clear_page_cache(&self) -> Result<()> { @@ -683,12 +685,18 @@ impl Connection { } pub fn checkpoint(&self) -> Result { - self.pager.wal_checkpoint() + self.pager + .wal_checkpoint(self._db.wal_checkpoint_disabled.get()) } /// Close a connection and checkpoint. pub fn close(&self) -> Result<()> { - self.pager.checkpoint_shutdown() + self.pager + .checkpoint_shutdown(self._db.wal_checkpoint_disabled.get()) + } + + pub fn wal_disable_checkpoint(&self) { + self._db.wal_checkpoint_disabled.set(true); } pub fn last_insert_rowid(&self) -> i64 { diff --git a/core/storage/btree.rs b/core/storage/btree.rs index daffe8182..c650d791d 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -7057,7 +7057,7 @@ mod tests { ) .unwrap(); loop { - match pager.end_tx(false, false, &conn).unwrap() { + match pager.end_tx(false, false, &conn, false).unwrap() { crate::PagerCacheflushStatus::Done(_) => break, crate::PagerCacheflushStatus::IO => { pager.io.run_once().unwrap(); @@ -7183,7 +7183,7 @@ mod tests { .unwrap(); cursor.move_to_root(); loop { - match pager.end_tx(false, false, &conn).unwrap() { + match pager.end_tx(false, false, &conn, false).unwrap() { crate::PagerCacheflushStatus::Done(_) => break, crate::PagerCacheflushStatus::IO => { pager.io.run_once().unwrap(); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 802d11f51..008c154e9 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -636,6 +636,7 @@ impl Pager { rollback: bool, change_schema: bool, connection: &Connection, + wal_checkpoint_disabled: bool, ) -> Result { tracing::trace!("end_tx(rollback={})", rollback); if rollback { @@ -643,7 +644,7 @@ impl Pager { self.wal.borrow().end_read_tx()?; return Ok(PagerCacheflushStatus::Done(PagerCacheflushResult::Rollback)); } - let cacheflush_status = self.cacheflush()?; + let cacheflush_status = self.cacheflush(wal_checkpoint_disabled)?; match cacheflush_status { PagerCacheflushStatus::IO => Ok(PagerCacheflushStatus::IO), PagerCacheflushStatus::Done(_) => { @@ -758,7 +759,7 @@ impl Pager { /// In the base case, it will write the dirty pages to the WAL and then fsync the WAL. /// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to /// the database file and then fsync the database file. - pub fn cacheflush(&self) -> Result { + pub fn cacheflush(&self, wal_checkpoint_disabled: bool) -> Result { let mut checkpoint_result = CheckpointResult::default(); loop { let state = self.flush_info.borrow().state; @@ -804,7 +805,7 @@ impl Pager { return Ok(PagerCacheflushStatus::IO); } - if !self.wal.borrow().should_checkpoint() { + if wal_checkpoint_disabled || !self.wal.borrow().should_checkpoint() { self.flush_info.borrow_mut().state = FlushState::Start; return Ok(PagerCacheflushStatus::Done( PagerCacheflushResult::WalWritten, @@ -912,7 +913,7 @@ impl Pager { .expect("Failed to clear page cache"); } - pub fn checkpoint_shutdown(&self) -> Result<()> { + pub fn checkpoint_shutdown(&self, wal_checkpoint_disabled: bool) -> Result<()> { let mut attempts = 0; { let mut wal = self.wal.borrow_mut(); @@ -927,11 +928,17 @@ impl Pager { attempts += 1; } } - self.wal_checkpoint()?; + self.wal_checkpoint(wal_checkpoint_disabled)?; Ok(()) } - pub fn wal_checkpoint(&self) -> Result { + pub fn wal_checkpoint(&self, wal_checkpoint_disabled: bool) -> Result { + if wal_checkpoint_disabled { + return Ok(CheckpointResult { + num_wal_frames: 0, + num_checkpointed_frames: 0, + }); + } let checkpoint_result: CheckpointResult; loop { match self.wal.borrow_mut().checkpoint( diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 502164cbf..383c4a82f 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -469,7 +469,8 @@ impl Program { rollback: bool, change_schema: bool, ) -> Result { - let cacheflush_status = pager.end_tx(rollback, change_schema, connection)?; + let cacheflush_status = + pager.end_tx(rollback, change_schema, connection, connection._db.wal_checkpoint_disabled.get())?; match cacheflush_status { PagerCacheflushStatus::Done(_) => { if self.change_cnt_on { diff --git a/sqlite3/README.md b/sqlite3/README.md index e58ea8c4a..b5b044b02 100644 --- a/sqlite3/README.md +++ b/sqlite3/README.md @@ -1,14 +1,14 @@ -# SQLite3 Implementation for Limbo +# SQLite3 Implementation for Turso -This directory contains a Rust implementation of the SQLite3 C API. The implementation serves as a compatibility layer between SQLite's C API and Limbo's native Rust database implementation. +This directory contains a Rust implementation of the SQLite3 C API. The implementation serves as a compatibility layer between SQLite's C API and Turso's native Rust database implementation. ## Purpose -This implementation provides SQLite3 API compatibility for Limbo, allowing existing applications that use SQLite to work with Limbo without modification. The code: +This implementation provides SQLite3 API compatibility for Turso, allowing existing applications that use SQLite to work with Turso without modification. The code: 1. Implements the SQLite3 C API functions in Rust 2. Translates between C and Rust data structures -3. Maps SQLite operations to equivalent Limbo operations +3. Maps SQLite operations to equivalent Turso operations 4. Maintains API compatibility with SQLite version 3.42.0 ## Testing Strategy diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index 331eaa5cd..5fcbfbd43 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -1188,6 +1188,17 @@ pub unsafe extern "C" fn libsql_wal_get_frame( } } +#[no_mangle] +pub unsafe extern "C" fn libsql_wal_disable_checkpoint(db: *mut sqlite3) -> ffi::c_int { + if db.is_null() { + return SQLITE_MISUSE; + } + let db: &mut sqlite3 = &mut *db; + let db = db.inner.lock().unwrap(); + db.conn.wal_disable_checkpoint(); + SQLITE_OK +} + fn sqlite3_safety_check_sick_or_ok(db: &sqlite3Inner) -> bool { match db.e_open_state { SQLITE_STATE_SICK | SQLITE_STATE_OPEN | SQLITE_STATE_BUSY => true, From 44b8275b2693f500dd883beb3e9a348f2e70b389 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Tue, 1 Jul 2025 23:05:32 -0300 Subject: [PATCH 2/4] add test + implement sqlite3_column_int64 for test --- sqlite3/src/lib.rs | 18 +++++- sqlite3/tests/compat/mod.rs | 106 ++++++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 2 deletions(-) diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index 5fcbfbd43..0d6e4d36b 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -561,8 +561,15 @@ pub unsafe extern "C" fn sqlite3_column_name( } #[no_mangle] -pub unsafe extern "C" fn sqlite3_column_int64(_stmt: *mut sqlite3_stmt, _idx: ffi::c_int) -> i64 { - stub!(); +pub unsafe extern "C" fn sqlite3_column_int64(stmt: *mut sqlite3_stmt, idx: ffi::c_int) -> i64 { + // Attempt to convert idx to usize + let idx = idx.try_into().unwrap(); + let stmt = &mut *stmt; + let row = stmt + .stmt + .row() + .expect("Function should only be called after `SQLITE_ROW`"); + row.get(idx).unwrap() } #[no_mangle] @@ -1188,6 +1195,13 @@ pub unsafe extern "C" fn libsql_wal_get_frame( } } +/// Disable WAL checkpointing. +/// +/// Note: This function disables WAL checkpointing entirely, including when +/// the last database connection is closed. This is different from +/// sqlite3_wal_autocheckpoint() which only disables automatic checkpoints +/// for the current connection, but still allows checkpointing when the +/// connection is closed. #[no_mangle] pub unsafe extern "C" fn libsql_wal_disable_checkpoint(db: *mut sqlite3) -> ffi::c_int { if db.is_null() { diff --git a/sqlite3/tests/compat/mod.rs b/sqlite3/tests/compat/mod.rs index e97bf7dad..10691071f 100644 --- a/sqlite3/tests/compat/mod.rs +++ b/sqlite3/tests/compat/mod.rs @@ -37,6 +37,7 @@ extern "C" { log_size: *mut i32, checkpoint_count: *mut i32, ) -> i32; + fn sqlite3_column_int64(stmt: *mut sqlite3_stmt, idx: i32) -> i64; fn libsql_wal_frame_count(db: *mut sqlite3, p_frame_count: *mut u32) -> i32; fn libsql_wal_get_frame( db: *mut sqlite3, @@ -44,10 +45,12 @@ extern "C" { p_frame: *mut u8, frame_len: u32, ) -> i32; + fn libsql_wal_disable_checkpoint(db: *mut sqlite3) -> i32; } const SQLITE_OK: i32 = 0; const SQLITE_CANTOPEN: i32 = 14; +const SQLITE_ROW: i32 = 100; const SQLITE_DONE: i32 = 101; const SQLITE_CHECKPOINT_PASSIVE: i32 = 0; @@ -301,5 +304,108 @@ mod tests { assert_eq!(sqlite3_close(db), SQLITE_OK); } } + + #[test] + fn test_disable_wal_checkpoint() { + let temp_file = tempfile::NamedTempFile::with_suffix(".db").unwrap(); + unsafe { + let mut db = ptr::null_mut(); + let path = temp_file.path(); + let c_path = std::ffi::CString::new(path.to_str().unwrap()).unwrap(); + assert_eq!(sqlite3_open(c_path.as_ptr(), &mut db), SQLITE_OK); + // Create a table and insert a row. + let mut stmt = ptr::null_mut(); + assert_eq!( + sqlite3_prepare_v2( + db, + c"CREATE TABLE test (id INTEGER PRIMARY KEY)".as_ptr(), + -1, + &mut stmt, + ptr::null_mut() + ), + SQLITE_OK + ); + assert_eq!(sqlite3_step(stmt), SQLITE_DONE); + assert_eq!(sqlite3_finalize(stmt), SQLITE_OK); + let mut stmt = ptr::null_mut(); + assert_eq!( + sqlite3_prepare_v2( + db, + c"INSERT INTO test (id) VALUES (0)".as_ptr(), + -1, + &mut stmt, + ptr::null_mut() + ), + SQLITE_OK + ); + assert_eq!(sqlite3_step(stmt), SQLITE_DONE); + assert_eq!(sqlite3_finalize(stmt), SQLITE_OK); + + let mut log_size = 0; + let mut checkpoint_count = 0; + + assert_eq!( + sqlite3_wal_checkpoint_v2( + db, + ptr::null(), + SQLITE_CHECKPOINT_PASSIVE, + &mut log_size, + &mut checkpoint_count + ), + SQLITE_OK + ); + } + let mut wal_path = temp_file.path().to_path_buf(); + assert!(wal_path.set_extension("db-wal")); + std::fs::remove_file(wal_path.clone()).unwrap(); + + { + let mut db = ptr::null_mut(); + unsafe { + let path = temp_file.path(); + let c_path = std::ffi::CString::new(path.to_str().unwrap()).unwrap(); + assert_eq!(sqlite3_open(c_path.as_ptr(), &mut db), SQLITE_OK); + assert_eq!(libsql_wal_disable_checkpoint(db), SQLITE_OK); + // Insert at least 1000 rows to go over checkpoint threshold. + let mut stmt = ptr::null_mut(); + for i in 1..2000 { + let sql = + std::ffi::CString::new(format!("INSERT INTO test (id) VALUES ({})", i)) + .unwrap(); + assert_eq!( + sqlite3_prepare_v2(db, sql.as_ptr(), -1, &mut stmt, ptr::null_mut()), + SQLITE_OK + ); + assert_eq!(sqlite3_step(stmt), SQLITE_DONE); + assert_eq!(sqlite3_finalize(stmt), SQLITE_OK); + } + } + } + + std::fs::remove_file(wal_path).unwrap(); + let mut db = ptr::null_mut(); + unsafe { + let path = temp_file.path(); + let c_path = std::ffi::CString::new(path.to_str().unwrap()).unwrap(); + assert_eq!(sqlite3_open(c_path.as_ptr(), &mut db), SQLITE_OK); + // Insert at least 1000 rows to go over checkpoint threshold. + let mut stmt = ptr::null_mut(); + assert_eq!( + sqlite3_prepare_v2( + db, + c"SELECT count() FROM test".as_ptr(), + -1, + &mut stmt, + ptr::null_mut() + ), + SQLITE_OK + ); + assert_eq!(sqlite3_step(stmt), SQLITE_ROW); + let count = sqlite3_column_int64(stmt, 0); + assert_eq!(count, 1); + assert_eq!(sqlite3_step(stmt), SQLITE_DONE); + assert_eq!(sqlite3_finalize(stmt), SQLITE_OK); + } + } } } From 3861584edc84beaf3714229b7691cfbcc99742fc Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 2 Jul 2025 12:24:41 -0300 Subject: [PATCH 3/4] add test for wal checkpointing correctly --- sqlite3/tests/compat/mod.rs | 104 ++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/sqlite3/tests/compat/mod.rs b/sqlite3/tests/compat/mod.rs index 10691071f..47b4011f0 100644 --- a/sqlite3/tests/compat/mod.rs +++ b/sqlite3/tests/compat/mod.rs @@ -382,6 +382,7 @@ mod tests { } } + // Delete WAL to ensure that we don't load anything from it std::fs::remove_file(wal_path).unwrap(); let mut db = ptr::null_mut(); unsafe { @@ -407,5 +408,108 @@ mod tests { assert_eq!(sqlite3_finalize(stmt), SQLITE_OK); } } + + #[test] + fn test_wal_checkpoint() { + let temp_file = tempfile::NamedTempFile::with_suffix(".db").unwrap(); + unsafe { + let mut db = ptr::null_mut(); + let path = temp_file.path(); + let c_path = std::ffi::CString::new(path.to_str().unwrap()).unwrap(); + assert_eq!(sqlite3_open(c_path.as_ptr(), &mut db), SQLITE_OK); + // Create a table and insert a row. + let mut stmt = ptr::null_mut(); + assert_eq!( + sqlite3_prepare_v2( + db, + c"CREATE TABLE test (id INTEGER PRIMARY KEY)".as_ptr(), + -1, + &mut stmt, + ptr::null_mut() + ), + SQLITE_OK + ); + assert_eq!(sqlite3_step(stmt), SQLITE_DONE); + assert_eq!(sqlite3_finalize(stmt), SQLITE_OK); + let mut stmt = ptr::null_mut(); + assert_eq!( + sqlite3_prepare_v2( + db, + c"INSERT INTO test (id) VALUES (0)".as_ptr(), + -1, + &mut stmt, + ptr::null_mut() + ), + SQLITE_OK + ); + assert_eq!(sqlite3_step(stmt), SQLITE_DONE); + assert_eq!(sqlite3_finalize(stmt), SQLITE_OK); + + let mut log_size = 0; + let mut checkpoint_count = 0; + + assert_eq!( + sqlite3_wal_checkpoint_v2( + db, + ptr::null(), + SQLITE_CHECKPOINT_PASSIVE, + &mut log_size, + &mut checkpoint_count + ), + SQLITE_OK + ); + } + let mut wal_path = temp_file.path().to_path_buf(); + assert!(wal_path.set_extension("db-wal")); + std::fs::remove_file(wal_path.clone()).unwrap(); + + { + let mut db = ptr::null_mut(); + unsafe { + let path = temp_file.path(); + let c_path = std::ffi::CString::new(path.to_str().unwrap()).unwrap(); + assert_eq!(sqlite3_open(c_path.as_ptr(), &mut db), SQLITE_OK); + // Insert at least 1000 rows to go over checkpoint threshold. + let mut stmt = ptr::null_mut(); + for i in 1..=2000 { + let sql = + std::ffi::CString::new(format!("INSERT INTO test (id) VALUES ({})", i)) + .unwrap(); + assert_eq!( + sqlite3_prepare_v2(db, sql.as_ptr(), -1, &mut stmt, ptr::null_mut()), + SQLITE_OK + ); + assert_eq!(sqlite3_step(stmt), SQLITE_DONE); + assert_eq!(sqlite3_finalize(stmt), SQLITE_OK); + } + } + } + + // Delete WAL to ensure that we don't load anything from it + std::fs::remove_file(wal_path).unwrap(); + let mut db = ptr::null_mut(); + unsafe { + let path = temp_file.path(); + let c_path = std::ffi::CString::new(path.to_str().unwrap()).unwrap(); + assert_eq!(sqlite3_open(c_path.as_ptr(), &mut db), SQLITE_OK); + // Insert at least 1000 rows to go over checkpoint threshold. + let mut stmt = ptr::null_mut(); + assert_eq!( + sqlite3_prepare_v2( + db, + c"SELECT count() FROM test".as_ptr(), + -1, + &mut stmt, + ptr::null_mut() + ), + SQLITE_OK + ); + assert_eq!(sqlite3_step(stmt), SQLITE_ROW); + let count = sqlite3_column_int64(stmt, 0); + assert_eq!(count, 2000); + assert_eq!(sqlite3_step(stmt), SQLITE_DONE); + assert_eq!(sqlite3_finalize(stmt), SQLITE_OK); + } + } } } From 56d87cb916d483dc9c2e2ce722ce471c7f730260 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 2 Jul 2025 14:11:31 -0300 Subject: [PATCH 4/4] move disable behavior to connection instead of checkpoint --- core/lib.rs | 14 +++++++------- core/vdbe/mod.rs | 8 ++++++-- sqlite3/src/lib.rs | 3 +-- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index d1e867cbf..4067aac15 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -119,7 +119,6 @@ pub struct Database { is_empty: Arc, init_lock: Arc>, open_flags: OpenFlags, - wal_checkpoint_disabled: Cell, } unsafe impl Send for Database {} @@ -211,7 +210,6 @@ impl Database { open_flags: flags, is_empty: Arc::new(AtomicUsize::new(is_empty)), init_lock: Arc::new(Mutex::new(())), - wal_checkpoint_disabled: Cell::new(false), }; let db = Arc::new(db); @@ -279,6 +277,7 @@ impl Database { _shared_cache: false, cache_size: Cell::new(default_cache_size), readonly: Cell::new(false), + wal_checkpoint_disabled: Cell::new(false), }); if let Err(e) = conn.register_builtins() { return Err(LimboError::ExtensionError(e)); @@ -330,6 +329,7 @@ impl Database { _shared_cache: false, cache_size: Cell::new(default_cache_size), readonly: Cell::new(false), + wal_checkpoint_disabled: Cell::new(false), }); if let Err(e) = conn.register_builtins() { @@ -449,6 +449,7 @@ pub struct Connection { _shared_cache: bool, cache_size: Cell, readonly: Cell, + wal_checkpoint_disabled: Cell, } impl Connection { @@ -675,8 +676,7 @@ impl Connection { /// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to /// the database file and then fsync the database file. pub fn cacheflush(&self) -> Result { - self.pager - .cacheflush(self._db.wal_checkpoint_disabled.get()) + self.pager.cacheflush(self.wal_checkpoint_disabled.get()) } pub fn clear_page_cache(&self) -> Result<()> { @@ -686,17 +686,17 @@ impl Connection { pub fn checkpoint(&self) -> Result { self.pager - .wal_checkpoint(self._db.wal_checkpoint_disabled.get()) + .wal_checkpoint(self.wal_checkpoint_disabled.get()) } /// Close a connection and checkpoint. pub fn close(&self) -> Result<()> { self.pager - .checkpoint_shutdown(self._db.wal_checkpoint_disabled.get()) + .checkpoint_shutdown(self.wal_checkpoint_disabled.get()) } pub fn wal_disable_checkpoint(&self) { - self._db.wal_checkpoint_disabled.set(true); + self.wal_checkpoint_disabled.set(true); } pub fn last_insert_rowid(&self) -> i64 { diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 383c4a82f..3663afa6f 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -469,8 +469,12 @@ impl Program { rollback: bool, change_schema: bool, ) -> Result { - let cacheflush_status = - pager.end_tx(rollback, change_schema, connection, connection._db.wal_checkpoint_disabled.get())?; + let cacheflush_status = pager.end_tx( + rollback, + change_schema, + connection, + connection.wal_checkpoint_disabled.get(), + )?; match cacheflush_status { PagerCacheflushStatus::Done(_) => { if self.change_cnt_on { diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index 0d6e4d36b..240e3acbd 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -1197,8 +1197,7 @@ pub unsafe extern "C" fn libsql_wal_get_frame( /// Disable WAL checkpointing. /// -/// Note: This function disables WAL checkpointing entirely, including when -/// the last database connection is closed. This is different from +/// Note: This function disables WAL checkpointing entirely for the connection. This is different from /// sqlite3_wal_autocheckpoint() which only disables automatic checkpoints /// for the current connection, but still allows checkpointing when the /// connection is closed.