diff --git a/Cargo.lock b/Cargo.lock index 9d521cd30..c9aa58c00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -628,6 +628,7 @@ version = "0.1.4" dependencies = [ "anyhow", "assert_cmd", + "ctor 0.5.0", "env_logger 0.10.2", "log", "rand 0.9.2", @@ -817,8 +818,18 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4735f265ba6a1188052ca32d461028a7d1125868be18e287e756019da7607b5" dependencies = [ - "ctor-proc-macro", - "dtor", + "ctor-proc-macro 0.0.5", + "dtor 0.0.6", +] + +[[package]] +name = "ctor" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67773048316103656a637612c4a62477603b777d91d9c62ff2290f9cde178fdb" +dependencies = [ + "ctor-proc-macro 0.0.6", + "dtor 0.1.0", ] [[package]] @@ -827,6 +838,12 @@ version = "0.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f211af61d8efdd104f96e57adf5e426ba1bc3ed7a4ead616e15e5881fd79c4d" +[[package]] +name = "ctor-proc-macro" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2931af7e13dc045d8e9d26afccc6fa115d64e115c9c84b1166288b46f6782c2" + [[package]] name = "ctr" version = "0.9.2" @@ -1016,7 +1033,16 @@ version = "0.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97cbdf2ad6846025e8e25df05171abfb30e3ababa12ee0a0e44b9bbe570633a8" dependencies = [ - "dtor-proc-macro", + "dtor-proc-macro 0.0.5", +] + +[[package]] +name = "dtor" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e58a0764cddb55ab28955347b45be00ade43d4d6f3ba4bf3dc354e4ec9432934" +dependencies = [ + "dtor-proc-macro 0.0.6", ] [[package]] @@ -1025,6 +1051,12 @@ version = "0.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7454e41ff9012c00d53cf7f475c5e3afa3b91b7c90568495495e8d9bf47a1055" +[[package]] +name = "dtor-proc-macro" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f678cf4a922c215c63e0de95eb1ff08a958a81d47e485cf9da1e27bf6305cfa5" + [[package]] name = "dyn-clone" version = "1.0.19" @@ -2308,7 +2340,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96671d5c84cee3ae4cab96386b9f953b22569ece9677b9fdd1492550a165eca5" dependencies = [ "bitflags 2.9.0", - "ctor", + "ctor 0.4.2", "napi-build", "napi-sys", "nohash-hasher", @@ -2328,7 +2360,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43e61844e0c0bb81e711f2084abe7cff187b03ca21ff8b000cb59bbda61e15a9" dependencies = [ "convert_case", - "ctor", + "ctor 0.4.2", "napi-derive-backend", "proc-macro2", "quote", @@ -4126,7 +4158,7 @@ version = "0.1.4" dependencies = [ "base64", "bytes", - "ctor", + "ctor 0.4.2", "futures", "genawaiter", "http", diff --git a/core/types.rs b/core/types.rs index 8affdd800..25e13ef5c 100644 --- a/core/types.rs +++ b/core/types.rs @@ -2646,7 +2646,7 @@ pub struct WalFrameInfo { pub db_size: u32, } -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub struct WalState { pub checkpoint_seq_no: u32, pub max_frame: u64, diff --git a/sync/engine/src/database_sync_operations.rs b/sync/engine/src/database_sync_operations.rs index fa984d646..f43881480 100644 --- a/sync/engine/src/database_sync_operations.rs +++ b/sync/engine/src/database_sync_operations.rs @@ -1074,15 +1074,15 @@ pub mod tests { let conn1 = db1.connect(&coro).await?; conn1.execute("CREATE TABLE t(x, y)")?; conn1.execute("INSERT INTO t VALUES (1, 2)")?; - let conn1_match_watermark = conn1.wal_frame_count().unwrap(); + let conn1_match_watermark = conn1.wal_state().unwrap().max_frame; conn1.execute("INSERT INTO t VALUES (3, 4)")?; - let conn1_sync_watermark = conn1.wal_frame_count().unwrap(); + let conn1_sync_watermark = conn1.wal_state().unwrap().max_frame; conn1.execute("INSERT INTO t VALUES (5, 6)")?; let conn2 = db2.connect(&coro).await?; conn2.execute("CREATE TABLE t(x, y)")?; conn2.execute("INSERT INTO t VALUES (1, 2)")?; - let conn2_match_watermark = conn2.wal_frame_count().unwrap(); + let conn2_match_watermark = conn2.wal_state().unwrap().max_frame; conn2.execute("INSERT INTO t VALUES (5, 6)")?; // db1 WAL frames: [A1 A2] [A3] [A4] (sync_watermark) [A5] diff --git a/tests/Cargo.toml b/tests/Cargo.toml index b26ca9b5f..163b6a7a3 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -27,6 +27,7 @@ assert_cmd = "^2" rand_chacha = "0.9.0" rand = "0.9.0" zerocopy = "0.8.26" +ctor = "0.5.0" [dev-dependencies] test-log = { version = "0.2.17", features = ["trace"] } @@ -34,4 +35,4 @@ tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } tracing = "0.1.41" [features] -encryption = ["turso_core/encryption"] \ No newline at end of file +encryption = ["turso_core/encryption"] diff --git a/tests/integration/functions/mod.rs b/tests/integration/functions/mod.rs index b31ed8f53..8ee95c32d 100644 --- a/tests/integration/functions/mod.rs +++ b/tests/integration/functions/mod.rs @@ -1,3 +1,16 @@ mod test_cdc; mod test_function_rowid; mod test_wal_api; + +#[cfg(test)] +mod tests { + use tracing_subscriber::EnvFilter; + + #[ctor::ctor] + fn init() { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .with_ansi(false) + .init(); + } +} diff --git a/tests/integration/functions/test_wal_api.rs b/tests/integration/functions/test_wal_api.rs index 2552fa45c..10185745d 100644 --- a/tests/integration/functions/test_wal_api.rs +++ b/tests/integration/functions/test_wal_api.rs @@ -3,7 +3,10 @@ use std::{collections::HashSet, path::PathBuf, sync::Arc}; use rand::{RngCore, SeedableRng}; use rand_chacha::ChaCha8Rng; use rusqlite::types::Value; -use turso_core::{types::WalFrameInfo, CheckpointMode, LimboError, StepResult}; +use turso_core::{ + types::{WalFrameInfo, WalState}, + CheckpointMode, LimboError, StepResult, +}; use crate::common::{limbo_exec_rows, rng_from_time, TempDatabase}; @@ -11,16 +14,16 @@ use crate::common::{limbo_exec_rows, rng_from_time, TempDatabase}; fn test_wal_frame_count() { let db = TempDatabase::new_empty(false); let conn = db.connect_limbo(); - assert_eq!(conn.wal_frame_count().unwrap(), 0); + assert_eq!(conn.wal_state().unwrap().max_frame, 0); conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") .unwrap(); - assert_eq!(conn.wal_frame_count().unwrap(), 2); + assert_eq!(conn.wal_state().unwrap().max_frame, 2); conn.execute("INSERT INTO t VALUES (10, 10), (5, 1)") .unwrap(); - assert_eq!(conn.wal_frame_count().unwrap(), 3); + assert_eq!(conn.wal_state().unwrap().max_frame, 3); conn.execute("INSERT INTO t VALUES (1024, randomblob(4096 * 10))") .unwrap(); - assert_eq!(conn.wal_frame_count().unwrap(), 15); + assert_eq!(conn.wal_state().unwrap().max_frame, 15); } #[test] @@ -41,17 +44,17 @@ fn test_wal_frame_transfer_no_schema_changes() { conn1 .execute("INSERT INTO t VALUES (1024, randomblob(4096 * 10))") .unwrap(); - assert_eq!(conn1.wal_frame_count().unwrap(), 15); + assert_eq!(conn1.wal_state().unwrap().max_frame, 15); let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); - let frames_count = conn1.wal_frame_count().unwrap(); + let frames_count = conn1.wal_state().unwrap().max_frame; for frame_id in 1..=frames_count { conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } conn2.wal_insert_end().unwrap(); - assert_eq!(conn2.wal_frame_count().unwrap(), 15); + assert_eq!(conn2.wal_state().unwrap().max_frame, 15); assert_eq!( limbo_exec_rows(&db2, &conn2, "SELECT x, length(y) FROM t"), vec![ @@ -75,7 +78,7 @@ fn test_wal_frame_transfer_various_schema_changes() { let mut frame = [0u8; 24 + 4096]; let mut synced_frame = 0; let mut sync = || { - let last_frame = conn1.wal_frame_count().unwrap(); + let last_frame = conn1.wal_state().unwrap().max_frame; conn2.wal_insert_begin().unwrap(); for frame_id in (synced_frame + 1)..=last_frame { conn1.wal_get_frame(frame_id, &mut frame).unwrap(); @@ -136,11 +139,11 @@ fn test_wal_frame_transfer_schema_changes() { conn1 .execute("INSERT INTO t VALUES (1024, randomblob(4096 * 10))") .unwrap(); - assert_eq!(conn1.wal_frame_count().unwrap(), 15); + assert_eq!(conn1.wal_state().unwrap().max_frame, 15); let mut frame = [0u8; 24 + 4096]; let mut commits = 0; conn2.wal_insert_begin().unwrap(); - for frame_id in 1..=conn1.wal_frame_count().unwrap() { + for frame_id in 1..=conn1.wal_state().unwrap().max_frame { conn1.wal_get_frame(frame_id, &mut frame).unwrap(); let info = conn2.wal_insert_frame(frame_id, &frame).unwrap(); if info.is_commit_frame() { @@ -149,7 +152,7 @@ fn test_wal_frame_transfer_schema_changes() { } conn2.wal_insert_end().unwrap(); assert_eq!(commits, 3); - assert_eq!(conn2.wal_frame_count().unwrap(), 15); + assert_eq!(conn2.wal_state().unwrap().max_frame, 15); assert_eq!( limbo_exec_rows(&db2, &conn2, "SELECT x, length(y) FROM t"), vec![ @@ -175,16 +178,16 @@ fn test_wal_frame_transfer_no_schema_changes_rollback() { conn1 .execute("INSERT INTO t VALUES (1024, randomblob(4096 * 10))") .unwrap(); - assert_eq!(conn1.wal_frame_count().unwrap(), 14); + assert_eq!(conn1.wal_state().unwrap().max_frame, 14); let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); // Intentionally leave out the final commit frame, so the big randomblob is not committed and should not be visible to transactions. - for frame_id in 1..=(conn1.wal_frame_count().unwrap() - 1) { + for frame_id in 1..=(conn1.wal_state().unwrap().max_frame - 1) { conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } conn2.wal_insert_end().unwrap(); - assert_eq!(conn2.wal_frame_count().unwrap(), 2); + assert_eq!(conn2.wal_state().unwrap().max_frame, 2); assert_eq!( limbo_exec_rows(&db2, &conn2, "SELECT x, length(y) FROM t"), vec![] as Vec> @@ -211,15 +214,15 @@ fn test_wal_frame_transfer_schema_changes_rollback() { conn1 .execute("INSERT INTO t VALUES (1024, randomblob(4096 * 10))") .unwrap(); - assert_eq!(conn1.wal_frame_count().unwrap(), 14); + assert_eq!(conn1.wal_state().unwrap().max_frame, 14); let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); - for frame_id in 1..=(conn1.wal_frame_count().unwrap() - 1) { + for frame_id in 1..=(conn1.wal_state().unwrap().max_frame - 1) { conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } conn2.wal_insert_end().unwrap(); - assert_eq!(conn2.wal_frame_count().unwrap(), 2); + assert_eq!(conn2.wal_state().unwrap().max_frame, 2); assert_eq!( limbo_exec_rows(&db2, &conn2, "SELECT x, length(y) FROM t"), vec![] as Vec> @@ -246,7 +249,7 @@ fn test_wal_frame_conflict() { conn2 .execute("CREATE TABLE q(x INTEGER PRIMARY KEY, y)") .unwrap(); - assert_eq!(conn1.wal_frame_count().unwrap(), 2); + assert_eq!(conn1.wal_state().unwrap().max_frame, 2); let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); conn1.wal_get_frame(1, &mut frame).unwrap(); @@ -268,7 +271,7 @@ fn test_wal_frame_far_away_write() { conn1 .execute("INSERT INTO t VALUES (1024, randomblob(4096 * 10))") .unwrap(); - assert_eq!(conn1.wal_frame_count().unwrap(), 14); + assert_eq!(conn1.wal_state().unwrap().max_frame, 14); let mut frame = [0u8; 24 + 4096]; conn2.wal_insert_begin().unwrap(); @@ -298,17 +301,17 @@ fn test_wal_frame_api_no_schema_changes_fuzz() { let mut rng = ChaCha8Rng::seed_from_u64(seed); println!("SEED: {seed}"); - let (mut size, mut synced_frame) = (0, conn2.wal_frame_count().unwrap()); - let mut commit_frames = vec![conn1.wal_frame_count().unwrap()]; + let (mut size, mut synced_frame) = (0, conn2.wal_state().unwrap().max_frame); + let mut commit_frames = vec![conn1.wal_state().unwrap().max_frame]; for _ in 0..256 { if rng.next_u32() % 10 != 0 { let key = rng.next_u32(); let length = rng.next_u32() % (4 * 4096); let query = format!("INSERT INTO t VALUES ({key}, randomblob({length}))"); conn1.execute(&query).unwrap(); - commit_frames.push(conn1.wal_frame_count().unwrap()); + commit_frames.push(conn1.wal_state().unwrap().max_frame); } else { - let last_frame = conn1.wal_frame_count().unwrap(); + let last_frame = conn1.wal_state().unwrap().max_frame; let next_frame = synced_frame + (rng.next_u32() as u64 % (last_frame - synced_frame + 1)); let mut frame = [0u8; 24 + 4096]; @@ -354,7 +357,7 @@ fn test_wal_api_changed_pages() { .collect::>(), HashSet::from([1, 2, 3]) ); - let frames = conn1.wal_frame_count().unwrap(); + let frames = conn1.wal_state().unwrap().max_frame; conn1.execute("INSERT INTO t VALUES (1, 2)").unwrap(); conn1.execute("INSERT INTO t VALUES (3, 4)").unwrap(); assert_eq!( @@ -365,7 +368,7 @@ fn test_wal_api_changed_pages() { .collect::>(), HashSet::from([2]) ); - let frames = conn1.wal_frame_count().unwrap(); + let frames = conn1.wal_state().unwrap().max_frame; conn1 .execute("INSERT INTO t VALUES (1024, randomblob(4096 * 2))") .unwrap(); @@ -396,7 +399,7 @@ fn revert_to(conn: &Arc, frame_watermark: u64) -> turso_ frames.push((page_id, frame)); } - let mut frame_no = conn.wal_frame_count().unwrap(); + let mut frame_no = conn.wal_state().unwrap().max_frame; for (i, (page_id, mut frame)) in frames.iter().enumerate() { let info = WalFrameInfo { db_size: if i == frames.len() - 1 { @@ -422,11 +425,11 @@ fn test_wal_api_revert_pages() { conn1 .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") .unwrap(); - let watermark1 = conn1.wal_frame_count().unwrap(); + let watermark1 = conn1.wal_state().unwrap().max_frame; conn1 .execute("INSERT INTO t VALUES (1, randomblob(10))") .unwrap(); - let watermark2 = conn1.wal_frame_count().unwrap(); + let watermark2 = conn1.wal_state().unwrap().max_frame; conn1 .execute("INSERT INTO t VALUES (3, randomblob(20))") @@ -467,15 +470,15 @@ fn test_wal_upper_bound_passive() { writer .execute("create table test(id integer primary key, value text)") .unwrap(); - let watermark0 = writer.wal_frame_count().unwrap(); + let watermark0 = writer.wal_state().unwrap().max_frame; writer .execute("insert into test values (1, 'hello')") .unwrap(); - let watermark1 = writer.wal_frame_count().unwrap(); + let watermark1 = writer.wal_state().unwrap().max_frame; writer .execute("insert into test values (2, 'turso')") .unwrap(); - let watermark2 = writer.wal_frame_count().unwrap(); + let watermark2 = writer.wal_state().unwrap().max_frame; let expected = [ vec![ turso_core::types::Value::Integer(1), @@ -525,7 +528,7 @@ fn test_wal_upper_bound_truncate() { writer .execute("insert into test values (1, 'hello')") .unwrap(); - let watermark = writer.wal_frame_count().unwrap(); + let watermark = writer.wal_state().unwrap().max_frame; writer .execute("insert into test values (2, 'turso')") .unwrap(); @@ -538,3 +541,41 @@ fn test_wal_upper_bound_truncate() { LimboError::Busy )); } + +#[test] +fn test_wal_state_checkpoint_seq() { + let db = TempDatabase::new_empty(false); + let writer = db.connect_limbo(); + + writer + .execute("create table test(id integer primary key, value text)") + .unwrap(); + writer + .execute("insert into test values (1, 'hello')") + .unwrap(); + writer + .checkpoint(CheckpointMode::Truncate { + upper_bound_inclusive: None, + }) + .unwrap(); + writer + .execute("insert into test values (2, 'turso')") + .unwrap(); + writer + .checkpoint(CheckpointMode::Truncate { + upper_bound_inclusive: None, + }) + .unwrap(); + writer + .execute("insert into test values (3, 'limbo')") + .unwrap(); + + let state = writer.wal_state().unwrap(); + assert_eq!( + state, + WalState { + checkpoint_seq_no: 2, + max_frame: 1 + } + ); +}