diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index d5697b73c..86ae514a4 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2187,9 +2187,14 @@ pub fn op_transaction( match pager.begin_write_tx()? { IOResult::Done(r) => { if let LimboResult::Busy = r { - pager.end_read_tx()?; - conn.transaction_state.replace(TransactionState::None); - conn.auto_commit.replace(true); + // We failed to upgrade to write transaction so put the transaction into its original state. + // That is, if the transaction had not started, end the read transaction so that next time we + // start a new one. + if matches!(current_state, TransactionState::None) { + pager.end_read_tx()?; + conn.transaction_state.replace(TransactionState::None); + } + assert_eq!(conn.transaction_state.get(), current_state); return Ok(InsnFunctionStepResult::Busy); } } diff --git a/tests/integration/fuzz_transaction/mod.rs b/tests/integration/fuzz_transaction/mod.rs index 1b61daac0..006ae7225 100644 --- a/tests/integration/fuzz_transaction/mod.rs +++ b/tests/integration/fuzz_transaction/mod.rs @@ -572,11 +572,6 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { } Err(e) => { println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); - if let Some(tx_id) = *current_tx_id { - shared_shadow_db.rollback_transaction(tx_id); - *current_tx_id = None; - } - // Check if it's an acceptable error if !e.to_string().contains("database is locked") { panic!("Unexpected error during commit: {e}"); @@ -597,9 +592,6 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { } Err(e) => { println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); - shared_shadow_db.rollback_transaction(tx_id); - *current_tx_id = None; - // Check if it's an acceptable error if !e.to_string().contains("Busy") && !e.to_string().contains("database is locked") @@ -646,10 +638,6 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { } Err(e) => { println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); - if let Some(tx_id) = *current_tx_id { - shared_shadow_db.rollback_transaction(tx_id); - *current_tx_id = None; - } // Check if it's an acceptable error if !e.to_string().contains("database is locked") { panic!("Unexpected error during insert: {e}"); @@ -687,10 +675,6 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { } Err(e) => { println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); - if let Some(tx_id) = *current_tx_id { - shared_shadow_db.rollback_transaction(tx_id); - *current_tx_id = None; - } // Check if it's an acceptable error if !e.to_string().contains("database is locked") { panic!("Unexpected error during update: {e}"); @@ -723,10 +707,6 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { } Err(e) => { println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); - if let Some(tx_id) = *current_tx_id { - shared_shadow_db.rollback_transaction(tx_id); - *current_tx_id = None; - } // Check if it's an acceptable error if !e.to_string().contains("database is locked") { panic!("Unexpected error during delete: {e}"); @@ -803,12 +783,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { Err(e) => { println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); // Check if it's an acceptable error - if e.to_string().contains("database is locked") { - if let Some(tx_id) = *current_tx_id { - shared_shadow_db.rollback_transaction(tx_id); - *current_tx_id = None; - } - } else { + if !e.to_string().contains("database is locked") { panic!("Unexpected error during alter table: {e}"); } } diff --git a/tests/integration/query_processing/test_transactions.rs b/tests/integration/query_processing/test_transactions.rs index d621edf45..bb1f2e77b 100644 --- a/tests/integration/query_processing/test_transactions.rs +++ b/tests/integration/query_processing/test_transactions.rs @@ -2,6 +2,105 @@ use turso_core::{LimboError, Result, StepResult, Value}; use crate::common::TempDatabase; +// Test a scenario where there are two concurrent deferred transactions: +// +// 1. Both transactions T1 and T2 start at the same time. +// 2. T1 writes to the database succesfully, but does not commit. +// 3. T2 attempts to write to the database, but gets busy error. +// 4. T1 commits +// 5. T2 attempts to write again and succeeds. This is because the transaction +// was still fresh (no reads or writes happened). +#[test] +fn test_deferred_transaction_restart() { + let tmp_db = TempDatabase::new("test_deferred_tx.db", true); + let conn1 = tmp_db.connect_limbo(); + let conn2 = tmp_db.connect_limbo(); + + conn1 + .execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)") + .unwrap(); + + conn1.execute("BEGIN").unwrap(); + conn2.execute("BEGIN").unwrap(); + + conn1 + .execute("INSERT INTO test (id, value) VALUES (1, 'first')") + .unwrap(); + + let result = conn2.execute("INSERT INTO test (id, value) VALUES (2, 'second')"); + assert!(matches!(result, Err(LimboError::Busy))); + + conn1.execute("COMMIT").unwrap(); + + conn2 + .execute("INSERT INTO test (id, value) VALUES (2, 'second')") + .unwrap(); + conn2.execute("COMMIT").unwrap(); + + let mut stmt = conn1.query("SELECT COUNT(*) FROM test").unwrap().unwrap(); + if let StepResult::Row = stmt.step().unwrap() { + let row = stmt.row().unwrap(); + assert_eq!(*row.get::<&Value>(0).unwrap(), Value::Integer(2)); + } +} + +// Test a scenario where a deferred transaction cannot restart due to prior reads: +// +// 1. Both transactions T1 and T2 start at the same time. +// 2. T2 performs a SELECT (establishes a read snapshot). +// 3. T1 writes to the database successfully, but does not commit. +// 4. T2 attempts to write to the database, but gets busy error. +// 5. T1 commits (invalidating T2's snapshot). +// 6. T2 attempts to write again but still gets BUSY - it cannot restart +// because it has performed reads and has a committed snapshot. +#[test] +fn test_deferred_transaction_no_restart() { + let tmp_db = TempDatabase::new("test_deferred_tx_no_restart.db", true); + let conn1 = tmp_db.connect_limbo(); + let conn2 = tmp_db.connect_limbo(); + + conn1 + .execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)") + .unwrap(); + + conn1.execute("BEGIN").unwrap(); + conn2.execute("BEGIN").unwrap(); + + // T2 performs a read - this establishes a snapshot and prevents restart + let mut stmt = conn2.query("SELECT COUNT(*) FROM test").unwrap().unwrap(); + if let StepResult::Row = stmt.step().unwrap() { + let row = stmt.row().unwrap(); + assert_eq!(*row.get::<&Value>(0).unwrap(), Value::Integer(0)); + } + + conn1 + .execute("INSERT INTO test (id, value) VALUES (1, 'first')") + .unwrap(); + + let result = conn2.execute("INSERT INTO test (id, value) VALUES (2, 'second')"); + assert!(matches!(result, Err(LimboError::Busy))); + + conn1.execute("COMMIT").unwrap(); + + // T2 still cannot write because its snapshot is stale and it cannot restart + let result = conn2.execute("INSERT INTO test (id, value) VALUES (2, 'second')"); + assert!(matches!(result, Err(LimboError::Busy))); + + // T2 must rollback and start fresh + conn2.execute("ROLLBACK").unwrap(); + conn2.execute("BEGIN").unwrap(); + conn2 + .execute("INSERT INTO test (id, value) VALUES (2, 'second')") + .unwrap(); + conn2.execute("COMMIT").unwrap(); + + let mut stmt = conn1.query("SELECT COUNT(*) FROM test").unwrap().unwrap(); + if let StepResult::Row = stmt.step().unwrap() { + let row = stmt.row().unwrap(); + assert_eq!(*row.get::<&Value>(0).unwrap(), Value::Integer(2)); + } +} + #[test] fn test_txn_error_doesnt_rollback_txn() -> Result<()> { let tmp_db = TempDatabase::new_with_rusqlite("create table t (x);", false);