diff --git a/tests/integration/functions/test_wal_api.rs b/tests/integration/functions/test_wal_api.rs index cf1548494..2534223e8 100644 --- a/tests/integration/functions/test_wal_api.rs +++ b/tests/integration/functions/test_wal_api.rs @@ -3,6 +3,7 @@ use std::{collections::HashSet, path::PathBuf, sync::Arc}; use rand::{RngCore, SeedableRng}; use rand_chacha::ChaCha8Rng; use rusqlite::types::Value; +use tempfile::TempDir; use turso_core::{ types::{WalFrameInfo, WalState}, CheckpointMode, LimboError, StepResult, @@ -53,7 +54,7 @@ fn test_wal_frame_transfer_no_schema_changes() { conn2.wal_insert_frame(frame_id, &frame).unwrap(); } - conn2.wal_insert_end().unwrap(); + conn2.wal_insert_end(false).unwrap(); assert_eq!(conn2.wal_state().unwrap().max_frame, 15); assert_eq!( limbo_exec_rows(&db2, &conn2, "SELECT x, length(y) FROM t"), @@ -84,7 +85,7 @@ fn test_wal_frame_transfer_various_schema_changes() { conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } - conn2.wal_insert_end().unwrap(); + conn2.wal_insert_end(false).unwrap(); synced_frame = last_frame; }; @@ -150,7 +151,7 @@ fn test_wal_frame_transfer_schema_changes() { commits += 1; } } - conn2.wal_insert_end().unwrap(); + conn2.wal_insert_end(false).unwrap(); assert_eq!(commits, 3); assert_eq!(conn2.wal_state().unwrap().max_frame, 15); assert_eq!( @@ -186,7 +187,7 @@ fn test_wal_frame_transfer_no_schema_changes_rollback() { conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } - conn2.wal_insert_end().unwrap(); + conn2.wal_insert_end(false).unwrap(); assert_eq!(conn2.wal_state().unwrap().max_frame, 2); assert_eq!( limbo_exec_rows(&db2, &conn2, "SELECT x, length(y) FROM t"), @@ -221,7 +222,7 @@ fn test_wal_frame_transfer_schema_changes_rollback() { conn1.wal_get_frame(frame_id, &mut frame).unwrap(); conn2.wal_insert_frame(frame_id, &frame).unwrap(); } - conn2.wal_insert_end().unwrap(); + conn2.wal_insert_end(false).unwrap(); assert_eq!(conn2.wal_state().unwrap().max_frame, 2); assert_eq!( limbo_exec_rows(&db2, &conn2, "SELECT x, length(y) FROM t"), @@ -320,7 +321,7 @@ fn test_wal_frame_api_no_schema_changes_fuzz() { conn1.wal_get_frame(frame_no, &mut frame).unwrap(); conn2.wal_insert_frame(frame_no, &frame[..]).unwrap(); } - conn2.wal_insert_end().unwrap(); + conn2.wal_insert_end(false).unwrap(); for (i, committed) in commit_frames.iter().enumerate() { if *committed <= next_frame { size = size.max(i); @@ -413,7 +414,7 @@ fn revert_to(conn: &Arc, frame_watermark: u64) -> turso_ frame_no += 1; conn.wal_insert_frame(frame_no, &frame)?; } - conn.wal_insert_end()?; + conn.wal_insert_end(false)?; Ok(()) } @@ -664,7 +665,7 @@ fn test_wal_revert_change_db_size() { frames_count += 1; writer.wal_insert_frame(frames_count, &frame).unwrap(); } - writer.wal_insert_end().unwrap(); + writer.wal_insert_end(false).unwrap(); writer .execute("insert into t values (3, randomblob(30 * 4096))") @@ -674,3 +675,254 @@ fn test_wal_revert_change_db_size() { vec![vec![Value::Integer(3), Value::Integer(30 * 4096)]] ); } + +#[test] +fn test_wal_api_exec_commit() { + let db = TempDatabase::new_empty(false); + let writer = db.connect_limbo(); + + writer + .execute("create table test(id integer primary key, value text)") + .unwrap(); + + writer.wal_insert_begin().unwrap(); + + writer + .execute("insert into test values (1, 'hello')") + .unwrap(); + writer + .execute("insert into test values (2, 'turso')") + .unwrap(); + + writer.wal_insert_end(true).unwrap(); + + let mut stmt = writer.prepare("select * from test").unwrap(); + let mut rows: Vec> = Vec::new(); + loop { + let result = stmt.step(); + match result { + Ok(StepResult::Row) => rows.push(stmt.row().unwrap().get_values().cloned().collect()), + Ok(StepResult::IO) => db.io.run_once().unwrap(), + Ok(StepResult::Done) => break, + result => panic!("unexpected step result: {result:?}"), + } + } + tracing::info!("rows: {:?}", rows); + assert_eq!( + rows, + vec![ + vec![ + turso_core::types::Value::Integer(1), + turso_core::types::Value::Text(turso_core::types::Text::new("hello")), + ], + vec![ + turso_core::types::Value::Integer(2), + turso_core::types::Value::Text(turso_core::types::Text::new("turso")), + ], + ] + ); +} + +#[test] +fn test_wal_api_exec_rollback() { + let db = TempDatabase::new_empty(false); + let writer = db.connect_limbo(); + + writer + .execute("create table test(id integer primary key, value text)") + .unwrap(); + + writer.wal_insert_begin().unwrap(); + + writer + .execute("insert into test values (1, 'hello')") + .unwrap(); + writer + .execute("insert into test values (2, 'turso')") + .unwrap(); + + writer.wal_insert_end(false).unwrap(); + + let mut stmt = writer.prepare("select * from test").unwrap(); + let mut rows: Vec> = Vec::new(); + loop { + let result = stmt.step(); + match result { + Ok(StepResult::Row) => rows.push(stmt.row().unwrap().get_values().cloned().collect()), + Ok(StepResult::IO) => db.io.run_once().unwrap(), + Ok(StepResult::Done) => break, + result => panic!("unexpected step result: {result:?}"), + } + } + tracing::info!("rows: {:?}", rows); + assert_eq!(rows, vec![] as Vec>); +} + +#[test] +fn test_wal_api_insert_exec_mix() { + let db = TempDatabase::new_empty(false); + let conn = db.connect_limbo(); + + conn.execute("create table a(x, y)").unwrap(); + conn.execute("insert into a values (1, randomblob(1 * 4096))") + .unwrap(); + let watermark = conn.wal_state().unwrap().max_frame; + conn.execute("create table b(x, y)").unwrap(); + conn.execute("insert into b values (2, randomblob(2 * 4096))") + .unwrap(); + + let pages = conn.wal_changed_pages_after(watermark).unwrap(); + let mut frames = Vec::new(); + let mut frame = [0u8; 4096 + 24]; + for page_no in pages { + let page = &mut frame[24..]; + if !conn + .try_wal_watermark_read_page(page_no, page, Some(watermark)) + .unwrap() + { + continue; + } + let info = WalFrameInfo { + db_size: 0, + page_no: page_no, + }; + info.put_to_frame_header(&mut frame); + frames.push(frame); + } + + let schema_version = conn.read_schema_version().unwrap(); + conn.wal_insert_begin().unwrap(); + + let frames_cnt = conn.wal_state().unwrap().max_frame; + for (i, frame) in frames.iter().enumerate() { + conn.wal_insert_frame(frames_cnt + i as u64 + 1, frame) + .unwrap(); + } + conn.write_schema_version(schema_version + 1).unwrap(); + conn.execute("insert into a values (3, randomblob(3 * 4096))") + .unwrap(); + conn.execute("create table b(x, y)").unwrap(); + conn.execute("insert into b values (4, randomblob(4 * 4096))") + .unwrap(); + + conn.wal_insert_end(true).unwrap(); + + let mut stmt = conn.prepare("select x, length(y) from a").unwrap(); + let mut rows: Vec> = Vec::new(); + loop { + let result = stmt.step(); + match result { + Ok(StepResult::Row) => rows.push(stmt.row().unwrap().get_values().cloned().collect()), + Ok(StepResult::IO) => db.io.run_once().unwrap(), + Ok(StepResult::Done) => break, + result => panic!("unexpected step result: {result:?}"), + } + } + tracing::info!("rows: {:?}", rows); + assert_eq!( + rows, + vec![ + vec![ + turso_core::types::Value::Integer(1), + turso_core::types::Value::Integer(1 * 4096), + ], + vec![ + turso_core::types::Value::Integer(3), + turso_core::types::Value::Integer(3 * 4096), + ], + ] + ); + + let mut stmt = conn.prepare("select x, length(y) from b").unwrap(); + let mut rows: Vec> = Vec::new(); + loop { + let result = stmt.step(); + match result { + Ok(StepResult::Row) => rows.push(stmt.row().unwrap().get_values().cloned().collect()), + Ok(StepResult::IO) => db.io.run_once().unwrap(), + Ok(StepResult::Done) => break, + result => panic!("unexpected step result: {result:?}"), + } + } + tracing::info!("rows: {:?}", rows); + assert_eq!( + rows, + vec![vec![ + turso_core::types::Value::Integer(4), + turso_core::types::Value::Integer(4 * 4096), + ]] + ); +} + +#[test] +fn test_db_share_same_file() { + let mut path = TempDir::new().unwrap().keep(); + let (mut rng, _) = rng_from_time(); + path.push(format!("test-{}.db", rng.next_u32())); + + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + let db_file = io + .open_file(path.to_str().unwrap(), turso_core::OpenFlags::Create, false) + .unwrap(); + let db_file = Arc::new(turso_core::storage::database::DatabaseFile::new(db_file)); + let db1 = turso_core::Database::open_with_flags( + io.clone(), + path.to_str().unwrap(), + db_file.clone(), + turso_core::OpenFlags::Create, + false, + true, + false, + ) + .unwrap(); + let conn1 = db1.connect().unwrap(); + conn1.wal_auto_checkpoint_disable(); + + conn1.execute("create table a(x, y)").unwrap(); + conn1 + .execute("insert into a values (1, randomblob(1 * 4096))") + .unwrap(); + conn1 + .checkpoint(CheckpointMode::Truncate { + upper_bound_inclusive: None, + }) + .unwrap(); + + conn1 + .execute("insert into a values (2, randomblob(2 * 4096))") + .unwrap(); + + let db2 = turso_core::Database::open_with_flags_bypass_registry( + io.clone(), + path.to_str().unwrap(), + &format!("{}-wal-copy", path.to_str().unwrap()), + db_file.clone(), + turso_core::OpenFlags::empty(), + false, + true, + false, + ) + .unwrap(); + let conn2 = db2.connect().unwrap(); + conn2.wal_auto_checkpoint_disable(); + + let mut stmt = conn2.prepare("select x, length(y) from a").unwrap(); + let mut rows: Vec> = Vec::new(); + loop { + let result = stmt.step(); + match result { + Ok(StepResult::Row) => rows.push(stmt.row().unwrap().get_values().cloned().collect()), + Ok(StepResult::IO) => db2.io.run_once().unwrap(), + Ok(StepResult::Done) => break, + result => panic!("unexpected step result: {result:?}"), + } + } + tracing::info!("rows: {:?}", rows); + assert_eq!( + rows, + vec![vec![ + turso_core::types::Value::Integer(1), + turso_core::types::Value::Integer(1 * 4096), + ]] + ); +} diff --git a/tests/integration/query_processing/test_write_path.rs b/tests/integration/query_processing/test_write_path.rs index 8ae4b37a6..e2cbfa06e 100644 --- a/tests/integration/query_processing/test_write_path.rs +++ b/tests/integration/query_processing/test_write_path.rs @@ -275,7 +275,6 @@ fn test_statement_reset() -> anyhow::Result<()> { } #[test] -#[ignore] fn test_wal_checkpoint() -> anyhow::Result<()> { let _ = env_logger::try_init(); let tmp_db =