diff --git a/Cargo.lock b/Cargo.lock index 0518d66bb..75b33b791 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -321,6 +321,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bitmaps" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d084b0137aaa901caf9f1e8b21daa6aa24d41cd806e111335541eff9683bd6" + [[package]] name = "blake3" version = "1.7.0" @@ -2331,6 +2337,7 @@ name = "limbo_sim" version = "0.2.0-pre.11" dependencies = [ "anyhow", + "bitmaps", "chrono", "clap", "dirs 6.0.0", diff --git a/Cargo.toml b/Cargo.toml index 4e25b69ea..60f18f1a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ regex = "1.11.1" regex-syntax = { version = "0.8.5", default-features = false } similar = { version = "2.7.0" } similar-asserts = { version = "1.7.0" } +bitmaps = { version = "3.2.1", default-features = false } [profile.dev.package.similar] opt-level = 3 diff --git a/simulator/Cargo.toml b/simulator/Cargo.toml index 84d67304e..a5d4daba8 100644 --- a/simulator/Cargo.toml +++ b/simulator/Cargo.toml @@ -47,3 +47,4 @@ indexmap = { workspace = true } either = "1.15.0" similar = { workspace = true } similar-asserts = { workspace = true } +bitmaps = { workspace = true } diff --git a/simulator/generation/plan.rs b/simulator/generation/plan.rs index 3390a1a8a..25edf404c 100644 --- a/simulator/generation/plan.rs +++ b/simulator/generation/plan.rs @@ -36,8 +36,10 @@ pub(crate) type ResultSet = Result>>; #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct InteractionPlan { - pub plan: Vec, + plan: Vec, pub mvcc: bool, + // Len should not count transactions statements, just so we can generate more meaningful interactions per run + len: usize, } impl InteractionPlan { @@ -45,11 +47,16 @@ impl InteractionPlan { Self { plan: Vec::new(), mvcc, + len: 0, } } pub fn new_with(plan: Vec, mvcc: bool) -> Self { - Self { plan, mvcc } + let len = plan + .iter() + .filter(|interaction| !interaction.is_transaction()) + .count(); + Self { plan, mvcc, len } } #[inline] @@ -57,23 +64,17 @@ impl InteractionPlan { &self.plan } - // TODO: this is just simplified logic so we can get something rolling with begin concurrent - // transactions in the simulator. Ideally when we generate the plan we will have begin and commits statements across interactions + /// Length of interactions that are not transaction statements + #[inline] + pub fn len(&self) -> usize { + self.len + } + pub fn push(&mut self, interactions: Interactions) { - if self.mvcc { - let conn_index = interactions.connection_index; - let begin = Interactions::new( - conn_index, - InteractionsType::Query(Query::Begin(Begin::Concurrent)), - ); - let commit = - Interactions::new(conn_index, InteractionsType::Query(Query::Commit(Commit))); - self.plan.push(begin); - self.plan.push(interactions); - self.plan.push(commit); - } else { - self.plan.push(interactions); + if !interactions.is_transaction() { + self.len += 1; } + self.plan.push(interactions); } /// Compute via diff computes a a plan from a given `.plan` file without the need to parse @@ -218,7 +219,7 @@ impl InteractionPlan { let create_query = Create::arbitrary(&mut env.rng.clone(), &env.connection_context(0)); // initial query starts at 0th connection - plan.plan.push(Interactions::new( + plan.push(Interactions::new( 0, InteractionsType::Query(Query::Create(create_query)), )); @@ -234,19 +235,69 @@ impl InteractionPlan { ) -> Option> { let num_interactions = env.opts.max_interactions as usize; if self.len() < num_interactions { - tracing::debug!("Generating interaction {}/{}", self.len(), num_interactions); - let interactions = { - let conn_index = env.choose_conn(rng); + let conn_index = env.choose_conn(rng); + let interactions = if self.mvcc && !env.conn_in_transaction(conn_index) { + let query = Query::Begin(Begin::Concurrent); + Interactions::new(conn_index, InteractionsType::Query(query)) + } else if self.mvcc + && env.conn_in_transaction(conn_index) + && env.has_conn_executed_query_after_transaction(conn_index) + && rng.random_bool(0.4) + { + let query = Query::Commit(Commit); + Interactions::new(conn_index, InteractionsType::Query(query)) + } else { let conn_ctx = &env.connection_context(conn_index); Interactions::arbitrary_from(rng, conn_ctx, (env, self.stats(), conn_index)) }; - let out_interactions = interactions.interactions(); + tracing::debug!("Generating interaction {}/{}", self.len(), num_interactions); + + let mut out_interactions = interactions.interactions(); + assert!(!out_interactions.is_empty()); + + let out_interactions = if self.mvcc + && out_interactions + .iter() + .any(|interaction| interaction.is_ddl()) + { + // DDL statements must be serial, so commit all connections and then execute the DDL + let mut commit_interactions = (0..env.connections.len()) + .filter(|&idx| env.conn_in_transaction(idx)) + .map(|idx| { + let query = Query::Commit(Commit); + let interaction = Interactions::new(idx, InteractionsType::Query(query)); + let out_interactions = interaction.interactions(); + self.push(interaction); + out_interactions + }) + .fold( + Vec::with_capacity(env.connections.len()), + |mut accum, mut curr| { + accum.append(&mut curr); + accum + }, + ); + commit_interactions.append(&mut out_interactions); + commit_interactions + } else { + out_interactions + }; + self.push(interactions); Some(out_interactions) } else { - None + // after we generated all interactions if some connection is still in a transaction, commit + (0..env.connections.len()) + .find(|idx| env.conn_in_transaction(*idx)) + .map(|conn_index| { + let query = Query::Commit(Commit); + let interaction = Interactions::new(conn_index, InteractionsType::Query(query)); + let out_interactions = interaction.interactions(); + self.push(interaction); + out_interactions + }) } } @@ -271,7 +322,7 @@ impl InteractionPlan { } impl Deref for InteractionPlan { - type Target = [Interactions]; + type Target = Vec; fn deref(&self) -> &Self::Target { &self.plan @@ -294,11 +345,32 @@ impl IntoIterator for InteractionPlan { } } +impl<'a> IntoIterator for &'a InteractionPlan { + type Item = &'a Interactions; + + type IntoIter = <&'a Vec as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.plan.iter() + } +} + +impl<'a> IntoIterator for &'a mut InteractionPlan { + type Item = &'a mut Interactions; + + type IntoIter = <&'a mut Vec as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.plan.iter_mut() + } +} + pub trait InteractionPlanIterator { fn next(&mut self, env: &mut SimulatorEnv) -> Option; } impl InteractionPlanIterator for &mut T { + #[inline] fn next(&mut self, env: &mut SimulatorEnv) -> Option { T::next(self, env) } @@ -339,6 +411,7 @@ impl InteractionPlanIterator for PlanIterator where I: Iterator, { + #[inline] fn next(&mut self, _env: &mut SimulatorEnv) -> Option { self.iter.next() } @@ -367,6 +440,13 @@ impl Interactions { interactions, } } + + pub fn get_extensional_queries(&mut self) -> Option<&mut Vec> { + match &mut self.interactions { + InteractionsType::Property(property) => property.get_extensional_queries(), + InteractionsType::Query(..) | InteractionsType::Fault(..) => None, + } + } } impl Deref for Interactions { @@ -390,26 +470,11 @@ pub enum InteractionsType { Fault(Fault), } -impl Shadow for Interactions { - type Result = (); - - fn shadow(&self, tables: &mut ShadowTablesMut) { - match &self.interactions { - InteractionsType::Property(property) => { - let initial_tables = tables.clone(); - for interaction in property.interactions(self.connection_index) { - let res = interaction.shadow(tables); - if res.is_err() { - // If any interaction fails, we reset the tables to the initial state - **tables = initial_tables.clone(); - break; - } - } - } - InteractionsType::Query(query) => { - let _ = query.shadow(tables); - } - InteractionsType::Fault(_) => {} +impl InteractionsType { + pub fn is_transaction(&self) -> bool { + match self { + InteractionsType::Query(query) => query.is_transaction(), + _ => false, } } } @@ -489,7 +554,7 @@ impl Display for InteractionPlan { writeln!(f, "-- FAULT '{fault}'")?; } InteractionsType::Query(query) => { - writeln!(f, "{query};")?; + writeln!(f, "{query}; -- {}", interactions.connection_index)?; } } } @@ -575,7 +640,7 @@ impl Assertion { } #[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub(crate) enum Fault { +pub enum Fault { Disconnect, ReopenDatabase, } @@ -593,6 +658,7 @@ impl Display for Fault { pub struct Interaction { pub connection_index: usize, pub interaction: InteractionType, + pub ignore_error: bool, } impl Deref for Interaction { @@ -614,6 +680,15 @@ impl Interaction { Self { connection_index, interaction, + ignore_error: false, + } + } + + pub fn new_ignore_error(connection_index: usize, interaction: InteractionType) -> Self { + Self { + connection_index, + interaction, + ignore_error: true, } } } @@ -633,7 +708,7 @@ pub enum InteractionType { // FIXME: add the connection index here later impl Display for Interaction { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.interaction) + write!(f, "{}; -- {}", self.interaction, self.connection_index) } } @@ -641,13 +716,13 @@ impl Display for InteractionType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Query(query) => write!(f, "{query}"), - Self::Assumption(assumption) => write!(f, "-- ASSUME {};", assumption.name), + Self::Assumption(assumption) => write!(f, "-- ASSUME {}", assumption.name), Self::Assertion(assertion) => { write!(f, "-- ASSERT {};", assertion.name) } - Self::Fault(fault) => write!(f, "-- FAULT '{fault}';"), + Self::Fault(fault) => write!(f, "-- FAULT '{fault}'"), Self::FsyncQuery(query) => { - writeln!(f, "-- FSYNC QUERY;")?; + writeln!(f, "-- FSYNC QUERY")?; writeln!(f, "{query};")?; write!(f, "{query};") } @@ -660,19 +735,31 @@ impl Shadow for InteractionType { type Result = anyhow::Result>>; fn shadow(&self, env: &mut ShadowTablesMut) -> Self::Result { match self { - Self::Query(query) => query.shadow(env), - Self::FsyncQuery(query) => { - let mut first = query.shadow(env)?; - first.extend(query.shadow(env)?); - Ok(first) - } - Self::Assumption(_) | Self::Assertion(_) | Self::Fault(_) | Self::FaultyQuery(_) => { - Ok(vec![]) + Self::Query(query) => { + if !query.is_transaction() { + env.add_query(query); + } + query.shadow(env) } + Self::Assumption(_) + | Self::Assertion(_) + | Self::Fault(_) + | Self::FaultyQuery(_) + | Self::FsyncQuery(_) => Ok(vec![]), } } } + impl InteractionType { + pub fn is_ddl(&self) -> bool { + match self { + InteractionType::Query(query) + | InteractionType::FsyncQuery(query) + | InteractionType::FaultyQuery(query) => query.is_ddl(), + _ => false, + } + } + pub(crate) fn execute_query(&self, conn: &mut Arc) -> ResultSet { if let Self::Query(query) = self { let query_str = query.to_string(); @@ -779,13 +866,15 @@ impl InteractionType { match fault { Fault::Disconnect => { if env.connections[conn_index].is_connected() { + if env.conn_in_transaction(conn_index) { + env.rollback_conn(conn_index); + } env.connections[conn_index].disconnect(); } else { return Err(turso_core::LimboError::InternalError( "connection already disconnected".into(), )); } - env.connections[conn_index] = SimConnection::Disconnected; } Fault::ReopenDatabase => { reopen_database(env); diff --git a/simulator/generation/property.rs b/simulator/generation/property.rs index 7e42444cd..d741748fa 100644 --- a/simulator/generation/property.rs +++ b/simulator/generation/property.rs @@ -27,7 +27,7 @@ use super::plan::{Assertion, Interaction, InteractionStats, ResultSet}; /// Properties are representations of executable specifications /// about the database behavior. #[derive(Debug, Clone, Serialize, Deserialize)] -pub(crate) enum Property { +pub enum Property { /// Insert-Select is a property in which the inserted row /// must be in the resulting rows of a select query that has a /// where clause that matches the inserted row. @@ -190,6 +190,10 @@ pub(crate) enum Property { query: Query, tables: Vec, }, + /// Property used to subsititute a property with its queries only + Queries { + queries: Vec, + }, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -213,8 +217,27 @@ impl Property { Property::FsyncNoWait { .. } => "FsyncNoWait", Property::FaultyQuery { .. } => "FaultyQuery", Property::UNIONAllPreservesCardinality { .. } => "UNION-All-Preserves-Cardinality", + Property::Queries { .. } => "Queries", } } + + pub fn get_extensional_queries(&mut self) -> Option<&mut Vec> { + match self { + Property::InsertValuesSelect { queries, .. } + | Property::DoubleCreateFailure { queries, .. } + | Property::DeleteSelect { queries, .. } + | Property::DropSelect { queries, .. } + | Property::Queries { queries } => Some(queries), + Property::FsyncNoWait { .. } | Property::FaultyQuery { .. } => None, + Property::SelectLimit { .. } + | Property::SelectSelectOptimizer { .. } + | Property::WhereTrueFalseNull { .. } + | Property::UNIONAllPreservesCardinality { .. } + | Property::ReadYourUpdatesBack { .. } + | Property::TableHasExpectedContent { .. } => None, + } + } + /// interactions construct a list of interactions, which is an executable representation of the property. /// the requirement of property -> vec conversion emerges from the need to serialize the property, /// and `interaction` cannot be serialized directly. @@ -447,12 +470,14 @@ impl Property { let table_name = create.table.name.clone(); let assertion = InteractionType::Assertion(Assertion::new("creating two tables with the name should result in a failure for the second query" - .to_string(), move |stack: &Vec, _| { + .to_string(), move |stack: &Vec, env| { let last = stack.last().unwrap(); match last { Ok(success) => Ok(Err(format!("expected table creation to fail but it succeeded: {success:?}"))), Err(e) => { if e.to_string().to_lowercase().contains(&format!("table {table_name} already exists")) { + // On error we rollback the transaction if there is any active here + env.rollback_conn(connection_index); Ok(Ok(())) } else { Ok(Err(format!("expected table already exists error, got: {e}"))) @@ -470,7 +495,7 @@ impl Property { .into_iter() .map(|q| Interaction::new(connection_index, InteractionType::Query(q))), ); - interactions.push(Interaction::new(connection_index, cq2)); + interactions.push(Interaction::new_ignore_error(connection_index, cq2)); interactions.push(Interaction::new(connection_index, assertion)); interactions @@ -804,6 +829,9 @@ impl Property { // We cannot make any assumptions about the error content; all we are about is, if the statement errored, // we don't shadow the results into the simulator env, i.e. we assume whatever the statement did was rolled back. tracing::error!("Fault injection produced error: {err}"); + + // On error we rollback the transaction if there is any active here + env.rollback_conn(connection_index); Ok(Ok(())) } } @@ -1023,6 +1051,11 @@ impl Property { )), ].into_iter().map(|i| Interaction::new(connection_index, i)).collect() } + Property::Queries { queries } => queries + .clone() + .into_iter() + .map(|query| Interaction::new(connection_index, InteractionType::Query(query))) + .collect(), } } } @@ -1159,6 +1192,7 @@ fn property_insert_values_select( rng: &mut R, remaining: &Remaining, ctx: &impl GenerationContext, + mvcc: bool, ) -> Property { // Get a random table let table = pick(ctx.tables(), rng); @@ -1178,7 +1212,7 @@ fn property_insert_values_select( }; // Choose if we want queries to be executed in an interactive transaction - let interactive = if rng.random_bool(0.5) { + let interactive = if !mvcc && rng.random_bool(0.5) { Some(InteractiveQueryInfo { start_with_immediate: rng.random_bool(0.5), end_with_commit: rng.random_bool(0.5), @@ -1536,7 +1570,14 @@ impl ArbitraryFrom<(&SimulatorEnv, &InteractionStats)> for Property { } else { 0 }, - Box::new(|rng: &mut R| property_insert_values_select(rng, &remaining_, conn_ctx)), + Box::new(|rng: &mut R| { + property_insert_values_select( + rng, + &remaining_, + conn_ctx, + env.profile.experimental_mvcc, + ) + }), ), ( remaining_.select.max(1), diff --git a/simulator/model/mod.rs b/simulator/model/mod.rs index 9a3c81e9f..551c08b1d 100644 --- a/simulator/model/mod.rs +++ b/simulator/model/mod.rs @@ -61,6 +61,22 @@ impl Query { Query::Begin(..) | Query::Commit(..) | Query::Rollback(..) => vec![], } } + + #[inline] + pub fn is_transaction(&self) -> bool { + matches!( + self, + Self::Begin(..) | Self::Commit(..) | Self::Rollback(..) + ) + } + + #[inline] + pub fn is_ddl(&self) -> bool { + matches!( + self, + Self::Create(..) | Self::CreateIndex(..) | Self::Drop(..) + ) + } } impl Display for Query { diff --git a/simulator/profiles/mod.rs b/simulator/profiles/mod.rs index f9dfff282..071b4ed20 100644 --- a/simulator/profiles/mod.rs +++ b/simulator/profiles/mod.rs @@ -28,7 +28,7 @@ pub struct Profile { #[garde(skip)] /// Experimental MVCC feature pub experimental_mvcc: bool, - #[garde(range(min = 1))] + #[garde(range(min = 1, max = 64))] pub max_connections: usize, #[garde(dive)] pub io: IOProfile, diff --git a/simulator/runner/env.rs b/simulator/runner/env.rs index d52810b27..9a23663b0 100644 --- a/simulator/runner/env.rs +++ b/simulator/runner/env.rs @@ -5,13 +5,17 @@ use std::panic::UnwindSafe; use std::path::{Path, PathBuf}; use std::sync::Arc; +use bitmaps::Bitmap; use garde::Validate; use rand::{Rng, SeedableRng}; use rand_chacha::ChaCha8Rng; use sql_generation::generation::GenerationContext; +use sql_generation::model::query::transaction::Rollback; use sql_generation::model::table::Table; use turso_core::Database; +use crate::generation::Shadow; +use crate::model::Query; use crate::profiles::Profile; use crate::runner::SimIO; use crate::runner::io::SimulatorIO; @@ -32,21 +36,37 @@ pub(crate) enum SimulationPhase { Shrink, } +#[derive(Debug, Clone)] +pub struct TransactionTables { + current_tables: Vec, + pending_changes: Vec, +} + +impl TransactionTables { + pub fn new(tables: Vec
) -> Self { + Self { + current_tables: tables, + pending_changes: Vec::new(), + } + } +} + #[derive(Debug)] pub struct ShadowTables<'a> { commited_tables: &'a Vec
, - transaction_tables: Option<&'a Vec
>, + transaction_tables: Option<&'a TransactionTables>, } #[derive(Debug)] pub struct ShadowTablesMut<'a> { commited_tables: &'a mut Vec
, - transaction_tables: &'a mut Option>, + transaction_tables: &'a mut Option, } impl<'a> ShadowTables<'a> { fn tables(&self) -> &'a Vec
{ - self.transaction_tables.map_or(self.commited_tables, |v| v) + self.transaction_tables + .map_or(self.commited_tables, |v| &v.current_tables) } } @@ -65,30 +85,48 @@ where fn tables(&'a self) -> &'a Vec
{ self.transaction_tables .as_ref() + .map(|t| &t.current_tables) .unwrap_or(self.commited_tables) } fn tables_mut(&'b mut self) -> &'b mut Vec
{ self.transaction_tables .as_mut() + .map(|t| &mut t.current_tables) .unwrap_or(self.commited_tables) } pub fn create_snapshot(&mut self) { - *self.transaction_tables = Some(self.commited_tables.clone()); + *self.transaction_tables = Some(TransactionTables::new(self.commited_tables.clone())); } pub fn apply_snapshot(&mut self) { // TODO: as we do not have concurrent tranasactions yet in the simulator // there is no conflict we are ignoring conflict problems right now if let Some(transation_tables) = self.transaction_tables.take() { - *self.commited_tables = transation_tables + let mut shadow_table = ShadowTablesMut { + commited_tables: self.commited_tables, + transaction_tables: &mut None, + }; + + for query in transation_tables.pending_changes { + // TODO: maybe panic on shadow error here + let _ = query.shadow(&mut shadow_table); + } } } pub fn delete_snapshot(&mut self) { *self.transaction_tables = None; } + + /// Append non transaction queries to the shadow tables + pub fn add_query(&mut self, query: &Query) { + assert!(!query.is_transaction()); + if let Some(transaction_tables) = self.transaction_tables { + transaction_tables.pending_changes.push(query.clone()); + } + } } impl<'a> Deref for ShadowTablesMut<'a> { @@ -105,36 +143,6 @@ impl<'a> DerefMut for ShadowTablesMut<'a> { } } -#[derive(Debug, Clone)] -pub(crate) struct SimulatorTables { - pub(crate) tables: Vec
, - pub(crate) snapshot: Option>, -} -impl SimulatorTables { - pub(crate) fn new() -> Self { - Self { - tables: Vec::new(), - snapshot: None, - } - } - - pub(crate) fn clear(&mut self) { - self.tables.clear(); - self.snapshot = None; - } - - pub(crate) fn push(&mut self, table: Table) { - self.tables.push(table); - } -} -impl Deref for SimulatorTables { - type Target = Vec
; - - fn deref(&self) -> &Self::Target { - &self.tables - } -} - pub(crate) struct SimulatorEnv { pub(crate) opts: SimulatorOpts, pub profile: Profile, @@ -150,7 +158,13 @@ pub(crate) struct SimulatorEnv { pub memory_io: bool, /// If connection state is None, means we are not in a transaction - pub connection_tables: Vec>>, + pub connection_tables: Vec>, + /// Bit map indicating whether a connection has executed a query that is not transaction related + /// + /// E.g Select, Insert, Create + /// and not Begin, Commit, Rollback \ + /// Has max size of 64 to accomodate 64 connections + connection_last_query: Bitmap<64>, // Table data that is committed into the database or wal pub committed_tables: Vec
, } @@ -175,6 +189,7 @@ impl SimulatorEnv { .collect(), // TODO: not sure if connection_tables should be recreated instead connection_tables: self.connection_tables.clone(), + connection_last_query: self.connection_last_query, committed_tables: self.committed_tables.clone(), } } @@ -393,6 +408,7 @@ impl SimulatorEnv { profile: profile.clone(), committed_tables: Vec::new(), connection_tables: vec![None; profile.max_connections], + connection_last_query: Bitmap::new(), } } @@ -430,11 +446,8 @@ impl SimulatorEnv { /// Clears the commited tables and the connection tables pub fn clear_tables(&mut self) { self.committed_tables.clear(); - self.connection_tables.iter_mut().for_each(|t| { - if let Some(t) = t { - t.clear(); - } - }); + self.connection_tables.iter_mut().for_each(|t| *t = None); + self.connection_last_query = Bitmap::new(); } // TODO: does not yet create the appropriate context to avoid WriteWriteConflitcs @@ -462,14 +475,41 @@ impl SimulatorEnv { } } - pub fn get_conn_tables<'a>(&'a self, conn_index: usize) -> ShadowTables<'a> { + pub fn conn_in_transaction(&self, conn_index: usize) -> bool { + self.connection_tables + .get(conn_index) + .is_some_and(|t| t.is_some()) + } + + pub fn has_conn_executed_query_after_transaction(&self, conn_index: usize) -> bool { + self.connection_last_query.get(conn_index) + } + + pub fn update_conn_last_interaction(&mut self, conn_index: usize, query: Option<&Query>) { + // If the conn will execute a transaction statement then we set the bitmap to false + // to indicate we have not executed any queries yet after the transaction begun + let value = query.is_some_and(|query| { + matches!( + query, + Query::Begin(..) | Query::Commit(..) | Query::Rollback(..) + ) + }); + self.connection_last_query.set(conn_index, value); + } + + pub fn rollback_conn(&mut self, conn_index: usize) { + Rollback.shadow(&mut self.get_conn_tables_mut(conn_index)); + self.update_conn_last_interaction(conn_index, Some(&Query::Rollback(Rollback))); + } + + pub fn get_conn_tables(&self, conn_index: usize) -> ShadowTables<'_> { ShadowTables { transaction_tables: self.connection_tables.get(conn_index).unwrap().as_ref(), commited_tables: &self.committed_tables, } } - pub fn get_conn_tables_mut<'a>(&'a mut self, conn_index: usize) -> ShadowTablesMut<'a> { + pub fn get_conn_tables_mut(&mut self, conn_index: usize) -> ShadowTablesMut<'_> { ShadowTablesMut { transaction_tables: self.connection_tables.get_mut(conn_index).unwrap(), commited_tables: &mut self.committed_tables, diff --git a/simulator/runner/execution.rs b/simulator/runner/execution.rs index a1a7d5736..e877a972f 100644 --- a/simulator/runner/execution.rs +++ b/simulator/runner/execution.rs @@ -160,7 +160,7 @@ pub fn execute_interaction( } } -#[instrument(skip(env, interaction, stack), fields(seed = %env.opts.seed, interaction = %interaction))] +#[instrument(skip(env, interaction, stack), fields(conn_index = interaction.connection_index, interaction = %interaction))] pub fn execute_interaction_turso( env: &mut SimulatorEnv, interaction: &Interaction, @@ -173,23 +173,29 @@ pub fn execute_interaction_turso( // Leave this empty info! here to print the span of the execution tracing::info!(""); match &interaction.interaction { - InteractionType::Query(_) => { + InteractionType::Query(query) => { tracing::debug!(?interaction); - let results = interaction.execute_query(conn); - if results.is_err() { - tracing::error!(?results); + let results = interaction + .execute_query(conn) + .inspect_err(|err| tracing::error!(?err)); + + if let Err(err) = &results + && !interaction.ignore_error + { + return Err(err.clone()); } stack.push(results); // TODO: skip integrity check with mvcc if !env.profile.experimental_mvcc { limbo_integrity_check(conn)?; } + env.update_conn_last_interaction(interaction.connection_index, Some(query)); } InteractionType::FsyncQuery(query) => { - let results = interaction.execute_fsync_query(conn.clone(), env); - if results.is_err() { - tracing::error!(?results); - } + let results = interaction + .execute_fsync_query(conn.clone(), env) + .inspect_err(|err| tracing::error!(?err)); + stack.push(results); let query_interaction = Interaction::new( @@ -217,10 +223,10 @@ pub fn execute_interaction_turso( } InteractionType::FaultyQuery(_) => { let conn = conn.clone(); - let results = interaction.execute_faulty_query(&conn, env); - if results.is_err() { - tracing::error!(?results); - } + let results = interaction + .execute_faulty_query(&conn, env) + .inspect_err(|err| tracing::error!(?err)); + stack.push(results); // Reset fault injection env.io.inject_fault(false); @@ -296,8 +302,14 @@ fn execute_interaction_rusqlite( let results = execute_query_rusqlite(conn, query).map_err(|e| { turso_core::LimboError::InternalError(format!("error executing query: {e}")) }); + if let Err(err) = &results + && !interaction.ignore_error + { + return Err(err.clone()); + } tracing::debug!("{:?}", results); stack.push(results); + env.update_conn_last_interaction(interaction.connection_index, Some(query)); } InteractionType::FsyncQuery(..) => { unimplemented!("cannot implement fsync query in rusqlite, as we do not control IO"); @@ -332,10 +344,6 @@ fn execute_query_rusqlite( query: &Query, ) -> rusqlite::Result>> { match query { - Query::Create(create) => { - connection.execute(create.to_string().as_str(), ())?; - Ok(vec![]) - } Query::Select(select) => { let mut stmt = connection.prepare(select.to_string().as_str())?; let columns = stmt.column_count(); @@ -360,36 +368,8 @@ fn execute_query_rusqlite( } Ok(result) } - Query::Insert(insert) => { - connection.execute(insert.to_string().as_str(), ())?; - Ok(vec![]) - } - Query::Delete(delete) => { - connection.execute(delete.to_string().as_str(), ())?; - Ok(vec![]) - } - Query::Drop(drop) => { - connection.execute(drop.to_string().as_str(), ())?; - Ok(vec![]) - } - Query::Update(update) => { - connection.execute(update.to_string().as_str(), ())?; - Ok(vec![]) - } - Query::CreateIndex(create_index) => { - connection.execute(create_index.to_string().as_str(), ())?; - Ok(vec![]) - } - Query::Begin(begin) => { - connection.execute(begin.to_string().as_str(), ())?; - Ok(vec![]) - } - Query::Commit(commit) => { - connection.execute(commit.to_string().as_str(), ())?; - Ok(vec![]) - } - Query::Rollback(rollback) => { - connection.execute(rollback.to_string().as_str(), ())?; + _ => { + connection.execute(query.to_string().as_str(), ())?; Ok(vec![]) } } diff --git a/simulator/shrink/plan.rs b/simulator/shrink/plan.rs index 7a54eb22e..93f2f1702 100644 --- a/simulator/shrink/plan.rs +++ b/simulator/shrink/plan.rs @@ -1,3 +1,5 @@ +use indexmap::IndexSet; + use crate::{ SandboxedResult, SimulatorEnv, generation::{ @@ -8,7 +10,21 @@ use crate::{ run_simulation, runner::execution::Execution, }; -use std::sync::{Arc, Mutex}; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +fn retain_relevant_queries( + extensional_queries: &mut Vec, + depending_tables: &IndexSet, +) { + extensional_queries.retain(|query| { + query.is_transaction() + || (!matches!(query, Query::Select(..)) + && query.uses().iter().any(|t| depending_tables.contains(t))) + }); +} impl InteractionPlan { /// Create a smaller interaction plan by deleting a property @@ -22,7 +38,7 @@ impl InteractionPlan { let secondary_interactions_index = all_interactions[failing_execution.interaction_index].0; // Index of the parent property where the interaction originated from - let failing_property = &self.plan[secondary_interactions_index]; + let failing_property = &self[secondary_interactions_index]; let mut depending_tables = failing_property.dependencies(); { @@ -57,120 +73,11 @@ impl InteractionPlan { let before = self.len(); // Remove all properties after the failing one - plan.plan.truncate(secondary_interactions_index + 1); + plan.truncate(secondary_interactions_index + 1); // means we errored in some fault on transaction statement so just maintain the statements from before the failing one if !depending_tables.is_empty() { - let mut idx = 0; - // Remove all properties that do not use the failing tables - plan.plan.retain_mut(|interactions| { - let retain = if idx == secondary_interactions_index { - if let InteractionsType::Property( - Property::FsyncNoWait { tables, .. } | Property::FaultyQuery { tables, .. }, - ) = &mut interactions.interactions - { - tables.retain(|table| depending_tables.contains(table)); - } - true - } else if matches!( - interactions.interactions, - InteractionsType::Query(Query::Begin(..)) - | InteractionsType::Query(Query::Commit(..)) - | InteractionsType::Query(Query::Rollback(..)) - ) { - true - } else { - let mut has_table = interactions - .uses() - .iter() - .any(|t| depending_tables.contains(t)); - - if has_table { - // Remove the extensional parts of the properties - if let InteractionsType::Property(p) = &mut interactions.interactions { - match p { - Property::InsertValuesSelect { queries, .. } - | Property::DoubleCreateFailure { queries, .. } - | Property::DeleteSelect { queries, .. } - | Property::DropSelect { queries, .. } => { - queries.clear(); - } - Property::FsyncNoWait { tables, query } - | Property::FaultyQuery { tables, query } => { - if !query.uses().iter().any(|t| depending_tables.contains(t)) { - tables.clear(); - } else { - tables.retain(|table| depending_tables.contains(table)); - } - } - Property::SelectLimit { .. } - | Property::SelectSelectOptimizer { .. } - | Property::WhereTrueFalseNull { .. } - | Property::UNIONAllPreservesCardinality { .. } - | Property::ReadYourUpdatesBack { .. } - | Property::TableHasExpectedContent { .. } => {} - } - } - // Check again after query clear if the interactions still uses the failing table - has_table = interactions - .uses() - .iter() - .any(|t| depending_tables.contains(t)); - } - let is_fault = matches!(interactions.interactions, InteractionsType::Fault(..)); - is_fault - || (has_table - && !matches!( - interactions.interactions, - InteractionsType::Query(Query::Select(_)) - | InteractionsType::Property(Property::SelectLimit { .. }) - | InteractionsType::Property( - Property::SelectSelectOptimizer { .. } - ) - )) - }; - idx += 1; - retain - }); - - // Comprise of idxs of Begin interactions - let mut begin_idx = Vec::new(); - // Comprise of idxs of the intereactions Commit and Rollback - let mut end_tx_idx = Vec::new(); - - for (idx, interactions) in plan.plan.iter().enumerate() { - match &interactions.interactions { - InteractionsType::Query(Query::Begin(..)) => { - begin_idx.push(idx); - } - InteractionsType::Query(Query::Commit(..)) - | InteractionsType::Query(Query::Rollback(..)) => { - let last_begin = begin_idx.last().unwrap() + 1; - if last_begin == idx { - end_tx_idx.push(idx); - } - } - _ => {} - } - } - - // remove interactions if its just a Begin Commit/Rollback with no queries in the middle - let mut range_transactions = end_tx_idx.into_iter().peekable(); - let mut idx = 0; - plan.plan.retain_mut(|_| { - let mut retain = true; - - if let Some(txn_interaction_idx) = range_transactions.peek().copied() { - if txn_interaction_idx == idx { - range_transactions.next(); - } - if txn_interaction_idx == idx || txn_interaction_idx.saturating_sub(1) == idx { - retain = false; - } - } - idx += 1; - retain - }); + plan.remove_properties(&depending_tables, secondary_interactions_index); } let after = plan.len(); @@ -183,6 +90,166 @@ impl InteractionPlan { plan } + + /// Remove all properties that do not use the failing tables + fn remove_properties( + &mut self, + depending_tables: &IndexSet, + failing_interaction_index: usize, + ) { + let mut idx = 0; + // Remove all properties that do not use the failing tables + self.retain_mut(|interactions| { + let retain = if idx == failing_interaction_index { + if let InteractionsType::Property( + Property::FsyncNoWait { tables, .. } | Property::FaultyQuery { tables, .. }, + ) = &mut interactions.interactions + { + tables.retain(|table| depending_tables.contains(table)); + } + true + } else { + let mut has_table = interactions + .uses() + .iter() + .any(|t| depending_tables.contains(t)); + + if has_table { + // will contain extensional queries that reference the depending tables + let mut extensional_queries = Vec::new(); + + // Remove the extensional parts of the properties + if let InteractionsType::Property(p) = &mut interactions.interactions { + match p { + Property::InsertValuesSelect { queries, .. } + | Property::DoubleCreateFailure { queries, .. } + | Property::DeleteSelect { queries, .. } + | Property::DropSelect { queries, .. } + | Property::Queries { queries } => { + extensional_queries.append(queries); + } + Property::FsyncNoWait { tables, query } + | Property::FaultyQuery { tables, query } => { + if !query.uses().iter().any(|t| depending_tables.contains(t)) { + tables.clear(); + } else { + tables.retain(|table| depending_tables.contains(table)); + } + } + Property::SelectLimit { .. } + | Property::SelectSelectOptimizer { .. } + | Property::WhereTrueFalseNull { .. } + | Property::UNIONAllPreservesCardinality { .. } + | Property::ReadYourUpdatesBack { .. } + | Property::TableHasExpectedContent { .. } => {} + } + } + // Check again after query clear if the interactions still uses the failing table + has_table = interactions + .uses() + .iter() + .any(|t| depending_tables.contains(t)); + + // means the queries in the original property are present in the depending tables regardless of the extensional queries + if has_table { + if let Some(queries) = interactions.get_extensional_queries() { + retain_relevant_queries(&mut extensional_queries, depending_tables); + queries.append(&mut extensional_queries); + } + } else { + // original property without extensional queries does not reference the tables so convert the property to + // `Property::Queries` if `extensional_queries` is not empty + retain_relevant_queries(&mut extensional_queries, depending_tables); + if !extensional_queries.is_empty() { + has_table = true; + *interactions = Interactions::new( + interactions.connection_index, + InteractionsType::Property(Property::Queries { + queries: extensional_queries, + }), + ); + } + } + } + let is_fault = matches!(interactions.interactions, InteractionsType::Fault(..)); + let is_transaction = matches!( + interactions.interactions, + InteractionsType::Query(Query::Begin(..)) + | InteractionsType::Query(Query::Commit(..)) + | InteractionsType::Query(Query::Rollback(..)) + ); + is_fault + || is_transaction + || (has_table + && !matches!( + interactions.interactions, + InteractionsType::Query(Query::Select(_)) + | InteractionsType::Property(Property::SelectLimit { .. }) + | InteractionsType::Property( + Property::SelectSelectOptimizer { .. } + ) + )) + }; + idx += 1; + retain + }); + + // Comprises of idxs of Begin interactions + let mut begin_idx: HashMap> = HashMap::new(); + // Comprises of idxs of Commit and Rollback intereactions + let mut end_tx_idx: HashMap> = HashMap::new(); + + for (idx, interactions) in self.iter().enumerate() { + match &interactions.interactions { + InteractionsType::Query(Query::Begin(..)) => { + begin_idx + .entry(interactions.connection_index) + .or_insert_with(|| vec![idx]); + } + InteractionsType::Query(Query::Commit(..)) + | InteractionsType::Query(Query::Rollback(..)) => { + let last_begin = begin_idx + .get(&interactions.connection_index) + .and_then(|list| list.last()) + .unwrap() + + 1; + if last_begin == idx { + end_tx_idx + .entry(interactions.connection_index) + .or_insert_with(|| vec![idx]); + } + } + _ => {} + } + } + + // remove interactions if its just a Begin Commit/Rollback with no queries in the middle + let mut range_transactions = end_tx_idx + .into_iter() + .map(|(conn_index, list)| (conn_index, list.into_iter().peekable())) + .collect::>(); + let mut idx = 0; + self.retain_mut(|interactions| { + let mut retain = true; + + let iter = range_transactions.get_mut(&interactions.connection_index); + + if let Some(iter) = iter { + if let Some(txn_interaction_idx) = iter.peek().copied() { + if txn_interaction_idx == idx { + iter.next(); + } + if txn_interaction_idx == idx || txn_interaction_idx.saturating_sub(1) == idx { + retain = false; + } + } + } + + idx += 1; + retain + }); + } + /// Create a smaller interaction plan by deleting a property pub(crate) fn brute_shrink_interaction_plan( &self, @@ -235,16 +302,17 @@ impl InteractionPlan { let before = self.len(); - plan.plan.truncate(secondary_interactions_index + 1); + plan.truncate(secondary_interactions_index + 1); // phase 1: shrink extensions - for interaction in &mut plan.plan { + for interaction in &mut plan { if let InteractionsType::Property(property) = &mut interaction.interactions { match property { Property::InsertValuesSelect { queries, .. } | Property::DoubleCreateFailure { queries, .. } | Property::DeleteSelect { queries, .. } - | Property::DropSelect { queries, .. } => { + | Property::DropSelect { queries, .. } + | Property::Queries { queries } => { let mut temp_plan = InteractionPlan::new_with( queries .iter() @@ -321,7 +389,7 @@ impl InteractionPlan { } let mut test_plan = plan.clone(); - test_plan.plan.remove(i); + test_plan.remove(i); if Self::test_shrunk_plan(&test_plan, failing_execution, old_result, env.clone()) { plan = test_plan;