From 8161badbf42fa3c36cac0c242fa2ccca31fedc48 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 10 Sep 2025 07:22:17 +0300 Subject: [PATCH] core/vdbe: Don't rollback transaction when write upgrade fails If upgrade from read to write transaction fails, don't roll back the transaction. Instead restore the transaction into its original state, which allows deferred transactions that have not read anything to restart automatically. Fixes #2984 --- core/vdbe/execute.rs | 11 ++- tests/integration/fuzz_transaction/mod.rs | 27 +---- .../query_processing/test_transactions.rs | 99 +++++++++++++++++++ 3 files changed, 108 insertions(+), 29 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index d5697b73c..86ae514a4 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2187,9 +2187,14 @@ pub fn op_transaction( match pager.begin_write_tx()? { IOResult::Done(r) => { if let LimboResult::Busy = r { - pager.end_read_tx()?; - conn.transaction_state.replace(TransactionState::None); - conn.auto_commit.replace(true); + // We failed to upgrade to write transaction so put the transaction into its original state. + // That is, if the transaction had not started, end the read transaction so that next time we + // start a new one. + if matches!(current_state, TransactionState::None) { + pager.end_read_tx()?; + conn.transaction_state.replace(TransactionState::None); + } + assert_eq!(conn.transaction_state.get(), current_state); return Ok(InsnFunctionStepResult::Busy); } } diff --git a/tests/integration/fuzz_transaction/mod.rs b/tests/integration/fuzz_transaction/mod.rs index 1b61daac0..006ae7225 100644 --- a/tests/integration/fuzz_transaction/mod.rs +++ b/tests/integration/fuzz_transaction/mod.rs @@ -572,11 +572,6 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { } Err(e) => { println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); - if let Some(tx_id) = *current_tx_id { - shared_shadow_db.rollback_transaction(tx_id); - *current_tx_id = None; - } - // Check if it's an acceptable error if !e.to_string().contains("database is locked") { panic!("Unexpected error during commit: {e}"); @@ -597,9 +592,6 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { } Err(e) => { println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); - shared_shadow_db.rollback_transaction(tx_id); - *current_tx_id = None; - // Check if it's an acceptable error if !e.to_string().contains("Busy") && !e.to_string().contains("database is locked") @@ -646,10 +638,6 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { } Err(e) => { println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); - if let Some(tx_id) = *current_tx_id { - shared_shadow_db.rollback_transaction(tx_id); - *current_tx_id = None; - } // Check if it's an acceptable error if !e.to_string().contains("database is locked") { panic!("Unexpected error during insert: {e}"); @@ -687,10 +675,6 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { } Err(e) => { println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); - if let Some(tx_id) = *current_tx_id { - shared_shadow_db.rollback_transaction(tx_id); - *current_tx_id = None; - } // Check if it's an acceptable error if !e.to_string().contains("database is locked") { panic!("Unexpected error during update: {e}"); @@ -723,10 +707,6 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { } Err(e) => { println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); - if let Some(tx_id) = *current_tx_id { - shared_shadow_db.rollback_transaction(tx_id); - *current_tx_id = None; - } // Check if it's an acceptable error if !e.to_string().contains("database is locked") { panic!("Unexpected error during delete: {e}"); @@ -803,12 +783,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { Err(e) => { println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); // Check if it's an acceptable error - if e.to_string().contains("database is locked") { - if let Some(tx_id) = *current_tx_id { - shared_shadow_db.rollback_transaction(tx_id); - *current_tx_id = None; - } - } else { + if !e.to_string().contains("database is locked") { panic!("Unexpected error during alter table: {e}"); } } diff --git a/tests/integration/query_processing/test_transactions.rs b/tests/integration/query_processing/test_transactions.rs index d621edf45..bb1f2e77b 100644 --- a/tests/integration/query_processing/test_transactions.rs +++ b/tests/integration/query_processing/test_transactions.rs @@ -2,6 +2,105 @@ use turso_core::{LimboError, Result, StepResult, Value}; use crate::common::TempDatabase; +// Test a scenario where there are two concurrent deferred transactions: +// +// 1. Both transactions T1 and T2 start at the same time. +// 2. T1 writes to the database succesfully, but does not commit. +// 3. T2 attempts to write to the database, but gets busy error. +// 4. T1 commits +// 5. T2 attempts to write again and succeeds. This is because the transaction +// was still fresh (no reads or writes happened). +#[test] +fn test_deferred_transaction_restart() { + let tmp_db = TempDatabase::new("test_deferred_tx.db", true); + let conn1 = tmp_db.connect_limbo(); + let conn2 = tmp_db.connect_limbo(); + + conn1 + .execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)") + .unwrap(); + + conn1.execute("BEGIN").unwrap(); + conn2.execute("BEGIN").unwrap(); + + conn1 + .execute("INSERT INTO test (id, value) VALUES (1, 'first')") + .unwrap(); + + let result = conn2.execute("INSERT INTO test (id, value) VALUES (2, 'second')"); + assert!(matches!(result, Err(LimboError::Busy))); + + conn1.execute("COMMIT").unwrap(); + + conn2 + .execute("INSERT INTO test (id, value) VALUES (2, 'second')") + .unwrap(); + conn2.execute("COMMIT").unwrap(); + + let mut stmt = conn1.query("SELECT COUNT(*) FROM test").unwrap().unwrap(); + if let StepResult::Row = stmt.step().unwrap() { + let row = stmt.row().unwrap(); + assert_eq!(*row.get::<&Value>(0).unwrap(), Value::Integer(2)); + } +} + +// Test a scenario where a deferred transaction cannot restart due to prior reads: +// +// 1. Both transactions T1 and T2 start at the same time. +// 2. T2 performs a SELECT (establishes a read snapshot). +// 3. T1 writes to the database successfully, but does not commit. +// 4. T2 attempts to write to the database, but gets busy error. +// 5. T1 commits (invalidating T2's snapshot). +// 6. T2 attempts to write again but still gets BUSY - it cannot restart +// because it has performed reads and has a committed snapshot. +#[test] +fn test_deferred_transaction_no_restart() { + let tmp_db = TempDatabase::new("test_deferred_tx_no_restart.db", true); + let conn1 = tmp_db.connect_limbo(); + let conn2 = tmp_db.connect_limbo(); + + conn1 + .execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)") + .unwrap(); + + conn1.execute("BEGIN").unwrap(); + conn2.execute("BEGIN").unwrap(); + + // T2 performs a read - this establishes a snapshot and prevents restart + let mut stmt = conn2.query("SELECT COUNT(*) FROM test").unwrap().unwrap(); + if let StepResult::Row = stmt.step().unwrap() { + let row = stmt.row().unwrap(); + assert_eq!(*row.get::<&Value>(0).unwrap(), Value::Integer(0)); + } + + conn1 + .execute("INSERT INTO test (id, value) VALUES (1, 'first')") + .unwrap(); + + let result = conn2.execute("INSERT INTO test (id, value) VALUES (2, 'second')"); + assert!(matches!(result, Err(LimboError::Busy))); + + conn1.execute("COMMIT").unwrap(); + + // T2 still cannot write because its snapshot is stale and it cannot restart + let result = conn2.execute("INSERT INTO test (id, value) VALUES (2, 'second')"); + assert!(matches!(result, Err(LimboError::Busy))); + + // T2 must rollback and start fresh + conn2.execute("ROLLBACK").unwrap(); + conn2.execute("BEGIN").unwrap(); + conn2 + .execute("INSERT INTO test (id, value) VALUES (2, 'second')") + .unwrap(); + conn2.execute("COMMIT").unwrap(); + + let mut stmt = conn1.query("SELECT COUNT(*) FROM test").unwrap().unwrap(); + if let StepResult::Row = stmt.step().unwrap() { + let row = stmt.row().unwrap(); + assert_eq!(*row.get::<&Value>(0).unwrap(), Value::Integer(2)); + } +} + #[test] fn test_txn_error_doesnt_rollback_txn() -> Result<()> { let tmp_db = TempDatabase::new_with_rusqlite("create table t (x);", false);