diff --git a/tests/integration/fuzz_transaction/mod.rs b/tests/integration/fuzz_transaction/mod.rs index 644ee847c..81bf4fb81 100644 --- a/tests/integration/fuzz_transaction/mod.rs +++ b/tests/integration/fuzz_transaction/mod.rs @@ -60,14 +60,19 @@ struct ShadowDb { committed_rows: HashMap, // Transaction states transactions: HashMap>, + query_gen_options: QueryGenOptions, } impl ShadowDb { - fn new(initial_schema: HashMap) -> Self { + fn new( + initial_schema: HashMap, + query_gen_options: QueryGenOptions, + ) -> Self { Self { schema: initial_schema, committed_rows: HashMap::new(), transactions: HashMap::new(), + query_gen_options, } } @@ -388,7 +393,9 @@ impl std::fmt::Display for AlterTableOp { #[derive(Debug, Clone)] enum Operation { - Begin, + Begin { + concurrent: bool, + }, Commit, Rollback, Insert { @@ -423,7 +430,9 @@ fn value_to_sql(v: &Value) -> String { impl std::fmt::Display for Operation { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Operation::Begin => write!(f, "BEGIN"), + Operation::Begin { concurrent } => { + write!(f, "BEGIN{}", if *concurrent { " CONCURRENT" } else { "" }) + } Operation::Commit => write!(f, "COMMIT"), Operation::Rollback => write!(f, "ROLLBACK"), Operation::Insert { id, other_columns } => { @@ -477,31 +486,115 @@ fn rng_from_time_or_env() -> (ChaCha8Rng, u64) { /// Verify translation isolation semantics with multiple concurrent connections. /// This test is ignored because it still fails sometimes; unsure if it fails due to a bug in the test or a bug in the implementation. async fn test_multiple_connections_fuzz() { - multiple_connections_fuzz(false).await + multiple_connections_fuzz(FuzzOptions::default()).await } #[tokio::test] #[ignore = "MVCC is currently under development, it is expected to fail"] // Same as test_multiple_connections_fuzz, but with MVCC enabled. async fn test_multiple_connections_fuzz_mvcc() { - multiple_connections_fuzz(true).await + let mvcc_fuzz_options = FuzzOptions { + mvcc_enabled: true, + max_num_connections: 2, + query_gen_options: QueryGenOptions { + weight_begin_deferred: 8, + weight_begin_concurrent: 8, + weight_commit: 8, + weight_rollback: 8, + weight_checkpoint: 0, + weight_ddl: 0, + weight_dml: 76, + dml_gen_options: DmlGenOptions { + weight_insert: 34, + weight_delete: 33, + weight_select: 33, + weight_update: 0, + }, + }, + ..FuzzOptions::default() + }; + multiple_connections_fuzz(mvcc_fuzz_options).await } -async fn multiple_connections_fuzz(mvcc_enabled: bool) { +#[derive(Debug, Clone)] +struct FuzzOptions { + num_iterations: usize, + operations_per_connection: usize, + max_num_connections: usize, + query_gen_options: QueryGenOptions, + mvcc_enabled: bool, +} + +#[derive(Debug, Clone)] +struct QueryGenOptions { + weight_begin_deferred: usize, + weight_begin_concurrent: usize, + weight_commit: usize, + weight_rollback: usize, + weight_checkpoint: usize, + weight_ddl: usize, + weight_dml: usize, + dml_gen_options: DmlGenOptions, +} + +#[derive(Debug, Clone)] +struct DmlGenOptions { + weight_insert: usize, + weight_update: usize, + weight_delete: usize, + weight_select: usize, +} + +impl Default for FuzzOptions { + fn default() -> Self { + Self { + num_iterations: 50, + operations_per_connection: 30, + max_num_connections: 8, + query_gen_options: QueryGenOptions::default(), + mvcc_enabled: false, + } + } +} + +impl Default for QueryGenOptions { + fn default() -> Self { + Self { + weight_begin_deferred: 10, + weight_begin_concurrent: 0, + weight_commit: 10, + weight_rollback: 10, + weight_checkpoint: 5, + weight_ddl: 5, + weight_dml: 55, + dml_gen_options: DmlGenOptions::default(), + } + } +} + +impl Default for DmlGenOptions { + fn default() -> Self { + Self { + weight_insert: 25, + weight_update: 25, + weight_delete: 25, + weight_select: 25, + } + } +} + +async fn multiple_connections_fuzz(opts: FuzzOptions) { let (mut rng, seed) = rng_from_time_or_env(); println!("Multiple connections fuzz test seed: {seed}"); - const NUM_ITERATIONS: usize = 50; - const OPERATIONS_PER_CONNECTION: usize = 30; - const MAX_NUM_CONNECTIONS: usize = 8; - - for iteration in 0..NUM_ITERATIONS { - let num_connections = rng.random_range(2..=MAX_NUM_CONNECTIONS); + for iteration in 0..opts.num_iterations { + let num_connections = rng.random_range(2..=opts.max_num_connections); println!("--- Seed {seed} Iteration {iteration} ---"); + println!("Options: {opts:?}"); // Create a fresh database for each iteration let tempfile = tempfile::NamedTempFile::new().unwrap(); let db = Builder::new_local(tempfile.path().to_str().unwrap()) - .with_mvcc(mvcc_enabled) + .with_mvcc(opts.mvcc_enabled) .build() .await .unwrap(); @@ -525,7 +618,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { ], }, ); - let mut shared_shadow_db = ShadowDb::new(schema); + let mut shared_shadow_db = ShadowDb::new(schema, opts.query_gen_options.clone()); let mut next_tx_id = 0; // Create connections @@ -544,26 +637,67 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { connections.push((conn, conn_id, None::)); // (connection, conn_id, current_tx_id) } + let is_acceptable_error = |e: &turso::Error| -> bool { + let e_string = e.to_string(); + e_string.contains("is locked") + || e_string.contains("busy") + || e_string.contains("Write-write conflict") + }; + let requires_rollback = |e: &turso::Error| -> bool { + let e_string = e.to_string(); + e_string.contains("Write-write conflict") + }; + + let handle_error = |e: &turso::Error, + tx_id: &mut Option, + conn_id: usize, + op_num: usize, + shadow_db: &mut ShadowDb| { + println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); + if requires_rollback(e) { + if let Some(tx_id) = tx_id { + println!("Connection {conn_id}(op={op_num}) rolling back transaction {tx_id}"); + shadow_db.rollback_transaction(*tx_id); + } + *tx_id = None; + } + if is_acceptable_error(e) { + return; + } + panic!("Unexpected error: {e}"); + }; + // Interleave operations between all connections - for op_num in 0..OPERATIONS_PER_CONNECTION { + for op_num in 0..opts.operations_per_connection { for (conn, conn_id, current_tx_id) in &mut connections { // Generate operation based on current transaction state let (operation, visible_rows) = generate_operation(&mut rng, *current_tx_id, &mut shared_shadow_db); let is_in_tx = current_tx_id.is_some(); + let is_in_tx_str = if is_in_tx { + format!("true(tx_id={:?})", current_tx_id.unwrap()) + } else { + "false".to_string() + }; let has_snapshot = current_tx_id.is_some_and(|tx_id| { shared_shadow_db.transactions.get(&tx_id).unwrap().is_some() }); - println!("Connection {conn_id}(op={op_num}): {operation}, is_in_tx={is_in_tx}, has_snapshot={has_snapshot}"); + println!("Connection {conn_id}(op={op_num}): {operation}, is_in_tx={is_in_tx_str}, has_snapshot={has_snapshot}"); match operation { - Operation::Begin => { + Operation::Begin { concurrent } => { shared_shadow_db.begin_transaction(next_tx_id, false); + if concurrent { + // in tursodb, BEGIN CONCURRENT immediately starts a transaction. + shared_shadow_db.take_snapshot_if_not_exists(next_tx_id); + } *current_tx_id = Some(next_tx_id); next_tx_id += 1; - conn.execute("BEGIN", ()).await.unwrap(); + let query = operation.to_string(); + + conn.execute(query.as_str(), ()).await.unwrap(); } Operation::Commit => { let Some(tx_id) = *current_tx_id else { @@ -578,13 +712,13 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { shared_shadow_db.commit_transaction(tx_id); *current_tx_id = None; } - Err(e) => { - println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); - // Check if it's an acceptable error - if !e.to_string().contains("database is locked") { - panic!("Unexpected error during commit: {e}"); - } - } + Err(e) => handle_error( + &e, + current_tx_id, + *conn_id, + op_num, + &mut shared_shadow_db, + ), } } Operation::Rollback => { @@ -598,15 +732,13 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { shared_shadow_db.rollback_transaction(tx_id); *current_tx_id = None; } - Err(e) => { - println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); - // Check if it's an acceptable error - if !e.to_string().contains("Busy") - && !e.to_string().contains("database is locked") - { - panic!("Unexpected error during rollback: {e}"); - } - } + Err(e) => handle_error( + &e, + current_tx_id, + *conn_id, + op_num, + &mut shared_shadow_db, + ), } } } @@ -645,13 +777,13 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { next_tx_id += 1; } } - Err(e) => { - println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); - // Check if it's an acceptable error - if !e.to_string().contains("database is locked") { - panic!("Unexpected error during insert: {e}"); - } - } + Err(e) => handle_error( + &e, + current_tx_id, + *conn_id, + op_num, + &mut shared_shadow_db, + ), } } Operation::Update { id, other_columns } => { @@ -683,13 +815,13 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { next_tx_id += 1; } } - Err(e) => { - println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); - // Check if it's an acceptable error - if !e.to_string().contains("database is locked") { - panic!("Unexpected error during update: {e}"); - } - } + Err(e) => handle_error( + &e, + current_tx_id, + *conn_id, + op_num, + &mut shared_shadow_db, + ), } } Operation::Delete { id } => { @@ -716,13 +848,13 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { next_tx_id += 1; } } - Err(e) => { - println!("Connection {conn_id}(op={op_num}) FAILED: {e}"); - // Check if it's an acceptable error - if !e.to_string().contains("database is locked") { - panic!("Unexpected error during delete: {e}"); - } - } + Err(e) => handle_error( + &e, + current_tx_id, + *conn_id, + op_num, + &mut shared_shadow_db, + ), } } Operation::Select => { @@ -735,9 +867,13 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) { let ok = loop { match rows.next().await { Err(e) => { - if !e.to_string().contains("database is locked") { - panic!("Unexpected error during select: {e}"); - } + handle_error( + &e, + current_tx_id, + *conn_id, + op_num, + &mut shared_shadow_db, + ); break false; } Ok(None) => { @@ -879,107 +1015,156 @@ fn generate_operation( shadow_db.get_visible_rows(None) // No transaction } }; - match rng.random_range(0..100) { - 0..=9 => { - if !in_transaction { - (Operation::Begin, get_visible_rows()) - } else { - let visible_rows = get_visible_rows(); - ( - generate_data_operation(rng, &visible_rows, &schema_clone), - visible_rows, - ) - } - } - 10..=14 => { - if in_transaction { - (Operation::Commit, get_visible_rows()) - } else { - let visible_rows = get_visible_rows(); - ( - generate_data_operation(rng, &visible_rows, &schema_clone), - visible_rows, - ) - } - } - 15..=19 => { - if in_transaction { - (Operation::Rollback, get_visible_rows()) - } else { - let visible_rows = get_visible_rows(); - ( - generate_data_operation(rng, &visible_rows, &schema_clone), - visible_rows, - ) - } - } - 20..=22 => { - let mode = match rng.random_range(0..=3) { - 0 => CheckpointMode::Passive, - 1 => CheckpointMode::Restart, - 2 => CheckpointMode::Truncate, - 3 => CheckpointMode::Full, - _ => unreachable!(), - }; - (Operation::Checkpoint { mode }, get_visible_rows()) - } - 23..=26 => { - let op = match rng.random_range(0..6) { - 0..=2 => AlterTableOp::AddColumn { - name: format!("col_{}", rng.random_range(1..i64::MAX)), - ty: "TEXT".to_string(), - }, - 3..=4 => { - let table_schema = schema_clone.get("test_table").unwrap(); - let columns_no_id = table_schema - .columns - .iter() - .filter(|c| c.name != "id") - .collect::>(); - if columns_no_id.is_empty() { - AlterTableOp::AddColumn { - name: format!("col_{}", rng.random_range(1..i64::MAX)), - ty: "TEXT".to_string(), - } - } else { - let column = columns_no_id.choose(rng).unwrap(); - AlterTableOp::DropColumn { - name: column.name.clone(), - } - } - } - 5 => { - let columns_no_id = schema_clone - .get("test_table") - .unwrap() - .columns - .iter() - .filter(|c| c.name != "id") - .collect::>(); - if columns_no_id.is_empty() { - AlterTableOp::AddColumn { - name: format!("col_{}", rng.random_range(1..i64::MAX)), - ty: "TEXT".to_string(), - } - } else { - let column = columns_no_id.choose(rng).unwrap(); - AlterTableOp::RenameColumn { - old_name: column.name.clone(), - new_name: format!("col_{}", rng.random_range(1..i64::MAX)), - } - } - } - _ => unreachable!(), - }; - (Operation::AlterTable { op }, get_visible_rows()) - } - _ => { + + let mut start = 0; + let range_begin_deferred = start..start + shadow_db.query_gen_options.weight_begin_deferred; + start += shadow_db.query_gen_options.weight_begin_deferred; + let range_begin_concurrent = start..start + shadow_db.query_gen_options.weight_begin_concurrent; + start += shadow_db.query_gen_options.weight_begin_concurrent; + let range_commit = start..start + shadow_db.query_gen_options.weight_commit; + start += shadow_db.query_gen_options.weight_commit; + let range_rollback = start..start + shadow_db.query_gen_options.weight_rollback; + start += shadow_db.query_gen_options.weight_rollback; + let range_checkpoint = start..start + shadow_db.query_gen_options.weight_checkpoint; + start += shadow_db.query_gen_options.weight_checkpoint; + let range_ddl = start..start + shadow_db.query_gen_options.weight_ddl; + start += shadow_db.query_gen_options.weight_ddl; + let range_dml = start..start + shadow_db.query_gen_options.weight_dml; + start += shadow_db.query_gen_options.weight_dml; + + let random_val = rng.random_range(0..start); + + if range_begin_deferred.contains(&random_val) { + if !in_transaction { + (Operation::Begin { concurrent: false }, get_visible_rows()) + } else { let visible_rows = get_visible_rows(); ( - generate_data_operation(rng, &visible_rows, &schema_clone), + generate_data_operation( + rng, + &visible_rows, + &schema_clone, + &shadow_db.query_gen_options.dml_gen_options, + ), visible_rows, ) } + } else if range_begin_concurrent.contains(&random_val) { + if !in_transaction { + (Operation::Begin { concurrent: true }, get_visible_rows()) + } else { + let visible_rows = get_visible_rows(); + ( + generate_data_operation( + rng, + &visible_rows, + &schema_clone, + &shadow_db.query_gen_options.dml_gen_options, + ), + visible_rows, + ) + } + } else if range_commit.contains(&random_val) { + if in_transaction { + (Operation::Commit, get_visible_rows()) + } else { + let visible_rows = get_visible_rows(); + ( + generate_data_operation( + rng, + &visible_rows, + &schema_clone, + &shadow_db.query_gen_options.dml_gen_options, + ), + visible_rows, + ) + } + } else if range_rollback.contains(&random_val) { + if in_transaction { + (Operation::Rollback, get_visible_rows()) + } else { + let visible_rows = get_visible_rows(); + ( + generate_data_operation( + rng, + &visible_rows, + &schema_clone, + &shadow_db.query_gen_options.dml_gen_options, + ), + visible_rows, + ) + } + } else if range_checkpoint.contains(&random_val) { + let mode = match rng.random_range(0..=3) { + 0 => CheckpointMode::Passive, + 1 => CheckpointMode::Restart, + 2 => CheckpointMode::Truncate, + 3 => CheckpointMode::Full, + _ => unreachable!(), + }; + (Operation::Checkpoint { mode }, get_visible_rows()) + } else if range_ddl.contains(&random_val) { + let op = match rng.random_range(0..6) { + 0..=2 => AlterTableOp::AddColumn { + name: format!("col_{}", rng.random_range(1..i64::MAX)), + ty: "TEXT".to_string(), + }, + 3..=4 => { + let table_schema = schema_clone.get("test_table").unwrap(); + let columns_no_id = table_schema + .columns + .iter() + .filter(|c| c.name != "id") + .collect::>(); + if columns_no_id.is_empty() { + AlterTableOp::AddColumn { + name: format!("col_{}", rng.random_range(1..i64::MAX)), + ty: "TEXT".to_string(), + } + } else { + let column = columns_no_id.choose(rng).unwrap(); + AlterTableOp::DropColumn { + name: column.name.clone(), + } + } + } + 5 => { + let columns_no_id = schema_clone + .get("test_table") + .unwrap() + .columns + .iter() + .filter(|c| c.name != "id") + .collect::>(); + if columns_no_id.is_empty() { + AlterTableOp::AddColumn { + name: format!("col_{}", rng.random_range(1..i64::MAX)), + ty: "TEXT".to_string(), + } + } else { + let column = columns_no_id.choose(rng).unwrap(); + AlterTableOp::RenameColumn { + old_name: column.name.clone(), + new_name: format!("col_{}", rng.random_range(1..i64::MAX)), + } + } + } + _ => unreachable!(), + }; + (Operation::AlterTable { op }, get_visible_rows()) + } else if range_dml.contains(&random_val) { + let visible_rows = get_visible_rows(); + ( + generate_data_operation( + rng, + &visible_rows, + &schema_clone, + &shadow_db.query_gen_options.dml_gen_options, + ), + visible_rows, + ) + } else { + unreachable!() } } @@ -987,10 +1172,10 @@ fn generate_data_operation( rng: &mut ChaCha8Rng, visible_rows: &[DbRow], schema: &HashMap, + dml_gen_options: &DmlGenOptions, ) -> Operation { let table_schema = schema.get("test_table").unwrap(); - let op_num = rng.random_range(0..4); - let mut generate_insert_operation = || { + let generate_insert_operation = |rng: &mut ChaCha8Rng| { let id = rng.random_range(1..i64::MAX); let mut other_columns = HashMap::new(); for column in table_schema.columns.iter() { @@ -1009,61 +1194,65 @@ fn generate_data_operation( } Operation::Insert { id, other_columns } }; - match op_num { - 0 => { - // Insert - generate_insert_operation() - } - 1 => { - // Update - if visible_rows.is_empty() { - // No rows to update, try insert instead - generate_insert_operation() - } else { - let columns_no_id = table_schema - .columns + let mut start = 0; + let range_insert = start..start + dml_gen_options.weight_insert; + start += dml_gen_options.weight_insert; + let range_update = start..start + dml_gen_options.weight_update; + start += dml_gen_options.weight_update; + let range_delete = start..start + dml_gen_options.weight_delete; + start += dml_gen_options.weight_delete; + let range_select = start..start + dml_gen_options.weight_select; + start += dml_gen_options.weight_select; + + let random_val = rng.random_range(0..start); + + if range_insert.contains(&random_val) { + generate_insert_operation(rng) + } else if range_update.contains(&random_val) { + if visible_rows.is_empty() { + // No rows to update, try insert instead + generate_insert_operation(rng) + } else { + let columns_no_id = table_schema + .columns + .iter() + .filter(|c| c.name != "id") + .collect::>(); + if columns_no_id.is_empty() { + // No columns to update, try insert instead + return generate_insert_operation(rng); + } + let id = visible_rows.choose(rng).unwrap().id; + let col_name_to_update = columns_no_id.choose(rng).unwrap().name.clone(); + let mut other_columns = HashMap::new(); + other_columns.insert( + col_name_to_update.clone(), + match columns_no_id .iter() - .filter(|c| c.name != "id") - .collect::>(); - if columns_no_id.is_empty() { - // No columns to update, try insert instead - return generate_insert_operation(); - } - let id = visible_rows.choose(rng).unwrap().id; - let col_name_to_update = columns_no_id.choose(rng).unwrap().name.clone(); - let mut other_columns = HashMap::new(); - other_columns.insert( - col_name_to_update.clone(), - match columns_no_id - .iter() - .find(|c| c.name == col_name_to_update) - .unwrap() - .ty - .as_str() - { - "TEXT" => Value::Text(format!("updated_{}", rng.random_range(1..i64::MAX))), - "INTEGER" => Value::Integer(rng.random_range(1..i64::MAX)), - "REAL" => Value::Real(rng.random_range(1..i64::MAX) as f64), - _ => Value::Null, - }, - ); - Operation::Update { id, other_columns } - } + .find(|c| c.name == col_name_to_update) + .unwrap() + .ty + .as_str() + { + "TEXT" => Value::Text(format!("updated_{}", rng.random_range(1..i64::MAX))), + "INTEGER" => Value::Integer(rng.random_range(1..i64::MAX)), + "REAL" => Value::Real(rng.random_range(1..i64::MAX) as f64), + _ => Value::Null, + }, + ); + Operation::Update { id, other_columns } } - 2 => { - // Delete - if visible_rows.is_empty() { - // No rows to delete, try insert instead - generate_insert_operation() - } else { - let id = visible_rows.choose(rng).unwrap().id; - Operation::Delete { id } - } + } else if range_delete.contains(&random_val) { + if visible_rows.is_empty() { + // No rows to delete, try insert instead + generate_insert_operation(rng) + } else { + let id = visible_rows.choose(rng).unwrap().id; + Operation::Delete { id } } - 3 => { - // Select - Operation::Select - } - _ => unreachable!(), + } else if range_select.contains(&random_val) { + Operation::Select + } else { + unreachable!() } }