From a874530db8205c06182207848eac6f2e144a3e99 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Wed, 10 Sep 2025 11:01:18 +0300 Subject: [PATCH] Fix semantics of transaction isolation test - Busy errors do not rollback the transaction - Transaction takes snapshot of DB state after its first successful access of the DB, not before its first attempt to access the DB --- tests/integration/fuzz_transaction/mod.rs | 46 ++++++++++++++--------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/tests/integration/fuzz_transaction/mod.rs b/tests/integration/fuzz_transaction/mod.rs index 006ae7225..2d83b36ac 100644 --- a/tests/integration/fuzz_transaction/mod.rs +++ b/tests/integration/fuzz_transaction/mod.rs @@ -86,9 +86,12 @@ impl ShadowDb { ); } - fn take_snapshot(&mut self, tx_id: usize) { + fn take_snapshot_if_not_exists(&mut self, tx_id: usize) { if let Some(tx_state) = self.transactions.get_mut(&tx_id) { - assert!(tx_state.is_none()); + if tx_state.is_some() { + // tx already has snapshot + return; + } tx_state.replace(TransactionState { schema: self.schema.clone(), visible_rows: self.committed_rows.clone(), @@ -547,7 +550,11 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { let (operation, visible_rows) = generate_operation(&mut rng, *current_tx_id, &mut shared_shadow_db); - println!("Connection {conn_id}(op={op_num}): {operation}"); + let is_in_tx = current_tx_id.is_some(); + let has_snapshot = current_tx_id.is_some_and(|tx_id| { + shared_shadow_db.transactions.get(&tx_id).unwrap().is_some() + }); + println!("Connection {conn_id}(op={op_num}): {operation}, is_in_tx={is_in_tx}, has_snapshot={has_snapshot}"); match operation { Operation::Begin => { @@ -622,6 +629,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { Ok(_) => { // Success - update shadow DB if let Some(tx_id) = *current_tx_id { + shared_shadow_db.take_snapshot_if_not_exists(tx_id); // In transaction - update transaction's view shared_shadow_db .insert(tx_id, id, other_columns.clone()) @@ -659,6 +667,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { Ok(_) => { // Success - update shadow DB if let Some(tx_id) = *current_tx_id { + shared_shadow_db.take_snapshot_if_not_exists(tx_id); // In transaction - update transaction's view shared_shadow_db .update(tx_id, id, other_columns.clone()) @@ -695,6 +704,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { Ok(_) => { // Success - update shadow DB if let Some(tx_id) = *current_tx_id { + shared_shadow_db.take_snapshot_if_not_exists(tx_id); // In transaction - update transaction's view shared_shadow_db.delete(tx_id, id).unwrap(); } else { @@ -720,6 +730,10 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { let columns = stmt.columns(); let mut rows = stmt.query(()).await.unwrap(); + if let Some(tx_id) = *current_tx_id { + shared_shadow_db.take_snapshot_if_not_exists(tx_id); + } + let mut real_rows = Vec::new(); while let Some(row) = rows.next().await.unwrap() { let Value::Integer(id) = row.get_value(0).unwrap() else { @@ -768,6 +782,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { match result { Ok(_) => { if let Some(tx_id) = *current_tx_id { + shared_shadow_db.take_snapshot_if_not_exists(tx_id); // In transaction - update transaction's view shared_shadow_db.alter_table(tx_id, op).unwrap(); } else { @@ -838,13 +853,8 @@ fn generate_operation( } else { shadow_db.schema.clone() }; - let mut get_visible_rows = |accesses_db: bool| { + let get_visible_rows = || { if let Some(tx_id) = current_tx_id { - let tx_state = shadow_db.transactions.get(&tx_id).unwrap(); - // Take snapshot during first operation that accesses the DB after a BEGIN, not immediately at BEGIN (the semantics is BEGIN DEFERRED) - if accesses_db && tx_state.is_none() { - shadow_db.take_snapshot(tx_id); - } shadow_db.get_visible_rows(Some(tx_id)) } else { shadow_db.get_visible_rows(None) // No transaction @@ -853,9 +863,9 @@ fn generate_operation( match rng.random_range(0..100) { 0..=9 => { if !in_transaction { - (Operation::Begin, get_visible_rows(false)) + (Operation::Begin, get_visible_rows()) } else { - let visible_rows = get_visible_rows(true); + let visible_rows = get_visible_rows(); ( generate_data_operation(rng, &visible_rows, &schema_clone), visible_rows, @@ -864,9 +874,9 @@ fn generate_operation( } 10..=14 => { if in_transaction { - (Operation::Commit, get_visible_rows(false)) + (Operation::Commit, get_visible_rows()) } else { - let visible_rows = get_visible_rows(true); + let visible_rows = get_visible_rows(); ( generate_data_operation(rng, &visible_rows, &schema_clone), visible_rows, @@ -875,9 +885,9 @@ fn generate_operation( } 15..=19 => { if in_transaction { - (Operation::Rollback, get_visible_rows(false)) + (Operation::Rollback, get_visible_rows()) } else { - let visible_rows = get_visible_rows(true); + let visible_rows = get_visible_rows(); ( generate_data_operation(rng, &visible_rows, &schema_clone), visible_rows, @@ -892,7 +902,7 @@ fn generate_operation( 3 => CheckpointMode::Full, _ => unreachable!(), }; - (Operation::Checkpoint { mode }, get_visible_rows(false)) + (Operation::Checkpoint { mode }, get_visible_rows()) } 23..=26 => { let op = match rng.random_range(0..6) { @@ -942,10 +952,10 @@ fn generate_operation( } _ => unreachable!(), }; - (Operation::AlterTable { op }, get_visible_rows(true)) + (Operation::AlterTable { op }, get_visible_rows()) } _ => { - let visible_rows = get_visible_rows(true); + let visible_rows = get_visible_rows(); ( generate_data_operation(rng, &visible_rows, &schema_clone), visible_rows,