Merge 'test/fuzz: improve maintainability/usability of tx isolation test' from Jussi Saurio

**test/fuzz: introduce fuzzoptions to tx isolation test**
this makes it significantly easier to tweak the tx isolation test
parameters, and also makes it much easier to run the MVCC version of the
test without manually tweaking code inline to make it work.
introduces default options for the non-mvcc and mvcc test variants.
---
**test/fuzz: improve error handling in tx isolation fuzz test**
- extract out common behavior for checking acceptable errors
- add functionality to check which errors require rolling back
  a transaction

Closes #3118
This commit is contained in:
Jussi Saurio
2025-09-15 09:21:49 +03:00
committed by GitHub

View File

@@ -60,14 +60,19 @@ struct ShadowDb {
committed_rows: HashMap<i64, DbRow>,
// Transaction states
transactions: HashMap<usize, Option<TransactionState>>,
query_gen_options: QueryGenOptions,
}
impl ShadowDb {
fn new(initial_schema: HashMap<String, TableSchema>) -> Self {
fn new(
initial_schema: HashMap<String, TableSchema>,
query_gen_options: QueryGenOptions,
) -> Self {
Self {
schema: initial_schema,
committed_rows: HashMap::new(),
transactions: HashMap::new(),
query_gen_options,
}
}
@@ -388,7 +393,9 @@ impl std::fmt::Display for AlterTableOp {
#[derive(Debug, Clone)]
enum Operation {
Begin,
Begin {
concurrent: bool,
},
Commit,
Rollback,
Insert {
@@ -423,7 +430,9 @@ fn value_to_sql(v: &Value) -> String {
impl std::fmt::Display for Operation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Operation::Begin => write!(f, "BEGIN"),
Operation::Begin { concurrent } => {
write!(f, "BEGIN{}", if *concurrent { " CONCURRENT" } else { "" })
}
Operation::Commit => write!(f, "COMMIT"),
Operation::Rollback => write!(f, "ROLLBACK"),
Operation::Insert { id, other_columns } => {
@@ -477,31 +486,115 @@ fn rng_from_time_or_env() -> (ChaCha8Rng, u64) {
/// Verify translation isolation semantics with multiple concurrent connections.
/// This test is ignored because it still fails sometimes; unsure if it fails due to a bug in the test or a bug in the implementation.
async fn test_multiple_connections_fuzz() {
multiple_connections_fuzz(false).await
multiple_connections_fuzz(FuzzOptions::default()).await
}
#[tokio::test]
#[ignore = "MVCC is currently under development, it is expected to fail"]
// Same as test_multiple_connections_fuzz, but with MVCC enabled.
async fn test_multiple_connections_fuzz_mvcc() {
multiple_connections_fuzz(true).await
let mvcc_fuzz_options = FuzzOptions {
mvcc_enabled: true,
max_num_connections: 2,
query_gen_options: QueryGenOptions {
weight_begin_deferred: 8,
weight_begin_concurrent: 8,
weight_commit: 8,
weight_rollback: 8,
weight_checkpoint: 0,
weight_ddl: 0,
weight_dml: 76,
dml_gen_options: DmlGenOptions {
weight_insert: 34,
weight_delete: 33,
weight_select: 33,
weight_update: 0,
},
},
..FuzzOptions::default()
};
multiple_connections_fuzz(mvcc_fuzz_options).await
}
async fn multiple_connections_fuzz(mvcc_enabled: bool) {
#[derive(Debug, Clone)]
struct FuzzOptions {
num_iterations: usize,
operations_per_connection: usize,
max_num_connections: usize,
query_gen_options: QueryGenOptions,
mvcc_enabled: bool,
}
#[derive(Debug, Clone)]
struct QueryGenOptions {
weight_begin_deferred: usize,
weight_begin_concurrent: usize,
weight_commit: usize,
weight_rollback: usize,
weight_checkpoint: usize,
weight_ddl: usize,
weight_dml: usize,
dml_gen_options: DmlGenOptions,
}
#[derive(Debug, Clone)]
struct DmlGenOptions {
weight_insert: usize,
weight_update: usize,
weight_delete: usize,
weight_select: usize,
}
impl Default for FuzzOptions {
fn default() -> Self {
Self {
num_iterations: 50,
operations_per_connection: 30,
max_num_connections: 8,
query_gen_options: QueryGenOptions::default(),
mvcc_enabled: false,
}
}
}
impl Default for QueryGenOptions {
fn default() -> Self {
Self {
weight_begin_deferred: 10,
weight_begin_concurrent: 0,
weight_commit: 10,
weight_rollback: 10,
weight_checkpoint: 5,
weight_ddl: 5,
weight_dml: 55,
dml_gen_options: DmlGenOptions::default(),
}
}
}
impl Default for DmlGenOptions {
fn default() -> Self {
Self {
weight_insert: 25,
weight_update: 25,
weight_delete: 25,
weight_select: 25,
}
}
}
async fn multiple_connections_fuzz(opts: FuzzOptions) {
let (mut rng, seed) = rng_from_time_or_env();
println!("Multiple connections fuzz test seed: {seed}");
const NUM_ITERATIONS: usize = 50;
const OPERATIONS_PER_CONNECTION: usize = 30;
const MAX_NUM_CONNECTIONS: usize = 8;
for iteration in 0..NUM_ITERATIONS {
let num_connections = rng.random_range(2..=MAX_NUM_CONNECTIONS);
for iteration in 0..opts.num_iterations {
let num_connections = rng.random_range(2..=opts.max_num_connections);
println!("--- Seed {seed} Iteration {iteration} ---");
println!("Options: {opts:?}");
// Create a fresh database for each iteration
let tempfile = tempfile::NamedTempFile::new().unwrap();
let db = Builder::new_local(tempfile.path().to_str().unwrap())
.with_mvcc(mvcc_enabled)
.with_mvcc(opts.mvcc_enabled)
.build()
.await
.unwrap();
@@ -525,7 +618,7 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
],
},
);
let mut shared_shadow_db = ShadowDb::new(schema);
let mut shared_shadow_db = ShadowDb::new(schema, opts.query_gen_options.clone());
let mut next_tx_id = 0;
// Create connections
@@ -544,26 +637,67 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
connections.push((conn, conn_id, None::<usize>)); // (connection, conn_id, current_tx_id)
}
let is_acceptable_error = |e: &turso::Error| -> bool {
let e_string = e.to_string();
e_string.contains("is locked")
|| e_string.contains("busy")
|| e_string.contains("Write-write conflict")
};
let requires_rollback = |e: &turso::Error| -> bool {
let e_string = e.to_string();
e_string.contains("Write-write conflict")
};
let handle_error = |e: &turso::Error,
tx_id: &mut Option<usize>,
conn_id: usize,
op_num: usize,
shadow_db: &mut ShadowDb| {
println!("Connection {conn_id}(op={op_num}) FAILED: {e}");
if requires_rollback(e) {
if let Some(tx_id) = tx_id {
println!("Connection {conn_id}(op={op_num}) rolling back transaction {tx_id}");
shadow_db.rollback_transaction(*tx_id);
}
*tx_id = None;
}
if is_acceptable_error(e) {
return;
}
panic!("Unexpected error: {e}");
};
// Interleave operations between all connections
for op_num in 0..OPERATIONS_PER_CONNECTION {
for op_num in 0..opts.operations_per_connection {
for (conn, conn_id, current_tx_id) in &mut connections {
// Generate operation based on current transaction state
let (operation, visible_rows) =
generate_operation(&mut rng, *current_tx_id, &mut shared_shadow_db);
let is_in_tx = current_tx_id.is_some();
let is_in_tx_str = if is_in_tx {
format!("true(tx_id={:?})", current_tx_id.unwrap())
} else {
"false".to_string()
};
let has_snapshot = current_tx_id.is_some_and(|tx_id| {
shared_shadow_db.transactions.get(&tx_id).unwrap().is_some()
});
println!("Connection {conn_id}(op={op_num}): {operation}, is_in_tx={is_in_tx}, has_snapshot={has_snapshot}");
println!("Connection {conn_id}(op={op_num}): {operation}, is_in_tx={is_in_tx_str}, has_snapshot={has_snapshot}");
match operation {
Operation::Begin => {
Operation::Begin { concurrent } => {
shared_shadow_db.begin_transaction(next_tx_id, false);
if concurrent {
// in tursodb, BEGIN CONCURRENT immediately starts a transaction.
shared_shadow_db.take_snapshot_if_not_exists(next_tx_id);
}
*current_tx_id = Some(next_tx_id);
next_tx_id += 1;
conn.execute("BEGIN", ()).await.unwrap();
let query = operation.to_string();
conn.execute(query.as_str(), ()).await.unwrap();
}
Operation::Commit => {
let Some(tx_id) = *current_tx_id else {
@@ -578,13 +712,13 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
shared_shadow_db.commit_transaction(tx_id);
*current_tx_id = None;
}
Err(e) => {
println!("Connection {conn_id}(op={op_num}) FAILED: {e}");
// Check if it's an acceptable error
if !e.to_string().contains("database is locked") {
panic!("Unexpected error during commit: {e}");
}
}
Err(e) => handle_error(
&e,
current_tx_id,
*conn_id,
op_num,
&mut shared_shadow_db,
),
}
}
Operation::Rollback => {
@@ -598,15 +732,13 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
shared_shadow_db.rollback_transaction(tx_id);
*current_tx_id = None;
}
Err(e) => {
println!("Connection {conn_id}(op={op_num}) FAILED: {e}");
// Check if it's an acceptable error
if !e.to_string().contains("Busy")
&& !e.to_string().contains("database is locked")
{
panic!("Unexpected error during rollback: {e}");
}
}
Err(e) => handle_error(
&e,
current_tx_id,
*conn_id,
op_num,
&mut shared_shadow_db,
),
}
}
}
@@ -645,13 +777,13 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
next_tx_id += 1;
}
}
Err(e) => {
println!("Connection {conn_id}(op={op_num}) FAILED: {e}");
// Check if it's an acceptable error
if !e.to_string().contains("database is locked") {
panic!("Unexpected error during insert: {e}");
}
}
Err(e) => handle_error(
&e,
current_tx_id,
*conn_id,
op_num,
&mut shared_shadow_db,
),
}
}
Operation::Update { id, other_columns } => {
@@ -683,13 +815,13 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
next_tx_id += 1;
}
}
Err(e) => {
println!("Connection {conn_id}(op={op_num}) FAILED: {e}");
// Check if it's an acceptable error
if !e.to_string().contains("database is locked") {
panic!("Unexpected error during update: {e}");
}
}
Err(e) => handle_error(
&e,
current_tx_id,
*conn_id,
op_num,
&mut shared_shadow_db,
),
}
}
Operation::Delete { id } => {
@@ -716,13 +848,13 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
next_tx_id += 1;
}
}
Err(e) => {
println!("Connection {conn_id}(op={op_num}) FAILED: {e}");
// Check if it's an acceptable error
if !e.to_string().contains("database is locked") {
panic!("Unexpected error during delete: {e}");
}
}
Err(e) => handle_error(
&e,
current_tx_id,
*conn_id,
op_num,
&mut shared_shadow_db,
),
}
}
Operation::Select => {
@@ -735,9 +867,13 @@ async fn multiple_connections_fuzz(mvcc_enabled: bool) {
let ok = loop {
match rows.next().await {
Err(e) => {
if !e.to_string().contains("database is locked") {
panic!("Unexpected error during select: {e}");
}
handle_error(
&e,
current_tx_id,
*conn_id,
op_num,
&mut shared_shadow_db,
);
break false;
}
Ok(None) => {
@@ -879,107 +1015,156 @@ fn generate_operation(
shadow_db.get_visible_rows(None) // No transaction
}
};
match rng.random_range(0..100) {
0..=9 => {
if !in_transaction {
(Operation::Begin, get_visible_rows())
} else {
let visible_rows = get_visible_rows();
(
generate_data_operation(rng, &visible_rows, &schema_clone),
visible_rows,
)
}
}
10..=14 => {
if in_transaction {
(Operation::Commit, get_visible_rows())
} else {
let visible_rows = get_visible_rows();
(
generate_data_operation(rng, &visible_rows, &schema_clone),
visible_rows,
)
}
}
15..=19 => {
if in_transaction {
(Operation::Rollback, get_visible_rows())
} else {
let visible_rows = get_visible_rows();
(
generate_data_operation(rng, &visible_rows, &schema_clone),
visible_rows,
)
}
}
20..=22 => {
let mode = match rng.random_range(0..=3) {
0 => CheckpointMode::Passive,
1 => CheckpointMode::Restart,
2 => CheckpointMode::Truncate,
3 => CheckpointMode::Full,
_ => unreachable!(),
};
(Operation::Checkpoint { mode }, get_visible_rows())
}
23..=26 => {
let op = match rng.random_range(0..6) {
0..=2 => AlterTableOp::AddColumn {
name: format!("col_{}", rng.random_range(1..i64::MAX)),
ty: "TEXT".to_string(),
},
3..=4 => {
let table_schema = schema_clone.get("test_table").unwrap();
let columns_no_id = table_schema
.columns
.iter()
.filter(|c| c.name != "id")
.collect::<Vec<_>>();
if columns_no_id.is_empty() {
AlterTableOp::AddColumn {
name: format!("col_{}", rng.random_range(1..i64::MAX)),
ty: "TEXT".to_string(),
}
} else {
let column = columns_no_id.choose(rng).unwrap();
AlterTableOp::DropColumn {
name: column.name.clone(),
}
}
}
5 => {
let columns_no_id = schema_clone
.get("test_table")
.unwrap()
.columns
.iter()
.filter(|c| c.name != "id")
.collect::<Vec<_>>();
if columns_no_id.is_empty() {
AlterTableOp::AddColumn {
name: format!("col_{}", rng.random_range(1..i64::MAX)),
ty: "TEXT".to_string(),
}
} else {
let column = columns_no_id.choose(rng).unwrap();
AlterTableOp::RenameColumn {
old_name: column.name.clone(),
new_name: format!("col_{}", rng.random_range(1..i64::MAX)),
}
}
}
_ => unreachable!(),
};
(Operation::AlterTable { op }, get_visible_rows())
}
_ => {
let mut start = 0;
let range_begin_deferred = start..start + shadow_db.query_gen_options.weight_begin_deferred;
start += shadow_db.query_gen_options.weight_begin_deferred;
let range_begin_concurrent = start..start + shadow_db.query_gen_options.weight_begin_concurrent;
start += shadow_db.query_gen_options.weight_begin_concurrent;
let range_commit = start..start + shadow_db.query_gen_options.weight_commit;
start += shadow_db.query_gen_options.weight_commit;
let range_rollback = start..start + shadow_db.query_gen_options.weight_rollback;
start += shadow_db.query_gen_options.weight_rollback;
let range_checkpoint = start..start + shadow_db.query_gen_options.weight_checkpoint;
start += shadow_db.query_gen_options.weight_checkpoint;
let range_ddl = start..start + shadow_db.query_gen_options.weight_ddl;
start += shadow_db.query_gen_options.weight_ddl;
let range_dml = start..start + shadow_db.query_gen_options.weight_dml;
start += shadow_db.query_gen_options.weight_dml;
let random_val = rng.random_range(0..start);
if range_begin_deferred.contains(&random_val) {
if !in_transaction {
(Operation::Begin { concurrent: false }, get_visible_rows())
} else {
let visible_rows = get_visible_rows();
(
generate_data_operation(rng, &visible_rows, &schema_clone),
generate_data_operation(
rng,
&visible_rows,
&schema_clone,
&shadow_db.query_gen_options.dml_gen_options,
),
visible_rows,
)
}
} else if range_begin_concurrent.contains(&random_val) {
if !in_transaction {
(Operation::Begin { concurrent: true }, get_visible_rows())
} else {
let visible_rows = get_visible_rows();
(
generate_data_operation(
rng,
&visible_rows,
&schema_clone,
&shadow_db.query_gen_options.dml_gen_options,
),
visible_rows,
)
}
} else if range_commit.contains(&random_val) {
if in_transaction {
(Operation::Commit, get_visible_rows())
} else {
let visible_rows = get_visible_rows();
(
generate_data_operation(
rng,
&visible_rows,
&schema_clone,
&shadow_db.query_gen_options.dml_gen_options,
),
visible_rows,
)
}
} else if range_rollback.contains(&random_val) {
if in_transaction {
(Operation::Rollback, get_visible_rows())
} else {
let visible_rows = get_visible_rows();
(
generate_data_operation(
rng,
&visible_rows,
&schema_clone,
&shadow_db.query_gen_options.dml_gen_options,
),
visible_rows,
)
}
} else if range_checkpoint.contains(&random_val) {
let mode = match rng.random_range(0..=3) {
0 => CheckpointMode::Passive,
1 => CheckpointMode::Restart,
2 => CheckpointMode::Truncate,
3 => CheckpointMode::Full,
_ => unreachable!(),
};
(Operation::Checkpoint { mode }, get_visible_rows())
} else if range_ddl.contains(&random_val) {
let op = match rng.random_range(0..6) {
0..=2 => AlterTableOp::AddColumn {
name: format!("col_{}", rng.random_range(1..i64::MAX)),
ty: "TEXT".to_string(),
},
3..=4 => {
let table_schema = schema_clone.get("test_table").unwrap();
let columns_no_id = table_schema
.columns
.iter()
.filter(|c| c.name != "id")
.collect::<Vec<_>>();
if columns_no_id.is_empty() {
AlterTableOp::AddColumn {
name: format!("col_{}", rng.random_range(1..i64::MAX)),
ty: "TEXT".to_string(),
}
} else {
let column = columns_no_id.choose(rng).unwrap();
AlterTableOp::DropColumn {
name: column.name.clone(),
}
}
}
5 => {
let columns_no_id = schema_clone
.get("test_table")
.unwrap()
.columns
.iter()
.filter(|c| c.name != "id")
.collect::<Vec<_>>();
if columns_no_id.is_empty() {
AlterTableOp::AddColumn {
name: format!("col_{}", rng.random_range(1..i64::MAX)),
ty: "TEXT".to_string(),
}
} else {
let column = columns_no_id.choose(rng).unwrap();
AlterTableOp::RenameColumn {
old_name: column.name.clone(),
new_name: format!("col_{}", rng.random_range(1..i64::MAX)),
}
}
}
_ => unreachable!(),
};
(Operation::AlterTable { op }, get_visible_rows())
} else if range_dml.contains(&random_val) {
let visible_rows = get_visible_rows();
(
generate_data_operation(
rng,
&visible_rows,
&schema_clone,
&shadow_db.query_gen_options.dml_gen_options,
),
visible_rows,
)
} else {
unreachable!()
}
}
@@ -987,10 +1172,10 @@ fn generate_data_operation(
rng: &mut ChaCha8Rng,
visible_rows: &[DbRow],
schema: &HashMap<String, TableSchema>,
dml_gen_options: &DmlGenOptions,
) -> Operation {
let table_schema = schema.get("test_table").unwrap();
let op_num = rng.random_range(0..4);
let mut generate_insert_operation = || {
let generate_insert_operation = |rng: &mut ChaCha8Rng| {
let id = rng.random_range(1..i64::MAX);
let mut other_columns = HashMap::new();
for column in table_schema.columns.iter() {
@@ -1009,61 +1194,65 @@ fn generate_data_operation(
}
Operation::Insert { id, other_columns }
};
match op_num {
0 => {
// Insert
generate_insert_operation()
}
1 => {
// Update
if visible_rows.is_empty() {
// No rows to update, try insert instead
generate_insert_operation()
} else {
let columns_no_id = table_schema
.columns
let mut start = 0;
let range_insert = start..start + dml_gen_options.weight_insert;
start += dml_gen_options.weight_insert;
let range_update = start..start + dml_gen_options.weight_update;
start += dml_gen_options.weight_update;
let range_delete = start..start + dml_gen_options.weight_delete;
start += dml_gen_options.weight_delete;
let range_select = start..start + dml_gen_options.weight_select;
start += dml_gen_options.weight_select;
let random_val = rng.random_range(0..start);
if range_insert.contains(&random_val) {
generate_insert_operation(rng)
} else if range_update.contains(&random_val) {
if visible_rows.is_empty() {
// No rows to update, try insert instead
generate_insert_operation(rng)
} else {
let columns_no_id = table_schema
.columns
.iter()
.filter(|c| c.name != "id")
.collect::<Vec<_>>();
if columns_no_id.is_empty() {
// No columns to update, try insert instead
return generate_insert_operation(rng);
}
let id = visible_rows.choose(rng).unwrap().id;
let col_name_to_update = columns_no_id.choose(rng).unwrap().name.clone();
let mut other_columns = HashMap::new();
other_columns.insert(
col_name_to_update.clone(),
match columns_no_id
.iter()
.filter(|c| c.name != "id")
.collect::<Vec<_>>();
if columns_no_id.is_empty() {
// No columns to update, try insert instead
return generate_insert_operation();
}
let id = visible_rows.choose(rng).unwrap().id;
let col_name_to_update = columns_no_id.choose(rng).unwrap().name.clone();
let mut other_columns = HashMap::new();
other_columns.insert(
col_name_to_update.clone(),
match columns_no_id
.iter()
.find(|c| c.name == col_name_to_update)
.unwrap()
.ty
.as_str()
{
"TEXT" => Value::Text(format!("updated_{}", rng.random_range(1..i64::MAX))),
"INTEGER" => Value::Integer(rng.random_range(1..i64::MAX)),
"REAL" => Value::Real(rng.random_range(1..i64::MAX) as f64),
_ => Value::Null,
},
);
Operation::Update { id, other_columns }
}
.find(|c| c.name == col_name_to_update)
.unwrap()
.ty
.as_str()
{
"TEXT" => Value::Text(format!("updated_{}", rng.random_range(1..i64::MAX))),
"INTEGER" => Value::Integer(rng.random_range(1..i64::MAX)),
"REAL" => Value::Real(rng.random_range(1..i64::MAX) as f64),
_ => Value::Null,
},
);
Operation::Update { id, other_columns }
}
2 => {
// Delete
if visible_rows.is_empty() {
// No rows to delete, try insert instead
generate_insert_operation()
} else {
let id = visible_rows.choose(rng).unwrap().id;
Operation::Delete { id }
}
} else if range_delete.contains(&random_val) {
if visible_rows.is_empty() {
// No rows to delete, try insert instead
generate_insert_operation(rng)
} else {
let id = visible_rows.choose(rng).unwrap().id;
Operation::Delete { id }
}
3 => {
// Select
Operation::Select
}
_ => unreachable!(),
} else if range_select.contains(&random_val) {
Operation::Select
} else {
unreachable!()
}
}