From 010fb1c12ac6eccf768ff1e75af2f671a50642f1 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Sun, 20 Jul 2025 19:35:54 +0300 Subject: [PATCH] fix/pager/cacheflush: cacheflush shouldn't commit --- core/storage/pager.rs | 1 - core/storage/wal.rs | 23 +++--- tests/integration/common.rs | 147 ++++++++++++++++++++++++++++++++++++ 3 files changed, 160 insertions(+), 11 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index a4c6265f8..7be8bb390 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -863,7 +863,6 @@ impl Pager { let in_flight = *self.flush_info.borrow().in_flight_writes.borrow(); if in_flight == 0 { self.flush_info.borrow_mut().state = CacheFlushState::Start; - self.wal.borrow_mut().finish_append_frames_commit()?; return Ok(IOResult::Done(())); } else { return Ok(IOResult::IO); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 45c202a76..45d0c0553 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -925,19 +925,22 @@ impl Wal for WalFile { let shared = self.get_shared(); let max_frame = shared.max_frame.load(Ordering::SeqCst); tracing::debug!(to_max_frame = max_frame); - let mut frame_cache = shared.frame_cache.lock(); - for (_, frames) in frame_cache.iter_mut() { - let mut last_valid_frame = frames.len(); - for frame in frames.iter().rev() { - if *frame <= max_frame { - break; + { + let mut frame_cache = shared.frame_cache.lock(); + for (_, frames) in frame_cache.iter_mut() { + let mut last_valid_frame = frames.len(); + for frame in frames.iter().rev() { + if *frame <= max_frame { + break; + } + last_valid_frame -= 1; } - last_valid_frame -= 1; + frames.truncate(last_valid_frame); } - frames.truncate(last_valid_frame); + let mut pages_in_frames = shared.pages_in_frames.lock(); + pages_in_frames.truncate(self.start_pages_in_frames); } - let mut pages_in_frames = shared.pages_in_frames.lock(); - pages_in_frames.truncate(self.start_pages_in_frames); + self.last_checksum = shared.last_checksum; } Ok(()) } diff --git a/tests/integration/common.rs b/tests/integration/common.rs index 51943888f..da5227a58 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -242,6 +242,7 @@ pub(crate) fn limbo_exec_rows_error( mod tests { use std::vec; use tempfile::TempDir; + use turso_core::types::IOResult; use super::{limbo_exec_rows, limbo_exec_rows_error, TempDatabase}; use rusqlite::types::Value; @@ -384,4 +385,150 @@ mod tests { Ok(()) } + + #[test] + /// Test that a transaction cannot read uncommitted changes of another transaction (no: READ UNCOMMITTED) + fn test_tx_isolation_no_dirty_reads() -> anyhow::Result<()> { + let path = TempDir::new() + .unwrap() + .keep() + .join("temp_transaction_isolation"); + let db = TempDatabase::new_with_existent(&path, true); + + // Create two separate connections + let conn1 = db.connect_limbo(); + + // Create test table + let _ = limbo_exec_rows(&db, &conn1, "CREATE TABLE t(x INTEGER)"); + + // Begin transaction on first connection and insert a value + let _ = limbo_exec_rows(&db, &conn1, "BEGIN"); + let _ = limbo_exec_rows(&db, &conn1, "INSERT INTO t VALUES (42)"); + while matches!(conn1.cacheflush().unwrap(), IOResult::IO) { + db.io.run_once().unwrap(); + } + + // Second connection should not see uncommitted changes + let conn2 = db.connect_limbo(); + let ret = limbo_exec_rows(&db, &conn2, "SELECT x FROM t"); + assert!( + ret.is_empty(), + "DIRTY READ: Second connection saw uncommitted changes: {ret:?}" + ); + + Ok(()) + } + + #[test] + #[ignore] + /// FIXME: This test fails. + /// Test that a transaction cannot read committed changes that were committed after the transaction started (no: READ COMMITTED) + fn test_tx_isolation_no_read_committed() -> anyhow::Result<()> { + let path = TempDir::new() + .unwrap() + .keep() + .join("temp_transaction_isolation"); + let db = TempDatabase::new_with_existent(&path, true); + + // Create two separate connections + let conn1 = db.connect_limbo(); + + // Create test table + let _ = limbo_exec_rows(&db, &conn1, "CREATE TABLE t(x INTEGER)"); + + // Begin transaction on first connection + let _ = limbo_exec_rows(&db, &conn1, "BEGIN"); + + // Commit a value from the second connection + let conn2 = db.connect_limbo(); + let _ = limbo_exec_rows(&db, &conn2, "BEGIN"); + let _ = limbo_exec_rows(&db, &conn2, "INSERT INTO t VALUES (42)"); + let _ = limbo_exec_rows(&db, &conn2, "COMMIT"); + + // First connection should not see the committed value + let ret = limbo_exec_rows(&db, &conn1, "SELECT x FROM t"); + assert!( + ret.is_empty(), + "SNAPSHOT ISOLATION VIOLATION: Older txn saw committed changes from newer txn: {ret:?}" + ); + + Ok(()) + } + + #[test] + /// Test that a txn can write a row, flush to WAL without committing, then rollback, and finally commit a second row. + /// Reopening database should show only the second row. + fn test_tx_isolation_cacheflush_rollback_commit() -> anyhow::Result<()> { + let path = TempDir::new() + .unwrap() + .keep() + .join("temp_transaction_isolation"); + let db = TempDatabase::new_with_existent(&path, true); + + let conn = db.connect_limbo(); + + // Create test table + let _ = limbo_exec_rows(&db, &conn, "CREATE TABLE t(x INTEGER)"); + + // Begin transaction on first connection and insert a value + let _ = limbo_exec_rows(&db, &conn, "BEGIN"); + let _ = limbo_exec_rows(&db, &conn, "INSERT INTO t VALUES (42)"); + while matches!(conn.cacheflush().unwrap(), IOResult::IO) { + db.io.run_once().unwrap(); + } + + // Rollback the transaction + let _ = limbo_exec_rows(&db, &conn, "ROLLBACK"); + + // Now actually commit a row + let _ = limbo_exec_rows(&db, &conn, "INSERT INTO t VALUES (69)"); + + // Reopen the database + let db = TempDatabase::new_with_existent(&path, true); + let conn = db.connect_limbo(); + + // Should only see the last committed value + let ret = limbo_exec_rows(&db, &conn, "SELECT x FROM t"); + assert_eq!( + ret, + vec![vec![Value::Integer(69)]], + "Expected 1 row but got {ret:?}" + ); + + Ok(()) + } + + #[test] + /// Test that a txn can write a row and flush to WAL without committing, then reopen DB and not see the row + fn test_tx_isolation_cacheflush_reopen() -> anyhow::Result<()> { + let path = TempDir::new() + .unwrap() + .keep() + .join("temp_transaction_isolation"); + let db = TempDatabase::new_with_existent(&path, true); + + let conn = db.connect_limbo(); + + // Create test table + let _ = limbo_exec_rows(&db, &conn, "CREATE TABLE t(x INTEGER)"); + + // Begin transaction and insert a value + let _ = limbo_exec_rows(&db, &conn, "BEGIN"); + let _ = limbo_exec_rows(&db, &conn, "INSERT INTO t VALUES (42)"); + + // Flush to WAL but don't commit + while matches!(conn.cacheflush().unwrap(), IOResult::IO) { + db.io.run_once().unwrap(); + } + + // Reopen the database without committing + let db = TempDatabase::new_with_existent(&path, true); + let conn = db.connect_limbo(); + + // Should see no rows since transaction was never committed + let ret = limbo_exec_rows(&db, &conn, "SELECT x FROM t"); + assert!(ret.is_empty(), "Expected 0 rows but got {ret:?}"); + + Ok(()) + } }