mvcc: add test to verify that reading both checkpointed and non-checkpointed tables works

This commit is contained in:
Jussi Saurio
2025-09-30 14:11:39 +03:00
parent 9788f6d005
commit a1bdad58b6
2 changed files with 125 additions and 1 deletions

View File

@@ -63,6 +63,23 @@ impl TempDatabase {
) )
} }
pub fn new_with_existent_with_opts(db_path: &Path, opts: turso_core::DatabaseOpts) -> Self {
let io: Arc<dyn IO + Send> = Arc::new(turso_core::PlatformIO::new().unwrap());
let db = Database::open_file_with_flags(
io.clone(),
db_path.to_str().unwrap(),
turso_core::OpenFlags::default(),
opts,
None,
)
.unwrap();
Self {
path: db_path.to_path_buf(),
io,
db,
}
}
pub fn new_with_existent_with_flags( pub fn new_with_existent_with_flags(
db_path: &Path, db_path: &Path,
flags: turso_core::OpenFlags, flags: turso_core::OpenFlags,

View File

@@ -1,4 +1,6 @@
use turso_core::{LimboError, Result, StepResult, Value}; use std::sync::Arc;
use turso_core::{Connection, LimboError, Result, Statement, StepResult, Value};
use crate::common::TempDatabase; use crate::common::TempDatabase;
@@ -539,6 +541,111 @@ fn test_mvcc_checkpoint_works() {
); );
} }
fn execute_and_log(conn: &Arc<Connection>, query: &str) -> Result<()> {
tracing::info!("Executing query: {}", query);
conn.execute(query)
}
fn query_and_log(conn: &Arc<Connection>, query: &str) -> Result<Option<Statement>> {
tracing::info!("Executing query: {}", query);
conn.query(query)
}
#[test]
fn test_mvcc_recovery_of_both_checkpointed_and_noncheckpointed_tables_works() {
let tmp_db = TempDatabase::new_with_opts(
"test_mvcc_recovery_of_both_checkpointed_and_noncheckpointed_tables_works.db",
turso_core::DatabaseOpts::new().with_mvcc(true),
);
let conn = tmp_db.connect_limbo();
// Create first table and insert rows
execute_and_log(
&conn,
"CREATE TABLE test1 (id INTEGER PRIMARY KEY, value INTEGER)",
)
.unwrap();
let mut expected_rows1 = Vec::new();
for i in 0..10 {
let value = i * 10;
execute_and_log(
&conn,
&format!("INSERT INTO test1 (id, value) VALUES ({}, {})", i, value),
)
.unwrap();
expected_rows1.push((i, value));
}
// Checkpoint
execute_and_log(&conn, "PRAGMA wal_checkpoint(TRUNCATE)").unwrap();
// Create second table and insert rows
execute_and_log(
&conn,
"CREATE TABLE test2 (id INTEGER PRIMARY KEY, value INTEGER)",
)
.unwrap();
let mut expected_rows2 = Vec::new();
for i in 0..5 {
let value = i * 20;
execute_and_log(
&conn,
&format!("INSERT INTO test2 (id, value) VALUES ({}, {})", i, value),
)
.unwrap();
expected_rows2.push((i, value));
}
// Sort expected rows
expected_rows1.sort_by(|a, b| match a.0.cmp(&b.0) {
std::cmp::Ordering::Equal => a.1.cmp(&b.1),
other => other,
});
expected_rows2.sort_by(|a, b| match a.0.cmp(&b.0) {
std::cmp::Ordering::Equal => a.1.cmp(&b.1),
other => other,
});
let path = tmp_db.path.clone();
drop(conn);
drop(tmp_db);
// Close and reopen database
let tmp_db = TempDatabase::new_with_existent_with_opts(
&path,
turso_core::DatabaseOpts::new().with_mvcc(true),
);
let conn = tmp_db.connect_limbo();
// Verify table1 rows
let stmt = query_and_log(&conn, "SELECT * FROM test1 ORDER BY id, value")
.unwrap()
.unwrap();
let rows = helper_read_all_rows(stmt);
let expected1: Vec<Vec<Value>> = expected_rows1
.into_iter()
.map(|(id, value)| vec![Value::Integer(id as i64), Value::Integer(value as i64)])
.collect();
assert_eq!(rows, expected1);
// Verify table2 rows
let stmt = query_and_log(&conn, "SELECT * FROM test2 ORDER BY id, value")
.unwrap()
.unwrap();
let rows = helper_read_all_rows(stmt);
let expected2: Vec<Vec<Value>> = expected_rows2
.into_iter()
.map(|(id, value)| vec![Value::Integer(id as i64), Value::Integer(value as i64)])
.collect();
assert_eq!(rows, expected2);
}
fn helper_read_all_rows(mut stmt: turso_core::Statement) -> Vec<Vec<Value>> { fn helper_read_all_rows(mut stmt: turso_core::Statement) -> Vec<Vec<Value>> {
let mut ret = Vec::new(); let mut ret = Vec::new();
loop { loop {