diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index a3ef69b96..ee0e0d838 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -37,11 +37,12 @@ pub mod transaction; pub mod value; use transaction::TransactionBehavior; +use turso_core::types::WalInsertInfo; pub use value::Value; pub use params::params_from_iter; +pub use params::IntoParams; -use crate::params::*; use std::fmt::Debug; use std::num::NonZero; use std::sync::{Arc, Mutex}; @@ -54,6 +55,8 @@ pub enum Error { MutexError(String), #[error("SQL execution failure: `{0}`")] SqlExecutionFailure(String), + #[error("WAL operation error: `{0}`")] + WalOperationError(String), } impl From for Error { @@ -170,6 +173,51 @@ impl Connection { stmt.execute(params).await } + pub fn wal_frame_count(&self) -> Result { + let conn = self + .inner + .lock() + .map_err(|e| Error::MutexError(e.to_string()))?; + conn.wal_frame_count() + .map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {e}"))) + } + + pub fn wal_insert_begin(&self) -> Result<()> { + let conn = self + .inner + .lock() + .map_err(|e| Error::MutexError(e.to_string()))?; + conn.wal_insert_begin() + .map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {e}"))) + } + + pub fn wal_insert_end(&self) -> Result<()> { + let conn = self + .inner + .lock() + .map_err(|e| Error::MutexError(e.to_string()))?; + conn.wal_insert_end() + .map_err(|e| Error::WalOperationError(format!("wal_insert_end failed: {e}"))) + } + + pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result { + let conn = self + .inner + .lock() + .map_err(|e| Error::MutexError(e.to_string()))?; + conn.wal_insert_frame(frame_no, frame) + .map_err(|e| Error::WalOperationError(format!("wal_insert_frame failed: {e}"))) + } + + pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<()> { + let conn = self + .inner + .lock() + .map_err(|e| Error::MutexError(e.to_string()))?; + conn.wal_get_frame(frame_no, frame) + .map_err(|e| Error::WalOperationError(format!("wal_insert_frame failed: {e}"))) + } + /// Prepare a SQL statement for later execution. pub async fn prepare(&self, sql: &str) -> Result { let conn = self @@ -351,6 +399,12 @@ impl Statement { cols } + + /// Reset internal statement state after previous execution so it can be reused again + pub fn reset(&self) { + let mut stmt = self.inner.lock().unwrap(); + stmt.reset(); + } } /// Column information. diff --git a/bindings/rust/src/params.rs b/bindings/rust/src/params.rs index 6cab3e1a5..b28e34635 100644 --- a/bindings/rust/src/params.rs +++ b/bindings/rust/src/params.rs @@ -217,6 +217,7 @@ macro_rules! named_tuple_into_params { } } +named_tuple_into_params!(1: (0 A)); named_tuple_into_params!(2: (0 A), (1 B)); named_tuple_into_params!(3: (0 A), (1 B), (2 C)); named_tuple_into_params!(4: (0 A), (1 B), (2 C), (3 D)); @@ -233,6 +234,7 @@ named_tuple_into_params!(14: (0 A), (1 B), (2 C), (3 D), (4 E), (5 F), (6 G), (7 named_tuple_into_params!(15: (0 A), (1 B), (2 C), (3 D), (4 E), (5 F), (6 G), (7 H), (8 I), (9 J), (10 K), (11 L), (12 M), (13 N), (14 O)); named_tuple_into_params!(16: (0 A), (1 B), (2 C), (3 D), (4 E), (5 F), (6 G), (7 H), (8 I), (9 J), (10 K), (11 L), (12 M), (13 N), (14 O), (15 P)); +tuple_into_params!(1: (0 A)); tuple_into_params!(2: (0 A), (1 B)); tuple_into_params!(3: (0 A), (1 B), (2 C)); tuple_into_params!(4: (0 A), (1 B), (2 C), (3 D)); diff --git a/core/lib.rs b/core/lib.rs index d1e17b2f3..37bdf257f 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -47,6 +47,8 @@ use crate::storage::{header_accessor, wal::DummyWAL}; use crate::translate::optimizer::optimize_plan; use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME; #[cfg(feature = "fs")] +use crate::types::WalInsertInfo; +#[cfg(feature = "fs")] use crate::util::{IOExt, OpenMode, OpenOptions}; use crate::vtab::VirtualTable; use core::str; @@ -1037,15 +1039,16 @@ impl Connection { } #[cfg(feature = "fs")] - pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result> { - self.pager.borrow().wal_get_frame(frame_no, frame) + pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<()> { + let c = self.pager.borrow().wal_get_frame(frame_no, frame)?; + self._db.io.wait_for_completion(c) } /// Insert `frame` (header included) at the position `frame_no` in the WAL /// If WAL already has frame at that position - turso-db will compare content of the page and either report conflict or return OK /// If attempt to write frame at the position `frame_no` will create gap in the WAL - method will return error #[cfg(feature = "fs")] - pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<()> { + pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result { self.pager.borrow().wal_insert_frame(frame_no, frame) } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 17f8e7d61..fb39932a9 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -7,7 +7,7 @@ use crate::storage::sqlite3_ondisk::{ self, parse_wal_frame_header, DatabaseHeader, PageContent, PageType, }; use crate::storage::wal::{CheckpointResult, Wal}; -use crate::types::IOResult; +use crate::types::{IOResult, WalInsertInfo}; use crate::util::IOExt as _; use crate::{return_if_io, Completion}; use crate::{turso_assert, Buffer, Connection, LimboError, Result}; @@ -1173,7 +1173,7 @@ impl Pager { } #[instrument(skip_all, level = Level::DEBUG)] - pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<()> { + pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result { let mut wal = self.wal.borrow_mut(); let (header, raw_page) = parse_wal_frame_header(frame); wal.write_frame_raw( @@ -1201,7 +1201,10 @@ impl Pager { } self.dirty_pages.borrow_mut().clear(); } - Ok(()) + Ok(WalInsertInfo { + page_no: header.page_number as usize, + is_commit: header.is_commit_frame(), + }) } #[instrument(skip_all, level = Level::DEBUG, name = "pager_checkpoint",)] diff --git a/core/types.rs b/core/types.rs index 49267cf38..96537bdfa 100644 --- a/core/types.rs +++ b/core/types.rs @@ -2435,6 +2435,12 @@ impl RawSlice { } } +#[derive(Debug)] +pub struct WalInsertInfo { + pub page_no: usize, + pub is_commit: bool, +} + #[cfg(test)] mod tests { use super::*; diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index 1a8309288..d9e562554 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -39,7 +39,7 @@ pub struct sqlite3 { } struct sqlite3Inner { - pub(crate) io: Arc, + pub(crate) _io: Arc, pub(crate) _db: Arc, pub(crate) conn: Arc, pub(crate) err_code: ffi::c_int, @@ -56,7 +56,7 @@ impl sqlite3 { conn: Arc, ) -> Self { let inner = sqlite3Inner { - io, + _io: io, _db: db, conn, err_code: SQLITE_OK, @@ -1193,10 +1193,7 @@ pub unsafe extern "C" fn libsql_wal_get_frame( let db = db.inner.lock().unwrap(); let frame = std::slice::from_raw_parts_mut(p_frame, frame_len as usize); match db.conn.wal_get_frame(frame_no, frame) { - Ok(c) => match db.io.wait_for_completion(c) { - Ok(_) => SQLITE_OK, - Err(_) => SQLITE_ERROR, - }, + Ok(()) => SQLITE_OK, Err(_) => SQLITE_ERROR, } } @@ -1233,7 +1230,7 @@ pub unsafe extern "C" fn libsql_wal_insert_frame( let db = db.inner.lock().unwrap(); let frame = std::slice::from_raw_parts(p_frame, frame_len as usize); match db.conn.wal_insert_frame(frame_no, frame) { - Ok(()) => SQLITE_OK, + Ok(_) => SQLITE_OK, Err(LimboError::Conflict(..)) => { if !p_conflict.is_null() { *p_conflict = 1; diff --git a/tests/integration/functions/test_wal_api.rs b/tests/integration/functions/test_wal_api.rs index f945183a2..9b7b710a3 100644 --- a/tests/integration/functions/test_wal_api.rs +++ b/tests/integration/functions/test_wal_api.rs @@ -41,11 +41,12 @@ fn test_wal_frame_transfer_no_schema_changes() { assert_eq!(conn1.wal_frame_count().unwrap(), 15); let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); - for frame_id in 1..=conn1.wal_frame_count().unwrap() as u32 { - let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + let frames_count = conn1.wal_frame_count().unwrap() as u32; + for frame_id in 1..=frames_count { + conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } + conn2.wal_insert_end().unwrap(); assert_eq!(conn2.wal_frame_count().unwrap(), 15); assert_eq!( @@ -74,8 +75,7 @@ fn test_wal_frame_transfer_various_schema_changes() { let last_frame = conn1.wal_frame_count().unwrap() as u32; conn2.wal_insert_begin().unwrap(); for frame_id in (synced_frame + 1)..=last_frame { - let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } conn2.wal_insert_end().unwrap(); @@ -135,13 +135,17 @@ fn test_wal_frame_transfer_schema_changes() { .unwrap(); assert_eq!(conn1.wal_frame_count().unwrap(), 15); let mut frame = [0u8; 24 + 4096]; + let mut commits = 0; conn2.wal_insert_begin().unwrap(); for frame_id in 1..=conn1.wal_frame_count().unwrap() as u32 { - let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); - conn2.wal_insert_frame(frame_id, &frame).unwrap(); + conn1.wal_get_frame(frame_id, &mut frame).unwrap(); + let info = conn2.wal_insert_frame(frame_id, &frame).unwrap(); + if info.is_commit { + commits += 1; + } } conn2.wal_insert_end().unwrap(); + assert_eq!(commits, 3); assert_eq!(conn2.wal_frame_count().unwrap(), 15); assert_eq!( limbo_exec_rows(&db2, &conn2, "SELECT x, length(y) FROM t"), @@ -172,8 +176,7 @@ fn test_wal_frame_transfer_no_schema_changes_rollback() { let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); for frame_id in 1..=(conn1.wal_frame_count().unwrap() as u32 - 1) { - let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } conn2.wal_insert_end().unwrap(); @@ -208,8 +211,7 @@ fn test_wal_frame_transfer_schema_changes_rollback() { let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); for frame_id in 1..=(conn1.wal_frame_count().unwrap() as u32 - 1) { - let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } conn2.wal_insert_end().unwrap(); @@ -243,8 +245,7 @@ fn test_wal_frame_conflict() { assert_eq!(conn1.wal_frame_count().unwrap(), 2); let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); - let c = conn1.wal_get_frame(1, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(1, &mut frame).unwrap(); assert!(conn2.wal_insert_frame(1, &frame).is_err()); } @@ -267,12 +268,10 @@ fn test_wal_frame_far_away_write() { let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); - let c = conn1.wal_get_frame(3, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(3, &mut frame).unwrap(); conn2.wal_insert_frame(3, &frame).unwrap(); - let c = conn1.wal_get_frame(5, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(5, &mut frame).unwrap(); assert!(conn2.wal_insert_frame(5, &frame).is_err()); } @@ -311,8 +310,7 @@ fn test_wal_frame_api_no_schema_changes_fuzz() { let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); for frame_no in (synced_frame + 1)..=next_frame { - let c = conn1.wal_get_frame(frame_no as u32, &mut frame).unwrap(); - db1.io.wait_for_completion(c).unwrap(); + conn1.wal_get_frame(frame_no as u32, &mut frame).unwrap(); conn2.wal_insert_frame(frame_no as u32, &frame[..]).unwrap(); } conn2.wal_insert_end().unwrap();