Fix semantics of transaction isolation test

- Busy errors do not rollback the transaction
- Transaction takes snapshot of DB state after its first successful
  access of the DB, not before its first attempt to access the DB
This commit is contained in:
Jussi Saurio
2025-09-10 11:01:18 +03:00
parent 8161badbf4
commit a874530db8

View File

@@ -86,9 +86,12 @@ impl ShadowDb {
);
}
fn take_snapshot(&mut self, tx_id: usize) {
fn take_snapshot_if_not_exists(&mut self, tx_id: usize) {
if let Some(tx_state) = self.transactions.get_mut(&tx_id) {
assert!(tx_state.is_none());
if tx_state.is_some() {
// tx already has snapshot
return;
}
tx_state.replace(TransactionState {
schema: self.schema.clone(),
visible_rows: self.committed_rows.clone(),
@@ -547,7 +550,11 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
let (operation, visible_rows) =
generate_operation(&mut rng, *current_tx_id, &mut shared_shadow_db);
println!("Connection {conn_id}(op={op_num}): {operation}");
let is_in_tx = current_tx_id.is_some();
let has_snapshot = current_tx_id.is_some_and(|tx_id| {
shared_shadow_db.transactions.get(&tx_id).unwrap().is_some()
});
println!("Connection {conn_id}(op={op_num}): {operation}, is_in_tx={is_in_tx}, has_snapshot={has_snapshot}");
match operation {
Operation::Begin => {
@@ -622,6 +629,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
Ok(_) => {
// Success - update shadow DB
if let Some(tx_id) = *current_tx_id {
shared_shadow_db.take_snapshot_if_not_exists(tx_id);
// In transaction - update transaction's view
shared_shadow_db
.insert(tx_id, id, other_columns.clone())
@@ -659,6 +667,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
Ok(_) => {
// Success - update shadow DB
if let Some(tx_id) = *current_tx_id {
shared_shadow_db.take_snapshot_if_not_exists(tx_id);
// In transaction - update transaction's view
shared_shadow_db
.update(tx_id, id, other_columns.clone())
@@ -695,6 +704,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
Ok(_) => {
// Success - update shadow DB
if let Some(tx_id) = *current_tx_id {
shared_shadow_db.take_snapshot_if_not_exists(tx_id);
// In transaction - update transaction's view
shared_shadow_db.delete(tx_id, id).unwrap();
} else {
@@ -720,6 +730,10 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
let columns = stmt.columns();
let mut rows = stmt.query(()).await.unwrap();
if let Some(tx_id) = *current_tx_id {
shared_shadow_db.take_snapshot_if_not_exists(tx_id);
}
let mut real_rows = Vec::new();
while let Some(row) = rows.next().await.unwrap() {
let Value::Integer(id) = row.get_value(0).unwrap() else {
@@ -768,6 +782,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
match result {
Ok(_) => {
if let Some(tx_id) = *current_tx_id {
shared_shadow_db.take_snapshot_if_not_exists(tx_id);
// In transaction - update transaction's view
shared_shadow_db.alter_table(tx_id, op).unwrap();
} else {
@@ -838,13 +853,8 @@ fn generate_operation(
} else {
shadow_db.schema.clone()
};
let mut get_visible_rows = |accesses_db: bool| {
let get_visible_rows = || {
if let Some(tx_id) = current_tx_id {
let tx_state = shadow_db.transactions.get(&tx_id).unwrap();
// Take snapshot during first operation that accesses the DB after a BEGIN, not immediately at BEGIN (the semantics is BEGIN DEFERRED)
if accesses_db && tx_state.is_none() {
shadow_db.take_snapshot(tx_id);
}
shadow_db.get_visible_rows(Some(tx_id))
} else {
shadow_db.get_visible_rows(None) // No transaction
@@ -853,9 +863,9 @@ fn generate_operation(
match rng.random_range(0..100) {
0..=9 => {
if !in_transaction {
(Operation::Begin, get_visible_rows(false))
(Operation::Begin, get_visible_rows())
} else {
let visible_rows = get_visible_rows(true);
let visible_rows = get_visible_rows();
(
generate_data_operation(rng, &visible_rows, &schema_clone),
visible_rows,
@@ -864,9 +874,9 @@ fn generate_operation(
}
10..=14 => {
if in_transaction {
(Operation::Commit, get_visible_rows(false))
(Operation::Commit, get_visible_rows())
} else {
let visible_rows = get_visible_rows(true);
let visible_rows = get_visible_rows();
(
generate_data_operation(rng, &visible_rows, &schema_clone),
visible_rows,
@@ -875,9 +885,9 @@ fn generate_operation(
}
15..=19 => {
if in_transaction {
(Operation::Rollback, get_visible_rows(false))
(Operation::Rollback, get_visible_rows())
} else {
let visible_rows = get_visible_rows(true);
let visible_rows = get_visible_rows();
(
generate_data_operation(rng, &visible_rows, &schema_clone),
visible_rows,
@@ -892,7 +902,7 @@ fn generate_operation(
3 => CheckpointMode::Full,
_ => unreachable!(),
};
(Operation::Checkpoint { mode }, get_visible_rows(false))
(Operation::Checkpoint { mode }, get_visible_rows())
}
23..=26 => {
let op = match rng.random_range(0..6) {
@@ -942,10 +952,10 @@ fn generate_operation(
}
_ => unreachable!(),
};
(Operation::AlterTable { op }, get_visible_rows(true))
(Operation::AlterTable { op }, get_visible_rows())
}
_ => {
let visible_rows = get_visible_rows(true);
let visible_rows = get_visible_rows();
(
generate_data_operation(rng, &visible_rows, &schema_clone),
visible_rows,