From 04154bf368a12806cf9dff83c727672067cbe732 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Tue, 16 Sep 2025 22:53:06 -0300 Subject: [PATCH] simplify code for doublecheck testing --- simulator/runner/differential.rs | 7 +- simulator/runner/doublecheck.rs | 333 +++++++++++++------------------ 2 files changed, 143 insertions(+), 197 deletions(-) diff --git a/simulator/runner/differential.rs b/simulator/runner/differential.rs index ef25147c3..6dd5803ee 100644 --- a/simulator/runner/differential.rs +++ b/simulator/runner/differential.rs @@ -55,7 +55,6 @@ pub(crate) fn execute_interactions( last_execution: Arc>, ) -> ExecutionResult { let mut history = ExecutionHistory::new(); - let now = std::time::Instant::now(); let mut env = env.lock().unwrap(); let mut rusqlite_env = rusqlite_env.lock().unwrap(); @@ -63,6 +62,8 @@ pub(crate) fn execute_interactions( env.tables.clear(); rusqlite_env.tables.clear(); + let now = std::time::Instant::now(); + for _tick in 0..env.opts.ticks { if state.interaction_pointer >= interactions.len() { break; @@ -95,7 +96,7 @@ pub(crate) fn execute_interactions( // second execute rusqlite let rusqlite_res = super::execution::execute_plan( - &mut env, + &mut rusqlite_env, interaction, rusqlite_conn_state, &mut rusqlite_state, @@ -111,6 +112,8 @@ pub(crate) fn execute_interactions( return ExecutionResult::new(history, Some(err)); } + state.interaction_pointer += 1; + // Check if the maximum time for the simulation has been reached if now.elapsed().as_secs() >= env.opts.max_time_simulation as u64 { return ExecutionResult::new( diff --git a/simulator/runner/doublecheck.rs b/simulator/runner/doublecheck.rs index 25adbfba3..a2c98b424 100644 --- a/simulator/runner/doublecheck.rs +++ b/simulator/runner/doublecheck.rs @@ -3,36 +3,34 @@ use std::{ sync::{Arc, Mutex}, }; -use sql_generation::generation::pick_index; - -use crate::{ - InteractionPlan, generation::plan::InteractionPlanState, - runner::execution::ExecutionContinuation, -}; +use crate::generation::plan::{ConnectionState, Interaction, InteractionPlanState}; use super::{ - env::{SimConnection, SimulatorEnv}, - execution::{Execution, ExecutionHistory, ExecutionResult, execute_interaction}, + env::SimulatorEnv, + execution::{Execution, ExecutionHistory, ExecutionResult}, }; -pub(crate) fn run_simulation( +pub fn run_simulation( env: Arc>, doublecheck_env: Arc>, - plan: &mut InteractionPlan, + plan: Vec, last_execution: Arc>, ) -> ExecutionResult { tracing::info!("Executing database interaction plan..."); - let mut state = InteractionPlanState { - stack: vec![], - interaction_pointer: 0, - secondary_pointer: 0, + let num_conns = { + let env = env.lock().unwrap(); + env.connections.len() }; - let mut doublecheck_state = InteractionPlanState { - stack: vec![], + let mut conn_states = (0..num_conns) + .map(|_| ConnectionState::default()) + .collect::>(); + + let mut doublecheck_states = conn_states.clone(); + + let mut state = InteractionPlanState { interaction_pointer: 0, - secondary_pointer: 0, }; let mut result = execute_plans( @@ -40,7 +38,8 @@ pub(crate) fn run_simulation( doublecheck_env.clone(), plan, &mut state, - &mut doublecheck_state, + &mut conn_states, + &mut doublecheck_states, last_execution, ); @@ -79,44 +78,72 @@ pub(crate) fn run_simulation( pub(crate) fn execute_plans( env: Arc>, doublecheck_env: Arc>, - plans: &mut InteractionPlan, - states: &mut InteractionPlanState, - doublecheck_states: &mut InteractionPlanState, + interactions: Vec, + state: &mut InteractionPlanState, + conn_states: &mut [ConnectionState], + doublecheck_states: &mut [ConnectionState], last_execution: Arc>, ) -> ExecutionResult { let mut history = ExecutionHistory::new(); - let now = std::time::Instant::now(); let mut env = env.lock().unwrap(); let mut doublecheck_env = doublecheck_env.lock().unwrap(); - for _tick in 0..env.opts.ticks { - // Pick the connection to interact with - let connection_index = pick_index(env.connections.len(), &mut env.rng); + env.tables.clear(); + doublecheck_env.tables.clear(); - history.history.push(Execution::new( - connection_index, - state.interaction_pointer, - state.secondary_pointer, - )); + let now = std::time::Instant::now(); + + for _tick in 0..env.opts.ticks { + if state.interaction_pointer >= interactions.len() { + break; + } + + let interaction = &interactions[state.interaction_pointer]; + + let connection_index = interaction.connection_index; + let turso_conn_state = &mut conn_states[connection_index]; + let doublecheck_conn_state = &mut doublecheck_states[connection_index]; + + history + .history + .push(Execution::new(connection_index, state.interaction_pointer)); let mut last_execution = last_execution.lock().unwrap(); last_execution.connection_index = connection_index; last_execution.interaction_index = state.interaction_pointer; - last_execution.secondary_index = state.secondary_pointer; - // Execute the interaction for the selected connection - match execute_plan( + + let mut turso_state = state.clone(); + + // first execute turso + let turso_res = super::execution::execute_plan( &mut env, + interaction, + turso_conn_state, + &mut turso_state, + ); + + let mut doublecheck_state = state.clone(); + + // second execute doublecheck + let doublecheck_res = super::execution::execute_plan( &mut doublecheck_env, - connection_index, - plans, - states, - doublecheck_states, + interaction, + doublecheck_conn_state, + &mut doublecheck_state, + ); + + // Compare results + if let Err(err) = compare_results( + turso_res, + turso_conn_state, + doublecheck_res, + doublecheck_conn_state, ) { - Ok(_) => {} - Err(err) => { - return ExecutionResult::new(history, Some(err)); - } + return ExecutionResult::new(history, Some(err)); } + + state.interaction_pointer += 1; + // Check if the maximum time for the simulation has been reached if now.elapsed().as_secs() >= env.opts.max_time_simulation as u64 { return ExecutionResult::new( @@ -131,176 +158,92 @@ pub(crate) fn execute_plans( ExecutionResult::new(history, None) } -fn execute_plan( - env: &mut SimulatorEnv, - doublecheck_env: &mut SimulatorEnv, - connection_index: usize, - plans: &mut [InteractionPlan], - states: &mut [InteractionPlanState], - doublecheck_states: &mut [InteractionPlanState], +fn compare_results( + turso_res: turso_core::Result<()>, + turso_state: &mut ConnectionState, + doublecheck_res: turso_core::Result<()>, + doublecheck_state: &mut ConnectionState, ) -> turso_core::Result<()> { - let connection = &env.connections[connection_index]; - let doublecheck_connection = &doublecheck_env.connections[connection_index]; - let plan = &mut plans[connection_index]; - - let state = &mut states[connection_index]; - let doublecheck_state = &mut doublecheck_states[connection_index]; - - if state.interaction_pointer >= plan.plan.len() { - return Ok(()); - } - - let interaction = &plan.plan[state.interaction_pointer].interactions()[state.secondary_pointer]; - - tracing::debug!( - "execute_plan(connection_index={}, interaction={})", - connection_index, - interaction - ); - tracing::debug!( - "connection: {}, doublecheck_connection: {}", - connection, - doublecheck_connection - ); - match (connection, doublecheck_connection) { - (SimConnection::Disconnected, SimConnection::Disconnected) => { - tracing::debug!("connecting {}", connection_index); - env.connect(connection_index); - doublecheck_env.connect(connection_index); - } - (SimConnection::LimboConnection(_), SimConnection::LimboConnection(_)) => { - let limbo_result = - execute_interaction(env, connection_index, interaction, &mut state.stack); - let doublecheck_result = execute_interaction( - doublecheck_env, - connection_index, - interaction, - &mut doublecheck_state.stack, - ); - match (limbo_result, doublecheck_result) { - (Ok(next_execution), Ok(next_execution_doublecheck)) => { - if next_execution != next_execution_doublecheck { - tracing::error!( - "expected next executions of limbo and doublecheck do not match" - ); - tracing::debug!( - "limbo result: {:?}, doublecheck result: {:?}", - next_execution, - next_execution_doublecheck - ); - return Err(turso_core::LimboError::InternalError( - "expected next executions of limbo and doublecheck do not match".into(), - )); - } - - let limbo_values = state.stack.last(); - let doublecheck_values = doublecheck_state.stack.last(); + match (turso_res, doublecheck_res) { + (Ok(..), Ok(..)) => { + let turso_values = turso_state.stack.last(); + let doublecheck_values = doublecheck_state.stack.last(); + match (turso_values, doublecheck_values) { + (Some(limbo_values), Some(doublecheck_values)) => { match (limbo_values, doublecheck_values) { - (Some(limbo_values), Some(doublecheck_values)) => { - match (limbo_values, doublecheck_values) { - (Ok(limbo_values), Ok(doublecheck_values)) => { - if limbo_values != doublecheck_values { - tracing::error!( - "returned values from limbo and doublecheck results do not match" - ); - tracing::debug!("limbo values {:?}", limbo_values); - tracing::debug!( - "doublecheck values {:?}", - doublecheck_values - ); - return Err(turso_core::LimboError::InternalError( + (Ok(limbo_values), Ok(doublecheck_values)) => { + if limbo_values != doublecheck_values { + tracing::error!( + "returned values from limbo and doublecheck results do not match" + ); + tracing::debug!("limbo values {:?}", limbo_values); + tracing::debug!("doublecheck values {:?}", doublecheck_values); + return Err(turso_core::LimboError::InternalError( "returned values from limbo and doublecheck results do not match".into(), )); - } - } - (Err(limbo_err), Err(doublecheck_err)) => { - if limbo_err.to_string() != doublecheck_err.to_string() { - tracing::error!( - "limbo and doublecheck errors do not match" - ); - tracing::error!("limbo error {}", limbo_err); - tracing::error!("doublecheck error {}", doublecheck_err); - return Err(turso_core::LimboError::InternalError( - "limbo and doublecheck errors do not match".into(), - )); - } - } - (Ok(limbo_result), Err(doublecheck_err)) => { - tracing::error!( - "limbo and doublecheck results do not match, limbo returned values but doublecheck failed" - ); - tracing::error!("limbo values {:?}", limbo_result); - tracing::error!("doublecheck error {}", doublecheck_err); - return Err(turso_core::LimboError::InternalError( - "limbo and doublecheck results do not match".into(), - )); - } - (Err(limbo_err), Ok(_)) => { - tracing::error!( - "limbo and doublecheck results do not match, limbo failed but doublecheck returned values" - ); - tracing::error!("limbo error {}", limbo_err); - return Err(turso_core::LimboError::InternalError( - "limbo and doublecheck results do not match".into(), - )); - } } } - (None, None) => {} - _ => { - tracing::error!("limbo and doublecheck results do not match"); + (Err(limbo_err), Err(doublecheck_err)) => { + if limbo_err.to_string() != doublecheck_err.to_string() { + tracing::error!("limbo and doublecheck errors do not match"); + tracing::error!("limbo error {}", limbo_err); + tracing::error!("doublecheck error {}", doublecheck_err); + return Err(turso_core::LimboError::InternalError( + "limbo and doublecheck errors do not match".into(), + )); + } + } + (Ok(limbo_result), Err(doublecheck_err)) => { + tracing::error!( + "limbo and doublecheck results do not match, limbo returned values but doublecheck failed" + ); + tracing::error!("limbo values {:?}", limbo_result); + tracing::error!("doublecheck error {}", doublecheck_err); + return Err(turso_core::LimboError::InternalError( + "limbo and doublecheck results do not match".into(), + )); + } + (Err(limbo_err), Ok(_)) => { + tracing::error!( + "limbo and doublecheck results do not match, limbo failed but doublecheck returned values" + ); + tracing::error!("limbo error {}", limbo_err); return Err(turso_core::LimboError::InternalError( "limbo and doublecheck results do not match".into(), )); } } - - // Move to the next interaction or property - match next_execution { - ExecutionContinuation::NextInteraction => { - if state.secondary_pointer + 1 - >= plan.plan[state.interaction_pointer].interactions().len() - { - // If we have reached the end of the interactions for this property, move to the next property - state.interaction_pointer += 1; - state.secondary_pointer = 0; - } else { - // Otherwise, move to the next interaction - state.secondary_pointer += 1; - } - } - ExecutionContinuation::NextProperty => { - // Skip to the next property - state.interaction_pointer += 1; - state.secondary_pointer = 0; - } - } } - (Err(err), Ok(_)) => { + (None, None) => {} + _ => { tracing::error!("limbo and doublecheck results do not match"); - tracing::error!("limbo error {}", err); - return Err(err); - } - (Ok(val), Err(err)) => { - tracing::error!("limbo and doublecheck results do not match"); - tracing::error!("limbo {:?}", val); - tracing::error!("doublecheck error {}", err); - return Err(err); - } - (Err(err), Err(err_doublecheck)) => { - if err.to_string() != err_doublecheck.to_string() { - tracing::error!("limbo and doublecheck errors do not match"); - tracing::error!("limbo error {}", err); - tracing::error!("doublecheck error {}", err_doublecheck); - return Err(turso_core::LimboError::InternalError( - "limbo and doublecheck errors do not match".into(), - )); - } + return Err(turso_core::LimboError::InternalError( + "limbo and doublecheck results do not match".into(), + )); } } } - _ => unreachable!("{} vs {}", connection, doublecheck_connection), + (Err(err), Ok(_)) => { + tracing::error!("limbo and doublecheck results do not match"); + tracing::error!("limbo error {}", err); + return Err(err); + } + (Ok(val), Err(err)) => { + tracing::error!("limbo and doublecheck results do not match"); + tracing::error!("limbo {:?}", val); + tracing::error!("doublecheck error {}", err); + return Err(err); + } + (Err(err), Err(err_doublecheck)) => { + if err.to_string() != err_doublecheck.to_string() { + tracing::error!("limbo and doublecheck errors do not match"); + tracing::error!("limbo error {}", err); + tracing::error!("doublecheck error {}", err_doublecheck); + return Err(turso_core::LimboError::InternalError( + "limbo and doublecheck errors do not match".into(), + )); + } + } } - Ok(()) }