Merge 'bindings/rust: Add WAL API support' from Nikita Sivukhin

Extend rust `turso-db` bindings with WAL API methods and also extend
result of `wal_insert_frame` method to return `WalInsertInfo` struct.

Closes #2305
This commit is contained in:
Pekka Enberg
2025-07-28 19:34:16 +03:00
7 changed files with 97 additions and 34 deletions

View File

@@ -37,11 +37,12 @@ pub mod transaction;
pub mod value;
use transaction::TransactionBehavior;
use turso_core::types::WalInsertInfo;
pub use value::Value;
pub use params::params_from_iter;
pub use params::IntoParams;
use crate::params::*;
use std::fmt::Debug;
use std::num::NonZero;
use std::sync::{Arc, Mutex};
@@ -54,6 +55,8 @@ pub enum Error {
MutexError(String),
#[error("SQL execution failure: `{0}`")]
SqlExecutionFailure(String),
#[error("WAL operation error: `{0}`")]
WalOperationError(String),
}
impl From<turso_core::LimboError> for Error {
@@ -170,6 +173,51 @@ impl Connection {
stmt.execute(params).await
}
pub fn wal_frame_count(&self) -> Result<u64> {
let conn = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()))?;
conn.wal_frame_count()
.map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {e}")))
}
pub fn wal_insert_begin(&self) -> Result<()> {
let conn = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()))?;
conn.wal_insert_begin()
.map_err(|e| Error::WalOperationError(format!("wal_insert_begin failed: {e}")))
}
pub fn wal_insert_end(&self) -> Result<()> {
let conn = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()))?;
conn.wal_insert_end()
.map_err(|e| Error::WalOperationError(format!("wal_insert_end failed: {e}")))
}
pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<WalInsertInfo> {
let conn = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()))?;
conn.wal_insert_frame(frame_no, frame)
.map_err(|e| Error::WalOperationError(format!("wal_insert_frame failed: {e}")))
}
pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<()> {
let conn = self
.inner
.lock()
.map_err(|e| Error::MutexError(e.to_string()))?;
conn.wal_get_frame(frame_no, frame)
.map_err(|e| Error::WalOperationError(format!("wal_insert_frame failed: {e}")))
}
/// Prepare a SQL statement for later execution.
pub async fn prepare(&self, sql: &str) -> Result<Statement> {
let conn = self
@@ -351,6 +399,12 @@ impl Statement {
cols
}
/// Reset internal statement state after previous execution so it can be reused again
pub fn reset(&self) {
let mut stmt = self.inner.lock().unwrap();
stmt.reset();
}
}
/// Column information.

View File

@@ -217,6 +217,7 @@ macro_rules! named_tuple_into_params {
}
}
named_tuple_into_params!(1: (0 A));
named_tuple_into_params!(2: (0 A), (1 B));
named_tuple_into_params!(3: (0 A), (1 B), (2 C));
named_tuple_into_params!(4: (0 A), (1 B), (2 C), (3 D));
@@ -233,6 +234,7 @@ named_tuple_into_params!(14: (0 A), (1 B), (2 C), (3 D), (4 E), (5 F), (6 G), (7
named_tuple_into_params!(15: (0 A), (1 B), (2 C), (3 D), (4 E), (5 F), (6 G), (7 H), (8 I), (9 J), (10 K), (11 L), (12 M), (13 N), (14 O));
named_tuple_into_params!(16: (0 A), (1 B), (2 C), (3 D), (4 E), (5 F), (6 G), (7 H), (8 I), (9 J), (10 K), (11 L), (12 M), (13 N), (14 O), (15 P));
tuple_into_params!(1: (0 A));
tuple_into_params!(2: (0 A), (1 B));
tuple_into_params!(3: (0 A), (1 B), (2 C));
tuple_into_params!(4: (0 A), (1 B), (2 C), (3 D));

View File

@@ -47,6 +47,8 @@ use crate::storage::{header_accessor, wal::DummyWAL};
use crate::translate::optimizer::optimize_plan;
use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME;
#[cfg(feature = "fs")]
use crate::types::WalInsertInfo;
#[cfg(feature = "fs")]
use crate::util::{IOExt, OpenMode, OpenOptions};
use crate::vtab::VirtualTable;
use core::str;
@@ -1037,15 +1039,16 @@ impl Connection {
}
#[cfg(feature = "fs")]
pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<Arc<Completion>> {
self.pager.borrow().wal_get_frame(frame_no, frame)
pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<()> {
let c = self.pager.borrow().wal_get_frame(frame_no, frame)?;
self._db.io.wait_for_completion(c)
}
/// Insert `frame` (header included) at the position `frame_no` in the WAL
/// If WAL already has frame at that position - turso-db will compare content of the page and either report conflict or return OK
/// If attempt to write frame at the position `frame_no` will create gap in the WAL - method will return error
#[cfg(feature = "fs")]
pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<()> {
pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<WalInsertInfo> {
self.pager.borrow().wal_insert_frame(frame_no, frame)
}

View File

@@ -7,7 +7,7 @@ use crate::storage::sqlite3_ondisk::{
self, parse_wal_frame_header, DatabaseHeader, PageContent, PageType,
};
use crate::storage::wal::{CheckpointResult, Wal};
use crate::types::IOResult;
use crate::types::{IOResult, WalInsertInfo};
use crate::util::IOExt as _;
use crate::{return_if_io, Completion};
use crate::{turso_assert, Buffer, Connection, LimboError, Result};
@@ -1173,7 +1173,7 @@ impl Pager {
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<()> {
pub fn wal_insert_frame(&self, frame_no: u32, frame: &[u8]) -> Result<WalInsertInfo> {
let mut wal = self.wal.borrow_mut();
let (header, raw_page) = parse_wal_frame_header(frame);
wal.write_frame_raw(
@@ -1201,7 +1201,10 @@ impl Pager {
}
self.dirty_pages.borrow_mut().clear();
}
Ok(())
Ok(WalInsertInfo {
page_no: header.page_number as usize,
is_commit: header.is_commit_frame(),
})
}
#[instrument(skip_all, level = Level::DEBUG, name = "pager_checkpoint",)]

View File

@@ -2435,6 +2435,12 @@ impl RawSlice {
}
}
#[derive(Debug)]
pub struct WalInsertInfo {
pub page_no: usize,
pub is_commit: bool,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -39,7 +39,7 @@ pub struct sqlite3 {
}
struct sqlite3Inner {
pub(crate) io: Arc<dyn turso_core::IO>,
pub(crate) _io: Arc<dyn turso_core::IO>,
pub(crate) _db: Arc<turso_core::Database>,
pub(crate) conn: Arc<turso_core::Connection>,
pub(crate) err_code: ffi::c_int,
@@ -56,7 +56,7 @@ impl sqlite3 {
conn: Arc<turso_core::Connection>,
) -> Self {
let inner = sqlite3Inner {
io,
_io: io,
_db: db,
conn,
err_code: SQLITE_OK,
@@ -1193,10 +1193,7 @@ pub unsafe extern "C" fn libsql_wal_get_frame(
let db = db.inner.lock().unwrap();
let frame = std::slice::from_raw_parts_mut(p_frame, frame_len as usize);
match db.conn.wal_get_frame(frame_no, frame) {
Ok(c) => match db.io.wait_for_completion(c) {
Ok(_) => SQLITE_OK,
Err(_) => SQLITE_ERROR,
},
Ok(()) => SQLITE_OK,
Err(_) => SQLITE_ERROR,
}
}
@@ -1233,7 +1230,7 @@ pub unsafe extern "C" fn libsql_wal_insert_frame(
let db = db.inner.lock().unwrap();
let frame = std::slice::from_raw_parts(p_frame, frame_len as usize);
match db.conn.wal_insert_frame(frame_no, frame) {
Ok(()) => SQLITE_OK,
Ok(_) => SQLITE_OK,
Err(LimboError::Conflict(..)) => {
if !p_conflict.is_null() {
*p_conflict = 1;

View File

@@ -41,11 +41,12 @@ fn test_wal_frame_transfer_no_schema_changes() {
assert_eq!(conn1.wal_frame_count().unwrap(), 15);
let mut frame = [0u8; 24 + 4096];
conn2.wal_insert_begin().unwrap();
for frame_id in 1..=conn1.wal_frame_count().unwrap() as u32 {
let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap();
db1.io.wait_for_completion(c).unwrap();
let frames_count = conn1.wal_frame_count().unwrap() as u32;
for frame_id in 1..=frames_count {
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
conn2.wal_insert_frame(frame_id, &frame).unwrap();
}
conn2.wal_insert_end().unwrap();
assert_eq!(conn2.wal_frame_count().unwrap(), 15);
assert_eq!(
@@ -74,8 +75,7 @@ fn test_wal_frame_transfer_various_schema_changes() {
let last_frame = conn1.wal_frame_count().unwrap() as u32;
conn2.wal_insert_begin().unwrap();
for frame_id in (synced_frame + 1)..=last_frame {
let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap();
db1.io.wait_for_completion(c).unwrap();
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
conn2.wal_insert_frame(frame_id, &frame).unwrap();
}
conn2.wal_insert_end().unwrap();
@@ -135,13 +135,17 @@ fn test_wal_frame_transfer_schema_changes() {
.unwrap();
assert_eq!(conn1.wal_frame_count().unwrap(), 15);
let mut frame = [0u8; 24 + 4096];
let mut commits = 0;
conn2.wal_insert_begin().unwrap();
for frame_id in 1..=conn1.wal_frame_count().unwrap() as u32 {
let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap();
db1.io.wait_for_completion(c).unwrap();
conn2.wal_insert_frame(frame_id, &frame).unwrap();
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
let info = conn2.wal_insert_frame(frame_id, &frame).unwrap();
if info.is_commit {
commits += 1;
}
}
conn2.wal_insert_end().unwrap();
assert_eq!(commits, 3);
assert_eq!(conn2.wal_frame_count().unwrap(), 15);
assert_eq!(
limbo_exec_rows(&db2, &conn2, "SELECT x, length(y) FROM t"),
@@ -172,8 +176,7 @@ fn test_wal_frame_transfer_no_schema_changes_rollback() {
let mut frame = [0u8; 24 + 4096];
conn2.wal_insert_begin().unwrap();
for frame_id in 1..=(conn1.wal_frame_count().unwrap() as u32 - 1) {
let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap();
db1.io.wait_for_completion(c).unwrap();
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
conn2.wal_insert_frame(frame_id, &frame).unwrap();
}
conn2.wal_insert_end().unwrap();
@@ -208,8 +211,7 @@ fn test_wal_frame_transfer_schema_changes_rollback() {
let mut frame = [0u8; 24 + 4096];
conn2.wal_insert_begin().unwrap();
for frame_id in 1..=(conn1.wal_frame_count().unwrap() as u32 - 1) {
let c = conn1.wal_get_frame(frame_id, &mut frame).unwrap();
db1.io.wait_for_completion(c).unwrap();
conn1.wal_get_frame(frame_id, &mut frame).unwrap();
conn2.wal_insert_frame(frame_id, &frame).unwrap();
}
conn2.wal_insert_end().unwrap();
@@ -243,8 +245,7 @@ fn test_wal_frame_conflict() {
assert_eq!(conn1.wal_frame_count().unwrap(), 2);
let mut frame = [0u8; 24 + 4096];
conn2.wal_insert_begin().unwrap();
let c = conn1.wal_get_frame(1, &mut frame).unwrap();
db1.io.wait_for_completion(c).unwrap();
conn1.wal_get_frame(1, &mut frame).unwrap();
assert!(conn2.wal_insert_frame(1, &frame).is_err());
}
@@ -267,12 +268,10 @@ fn test_wal_frame_far_away_write() {
let mut frame = [0u8; 24 + 4096];
conn2.wal_insert_begin().unwrap();
let c = conn1.wal_get_frame(3, &mut frame).unwrap();
db1.io.wait_for_completion(c).unwrap();
conn1.wal_get_frame(3, &mut frame).unwrap();
conn2.wal_insert_frame(3, &frame).unwrap();
let c = conn1.wal_get_frame(5, &mut frame).unwrap();
db1.io.wait_for_completion(c).unwrap();
conn1.wal_get_frame(5, &mut frame).unwrap();
assert!(conn2.wal_insert_frame(5, &frame).is_err());
}
@@ -311,8 +310,7 @@ fn test_wal_frame_api_no_schema_changes_fuzz() {
let mut frame = [0u8; 24 + 4096];
conn2.wal_insert_begin().unwrap();
for frame_no in (synced_frame + 1)..=next_frame {
let c = conn1.wal_get_frame(frame_no as u32, &mut frame).unwrap();
db1.io.wait_for_completion(c).unwrap();
conn1.wal_get_frame(frame_no as u32, &mut frame).unwrap();
conn2.wal_insert_frame(frame_no as u32, &frame[..]).unwrap();
}
conn2.wal_insert_end().unwrap();