From 09b18f6b6efcbbe4388dd0c4cce9de9c116f0187 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 28 Jul 2025 17:06:10 +0400 Subject: [PATCH 1/8] add WAL API methods to the rust bindings and extend result of wal_insert_frame method --- bindings/rust/src/lib.rs | 56 ++++++++++++++++++++++++++++++++++++- bindings/rust/src/params.rs | 2 ++ core/lib.rs | 9 ++++-- core/storage/pager.rs | 9 ++++-- 4 files changed, 69 insertions(+), 7 deletions(-) diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index a3ef69b96..19b84ea3d 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -37,11 +37,12 @@ pub mod transaction; pub mod value; use transaction::TransactionBehavior; +use turso_core::types::WalInsertInfo; pub use value::Value; pub use params::params_from_iter; +pub use params::IntoParams; -use crate::params::*; use std::fmt::Debug; use std::num::NonZero; use std::sync::{Arc, Mutex}; @@ -54,6 +55,8 @@ pub enum Error { MutexError(String), #[error("SQL execution failure: `{0}`")] SqlExecutionFailure(String), + #[error("WAL operation error: `{0}`")] + WalOperationError(String), } impl From for Error { @@ -170,6 +173,51 @@ impl Connection { stmt.execute(params).await } + pub fn wal_frame_count(&self) -> Result { + let conn = self + .inner + .lock() + .map_err(|e| Error::MutexError(e.to_string()))?; + conn.wal_frame_count() + .map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {}", e))) + } + + pub fn wal_insert_begin(&self) -> Result<()> { + let conn = self + .inner + .lock() + .map_err(|e| Error::MutexError(e.to_string()))?; + conn.wal_insert_begin() + .map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {}", e))) + } + + pub fn wal_insert_end(&self) -> Result<()> { + let conn = self + .inner + .lock() + .map_err(|e| Error::MutexError(e.to_string()))?; + conn.wal_insert_end() + .map_err(|e| Error::WalOperationError(format!("wal_insert_end failed: {}", e))) + } + + pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result { + let conn = self + .inner + .lock() + .map_err(|e| Error::MutexError(e.to_string()))?; + conn.wal_insert_frame(frame_no, frame) + .map_err(|e| Error::WalOperationError(format!("wal_insert_frame failed: {}", e))) + } + + pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<()> { + let conn = self + .inner + .lock() + .map_err(|e| Error::MutexError(e.to_string()))?; + conn.wal_get_frame(frame_no, frame) + .map_err(|e| Error::WalOperationError(format!("wal_insert_frame failed: {}", e))) + } + /// Prepare a SQL statement for later execution. pub async fn prepare(&self, sql: &str) -> Result { let conn = self @@ -351,6 +399,12 @@ impl Statement { cols } + + /// Reset internal statement state after previous execution so it can be reused again + pub fn reset(&self) { + let mut stmt = self.inner.lock().unwrap(); + stmt.reset(); + } } /// Column information. diff --git a/bindings/rust/src/params.rs b/bindings/rust/src/params.rs index 6cab3e1a5..b28e34635 100644 --- a/bindings/rust/src/params.rs +++ b/bindings/rust/src/params.rs @@ -217,6 +217,7 @@ macro_rules! named_tuple_into_params { } } +named_tuple_into_params!(1: (0 A)); named_tuple_into_params!(2: (0 A), (1 B)); named_tuple_into_params!(3: (0 A), (1 B), (2 C)); named_tuple_into_params!(4: (0 A), (1 B), (2 C), (3 D)); @@ -233,6 +234,7 @@ named_tuple_into_params!(14: (0 A), (1 B), (2 C), (3 D), (4 E), (5 F), (6 G), (7 named_tuple_into_params!(15: (0 A), (1 B), (2 C), (3 D), (4 E), (5 F), (6 G), (7 H), (8 I), (9 J), (10 K), (11 L), (12 M), (13 N), (14 O)); named_tuple_into_params!(16: (0 A), (1 B), (2 C), (3 D), (4 E), (5 F), (6 G), (7 H), (8 I), (9 J), (10 K), (11 L), (12 M), (13 N), (14 O), (15 P)); +tuple_into_params!(1: (0 A)); tuple_into_params!(2: (0 A), (1 B)); tuple_into_params!(3: (0 A), (1 B), (2 C)); tuple_into_params!(4: (0 A), (1 B), (2 C), (3 D)); diff --git a/core/lib.rs b/core/lib.rs index d1e17b2f3..37bdf257f 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -47,6 +47,8 @@ use crate::storage::{header_accessor, wal::DummyWAL}; use crate::translate::optimizer::optimize_plan; use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME; #[cfg(feature = "fs")] +use crate::types::WalInsertInfo; +#[cfg(feature = "fs")] use crate::util::{IOExt, OpenMode, OpenOptions}; use crate::vtab::VirtualTable; use core::str; @@ -1037,15 +1039,16 @@ impl Connection { } #[cfg(feature = "fs")] - pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result> { - self.pager.borrow().wal_get_frame(frame_no, frame) + pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<()> { + let c = self.pager.borrow().wal_get_frame(frame_no, frame)?; + self._db.io.wait_for_completion(c) } /// Insert `frame` (header included) at the position `frame_no` in the WAL /// If WAL already has frame at that position - turso-db will compare content of the page and either report conflict or return OK /// If attempt to write frame at the position `frame_no` will create gap in the WAL - method will return error #[cfg(feature = "fs")] - pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<()> { + pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result { self.pager.borrow().wal_insert_frame(frame_no, frame) } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 17f8e7d61..fb39932a9 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -7,7 +7,7 @@ use crate::storage::sqlite3_ondisk::{ self, parse_wal_frame_header, DatabaseHeader, PageContent, PageType, }; use crate::storage::wal::{CheckpointResult, Wal}; -use crate::types::IOResult; +use crate::types::{IOResult, WalInsertInfo}; use crate::util::IOExt as _; use crate::{return_if_io, Completion}; use crate::{turso_assert, Buffer, Connection, LimboError, Result}; @@ -1173,7 +1173,7 @@ impl Pager { } #[instrument(skip_all, level = Level::DEBUG)] - pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<()> { + pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result { let mut wal = self.wal.borrow_mut(); let (header, raw_page) = parse_wal_frame_header(frame); wal.write_frame_raw( @@ -1201,7 +1201,10 @@ impl Pager { } self.dirty_pages.borrow_mut().clear(); } - Ok(()) + Ok(WalInsertInfo { + page_no: header.page_number as usize, + is_commit: header.is_commit_frame(), + }) } #[instrument(skip_all, level = Level::DEBUG, name = "pager_checkpoint",)] From 4695719d2ba7e26dd2e82f88a4bfc3153a1898f0 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 28 Jul 2025 17:06:38 +0400 Subject: [PATCH 2/8] fix C bindings --- sqlite3/src/lib.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index 1a8309288..9fc0c9f15 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -1193,10 +1193,7 @@ pub unsafe extern "C" fn libsql_wal_get_frame( let db = db.inner.lock().unwrap(); let frame = std::slice::from_raw_parts_mut(p_frame, frame_len as usize); match db.conn.wal_get_frame(frame_no, frame) { - Ok(c) => match db.io.wait_for_completion(c) { - Ok(_) => SQLITE_OK, - Err(_) => SQLITE_ERROR, - }, + Ok(()) => SQLITE_OK, Err(_) => SQLITE_ERROR, } } @@ -1233,7 +1230,7 @@ pub unsafe extern "C" fn libsql_wal_insert_frame( let db = db.inner.lock().unwrap(); let frame = std::slice::from_raw_parts(p_frame, frame_len as usize); match db.conn.wal_insert_frame(frame_no, frame) { - Ok(()) => SQLITE_OK, + Ok(_) => SQLITE_OK, Err(LimboError::Conflict(..)) => { if !p_conflict.is_null() { *p_conflict = 1; From eb32ea49e67c4413736d1926b40f4683b12ee6a8 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 28 Jul 2025 17:06:48 +0400 Subject: [PATCH 3/8] fix tests --- tests/integration/functions/test_wal_api.rs | 24 +++++++-------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/tests/integration/functions/test_wal_api.rs b/tests/integration/functions/test_wal_api.rs index f945183a2..e096e6729 100644 --- a/tests/integration/functions/test_wal_api.rs +++ b/tests/integration/functions/test_wal_api.rs @@ -42,8 +42,7 @@ fn test_wal_frame_transfer_no_schema_changes() { let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); for frame_id in 1..=conn1.wal_frame_count().unwrap() as u32 { - let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } conn2.wal_insert_end().unwrap(); @@ -137,8 +136,7 @@ fn test_wal_frame_transfer_schema_changes() { let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); for frame_id in 1..=conn1.wal_frame_count().unwrap() as u32 { - let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } conn2.wal_insert_end().unwrap(); @@ -172,8 +170,7 @@ fn test_wal_frame_transfer_no_schema_changes_rollback() { let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); for frame_id in 1..=(conn1.wal_frame_count().unwrap() as u32 - 1) { - let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } conn2.wal_insert_end().unwrap(); @@ -208,8 +205,7 @@ fn test_wal_frame_transfer_schema_changes_rollback() { let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); for frame_id in 1..=(conn1.wal_frame_count().unwrap() as u32 - 1) { - let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } conn2.wal_insert_end().unwrap(); @@ -243,8 +239,7 @@ fn test_wal_frame_conflict() { assert_eq!(conn1.wal_frame_count().unwrap(), 2); let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); - let c = conn1.wal_get_frame(1, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(1, &mut frame).unwrap(); assert!(conn2.wal_insert_frame(1, &frame).is_err()); } @@ -267,12 +262,10 @@ fn test_wal_frame_far_away_write() { let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); - let c = conn1.wal_get_frame(3, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(3, &mut frame).unwrap(); conn2.wal_insert_frame(3, &frame).unwrap(); - let c = conn1.wal_get_frame(5, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(5, &mut frame).unwrap(); assert!(conn2.wal_insert_frame(5, &frame).is_err()); } @@ -311,8 +304,7 @@ fn test_wal_frame_api_no_schema_changes_fuzz() { let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); for frame_no in (synced_frame + 1)..=next_frame { - let c = conn1.wal_get_frame(frame_no as u32, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(frame_no as u32, &mut frame).unwrap(); conn2.wal_insert_frame(frame_no as u32, &frame[..]).unwrap(); } conn2.wal_insert_end().unwrap(); From 3614b022ab7b695fd1f3a5ef86d073d62c7b8e0d Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 28 Jul 2025 17:08:23 +0400 Subject: [PATCH 4/8] add WalInsertInfo type --- core/types.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/types.rs b/core/types.rs index 49267cf38..96537bdfa 100644 --- a/core/types.rs +++ b/core/types.rs @@ -2435,6 +2435,12 @@ impl RawSlice { } } +#[derive(Debug)] +pub struct WalInsertInfo { + pub page_no: usize, + pub is_commit: bool, +} + #[cfg(test)] mod tests { use super::*; From 4d25cda1e2861a76585222e9c9249b48e38fa3cb Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 28 Jul 2025 17:13:15 +0400 Subject: [PATCH 5/8] slightly adjust one test --- tests/integration/functions/test_wal_api.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/integration/functions/test_wal_api.rs b/tests/integration/functions/test_wal_api.rs index e096e6729..198e9d9a1 100644 --- a/tests/integration/functions/test_wal_api.rs +++ b/tests/integration/functions/test_wal_api.rs @@ -41,10 +41,12 @@ fn test_wal_frame_transfer_no_schema_changes() { assert_eq!(conn1.wal_frame_count().unwrap(), 15); let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); - for frame_id in 1..=conn1.wal_frame_count().unwrap() as u32 { + let frames_count = conn1.wal_frame_count().unwrap() as u32; + for frame_id in 1..=frames_count { conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } + conn2.wal_insert_end().unwrap(); assert_eq!(conn2.wal_frame_count().unwrap(), 15); assert_eq!( @@ -134,12 +136,17 @@ fn test_wal_frame_transfer_schema_changes() { .unwrap(); assert_eq!(conn1.wal_frame_count().unwrap(), 15); let mut frame = [0u8; 24 + 4096]; + let mut commits = 0; conn2.wal_insert_begin().unwrap(); for frame_id in 1..=conn1.wal_frame_count().unwrap() as u32 { conn1.wal_get_frame(frame_id, &mut frame).unwrap(); - conn2.wal_insert_frame(frame_id, &frame).unwrap(); + let info = conn2.wal_insert_frame(frame_id, &frame).unwrap(); + if info.is_commit { + commits += 1; + } } conn2.wal_insert_end().unwrap(); + assert_eq!(commits, 3); assert_eq!(conn2.wal_frame_count().unwrap(), 15); assert_eq!( limbo_exec_rows(&db2, &conn2, "SELECT x, length(y) FROM t"), From d8be1cbef11d120ca086874811a0c1926aa655ba Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 28 Jul 2025 17:20:57 +0400 Subject: [PATCH 6/8] fix after rebase --- tests/integration/functions/test_wal_api.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration/functions/test_wal_api.rs b/tests/integration/functions/test_wal_api.rs index 198e9d9a1..9b7b710a3 100644 --- a/tests/integration/functions/test_wal_api.rs +++ b/tests/integration/functions/test_wal_api.rs @@ -75,8 +75,7 @@ fn test_wal_frame_transfer_various_schema_changes() { let last_frame = conn1.wal_frame_count().unwrap() as u32; conn2.wal_insert_begin().unwrap(); for frame_id in (synced_frame + 1)..=last_frame { - let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } conn2.wal_insert_end().unwrap(); From 976b67a408f2dc6eaeb0c1c10ba5d717c1bb8cfd Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 28 Jul 2025 17:27:52 +0400 Subject: [PATCH 7/8] fix clippy --- bindings/rust/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 19b84ea3d..ee0e0d838 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -179,7 +179,7 @@ impl Connection { .lock() .map_err(|e| Error::MutexError(e.to_string()))?; conn.wal_frame_count() - .map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {}", e))) + .map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {e}"))) } pub fn wal_insert_begin(&self) -> Result<()> { @@ -188,7 +188,7 @@ impl Connection { .lock() .map_err(|e| Error::MutexError(e.to_string()))?; conn.wal_insert_begin() - .map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {}", e))) + .map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {e}"))) } pub fn wal_insert_end(&self) -> Result<()> { @@ -197,7 +197,7 @@ impl Connection { .lock() .map_err(|e| Error::MutexError(e.to_string()))?; conn.wal_insert_end() - .map_err(|e| Error::WalOperationError(format!("wal_insert_end failed: {}", e))) + .map_err(|e| Error::WalOperationError(format!("wal_insert_end failed: {e}"))) } pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result { @@ -206,7 +206,7 @@ impl Connection { .lock() .map_err(|e| Error::MutexError(e.to_string()))?; conn.wal_insert_frame(frame_no, frame) - .map_err(|e| Error::WalOperationError(format!("wal_insert_frame failed: {}", e))) + .map_err(|e| Error::WalOperationError(format!("wal_insert_frame failed: {e}"))) } pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<()> { @@ -215,7 +215,7 @@ impl Connection { .lock() .map_err(|e| Error::MutexError(e.to_string()))?; conn.wal_get_frame(frame_no, frame) - .map_err(|e| Error::WalOperationError(format!("wal_insert_frame failed: {}", e))) + .map_err(|e| Error::WalOperationError(format!("wal_insert_frame failed: {e}"))) } /// Prepare a SQL statement for later execution. From ff0410a7d31c2a8484b146fca1ad67a5629b432c Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 28 Jul 2025 17:36:41 +0400 Subject: [PATCH 8/8] fix clippy (2) --- sqlite3/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index 9fc0c9f15..d9e562554 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -39,7 +39,7 @@ pub struct sqlite3 { } struct sqlite3Inner { - pub(crate) io: Arc, + pub(crate) _io: Arc, pub(crate) _db: Arc, pub(crate) conn: Arc, pub(crate) err_code: ffi::c_int, @@ -56,7 +56,7 @@ impl sqlite3 { conn: Arc, ) -> Self { let inner = sqlite3Inner { - io, + _io: io, _db: db, conn, err_code: SQLITE_OK,