From 0288f4aac62e47392d9f7e68b5bb4564dfb34c9d Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Fri, 27 Jun 2025 23:49:47 -0300 Subject: [PATCH] introduce fsync interaction + property --- simulator/generation/plan.rs | 143 ++++++++++++++++++++----------- simulator/generation/property.rs | 63 +++++++++++++- simulator/runner/differential.rs | 9 ++ simulator/runner/execution.rs | 18 +++- simulator/shrink/plan.rs | 3 +- 5 files changed, 185 insertions(+), 51 deletions(-) diff --git a/simulator/generation/plan.rs b/simulator/generation/plan.rs index c0e7e55bc..24b76c331 100644 --- a/simulator/generation/plan.rs +++ b/simulator/generation/plan.rs @@ -188,6 +188,10 @@ impl Display for InteractionPlan { writeln!(f, "-- ASSERT {};", assertion.message)? } Interaction::Fault(fault) => writeln!(f, "-- FAULT '{}';", fault)?, + Interaction::FsyncQuery(query) => { + writeln!(f, "-- FSYNC QUERY;")?; + writeln!(f, "{};", query)? + } } } writeln!(f, "-- end testing '{}'", name)?; @@ -238,6 +242,9 @@ pub(crate) enum Interaction { Assumption(Assertion), Assertion(Assertion), Fault(Fault), + /// Will attempt to run any random query. However, when the connection tries to sync it will + /// close all connections and reopen the database and assert that no data was lost + FsyncQuery(Query), } impl Display for Interaction { @@ -247,6 +254,7 @@ impl Display for Interaction { Self::Assumption(assumption) => write!(f, "ASSUME {}", assumption.message), Self::Assertion(assertion) => write!(f, "ASSERT {}", assertion.message), Self::Fault(fault) => write!(f, "FAULT '{}'", fault), + Self::FsyncQuery(query) => write!(f, "{}", query), } } } @@ -373,6 +381,8 @@ impl Interactions { select1.shadow(env); select2.shadow(env); } + // Nothing should change + Property::FsyncNoWait { .. } => {} } for interaction in property.interactions() { match interaction { @@ -402,6 +412,8 @@ impl Interactions { Interaction::Assertion(_) => {} Interaction::Assumption(_) => {} Interaction::Fault(_) => {} + // FsyncQuery should not shadow as we are not going to run it to completion + Interaction::FsyncQuery(_) => {} } } } @@ -503,7 +515,9 @@ impl Interaction { pub(crate) fn shadow(&self, env: &mut SimulatorEnv) -> Vec> { match self { Self::Query(query) => query.shadow(env), - Self::Assumption(_) | Self::Assertion(_) | Self::Fault(_) => vec![], + Self::Assumption(_) | Self::Assertion(_) | Self::Fault(_) | Self::FsyncQuery(_) => { + vec![] + } } } pub(crate) fn execute_query(&self, conn: &mut Arc, io: &SimulatorIO) -> ResultSet { @@ -557,9 +571,6 @@ impl Interaction { env: &SimulatorEnv, ) -> Result<()> { match self { - Self::Query(_) => { - unreachable!("unexpected: this function should only be called on assertions") - } Self::Assertion(assertion) => { let result = assertion.func.as_ref()(stack, env); match result { @@ -573,10 +584,7 @@ impl Interaction { ))), } } - Self::Assumption(_) => { - unreachable!("unexpected: this function should only be called on assertions") - } - Self::Fault(_) => { + _ => { unreachable!("unexpected: this function should only be called on assertions") } } @@ -588,12 +596,6 @@ impl Interaction { env: &SimulatorEnv, ) -> Result<()> { match self { - Self::Query(_) => { - unreachable!("unexpected: this function should only be called on assumptions") - } - Self::Assertion(_) => { - unreachable!("unexpected: this function should only be called on assumptions") - } Self::Assumption(assumption) => { let result = assumption.func.as_ref()(stack, env); match result { @@ -607,7 +609,7 @@ impl Interaction { ))), } } - Self::Fault(_) => { + _ => { unreachable!("unexpected: this function should only be called on assumptions") } } @@ -615,15 +617,6 @@ impl Interaction { pub(crate) fn execute_fault(&self, env: &mut SimulatorEnv, conn_index: usize) -> Result<()> { match self { - Self::Query(_) => { - unreachable!("unexpected: this function should only be called on faults") - } - Self::Assertion(_) => { - unreachable!("unexpected: this function should only be called on faults") - } - Self::Assumption(_) => { - unreachable!("unexpected: this function should only be called on faults") - } Self::Fault(fault) => { match fault { Fault::Disconnect => { @@ -637,36 +630,90 @@ impl Interaction { env.connections[conn_index] = SimConnection::Disconnected; } Fault::ReopenDatabase => { - // 1. Close all connections without default checkpoint-on-close behavior - // to expose bugs related to how we handle WAL - let num_conns = env.connections.len(); - env.connections.clear(); - - // 2. Re-open database - let db_path = env.db_path.clone(); - let db = match turso_core::Database::open_file( - env.io.clone(), - &db_path, - false, - false, - ) { - Ok(db) => db, - Err(e) => { - panic!("error opening simulator test file {:?}: {:?}", db_path, e); - } - }; - env.db = db; - - for _ in 0..num_conns { - env.connections - .push(SimConnection::LimboConnection(env.db.connect().unwrap())); - } + reopen_database(env); } } Ok(()) } + _ => { + unreachable!("unexpected: this function should only be called on faults") + } } } + + pub(crate) fn execute_fsync_query( + &self, + conn: Arc, + env: &mut SimulatorEnv, + ) -> Result<()> { + if let Self::FsyncQuery(query) = self { + let query_str = query.to_string(); + let rows = conn.query(&query_str); + if rows.is_err() { + let err = rows.err(); + tracing::debug!( + "Error running query '{}': {:?}", + &query_str[0..query_str.len().min(4096)], + err + ); + return Err(err.unwrap()); + } + let rows = rows?; + assert!(rows.is_some()); + let mut rows = rows.unwrap(); + while let Ok(row) = rows.step() { + match row { + StepResult::IO => { + let syncing = { + let files = env.io.files.borrow(); + // TODO: currently assuming we only have 1 file that is syncing + files + .iter() + .any(|file| file.sync_completion.borrow().is_some()) + }; + if syncing { + reopen_database(env); + } else { + env.io.run_once().unwrap(); + } + } + StepResult::Done => { + break; + } + StepResult::Row | StepResult::Interrupt | StepResult::Busy => {} + } + } + + Ok(()) + } else { + unreachable!("unexpected: this function should only be called on queries") + } + } +} + +fn reopen_database(env: &mut SimulatorEnv) { + // 1. Close all connections without default checkpoint-on-close behavior + // to expose bugs related to how we handle WAL + let num_conns = env.connections.len(); + env.connections.clear(); + + // Clear all open files + env.io.files.borrow_mut().clear(); + + // 2. Re-open database + let db_path = env.db_path.clone(); + let db = match turso_core::Database::open_file(env.io.clone(), &db_path, false, false) { + Ok(db) => db, + Err(e) => { + panic!("error opening simulator test file {:?}: {:?}", db_path, e); + } + }; + env.db = db; + + for _ in 0..num_conns { + env.connections + .push(SimConnection::LimboConnection(env.db.connect().unwrap())); + } } fn random_create(rng: &mut R, _env: &SimulatorEnv) -> Interactions { diff --git a/simulator/generation/property.rs b/simulator/generation/property.rs index 5eccf0ec1..e846634dd 100644 --- a/simulator/generation/property.rs +++ b/simulator/generation/property.rs @@ -9,7 +9,7 @@ use crate::{ select::{Distinctness, ResultColumn}, Create, Delete, Drop, Insert, Query, Select, }, - table::SimValue, + table::{SimValue, Table}, }, runner::env::SimulatorEnv, }; @@ -127,6 +127,10 @@ pub(crate) enum Property { table: String, predicate: Predicate, }, + FsyncNoWait { + query: Query, + tables: Vec, + }, } impl Property { @@ -138,6 +142,7 @@ impl Property { Property::DeleteSelect { .. } => "Delete-Select", Property::DropSelect { .. } => "Drop-Select", Property::SelectSelectOptimizer { .. } => "Select-Select-Optimizer", + Property::FsyncNoWait { .. } => "FsyncNoWait", } } /// interactions construct a list of interactions, which is an executable representation of the property. @@ -428,6 +433,47 @@ impl Property { vec![assumption, select1, select2, assertion] } + Property::FsyncNoWait { query, tables } => { + let checks = tables + .iter() + .map(|table| { + let select = Interaction::Query(Query::Select(Select { + table: table.name.clone(), + result_columns: vec![ResultColumn::Star], + predicate: Predicate::true_(), + limit: None, + distinct: Distinctness::All, + })); + let assertion = Interaction::Assertion(Assertion { + message: format!( + "table {} contains all of its values after the wal reopened", + table.name + ), + func: Box::new({ + let table = table.clone(); + move |stack: &Vec, _: &SimulatorEnv| { + let last = stack.last().unwrap(); + match last { + Ok(vals) => { + if *vals != table.rows { + tracing::error!(table.name, ?vals, ?table.rows, "values mismatch after wal reopen"); + } + Ok(*vals == table.rows) + } + Err(err) => { + Err(LimboError::InternalError(format!("{}", err))) + } + } + } + }), + }); + [select, assertion].into_iter() + }) + .flatten(); + Vec::from_iter( + std::iter::once(Interaction::FsyncQuery(query.clone())).chain(checks), + ) + } } } } @@ -697,6 +743,17 @@ fn property_select_select_optimizer(rng: &mut R, env: &SimulatorEn } } +fn property_fsync_no_wait( + rng: &mut R, + env: &SimulatorEnv, + remaining: &Remaining, +) -> Property { + Property::FsyncNoWait { + query: Query::arbitrary_from(rng, (env, remaining)), + tables: env.tables.clone(), + } +} + impl ArbitraryFrom<(&SimulatorEnv, &InteractionStats)> for Property { fn arbitrary_from( rng: &mut R, @@ -754,6 +811,10 @@ impl ArbitraryFrom<(&SimulatorEnv, &InteractionStats)> for Property { }, Box::new(|rng: &mut R| property_select_select_optimizer(rng, env)), ), + ( + 50.0, // Freestyle number + Box::new(|rng: &mut R| property_fsync_no_wait(rng, env, &remaining_)), + ), ], rng, ) diff --git a/simulator/runner/differential.rs b/simulator/runner/differential.rs index 6d48ca99a..511767128 100644 --- a/simulator/runner/differential.rs +++ b/simulator/runner/differential.rs @@ -337,6 +337,15 @@ fn execute_interaction_rusqlite( tracing::debug!("{:?}", results); stack.push(results); } + Interaction::FsyncQuery(_) => { + let conn = match &env.connections[connection_index] { + SimConnection::LimboConnection(conn) => conn.clone(), + SimConnection::SQLiteConnection(_) => unreachable!(), + SimConnection::Disconnected => unreachable!(), + }; + + interaction.execute_fsync_query(conn.clone(), env)?; + } Interaction::Assertion(_) => { interaction.execute_assertion(stack, env)?; stack.clear(); diff --git a/simulator/runner/execution.rs b/simulator/runner/execution.rs index fcf9e6314..334fe719e 100644 --- a/simulator/runner/execution.rs +++ b/simulator/runner/execution.rs @@ -196,6 +196,22 @@ pub(crate) fn execute_interaction( stack.push(results); limbo_integrity_check(conn)?; } + Interaction::FsyncQuery(_) => { + let conn = match &env.connections[connection_index] { + SimConnection::LimboConnection(conn) => conn.clone(), + SimConnection::SQLiteConnection(_) => unreachable!(), + SimConnection::Disconnected => unreachable!(), + }; + + interaction.execute_fsync_query(conn.clone(), env)?; + + let conn = match &env.connections[connection_index] { + SimConnection::LimboConnection(conn) => conn, + SimConnection::SQLiteConnection(_) => unreachable!(), + SimConnection::Disconnected => unreachable!(), + }; + limbo_integrity_check(conn)?; + } Interaction::Assertion(_) => { interaction.execute_assertion(stack, env)?; stack.clear(); @@ -217,7 +233,7 @@ pub(crate) fn execute_interaction( Ok(ExecutionContinuation::NextInteraction) } -fn limbo_integrity_check(conn: &mut Arc) -> Result<()> { +fn limbo_integrity_check(conn: &Arc) -> Result<()> { let mut rows = conn.query("PRAGMA integrity_check;")?.unwrap(); let mut result = Vec::new(); diff --git a/simulator/shrink/plan.rs b/simulator/shrink/plan.rs index a42f32836..521d93810 100644 --- a/simulator/shrink/plan.rs +++ b/simulator/shrink/plan.rs @@ -71,7 +71,8 @@ impl InteractionPlan { queries.clear(); } Property::SelectLimit { .. } - | Property::SelectSelectOptimizer { .. } => {} + | Property::SelectSelectOptimizer { .. } + | Property::FsyncNoWait { .. } => {} } } // Check again after query clear if the interactions still uses the failing table