mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-07 18:24:20 +01:00
Merge 'test/fuzz/transactions: add "PRAGMA wal_checkpoint" to txn isolation fuzz test' from Jussi Saurio
This PR is now ready, since i cannot find any new bugs with the fuzzer. ## Changes - adds `PRAGMA wal_checkpoint` to transaction isolation fuzz test ## Fixes extracted as separate PRs from this one: #2360 #2362 #2365 #2366 #2367 #2380 Closes #2364
This commit is contained in:
@@ -67,7 +67,10 @@ impl ShadowDb {
|
||||
|
||||
fn commit_transaction(&mut self, tx_id: usize) {
|
||||
if let Some(tx_state) = self.transactions.remove(&tx_id) {
|
||||
let tx_state = tx_state.unwrap();
|
||||
let Some(tx_state) = tx_state else {
|
||||
// Transaction hasn't accessed the DB yet -> do nothing
|
||||
return;
|
||||
};
|
||||
// Apply pending changes to committed state
|
||||
for op in tx_state.pending_changes {
|
||||
match op {
|
||||
@@ -158,7 +161,10 @@ impl ShadowDb {
|
||||
return self.committed_rows.values().cloned().collect();
|
||||
};
|
||||
if let Some(tx_state) = self.transactions.get(&tx_id) {
|
||||
let tx_state = tx_state.as_ref().unwrap();
|
||||
let Some(tx_state) = tx_state.as_ref() else {
|
||||
// Transaction hasn't accessed the DB yet -> see committed state
|
||||
return self.committed_rows.values().cloned().collect();
|
||||
};
|
||||
tx_state.visible_rows.values().cloned().collect()
|
||||
} else {
|
||||
// No transaction - see committed state
|
||||
@@ -167,6 +173,23 @@ impl ShadowDb {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum CheckpointMode {
|
||||
Passive,
|
||||
Restart,
|
||||
Truncate,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for CheckpointMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
CheckpointMode::Passive => write!(f, "PASSIVE"),
|
||||
CheckpointMode::Restart => write!(f, "RESTART"),
|
||||
CheckpointMode::Truncate => write!(f, "TRUNCATE"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum Operation {
|
||||
Begin,
|
||||
@@ -175,6 +198,7 @@ enum Operation {
|
||||
Insert { id: i64, text: String },
|
||||
Update { id: i64, text: String },
|
||||
Delete { id: i64 },
|
||||
Checkpoint { mode: CheckpointMode },
|
||||
Select,
|
||||
}
|
||||
|
||||
@@ -192,6 +216,7 @@ impl std::fmt::Display for Operation {
|
||||
}
|
||||
Operation::Delete { id } => write!(f, "DELETE FROM test_table WHERE id = {id}"),
|
||||
Operation::Select => write!(f, "SELECT * FROM test_table"),
|
||||
Operation::Checkpoint { mode } => write!(f, "PRAGMA wal_checkpoint({mode})"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -254,19 +279,8 @@ async fn test_multiple_connections_fuzz() {
|
||||
for op_num in 0..OPERATIONS_PER_CONNECTION {
|
||||
for (conn, conn_id, current_tx_id) in &mut connections {
|
||||
// Generate operation based on current transaction state
|
||||
let visible_rows = if let Some(tx_id) = *current_tx_id {
|
||||
// Take snapshot during first operation after a BEGIN, not immediately at BEGIN (the semantics is BEGIN DEFERRED)
|
||||
let tx_state = shared_shadow_db.transactions.get(&tx_id).unwrap();
|
||||
if tx_state.is_none() {
|
||||
shared_shadow_db.take_snapshot(tx_id);
|
||||
}
|
||||
shared_shadow_db.get_visible_rows(Some(tx_id))
|
||||
} else {
|
||||
shared_shadow_db.get_visible_rows(None) // No transaction
|
||||
};
|
||||
|
||||
let operation =
|
||||
generate_operation(&mut rng, current_tx_id.is_some(), &visible_rows);
|
||||
let (operation, visible_rows) =
|
||||
generate_operation(&mut rng, *current_tx_id, &mut shared_shadow_db);
|
||||
|
||||
println!("Connection {conn_id}(op={op_num}): {operation}");
|
||||
|
||||
@@ -484,6 +498,34 @@ async fn test_multiple_connections_fuzz() {
|
||||
);
|
||||
}
|
||||
}
|
||||
Operation::Checkpoint { mode } => {
|
||||
let query = format!("PRAGMA wal_checkpoint({mode})");
|
||||
let mut rows = conn.query(&query, ()).await.unwrap();
|
||||
|
||||
match rows.next().await {
|
||||
Ok(Some(row)) => {
|
||||
let checkpoint_ok = matches!(row.get_value(0).unwrap(), Value::Integer(0));
|
||||
let wal_page_count = match row.get_value(1).unwrap() {
|
||||
Value::Integer(count) => count.to_string(),
|
||||
Value::Null => "NULL".to_string(),
|
||||
_ => panic!("Unexpected value for wal_page_count: {:?}", row.get_value(1)),
|
||||
};
|
||||
let checkpoint_count = match row.get_value(2).unwrap() {
|
||||
Value::Integer(count) => count.to_string(),
|
||||
Value::Null => "NULL".to_string(),
|
||||
_ => panic!("Unexpected value for checkpoint_count: {:?}", row.get_value(2)),
|
||||
};
|
||||
println!("Connection {conn_id}(op={op_num}) Checkpoint {mode}: OK: {checkpoint_ok}, wal_page_count: {wal_page_count}, checkpointed_count: {checkpoint_count}");
|
||||
}
|
||||
Ok(None) => panic!("Connection {conn_id}(op={op_num}) Checkpoint {mode}: No rows returned"),
|
||||
Err(e) => {
|
||||
println!("Connection {conn_id}(op={op_num}) FAILED: {e}");
|
||||
if !e.to_string().contains("database is locked") && !e.to_string().contains("database table is locked") {
|
||||
panic!("Unexpected error during checkpoint: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -492,36 +534,61 @@ async fn test_multiple_connections_fuzz() {
|
||||
|
||||
fn generate_operation(
|
||||
rng: &mut ChaCha8Rng,
|
||||
in_transaction: bool,
|
||||
visible_rows: &[DbRow],
|
||||
) -> Operation {
|
||||
current_tx_id: Option<usize>,
|
||||
shadow_db: &mut ShadowDb,
|
||||
) -> (Operation, Vec<DbRow>) {
|
||||
let in_transaction = current_tx_id.is_some();
|
||||
let mut get_visible_rows = |accesses_db: bool| {
|
||||
if let Some(tx_id) = current_tx_id {
|
||||
let tx_state = shadow_db.transactions.get(&tx_id).unwrap();
|
||||
// Take snapshot during first operation that accesses the DB after a BEGIN, not immediately at BEGIN (the semantics is BEGIN DEFERRED)
|
||||
if accesses_db && tx_state.is_none() {
|
||||
shadow_db.take_snapshot(tx_id);
|
||||
}
|
||||
shadow_db.get_visible_rows(Some(tx_id))
|
||||
} else {
|
||||
shadow_db.get_visible_rows(None) // No transaction
|
||||
}
|
||||
};
|
||||
match rng.gen_range(0..100) {
|
||||
// 10% chance to begin transaction
|
||||
0..=9 => {
|
||||
if !in_transaction {
|
||||
Operation::Begin
|
||||
(Operation::Begin, get_visible_rows(false))
|
||||
} else {
|
||||
generate_data_operation(rng, visible_rows)
|
||||
let visible_rows = get_visible_rows(true);
|
||||
(generate_data_operation(rng, &visible_rows), visible_rows)
|
||||
}
|
||||
}
|
||||
// 5% chance to commit
|
||||
10..=14 => {
|
||||
if in_transaction {
|
||||
Operation::Commit
|
||||
(Operation::Commit, get_visible_rows(false))
|
||||
} else {
|
||||
generate_data_operation(rng, visible_rows)
|
||||
let visible_rows = get_visible_rows(true);
|
||||
(generate_data_operation(rng, &visible_rows), visible_rows)
|
||||
}
|
||||
}
|
||||
// 5% chance to rollback
|
||||
15..=19 => {
|
||||
if in_transaction {
|
||||
Operation::Rollback
|
||||
(Operation::Rollback, get_visible_rows(false))
|
||||
} else {
|
||||
generate_data_operation(rng, visible_rows)
|
||||
let visible_rows = get_visible_rows(true);
|
||||
(generate_data_operation(rng, &visible_rows), visible_rows)
|
||||
}
|
||||
}
|
||||
20..=22 => {
|
||||
let mode = match rng.gen_range(0..3) {
|
||||
0 => CheckpointMode::Passive,
|
||||
1 => CheckpointMode::Restart,
|
||||
2 => CheckpointMode::Truncate,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
(Operation::Checkpoint { mode }, get_visible_rows(false))
|
||||
}
|
||||
// 80% chance for data operations
|
||||
_ => generate_data_operation(rng, visible_rows),
|
||||
_ => {
|
||||
let visible_rows = get_visible_rows(true);
|
||||
(generate_data_operation(rng, &visible_rows), visible_rows)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user