mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-20 15:35:29 +01:00
Merge 'fix/pager/cacheflush: cacheflush shouldn't commit' from Jussi Saurio
Closes #2188 Closes #2194 Doesn't fix #2192 but adds an ignored test Reviewed-by: Pere Diaz Bou <pere-altea@homail.com> Closes #2189
This commit is contained in:
@@ -863,7 +863,6 @@ impl Pager {
|
||||
let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
|
||||
if in_flight == 0 {
|
||||
self.flush_info.borrow_mut().state = CacheFlushState::Start;
|
||||
self.wal.borrow_mut().finish_append_frames_commit()?;
|
||||
return Ok(IOResult::Done(()));
|
||||
} else {
|
||||
return Ok(IOResult::IO);
|
||||
|
||||
@@ -925,19 +925,22 @@ impl Wal for WalFile {
|
||||
let shared = self.get_shared();
|
||||
let max_frame = shared.max_frame.load(Ordering::SeqCst);
|
||||
tracing::debug!(to_max_frame = max_frame);
|
||||
let mut frame_cache = shared.frame_cache.lock();
|
||||
for (_, frames) in frame_cache.iter_mut() {
|
||||
let mut last_valid_frame = frames.len();
|
||||
for frame in frames.iter().rev() {
|
||||
if *frame <= max_frame {
|
||||
break;
|
||||
{
|
||||
let mut frame_cache = shared.frame_cache.lock();
|
||||
for (_, frames) in frame_cache.iter_mut() {
|
||||
let mut last_valid_frame = frames.len();
|
||||
for frame in frames.iter().rev() {
|
||||
if *frame <= max_frame {
|
||||
break;
|
||||
}
|
||||
last_valid_frame -= 1;
|
||||
}
|
||||
last_valid_frame -= 1;
|
||||
frames.truncate(last_valid_frame);
|
||||
}
|
||||
frames.truncate(last_valid_frame);
|
||||
let mut pages_in_frames = shared.pages_in_frames.lock();
|
||||
pages_in_frames.truncate(self.start_pages_in_frames);
|
||||
}
|
||||
let mut pages_in_frames = shared.pages_in_frames.lock();
|
||||
pages_in_frames.truncate(self.start_pages_in_frames);
|
||||
self.last_checksum = shared.last_checksum;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -242,6 +242,7 @@ pub(crate) fn limbo_exec_rows_error(
|
||||
mod tests {
|
||||
use std::vec;
|
||||
use tempfile::TempDir;
|
||||
use turso_core::types::IOResult;
|
||||
|
||||
use super::{limbo_exec_rows, limbo_exec_rows_error, TempDatabase};
|
||||
use rusqlite::types::Value;
|
||||
@@ -384,4 +385,150 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// Test that a transaction cannot read uncommitted changes of another transaction (no: READ UNCOMMITTED)
|
||||
fn test_tx_isolation_no_dirty_reads() -> anyhow::Result<()> {
|
||||
let path = TempDir::new()
|
||||
.unwrap()
|
||||
.keep()
|
||||
.join("temp_transaction_isolation");
|
||||
let db = TempDatabase::new_with_existent(&path, true);
|
||||
|
||||
// Create two separate connections
|
||||
let conn1 = db.connect_limbo();
|
||||
|
||||
// Create test table
|
||||
let _ = limbo_exec_rows(&db, &conn1, "CREATE TABLE t(x INTEGER)");
|
||||
|
||||
// Begin transaction on first connection and insert a value
|
||||
let _ = limbo_exec_rows(&db, &conn1, "BEGIN");
|
||||
let _ = limbo_exec_rows(&db, &conn1, "INSERT INTO t VALUES (42)");
|
||||
while matches!(conn1.cacheflush().unwrap(), IOResult::IO) {
|
||||
db.io.run_once().unwrap();
|
||||
}
|
||||
|
||||
// Second connection should not see uncommitted changes
|
||||
let conn2 = db.connect_limbo();
|
||||
let ret = limbo_exec_rows(&db, &conn2, "SELECT x FROM t");
|
||||
assert!(
|
||||
ret.is_empty(),
|
||||
"DIRTY READ: Second connection saw uncommitted changes: {ret:?}"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
/// FIXME: This test fails.
|
||||
/// Test that a transaction cannot read committed changes that were committed after the transaction started (no: READ COMMITTED)
|
||||
fn test_tx_isolation_no_read_committed() -> anyhow::Result<()> {
|
||||
let path = TempDir::new()
|
||||
.unwrap()
|
||||
.keep()
|
||||
.join("temp_transaction_isolation");
|
||||
let db = TempDatabase::new_with_existent(&path, true);
|
||||
|
||||
// Create two separate connections
|
||||
let conn1 = db.connect_limbo();
|
||||
|
||||
// Create test table
|
||||
let _ = limbo_exec_rows(&db, &conn1, "CREATE TABLE t(x INTEGER)");
|
||||
|
||||
// Begin transaction on first connection
|
||||
let _ = limbo_exec_rows(&db, &conn1, "BEGIN");
|
||||
|
||||
// Commit a value from the second connection
|
||||
let conn2 = db.connect_limbo();
|
||||
let _ = limbo_exec_rows(&db, &conn2, "BEGIN");
|
||||
let _ = limbo_exec_rows(&db, &conn2, "INSERT INTO t VALUES (42)");
|
||||
let _ = limbo_exec_rows(&db, &conn2, "COMMIT");
|
||||
|
||||
// First connection should not see the committed value
|
||||
let ret = limbo_exec_rows(&db, &conn1, "SELECT x FROM t");
|
||||
assert!(
|
||||
ret.is_empty(),
|
||||
"SNAPSHOT ISOLATION VIOLATION: Older txn saw committed changes from newer txn: {ret:?}"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// Test that a txn can write a row, flush to WAL without committing, then rollback, and finally commit a second row.
|
||||
/// Reopening database should show only the second row.
|
||||
fn test_tx_isolation_cacheflush_rollback_commit() -> anyhow::Result<()> {
|
||||
let path = TempDir::new()
|
||||
.unwrap()
|
||||
.keep()
|
||||
.join("temp_transaction_isolation");
|
||||
let db = TempDatabase::new_with_existent(&path, true);
|
||||
|
||||
let conn = db.connect_limbo();
|
||||
|
||||
// Create test table
|
||||
let _ = limbo_exec_rows(&db, &conn, "CREATE TABLE t(x INTEGER)");
|
||||
|
||||
// Begin transaction on first connection and insert a value
|
||||
let _ = limbo_exec_rows(&db, &conn, "BEGIN");
|
||||
let _ = limbo_exec_rows(&db, &conn, "INSERT INTO t VALUES (42)");
|
||||
while matches!(conn.cacheflush().unwrap(), IOResult::IO) {
|
||||
db.io.run_once().unwrap();
|
||||
}
|
||||
|
||||
// Rollback the transaction
|
||||
let _ = limbo_exec_rows(&db, &conn, "ROLLBACK");
|
||||
|
||||
// Now actually commit a row
|
||||
let _ = limbo_exec_rows(&db, &conn, "INSERT INTO t VALUES (69)");
|
||||
|
||||
// Reopen the database
|
||||
let db = TempDatabase::new_with_existent(&path, true);
|
||||
let conn = db.connect_limbo();
|
||||
|
||||
// Should only see the last committed value
|
||||
let ret = limbo_exec_rows(&db, &conn, "SELECT x FROM t");
|
||||
assert_eq!(
|
||||
ret,
|
||||
vec![vec![Value::Integer(69)]],
|
||||
"Expected 1 row but got {ret:?}"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// Test that a txn can write a row and flush to WAL without committing, then reopen DB and not see the row
|
||||
fn test_tx_isolation_cacheflush_reopen() -> anyhow::Result<()> {
|
||||
let path = TempDir::new()
|
||||
.unwrap()
|
||||
.keep()
|
||||
.join("temp_transaction_isolation");
|
||||
let db = TempDatabase::new_with_existent(&path, true);
|
||||
|
||||
let conn = db.connect_limbo();
|
||||
|
||||
// Create test table
|
||||
let _ = limbo_exec_rows(&db, &conn, "CREATE TABLE t(x INTEGER)");
|
||||
|
||||
// Begin transaction and insert a value
|
||||
let _ = limbo_exec_rows(&db, &conn, "BEGIN");
|
||||
let _ = limbo_exec_rows(&db, &conn, "INSERT INTO t VALUES (42)");
|
||||
|
||||
// Flush to WAL but don't commit
|
||||
while matches!(conn.cacheflush().unwrap(), IOResult::IO) {
|
||||
db.io.run_once().unwrap();
|
||||
}
|
||||
|
||||
// Reopen the database without committing
|
||||
let db = TempDatabase::new_with_existent(&path, true);
|
||||
let conn = db.connect_limbo();
|
||||
|
||||
// Should see no rows since transaction was never committed
|
||||
let ret = limbo_exec_rows(&db, &conn, "SELECT x FROM t");
|
||||
assert!(ret.is_empty(), "Expected 0 rows but got {ret:?}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user