mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 17:05:36 +01:00
Merge 'add libsql_disable_wal_checkpoint' from Pedro Muniz
Closes #1894 Closes #1920
This commit is contained in:
16
core/lib.rs
16
core/lib.rs
@@ -118,7 +118,6 @@ pub struct Database {
|
||||
maybe_shared_wal: RwLock<Option<Arc<UnsafeCell<WalFileShared>>>>,
|
||||
is_empty: Arc<AtomicUsize>,
|
||||
init_lock: Arc<Mutex<()>>,
|
||||
|
||||
open_flags: OpenFlags,
|
||||
}
|
||||
|
||||
@@ -278,6 +277,7 @@ impl Database {
|
||||
_shared_cache: false,
|
||||
cache_size: Cell::new(default_cache_size),
|
||||
readonly: Cell::new(false),
|
||||
wal_checkpoint_disabled: Cell::new(false),
|
||||
});
|
||||
if let Err(e) = conn.register_builtins() {
|
||||
return Err(LimboError::ExtensionError(e));
|
||||
@@ -329,6 +329,7 @@ impl Database {
|
||||
_shared_cache: false,
|
||||
cache_size: Cell::new(default_cache_size),
|
||||
readonly: Cell::new(false),
|
||||
wal_checkpoint_disabled: Cell::new(false),
|
||||
});
|
||||
|
||||
if let Err(e) = conn.register_builtins() {
|
||||
@@ -448,6 +449,7 @@ pub struct Connection {
|
||||
_shared_cache: bool,
|
||||
cache_size: Cell<i32>,
|
||||
readonly: Cell<bool>,
|
||||
wal_checkpoint_disabled: Cell<bool>,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
@@ -674,7 +676,7 @@ impl Connection {
|
||||
/// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
|
||||
/// the database file and then fsync the database file.
|
||||
pub fn cacheflush(&self) -> Result<PagerCacheflushStatus> {
|
||||
self.pager.cacheflush()
|
||||
self.pager.cacheflush(self.wal_checkpoint_disabled.get())
|
||||
}
|
||||
|
||||
pub fn clear_page_cache(&self) -> Result<()> {
|
||||
@@ -683,12 +685,18 @@ impl Connection {
|
||||
}
|
||||
|
||||
pub fn checkpoint(&self) -> Result<CheckpointResult> {
|
||||
self.pager.wal_checkpoint()
|
||||
self.pager
|
||||
.wal_checkpoint(self.wal_checkpoint_disabled.get())
|
||||
}
|
||||
|
||||
/// Close a connection and checkpoint.
|
||||
pub fn close(&self) -> Result<()> {
|
||||
self.pager.checkpoint_shutdown()
|
||||
self.pager
|
||||
.checkpoint_shutdown(self.wal_checkpoint_disabled.get())
|
||||
}
|
||||
|
||||
pub fn wal_disable_checkpoint(&self) {
|
||||
self.wal_checkpoint_disabled.set(true);
|
||||
}
|
||||
|
||||
pub fn last_insert_rowid(&self) -> i64 {
|
||||
|
||||
@@ -7057,7 +7057,7 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
loop {
|
||||
match pager.end_tx(false, false, &conn).unwrap() {
|
||||
match pager.end_tx(false, false, &conn, false).unwrap() {
|
||||
crate::PagerCacheflushStatus::Done(_) => break,
|
||||
crate::PagerCacheflushStatus::IO => {
|
||||
pager.io.run_once().unwrap();
|
||||
@@ -7183,7 +7183,7 @@ mod tests {
|
||||
.unwrap();
|
||||
cursor.move_to_root();
|
||||
loop {
|
||||
match pager.end_tx(false, false, &conn).unwrap() {
|
||||
match pager.end_tx(false, false, &conn, false).unwrap() {
|
||||
crate::PagerCacheflushStatus::Done(_) => break,
|
||||
crate::PagerCacheflushStatus::IO => {
|
||||
pager.io.run_once().unwrap();
|
||||
|
||||
@@ -636,6 +636,7 @@ impl Pager {
|
||||
rollback: bool,
|
||||
change_schema: bool,
|
||||
connection: &Connection,
|
||||
wal_checkpoint_disabled: bool,
|
||||
) -> Result<PagerCacheflushStatus> {
|
||||
tracing::trace!("end_tx(rollback={})", rollback);
|
||||
if rollback {
|
||||
@@ -643,7 +644,7 @@ impl Pager {
|
||||
self.wal.borrow().end_read_tx()?;
|
||||
return Ok(PagerCacheflushStatus::Done(PagerCacheflushResult::Rollback));
|
||||
}
|
||||
let cacheflush_status = self.cacheflush()?;
|
||||
let cacheflush_status = self.cacheflush(wal_checkpoint_disabled)?;
|
||||
match cacheflush_status {
|
||||
PagerCacheflushStatus::IO => Ok(PagerCacheflushStatus::IO),
|
||||
PagerCacheflushStatus::Done(_) => {
|
||||
@@ -758,7 +759,7 @@ impl Pager {
|
||||
/// In the base case, it will write the dirty pages to the WAL and then fsync the WAL.
|
||||
/// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
|
||||
/// the database file and then fsync the database file.
|
||||
pub fn cacheflush(&self) -> Result<PagerCacheflushStatus> {
|
||||
pub fn cacheflush(&self, wal_checkpoint_disabled: bool) -> Result<PagerCacheflushStatus> {
|
||||
let mut checkpoint_result = CheckpointResult::default();
|
||||
loop {
|
||||
let state = self.flush_info.borrow().state;
|
||||
@@ -804,7 +805,7 @@ impl Pager {
|
||||
return Ok(PagerCacheflushStatus::IO);
|
||||
}
|
||||
|
||||
if !self.wal.borrow().should_checkpoint() {
|
||||
if wal_checkpoint_disabled || !self.wal.borrow().should_checkpoint() {
|
||||
self.flush_info.borrow_mut().state = FlushState::Start;
|
||||
return Ok(PagerCacheflushStatus::Done(
|
||||
PagerCacheflushResult::WalWritten,
|
||||
@@ -912,7 +913,7 @@ impl Pager {
|
||||
.expect("Failed to clear page cache");
|
||||
}
|
||||
|
||||
pub fn checkpoint_shutdown(&self) -> Result<()> {
|
||||
pub fn checkpoint_shutdown(&self, wal_checkpoint_disabled: bool) -> Result<()> {
|
||||
let mut attempts = 0;
|
||||
{
|
||||
let mut wal = self.wal.borrow_mut();
|
||||
@@ -927,11 +928,17 @@ impl Pager {
|
||||
attempts += 1;
|
||||
}
|
||||
}
|
||||
self.wal_checkpoint()?;
|
||||
self.wal_checkpoint(wal_checkpoint_disabled)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn wal_checkpoint(&self) -> Result<CheckpointResult> {
|
||||
pub fn wal_checkpoint(&self, wal_checkpoint_disabled: bool) -> Result<CheckpointResult> {
|
||||
if wal_checkpoint_disabled {
|
||||
return Ok(CheckpointResult {
|
||||
num_wal_frames: 0,
|
||||
num_checkpointed_frames: 0,
|
||||
});
|
||||
}
|
||||
let checkpoint_result: CheckpointResult;
|
||||
loop {
|
||||
match self.wal.borrow_mut().checkpoint(
|
||||
|
||||
@@ -469,7 +469,12 @@ impl Program {
|
||||
rollback: bool,
|
||||
change_schema: bool,
|
||||
) -> Result<StepResult> {
|
||||
let cacheflush_status = pager.end_tx(rollback, change_schema, connection)?;
|
||||
let cacheflush_status = pager.end_tx(
|
||||
rollback,
|
||||
change_schema,
|
||||
connection,
|
||||
connection.wal_checkpoint_disabled.get(),
|
||||
)?;
|
||||
match cacheflush_status {
|
||||
PagerCacheflushStatus::Done(_) => {
|
||||
if self.change_cnt_on {
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
# SQLite3 Implementation for Limbo
|
||||
# SQLite3 Implementation for Turso
|
||||
|
||||
This directory contains a Rust implementation of the SQLite3 C API. The implementation serves as a compatibility layer between SQLite's C API and Limbo's native Rust database implementation.
|
||||
This directory contains a Rust implementation of the SQLite3 C API. The implementation serves as a compatibility layer between SQLite's C API and Turso's native Rust database implementation.
|
||||
|
||||
## Purpose
|
||||
|
||||
This implementation provides SQLite3 API compatibility for Limbo, allowing existing applications that use SQLite to work with Limbo without modification. The code:
|
||||
This implementation provides SQLite3 API compatibility for Turso, allowing existing applications that use SQLite to work with Turso without modification. The code:
|
||||
|
||||
1. Implements the SQLite3 C API functions in Rust
|
||||
2. Translates between C and Rust data structures
|
||||
3. Maps SQLite operations to equivalent Limbo operations
|
||||
3. Maps SQLite operations to equivalent Turso operations
|
||||
4. Maintains API compatibility with SQLite version 3.42.0
|
||||
|
||||
## Testing Strategy
|
||||
|
||||
@@ -561,8 +561,15 @@ pub unsafe extern "C" fn sqlite3_column_name(
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn sqlite3_column_int64(_stmt: *mut sqlite3_stmt, _idx: ffi::c_int) -> i64 {
|
||||
stub!();
|
||||
pub unsafe extern "C" fn sqlite3_column_int64(stmt: *mut sqlite3_stmt, idx: ffi::c_int) -> i64 {
|
||||
// Attempt to convert idx to usize
|
||||
let idx = idx.try_into().unwrap();
|
||||
let stmt = &mut *stmt;
|
||||
let row = stmt
|
||||
.stmt
|
||||
.row()
|
||||
.expect("Function should only be called after `SQLITE_ROW`");
|
||||
row.get(idx).unwrap()
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
@@ -1188,6 +1195,23 @@ pub unsafe extern "C" fn libsql_wal_get_frame(
|
||||
}
|
||||
}
|
||||
|
||||
/// Disable WAL checkpointing.
|
||||
///
|
||||
/// Note: This function disables WAL checkpointing entirely for the connection. This is different from
|
||||
/// sqlite3_wal_autocheckpoint() which only disables automatic checkpoints
|
||||
/// for the current connection, but still allows checkpointing when the
|
||||
/// connection is closed.
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn libsql_wal_disable_checkpoint(db: *mut sqlite3) -> ffi::c_int {
|
||||
if db.is_null() {
|
||||
return SQLITE_MISUSE;
|
||||
}
|
||||
let db: &mut sqlite3 = &mut *db;
|
||||
let db = db.inner.lock().unwrap();
|
||||
db.conn.wal_disable_checkpoint();
|
||||
SQLITE_OK
|
||||
}
|
||||
|
||||
fn sqlite3_safety_check_sick_or_ok(db: &sqlite3Inner) -> bool {
|
||||
match db.e_open_state {
|
||||
SQLITE_STATE_SICK | SQLITE_STATE_OPEN | SQLITE_STATE_BUSY => true,
|
||||
|
||||
@@ -37,6 +37,7 @@ extern "C" {
|
||||
log_size: *mut i32,
|
||||
checkpoint_count: *mut i32,
|
||||
) -> i32;
|
||||
fn sqlite3_column_int64(stmt: *mut sqlite3_stmt, idx: i32) -> i64;
|
||||
fn libsql_wal_frame_count(db: *mut sqlite3, p_frame_count: *mut u32) -> i32;
|
||||
fn libsql_wal_get_frame(
|
||||
db: *mut sqlite3,
|
||||
@@ -44,10 +45,12 @@ extern "C" {
|
||||
p_frame: *mut u8,
|
||||
frame_len: u32,
|
||||
) -> i32;
|
||||
fn libsql_wal_disable_checkpoint(db: *mut sqlite3) -> i32;
|
||||
}
|
||||
|
||||
const SQLITE_OK: i32 = 0;
|
||||
const SQLITE_CANTOPEN: i32 = 14;
|
||||
const SQLITE_ROW: i32 = 100;
|
||||
const SQLITE_DONE: i32 = 101;
|
||||
|
||||
const SQLITE_CHECKPOINT_PASSIVE: i32 = 0;
|
||||
@@ -301,5 +304,212 @@ mod tests {
|
||||
assert_eq!(sqlite3_close(db), SQLITE_OK);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_disable_wal_checkpoint() {
|
||||
let temp_file = tempfile::NamedTempFile::with_suffix(".db").unwrap();
|
||||
unsafe {
|
||||
let mut db = ptr::null_mut();
|
||||
let path = temp_file.path();
|
||||
let c_path = std::ffi::CString::new(path.to_str().unwrap()).unwrap();
|
||||
assert_eq!(sqlite3_open(c_path.as_ptr(), &mut db), SQLITE_OK);
|
||||
// Create a table and insert a row.
|
||||
let mut stmt = ptr::null_mut();
|
||||
assert_eq!(
|
||||
sqlite3_prepare_v2(
|
||||
db,
|
||||
c"CREATE TABLE test (id INTEGER PRIMARY KEY)".as_ptr(),
|
||||
-1,
|
||||
&mut stmt,
|
||||
ptr::null_mut()
|
||||
),
|
||||
SQLITE_OK
|
||||
);
|
||||
assert_eq!(sqlite3_step(stmt), SQLITE_DONE);
|
||||
assert_eq!(sqlite3_finalize(stmt), SQLITE_OK);
|
||||
let mut stmt = ptr::null_mut();
|
||||
assert_eq!(
|
||||
sqlite3_prepare_v2(
|
||||
db,
|
||||
c"INSERT INTO test (id) VALUES (0)".as_ptr(),
|
||||
-1,
|
||||
&mut stmt,
|
||||
ptr::null_mut()
|
||||
),
|
||||
SQLITE_OK
|
||||
);
|
||||
assert_eq!(sqlite3_step(stmt), SQLITE_DONE);
|
||||
assert_eq!(sqlite3_finalize(stmt), SQLITE_OK);
|
||||
|
||||
let mut log_size = 0;
|
||||
let mut checkpoint_count = 0;
|
||||
|
||||
assert_eq!(
|
||||
sqlite3_wal_checkpoint_v2(
|
||||
db,
|
||||
ptr::null(),
|
||||
SQLITE_CHECKPOINT_PASSIVE,
|
||||
&mut log_size,
|
||||
&mut checkpoint_count
|
||||
),
|
||||
SQLITE_OK
|
||||
);
|
||||
}
|
||||
let mut wal_path = temp_file.path().to_path_buf();
|
||||
assert!(wal_path.set_extension("db-wal"));
|
||||
std::fs::remove_file(wal_path.clone()).unwrap();
|
||||
|
||||
{
|
||||
let mut db = ptr::null_mut();
|
||||
unsafe {
|
||||
let path = temp_file.path();
|
||||
let c_path = std::ffi::CString::new(path.to_str().unwrap()).unwrap();
|
||||
assert_eq!(sqlite3_open(c_path.as_ptr(), &mut db), SQLITE_OK);
|
||||
assert_eq!(libsql_wal_disable_checkpoint(db), SQLITE_OK);
|
||||
// Insert at least 1000 rows to go over checkpoint threshold.
|
||||
let mut stmt = ptr::null_mut();
|
||||
for i in 1..2000 {
|
||||
let sql =
|
||||
std::ffi::CString::new(format!("INSERT INTO test (id) VALUES ({})", i))
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
sqlite3_prepare_v2(db, sql.as_ptr(), -1, &mut stmt, ptr::null_mut()),
|
||||
SQLITE_OK
|
||||
);
|
||||
assert_eq!(sqlite3_step(stmt), SQLITE_DONE);
|
||||
assert_eq!(sqlite3_finalize(stmt), SQLITE_OK);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete WAL to ensure that we don't load anything from it
|
||||
std::fs::remove_file(wal_path).unwrap();
|
||||
let mut db = ptr::null_mut();
|
||||
unsafe {
|
||||
let path = temp_file.path();
|
||||
let c_path = std::ffi::CString::new(path.to_str().unwrap()).unwrap();
|
||||
assert_eq!(sqlite3_open(c_path.as_ptr(), &mut db), SQLITE_OK);
|
||||
// Insert at least 1000 rows to go over checkpoint threshold.
|
||||
let mut stmt = ptr::null_mut();
|
||||
assert_eq!(
|
||||
sqlite3_prepare_v2(
|
||||
db,
|
||||
c"SELECT count() FROM test".as_ptr(),
|
||||
-1,
|
||||
&mut stmt,
|
||||
ptr::null_mut()
|
||||
),
|
||||
SQLITE_OK
|
||||
);
|
||||
assert_eq!(sqlite3_step(stmt), SQLITE_ROW);
|
||||
let count = sqlite3_column_int64(stmt, 0);
|
||||
assert_eq!(count, 1);
|
||||
assert_eq!(sqlite3_step(stmt), SQLITE_DONE);
|
||||
assert_eq!(sqlite3_finalize(stmt), SQLITE_OK);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wal_checkpoint() {
|
||||
let temp_file = tempfile::NamedTempFile::with_suffix(".db").unwrap();
|
||||
unsafe {
|
||||
let mut db = ptr::null_mut();
|
||||
let path = temp_file.path();
|
||||
let c_path = std::ffi::CString::new(path.to_str().unwrap()).unwrap();
|
||||
assert_eq!(sqlite3_open(c_path.as_ptr(), &mut db), SQLITE_OK);
|
||||
// Create a table and insert a row.
|
||||
let mut stmt = ptr::null_mut();
|
||||
assert_eq!(
|
||||
sqlite3_prepare_v2(
|
||||
db,
|
||||
c"CREATE TABLE test (id INTEGER PRIMARY KEY)".as_ptr(),
|
||||
-1,
|
||||
&mut stmt,
|
||||
ptr::null_mut()
|
||||
),
|
||||
SQLITE_OK
|
||||
);
|
||||
assert_eq!(sqlite3_step(stmt), SQLITE_DONE);
|
||||
assert_eq!(sqlite3_finalize(stmt), SQLITE_OK);
|
||||
let mut stmt = ptr::null_mut();
|
||||
assert_eq!(
|
||||
sqlite3_prepare_v2(
|
||||
db,
|
||||
c"INSERT INTO test (id) VALUES (0)".as_ptr(),
|
||||
-1,
|
||||
&mut stmt,
|
||||
ptr::null_mut()
|
||||
),
|
||||
SQLITE_OK
|
||||
);
|
||||
assert_eq!(sqlite3_step(stmt), SQLITE_DONE);
|
||||
assert_eq!(sqlite3_finalize(stmt), SQLITE_OK);
|
||||
|
||||
let mut log_size = 0;
|
||||
let mut checkpoint_count = 0;
|
||||
|
||||
assert_eq!(
|
||||
sqlite3_wal_checkpoint_v2(
|
||||
db,
|
||||
ptr::null(),
|
||||
SQLITE_CHECKPOINT_PASSIVE,
|
||||
&mut log_size,
|
||||
&mut checkpoint_count
|
||||
),
|
||||
SQLITE_OK
|
||||
);
|
||||
}
|
||||
let mut wal_path = temp_file.path().to_path_buf();
|
||||
assert!(wal_path.set_extension("db-wal"));
|
||||
std::fs::remove_file(wal_path.clone()).unwrap();
|
||||
|
||||
{
|
||||
let mut db = ptr::null_mut();
|
||||
unsafe {
|
||||
let path = temp_file.path();
|
||||
let c_path = std::ffi::CString::new(path.to_str().unwrap()).unwrap();
|
||||
assert_eq!(sqlite3_open(c_path.as_ptr(), &mut db), SQLITE_OK);
|
||||
// Insert at least 1000 rows to go over checkpoint threshold.
|
||||
let mut stmt = ptr::null_mut();
|
||||
for i in 1..=2000 {
|
||||
let sql =
|
||||
std::ffi::CString::new(format!("INSERT INTO test (id) VALUES ({})", i))
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
sqlite3_prepare_v2(db, sql.as_ptr(), -1, &mut stmt, ptr::null_mut()),
|
||||
SQLITE_OK
|
||||
);
|
||||
assert_eq!(sqlite3_step(stmt), SQLITE_DONE);
|
||||
assert_eq!(sqlite3_finalize(stmt), SQLITE_OK);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete WAL to ensure that we don't load anything from it
|
||||
std::fs::remove_file(wal_path).unwrap();
|
||||
let mut db = ptr::null_mut();
|
||||
unsafe {
|
||||
let path = temp_file.path();
|
||||
let c_path = std::ffi::CString::new(path.to_str().unwrap()).unwrap();
|
||||
assert_eq!(sqlite3_open(c_path.as_ptr(), &mut db), SQLITE_OK);
|
||||
// Insert at least 1000 rows to go over checkpoint threshold.
|
||||
let mut stmt = ptr::null_mut();
|
||||
assert_eq!(
|
||||
sqlite3_prepare_v2(
|
||||
db,
|
||||
c"SELECT count() FROM test".as_ptr(),
|
||||
-1,
|
||||
&mut stmt,
|
||||
ptr::null_mut()
|
||||
),
|
||||
SQLITE_OK
|
||||
);
|
||||
assert_eq!(sqlite3_step(stmt), SQLITE_ROW);
|
||||
let count = sqlite3_column_int64(stmt, 0);
|
||||
assert_eq!(count, 2000);
|
||||
assert_eq!(sqlite3_step(stmt), SQLITE_DONE);
|
||||
assert_eq!(sqlite3_finalize(stmt), SQLITE_OK);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user