diff --git a/tests/integration/common.rs b/tests/integration/common.rs index 2a687ade2..e3df43a07 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -63,6 +63,23 @@ impl TempDatabase { ) } + pub fn new_with_existent_with_opts(db_path: &Path, opts: turso_core::DatabaseOpts) -> Self { + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + let db = Database::open_file_with_flags( + io.clone(), + db_path.to_str().unwrap(), + turso_core::OpenFlags::default(), + opts, + None, + ) + .unwrap(); + Self { + path: db_path.to_path_buf(), + io, + db, + } + } + pub fn new_with_existent_with_flags( db_path: &Path, flags: turso_core::OpenFlags, diff --git a/tests/integration/query_processing/test_transactions.rs b/tests/integration/query_processing/test_transactions.rs index 6ba648944..37ddefa79 100644 --- a/tests/integration/query_processing/test_transactions.rs +++ b/tests/integration/query_processing/test_transactions.rs @@ -1,4 +1,6 @@ -use turso_core::{LimboError, Result, StepResult, Value}; +use std::sync::Arc; + +use turso_core::{Connection, LimboError, Result, Statement, StepResult, Value}; use crate::common::TempDatabase; @@ -539,6 +541,111 @@ fn test_mvcc_checkpoint_works() { ); } +fn execute_and_log(conn: &Arc, query: &str) -> Result<()> { + tracing::info!("Executing query: {}", query); + conn.execute(query) +} + +fn query_and_log(conn: &Arc, query: &str) -> Result> { + tracing::info!("Executing query: {}", query); + conn.query(query) +} + +#[test] +fn test_mvcc_recovery_of_both_checkpointed_and_noncheckpointed_tables_works() { + let tmp_db = TempDatabase::new_with_opts( + "test_mvcc_recovery_of_both_checkpointed_and_noncheckpointed_tables_works.db", + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Create first table and insert rows + execute_and_log( + &conn, + "CREATE TABLE test1 (id INTEGER PRIMARY KEY, value INTEGER)", + ) + .unwrap(); + + let mut expected_rows1 = Vec::new(); + for i in 0..10 { + let value = i * 10; + execute_and_log( + &conn, + &format!("INSERT INTO test1 (id, value) VALUES ({}, {})", i, value), + ) + .unwrap(); + expected_rows1.push((i, value)); + } + + // Checkpoint + execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap(); + + // Create second table and insert rows + execute_and_log( + &conn, + "CREATE TABLE test2 (id INTEGER PRIMARY KEY, value INTEGER)", + ) + .unwrap(); + + let mut expected_rows2 = Vec::new(); + for i in 0..5 { + let value = i * 20; + execute_and_log( + &conn, + &format!("INSERT INTO test2 (id, value) VALUES ({}, {})", i, value), + ) + .unwrap(); + expected_rows2.push((i, value)); + } + + // Sort expected rows + expected_rows1.sort_by(|a, b| match a.0.cmp(&b.0) { + std::cmp::Ordering::Equal => a.1.cmp(&b.1), + other => other, + }); + expected_rows2.sort_by(|a, b| match a.0.cmp(&b.0) { + std::cmp::Ordering::Equal => a.1.cmp(&b.1), + other => other, + }); + + let path = tmp_db.path.clone(); + drop(conn); + drop(tmp_db); + + // Close and reopen database + let tmp_db = TempDatabase::new_with_existent_with_opts( + &path, + turso_core::DatabaseOpts::new().with_mvcc(true), + ); + let conn = tmp_db.connect_limbo(); + + // Verify table1 rows + let stmt = query_and_log(&conn, "SELECT * FROM test1 ORDER BY id, value") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + + let expected1: Vec> = expected_rows1 + .into_iter() + .map(|(id, value)| vec![Value::Integer(id as i64), Value::Integer(value as i64)]) + .collect(); + + assert_eq!(rows, expected1); + + // Verify table2 rows + let stmt = query_and_log(&conn, "SELECT * FROM test2 ORDER BY id, value") + .unwrap() + .unwrap(); + let rows = helper_read_all_rows(stmt); + + let expected2: Vec> = expected_rows2 + .into_iter() + .map(|(id, value)| vec![Value::Integer(id as i64), Value::Integer(value as i64)]) + .collect(); + + assert_eq!(rows, expected2); +} + fn helper_read_all_rows(mut stmt: turso_core::Statement) -> Vec> { let mut ret = Vec::new(); loop {