diff --git a/tests/integration/fuzz_transaction/mod.rs b/tests/integration/fuzz_transaction/mod.rs index 8fbde36cc..2348cc503 100644 --- a/tests/integration/fuzz_transaction/mod.rs +++ b/tests/integration/fuzz_transaction/mod.rs @@ -126,6 +126,48 @@ impl ShadowDb { Operation::Delete { id } => { self.committed_rows.remove(&id); } + Operation::AlterTable { op } => match op { + AlterTableOp::AddColumn { name, ty } => { + let table_columns = + &mut self.schema.get_mut("test_table").unwrap().columns; + table_columns.push(Column { + name: name.clone(), + ty: ty.clone(), + primary_key: false, + }); + for row in self.committed_rows.values_mut() { + row.other_columns.insert(name.clone(), Value::Null); + } + } + AlterTableOp::DropColumn { name } => { + let table_columns = + &mut self.schema.get_mut("test_table").unwrap().columns; + table_columns.retain(|c| c.name != name); + for row in self.committed_rows.values_mut() { + row.other_columns.remove(&name); + } + } + AlterTableOp::RenameColumn { old_name, new_name } => { + let table_columns = + &mut self.schema.get_mut("test_table").unwrap().columns; + let col_type = table_columns + .iter() + .find(|c| c.name == old_name) + .unwrap() + .ty + .clone(); + table_columns.retain(|c| c.name != old_name); + table_columns.push(Column { + name: new_name.clone(), + ty: col_type, + primary_key: false, + }); + for row in self.committed_rows.values_mut() { + let value = row.other_columns.remove(&old_name).unwrap(); + row.other_columns.insert(new_name.clone(), value); + } + } + }, _ => unreachable!("Unexpected operation: {op}"), } } @@ -212,6 +254,78 @@ impl ShadowDb { } } + fn alter_table(&mut self, tx_id: usize, op: AlterTableOp) -> Result<(), String> { + if let Some(tx_state) = self.transactions.get_mut(&tx_id) { + let table_columns = &mut tx_state + .as_mut() + .unwrap() + .schema + .get_mut("test_table") + .unwrap() + .columns; + match op { + AlterTableOp::AddColumn { name, ty } => { + table_columns.push(Column { + name: name.clone(), + ty: ty.clone(), + primary_key: false, + }); + let pending_changes = &mut tx_state.as_mut().unwrap().pending_changes; + pending_changes.push(Operation::AlterTable { + op: AlterTableOp::AddColumn { + name: name.clone(), + ty: ty.clone(), + }, + }); + let visible_rows = &mut tx_state.as_mut().unwrap().visible_rows; + for visible_row in visible_rows.values_mut() { + visible_row.other_columns.insert(name.clone(), Value::Null); + } + } + AlterTableOp::DropColumn { name } => { + table_columns.retain(|c| c.name != name); + let pending_changes = &mut tx_state.as_mut().unwrap().pending_changes; + pending_changes.push(Operation::AlterTable { + op: AlterTableOp::DropColumn { name: name.clone() }, + }); + let visible_rows = &mut tx_state.as_mut().unwrap().visible_rows; + for visible_row in visible_rows.values_mut() { + visible_row.other_columns.remove(&name); + } + } + AlterTableOp::RenameColumn { old_name, new_name } => { + let col_type = table_columns + .iter() + .find(|c| c.name == old_name) + .unwrap() + .ty + .clone(); + table_columns.retain(|c| c.name != old_name); + table_columns.push(Column { + name: new_name.clone(), + ty: col_type, + primary_key: false, + }); + let pending_changes = &mut tx_state.as_mut().unwrap().pending_changes; + pending_changes.push(Operation::AlterTable { + op: AlterTableOp::RenameColumn { + old_name: old_name.clone(), + new_name: new_name.clone(), + }, + }); + let visible_rows = &mut tx_state.as_mut().unwrap().visible_rows; + for visible_row in visible_rows.values_mut() { + let value = visible_row.other_columns.remove(&old_name).unwrap(); + visible_row.other_columns.insert(new_name.clone(), value); + } + } + } + Ok(()) + } else { + Err("No active transaction".to_string()) + } + } + fn get_visible_rows(&self, tx_id: Option) -> Vec { let Some(tx_id) = tx_id else { // No transaction - see committed state @@ -247,6 +361,26 @@ impl std::fmt::Display for CheckpointMode { } } +#[derive(Debug, Clone)] +#[allow(clippy::enum_variant_names)] +enum AlterTableOp { + AddColumn { name: String, ty: String }, + DropColumn { name: String }, + RenameColumn { old_name: String, new_name: String }, +} + +impl std::fmt::Display for AlterTableOp { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AlterTableOp::AddColumn { name, ty } => write!(f, "ADD COLUMN {name} {ty}"), + AlterTableOp::DropColumn { name } => write!(f, "DROP COLUMN {name}"), + AlterTableOp::RenameColumn { old_name, new_name } => { + write!(f, "RENAME COLUMN {old_name} TO {new_name}") + } + } + } +} + #[derive(Debug, Clone)] enum Operation { Begin, @@ -266,6 +400,9 @@ enum Operation { Checkpoint { mode: CheckpointMode, }, + AlterTable { + op: AlterTableOp, + }, Select, } @@ -311,6 +448,7 @@ impl std::fmt::Display for Operation { Operation::Delete { id } => write!(f, "DELETE FROM test_table WHERE id = {id}"), Operation::Select => write!(f, "SELECT * FROM test_table"), Operation::Checkpoint { mode } => write!(f, "PRAGMA wal_checkpoint({mode})"), + Operation::AlterTable { op } => write!(f, "ALTER TABLE test_table {op}"), } } } @@ -629,6 +767,39 @@ async fn test_multiple_connections_fuzz() { ); } } + Operation::AlterTable { op } => { + let query = format!("ALTER TABLE test_table {op}"); + let result = conn.execute(&query, ()).await; + + match result { + Ok(_) => { + if let Some(tx_id) = *current_tx_id { + // In transaction - update transaction's view + shared_shadow_db.alter_table(tx_id, op).unwrap(); + } else { + // Auto-commit - update shadow DB committed state + shared_shadow_db.begin_transaction(next_tx_id, true); + shared_shadow_db + .alter_table(next_tx_id, op.clone()) + .unwrap(); + shared_shadow_db.commit_transaction(next_tx_id); + next_tx_id += 1; + } + } + Err(e) => { + println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); + // Check if it's an acceptable error + if e.to_string().contains("database is locked") { + if let Some(tx_id) = *current_tx_id { + shared_shadow_db.rollback_transaction(tx_id); + *current_tx_id = None; + } + } else { + panic!("Unexpected error during alter table: {e}"); + } + } + } + } Operation::Checkpoint { mode } => { let query = format!("PRAGMA wal_checkpoint({mode})"); let mut rows = conn.query(&query, ()).await.unwrap(); @@ -733,6 +904,56 @@ fn generate_operation( }; (Operation::Checkpoint { mode }, get_visible_rows(false)) } + 23..=26 => { + let op = match rng.random_range(0..6) { + 0..=2 => AlterTableOp::AddColumn { + name: format!("col_{}", rng.random_range(1..i64::MAX)), + ty: "TEXT".to_string(), + }, + 3..=4 => { + let table_schema = schema_clone.get("test_table").unwrap(); + let columns_no_id = table_schema + .columns + .iter() + .filter(|c| c.name != "id") + .collect::>(); + if columns_no_id.is_empty() { + AlterTableOp::AddColumn { + name: format!("col_{}", rng.random_range(1..i64::MAX)), + ty: "TEXT".to_string(), + } + } else { + let column = columns_no_id.choose(rng).unwrap(); + AlterTableOp::DropColumn { + name: column.name.clone(), + } + } + } + 5 => { + let columns_no_id = schema_clone + .get("test_table") + .unwrap() + .columns + .iter() + .filter(|c| c.name != "id") + .collect::>(); + if columns_no_id.is_empty() { + AlterTableOp::AddColumn { + name: format!("col_{}", rng.random_range(1..i64::MAX)), + ty: "TEXT".to_string(), + } + } else { + let column = columns_no_id.choose(rng).unwrap(); + AlterTableOp::RenameColumn { + old_name: column.name.clone(), + new_name: format!("col_{}", rng.random_range(1..i64::MAX)), + } + } + } + _ => unreachable!(), + }; + (Operation::AlterTable { op }, get_visible_rows(true)) + } _ => { let visible_rows = get_visible_rows(true); (