diff --git a/tests/integration/fuzz_transaction/mod.rs b/tests/integration/fuzz_transaction/mod.rs index c8f507c0d..8fbde36cc 100644 --- a/tests/integration/fuzz_transaction/mod.rs +++ b/tests/integration/fuzz_transaction/mod.rs @@ -5,28 +5,57 @@ use std::collections::HashMap; use turso::{Builder, Value}; // In-memory representation of the database state -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq)] struct DbRow { id: i64, - text: String, + other_columns: HashMap, } impl std::fmt::Display for DbRow { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "(id={}, text={})", self.id, self.text) + if self.other_columns.is_empty() { + write!(f, "(id={})", self.id) + } else { + write!( + f, + "(id={},{})", + self.id, + self.other_columns + .iter() + .map(|(k, v)| format!("{k}={v:?}")) + .collect::>() + .join(", "), + ) + } } } #[derive(Debug, Clone)] struct TransactionState { - // What this transaction can see (snapshot) + // The schema this transaction can see (snapshot) + schema: HashMap, + // The rows this transaction can see (snapshot) visible_rows: HashMap, // Pending changes in this transaction pending_changes: Vec, } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct Column { + name: String, + ty: String, + primary_key: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct TableSchema { + columns: Vec, +} + #[derive(Debug)] struct ShadowDb { + // Schema + schema: HashMap, // Committed state (what's actually in the database) committed_rows: HashMap, // Transaction states @@ -34,8 +63,9 @@ struct ShadowDb { } impl ShadowDb { - fn new() -> Self { + fn new(initial_schema: HashMap) -> Self { Self { + schema: initial_schema, committed_rows: HashMap::new(), transactions: HashMap::new(), } @@ -46,6 +76,7 @@ impl ShadowDb { tx_id, if immediate { Some(TransactionState { + schema: self.schema.clone(), visible_rows: self.committed_rows.clone(), pending_changes: Vec::new(), }) @@ -59,6 +90,7 @@ impl ShadowDb { if let Some(tx_state) = self.transactions.get_mut(&tx_id) { assert!(tx_state.is_none()); tx_state.replace(TransactionState { + schema: self.schema.clone(), visible_rows: self.committed_rows.clone(), pending_changes: Vec::new(), }); @@ -74,16 +106,27 @@ impl ShadowDb { // Apply pending changes to committed state for op in tx_state.pending_changes { match op { - Operation::Insert { id, text } => { - self.committed_rows.insert(id, DbRow { id, text }); + Operation::Insert { id, other_columns } => { + assert!( + other_columns.len() + == self.schema.get("test_table").unwrap().columns.len() - 1, + "Inserted row has {} columns, expected {}", + other_columns.len() + 1, + self.schema.get("test_table").unwrap().columns.len() + ); + self.committed_rows.insert(id, DbRow { id, other_columns }); } - Operation::Update { id, text } => { - self.committed_rows.insert(id, DbRow { id, text }); + Operation::Update { id, other_columns } => { + let mut row_to_update = self.committed_rows.get(&id).unwrap().clone(); + for (k, v) in other_columns { + row_to_update.other_columns.insert(k, v); + } + self.committed_rows.insert(id, row_to_update); } Operation::Delete { id } => { self.committed_rows.remove(&id); } - other => unreachable!("Unexpected operation: {other}"), + _ => unreachable!("Unexpected operation: {op}"), } } } @@ -93,7 +136,12 @@ impl ShadowDb { self.transactions.remove(&tx_id); } - fn insert(&mut self, tx_id: usize, id: i64, text: String) -> Result<(), String> { + fn insert( + &mut self, + tx_id: usize, + id: i64, + other_columns: HashMap, + ) -> Result<(), String> { if let Some(tx_state) = self.transactions.get_mut(&tx_id) { // Check if row exists in visible state if tx_state.as_ref().unwrap().visible_rows.contains_key(&id) { @@ -101,13 +149,13 @@ impl ShadowDb { } let row = DbRow { id, - text: text.clone(), + other_columns: other_columns.clone(), }; tx_state .as_mut() .unwrap() .pending_changes - .push(Operation::Insert { id, text }); + .push(Operation::Insert { id, other_columns }); tx_state.as_mut().unwrap().visible_rows.insert(id, row); Ok(()) } else { @@ -115,22 +163,31 @@ impl ShadowDb { } } - fn update(&mut self, tx_id: usize, id: i64, text: String) -> Result<(), String> { + fn update( + &mut self, + tx_id: usize, + id: i64, + other_columns: HashMap, + ) -> Result<(), String> { if let Some(tx_state) = self.transactions.get_mut(&tx_id) { // Check if row exists in visible state - if !tx_state.as_ref().unwrap().visible_rows.contains_key(&id) { + let visible_rows = &tx_state.as_ref().unwrap().visible_rows; + if !visible_rows.contains_key(&id) { return Err("Row not found".to_string()); } - let row = DbRow { - id, - text: text.clone(), - }; + let mut new_row = visible_rows.get(&id).unwrap().clone(); + for (k, v) in other_columns { + new_row.other_columns.insert(k, v); + } tx_state .as_mut() .unwrap() .pending_changes - .push(Operation::Update { id, text }); - tx_state.as_mut().unwrap().visible_rows.insert(id, row); + .push(Operation::Update { + id, + other_columns: new_row.other_columns.clone(), + }); + tx_state.as_mut().unwrap().visible_rows.insert(id, new_row); Ok(()) } else { Err("No active transaction".to_string()) @@ -195,24 +252,61 @@ enum Operation { Begin, Commit, Rollback, - Insert { id: i64, text: String }, - Update { id: i64, text: String }, - Delete { id: i64 }, - Checkpoint { mode: CheckpointMode }, + Insert { + id: i64, + other_columns: HashMap, + }, + Update { + id: i64, + other_columns: HashMap, + }, + Delete { + id: i64, + }, + Checkpoint { + mode: CheckpointMode, + }, Select, } +fn value_to_sql(v: &Value) -> String { + match v { + Value::Integer(i) => i.to_string(), + Value::Text(s) => format!("'{s}'"), + Value::Null => "NULL".to_string(), + _ => unreachable!(), + } +} + impl std::fmt::Display for Operation { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Operation::Begin => write!(f, "BEGIN"), Operation::Commit => write!(f, "COMMIT"), Operation::Rollback => write!(f, "ROLLBACK"), - Operation::Insert { id, text } => { - write!(f, "INSERT INTO test_table (id, text) VALUES ({id}, {text})") + Operation::Insert { id, other_columns } => { + let col_names = other_columns.keys().cloned().collect::>().join(", "); + let col_values = other_columns + .values() + .map(value_to_sql) + .collect::>() + .join(", "); + if col_names.is_empty() { + write!(f, "INSERT INTO test_table (id) VALUES ({id})") + } else { + write!( + f, + "INSERT INTO test_table (id, {col_names}) VALUES ({id}, {col_values})" + ) + } } - Operation::Update { id, text } => { - write!(f, "UPDATE test_table SET text = {text} WHERE id = {id}") + Operation::Update { id, other_columns } => { + let update_set = other_columns + .iter() + .map(|(k, v)| format!("{k}={}", value_to_sql(v))) + .collect::>() + .join(", "); + write!(f, "UPDATE test_table SET {update_set} WHERE id = {id}") } Operation::Delete { id } => write!(f, "DELETE FROM test_table WHERE id = {id}"), Operation::Select => write!(f, "SELECT * FROM test_table"), @@ -248,6 +342,7 @@ async fn test_multiple_connections_fuzz() { const NUM_CONNECTIONS: usize = 2; for iteration in 0..NUM_ITERATIONS { + println!("--- Seed {seed} Iteration {iteration} ---"); // Create a fresh database for each iteration let tempfile = tempfile::NamedTempFile::new().unwrap(); let db = Builder::new_local(tempfile.path().to_str().unwrap()) @@ -256,7 +351,25 @@ async fn test_multiple_connections_fuzz() { .unwrap(); // SHARED shadow database for all connections - let mut shared_shadow_db = ShadowDb::new(); + let mut schema = HashMap::new(); + schema.insert( + "test_table".to_string(), + TableSchema { + columns: vec![ + Column { + name: "id".to_string(), + ty: "INTEGER".to_string(), + primary_key: true, + }, + Column { + name: "text".to_string(), + ty: "TEXT".to_string(), + primary_key: false, + }, + ], + }, + ); + let mut shared_shadow_db = ShadowDb::new(schema); let mut next_tx_id = 0; // Create connections @@ -345,13 +458,20 @@ async fn test_multiple_connections_fuzz() { } } } - Operation::Insert { id, text } => { - let result = conn - .execute( - "INSERT INTO test_table (id, text) VALUES (?, ?)", - vec![Value::Integer(id), Value::Text(text.clone())], - ) - .await; + Operation::Insert { id, other_columns } => { + let col_names = + other_columns.keys().cloned().collect::>().join(", "); + let col_values = other_columns + .values() + .map(value_to_sql) + .collect::>() + .join(", "); + let query = if col_names.is_empty() { + format!("INSERT INTO test_table (id) VALUES ({id})") + } else { + format!("INSERT INTO test_table (id, {col_names}) VALUES ({id}, {col_values})") + }; + let result = conn.execute(query.as_str(), ()).await; // Check if real DB operation succeeded match result { @@ -359,12 +479,14 @@ async fn test_multiple_connections_fuzz() { // Success - update shadow DB if let Some(tx_id) = *current_tx_id { // In transaction - update transaction's view - shared_shadow_db.insert(tx_id, id, text.clone()).unwrap(); + shared_shadow_db + .insert(tx_id, id, other_columns.clone()) + .unwrap(); } else { // Auto-commit - update shadow DB committed state shared_shadow_db.begin_transaction(next_tx_id, true); shared_shadow_db - .insert(next_tx_id, id, text.clone()) + .insert(next_tx_id, id, other_columns.clone()) .unwrap(); shared_shadow_db.commit_transaction(next_tx_id); next_tx_id += 1; @@ -383,13 +505,14 @@ async fn test_multiple_connections_fuzz() { } } } - Operation::Update { id, text } => { - let result = conn - .execute( - "UPDATE test_table SET text = ? WHERE id = ?", - vec![Value::Text(text.clone()), Value::Integer(id)], - ) - .await; + Operation::Update { id, other_columns } => { + let col_set = other_columns + .iter() + .map(|(k, v)| format!("{k}={}", value_to_sql(v))) + .collect::>() + .join(", "); + let query = format!("UPDATE test_table SET {col_set} WHERE id = {id}"); + let result = conn.execute(query.as_str(), ()).await; // Check if real DB operation succeeded match result { @@ -397,12 +520,14 @@ async fn test_multiple_connections_fuzz() { // Success - update shadow DB if let Some(tx_id) = *current_tx_id { // In transaction - update transaction's view - shared_shadow_db.update(tx_id, id, text.clone()).unwrap(); + shared_shadow_db + .update(tx_id, id, other_columns.clone()) + .unwrap(); } else { // Auto-commit - update shadow DB committed state shared_shadow_db.begin_transaction(next_tx_id, true); shared_shadow_db - .update(next_tx_id, id, text.clone()) + .update(next_tx_id, id, other_columns.clone()) .unwrap(); shared_shadow_db.commit_transaction(next_tx_id); next_tx_id += 1; @@ -424,8 +549,8 @@ async fn test_multiple_connections_fuzz() { Operation::Delete { id } => { let result = conn .execute( - "DELETE FROM test_table WHERE id = ?", - vec![Value::Integer(id)], + format!("DELETE FROM test_table WHERE id = {id}").as_str(), + (), ) .await; @@ -458,17 +583,23 @@ async fn test_multiple_connections_fuzz() { } } Operation::Select => { - let query_str = "SELECT id, text FROM test_table ORDER BY id"; - let mut rows = conn.query(query_str, ()).await.unwrap(); + let query_str = "SELECT * FROM test_table ORDER BY id"; + let mut stmt = conn.prepare(query_str).await.unwrap(); + let columns = stmt.columns(); + let mut rows = stmt.query(()).await.unwrap(); let mut real_rows = Vec::new(); while let Some(row) = rows.next().await.unwrap() { - let id = row.get_value(0).unwrap(); - let text = row.get_value(1).unwrap(); - - if let (Value::Integer(id), Value::Text(text)) = (id, text) { - real_rows.push(DbRow { id, text }); + let Value::Integer(id) = row.get_value(0).unwrap() else { + panic!("Unexpected value for id: {:?}", row.get_value(0)); + }; + let mut other_columns = HashMap::new(); + for i in 1..columns.len() { + let column = columns.get(i).unwrap(); + let value = row.get_value(i).unwrap(); + other_columns.insert(column.name().to_string(), value); } + real_rows.push(DbRow { id, other_columns }); } real_rows.sort_by_key(|r| r.id); @@ -538,6 +669,15 @@ fn generate_operation( shadow_db: &mut ShadowDb, ) -> (Operation, Vec) { let in_transaction = current_tx_id.is_some(); + let schema_clone = if let Some(tx_id) = current_tx_id { + if let Some(Some(tx_state)) = shadow_db.transactions.get(&tx_id) { + tx_state.schema.clone() + } else { + shadow_db.schema.clone() + } + } else { + shadow_db.schema.clone() + }; let mut get_visible_rows = |accesses_db: bool| { if let Some(tx_id) = current_tx_id { let tx_state = shadow_db.transactions.get(&tx_id).unwrap(); @@ -556,7 +696,10 @@ fn generate_operation( (Operation::Begin, get_visible_rows(false)) } else { let visible_rows = get_visible_rows(true); - (generate_data_operation(rng, &visible_rows), visible_rows) + ( + generate_data_operation(rng, &visible_rows, &schema_clone), + visible_rows, + ) } } 10..=14 => { @@ -564,7 +707,10 @@ fn generate_operation( (Operation::Commit, get_visible_rows(false)) } else { let visible_rows = get_visible_rows(true); - (generate_data_operation(rng, &visible_rows), visible_rows) + ( + generate_data_operation(rng, &visible_rows, &schema_clone), + visible_rows, + ) } } 15..=19 => { @@ -572,7 +718,10 @@ fn generate_operation( (Operation::Rollback, get_visible_rows(false)) } else { let visible_rows = get_visible_rows(true); - (generate_data_operation(rng, &visible_rows), visible_rows) + ( + generate_data_operation(rng, &visible_rows, &schema_clone), + visible_rows, + ) } } 20..=22 => { @@ -584,53 +733,97 @@ fn generate_operation( }; (Operation::Checkpoint { mode }, get_visible_rows(false)) } - // 80% chance for data operations _ => { let visible_rows = get_visible_rows(true); - (generate_data_operation(rng, &visible_rows), visible_rows) + ( + generate_data_operation(rng, &visible_rows, &schema_clone), + visible_rows, + ) } } } -fn generate_data_operation(rng: &mut ChaCha8Rng, visible_rows: &[DbRow]) -> Operation { - match rng.random_range(0..4) { +fn generate_data_operation( + rng: &mut ChaCha8Rng, + visible_rows: &[DbRow], + schema: &HashMap, +) -> Operation { + let table_schema = schema.get("test_table").unwrap(); + let op_num = rng.random_range(0..4); + let mut generate_insert_operation = || { + let id = rng.random_range(1..i64::MAX); + let mut other_columns = HashMap::new(); + for column in table_schema.columns.iter() { + if column.name == "id" { + continue; + } + other_columns.insert( + column.name.clone(), + match column.ty.as_str() { + "TEXT" => Value::Text(format!("text_{}", rng.random_range(1..i64::MAX))), + "INTEGER" => Value::Integer(rng.random_range(1..i64::MAX)), + "REAL" => Value::Real(rng.random_range(1..i64::MAX) as f64), + _ => Value::Null, + }, + ); + } + Operation::Insert { id, other_columns } + }; + match op_num { 0 => { - // Insert - generate a new ID that doesn't exist - let id = if visible_rows.is_empty() { - rng.random_range(1..1000) - } else { - let max_id = visible_rows.iter().map(|r| r.id).max().unwrap(); - rng.random_range(max_id + 1..max_id + 100) - }; - let text = format!("text_{}", rng.random_range(1..1000)); - Operation::Insert { id, text } + // Insert + generate_insert_operation() } 1 => { - // Update - only if there are visible rows + // Update if visible_rows.is_empty() { // No rows to update, try insert instead - let id = rng.random_range(1..1000); - let text = format!("text_{}", rng.random_range(1..1000)); - Operation::Insert { id, text } + generate_insert_operation() } else { + let columns_no_id = table_schema + .columns + .iter() + .filter(|c| c.name != "id") + .collect::>(); + if columns_no_id.is_empty() { + // No columns to update, try insert instead + return generate_insert_operation(); + } let id = visible_rows.choose(rng).unwrap().id; - let text = format!("updated_{}", rng.random_range(1..1000)); - Operation::Update { id, text } + let col_name_to_update = columns_no_id.choose(rng).unwrap().name.clone(); + let mut other_columns = HashMap::new(); + other_columns.insert( + col_name_to_update.clone(), + match columns_no_id + .iter() + .find(|c| c.name == col_name_to_update) + .unwrap() + .ty + .as_str() + { + "TEXT" => Value::Text(format!("updated_{}", rng.random_range(1..i64::MAX))), + "INTEGER" => Value::Integer(rng.random_range(1..i64::MAX)), + "REAL" => Value::Real(rng.random_range(1..i64::MAX) as f64), + _ => Value::Null, + }, + ); + Operation::Update { id, other_columns } } } 2 => { - // Delete - only if there are visible rows + // Delete if visible_rows.is_empty() { // No rows to delete, try insert instead - let id = rng.random_range(1..1000); - let text = format!("text_{}", rng.random_range(1..1000)); - Operation::Insert { id, text } + generate_insert_operation() } else { let id = visible_rows.choose(rng).unwrap().id; Operation::Delete { id } } } - 3 => Operation::Select, + 3 => { + // Select + Operation::Select + } _ => unreachable!(), } }