mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-22 01:24:18 +01:00
simplify code for doublecheck testing
This commit is contained in:
@@ -55,7 +55,6 @@ pub(crate) fn execute_interactions(
|
||||
last_execution: Arc<Mutex<Execution>>,
|
||||
) -> ExecutionResult {
|
||||
let mut history = ExecutionHistory::new();
|
||||
let now = std::time::Instant::now();
|
||||
|
||||
let mut env = env.lock().unwrap();
|
||||
let mut rusqlite_env = rusqlite_env.lock().unwrap();
|
||||
@@ -63,6 +62,8 @@ pub(crate) fn execute_interactions(
|
||||
env.tables.clear();
|
||||
rusqlite_env.tables.clear();
|
||||
|
||||
let now = std::time::Instant::now();
|
||||
|
||||
for _tick in 0..env.opts.ticks {
|
||||
if state.interaction_pointer >= interactions.len() {
|
||||
break;
|
||||
@@ -95,7 +96,7 @@ pub(crate) fn execute_interactions(
|
||||
|
||||
// second execute rusqlite
|
||||
let rusqlite_res = super::execution::execute_plan(
|
||||
&mut env,
|
||||
&mut rusqlite_env,
|
||||
interaction,
|
||||
rusqlite_conn_state,
|
||||
&mut rusqlite_state,
|
||||
@@ -111,6 +112,8 @@ pub(crate) fn execute_interactions(
|
||||
return ExecutionResult::new(history, Some(err));
|
||||
}
|
||||
|
||||
state.interaction_pointer += 1;
|
||||
|
||||
// Check if the maximum time for the simulation has been reached
|
||||
if now.elapsed().as_secs() >= env.opts.max_time_simulation as u64 {
|
||||
return ExecutionResult::new(
|
||||
|
||||
@@ -3,36 +3,34 @@ use std::{
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use sql_generation::generation::pick_index;
|
||||
|
||||
use crate::{
|
||||
InteractionPlan, generation::plan::InteractionPlanState,
|
||||
runner::execution::ExecutionContinuation,
|
||||
};
|
||||
use crate::generation::plan::{ConnectionState, Interaction, InteractionPlanState};
|
||||
|
||||
use super::{
|
||||
env::{SimConnection, SimulatorEnv},
|
||||
execution::{Execution, ExecutionHistory, ExecutionResult, execute_interaction},
|
||||
env::SimulatorEnv,
|
||||
execution::{Execution, ExecutionHistory, ExecutionResult},
|
||||
};
|
||||
|
||||
pub(crate) fn run_simulation(
|
||||
pub fn run_simulation(
|
||||
env: Arc<Mutex<SimulatorEnv>>,
|
||||
doublecheck_env: Arc<Mutex<SimulatorEnv>>,
|
||||
plan: &mut InteractionPlan,
|
||||
plan: Vec<Interaction>,
|
||||
last_execution: Arc<Mutex<Execution>>,
|
||||
) -> ExecutionResult {
|
||||
tracing::info!("Executing database interaction plan...");
|
||||
|
||||
let mut state = InteractionPlanState {
|
||||
stack: vec![],
|
||||
interaction_pointer: 0,
|
||||
secondary_pointer: 0,
|
||||
let num_conns = {
|
||||
let env = env.lock().unwrap();
|
||||
env.connections.len()
|
||||
};
|
||||
|
||||
let mut doublecheck_state = InteractionPlanState {
|
||||
stack: vec![],
|
||||
let mut conn_states = (0..num_conns)
|
||||
.map(|_| ConnectionState::default())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut doublecheck_states = conn_states.clone();
|
||||
|
||||
let mut state = InteractionPlanState {
|
||||
interaction_pointer: 0,
|
||||
secondary_pointer: 0,
|
||||
};
|
||||
|
||||
let mut result = execute_plans(
|
||||
@@ -40,7 +38,8 @@ pub(crate) fn run_simulation(
|
||||
doublecheck_env.clone(),
|
||||
plan,
|
||||
&mut state,
|
||||
&mut doublecheck_state,
|
||||
&mut conn_states,
|
||||
&mut doublecheck_states,
|
||||
last_execution,
|
||||
);
|
||||
|
||||
@@ -79,44 +78,72 @@ pub(crate) fn run_simulation(
|
||||
pub(crate) fn execute_plans(
|
||||
env: Arc<Mutex<SimulatorEnv>>,
|
||||
doublecheck_env: Arc<Mutex<SimulatorEnv>>,
|
||||
plans: &mut InteractionPlan,
|
||||
states: &mut InteractionPlanState,
|
||||
doublecheck_states: &mut InteractionPlanState,
|
||||
interactions: Vec<Interaction>,
|
||||
state: &mut InteractionPlanState,
|
||||
conn_states: &mut [ConnectionState],
|
||||
doublecheck_states: &mut [ConnectionState],
|
||||
last_execution: Arc<Mutex<Execution>>,
|
||||
) -> ExecutionResult {
|
||||
let mut history = ExecutionHistory::new();
|
||||
let now = std::time::Instant::now();
|
||||
|
||||
let mut env = env.lock().unwrap();
|
||||
let mut doublecheck_env = doublecheck_env.lock().unwrap();
|
||||
|
||||
for _tick in 0..env.opts.ticks {
|
||||
// Pick the connection to interact with
|
||||
let connection_index = pick_index(env.connections.len(), &mut env.rng);
|
||||
env.tables.clear();
|
||||
doublecheck_env.tables.clear();
|
||||
|
||||
history.history.push(Execution::new(
|
||||
connection_index,
|
||||
state.interaction_pointer,
|
||||
state.secondary_pointer,
|
||||
));
|
||||
let now = std::time::Instant::now();
|
||||
|
||||
for _tick in 0..env.opts.ticks {
|
||||
if state.interaction_pointer >= interactions.len() {
|
||||
break;
|
||||
}
|
||||
|
||||
let interaction = &interactions[state.interaction_pointer];
|
||||
|
||||
let connection_index = interaction.connection_index;
|
||||
let turso_conn_state = &mut conn_states[connection_index];
|
||||
let doublecheck_conn_state = &mut doublecheck_states[connection_index];
|
||||
|
||||
history
|
||||
.history
|
||||
.push(Execution::new(connection_index, state.interaction_pointer));
|
||||
let mut last_execution = last_execution.lock().unwrap();
|
||||
last_execution.connection_index = connection_index;
|
||||
last_execution.interaction_index = state.interaction_pointer;
|
||||
last_execution.secondary_index = state.secondary_pointer;
|
||||
// Execute the interaction for the selected connection
|
||||
match execute_plan(
|
||||
|
||||
let mut turso_state = state.clone();
|
||||
|
||||
// first execute turso
|
||||
let turso_res = super::execution::execute_plan(
|
||||
&mut env,
|
||||
interaction,
|
||||
turso_conn_state,
|
||||
&mut turso_state,
|
||||
);
|
||||
|
||||
let mut doublecheck_state = state.clone();
|
||||
|
||||
// second execute doublecheck
|
||||
let doublecheck_res = super::execution::execute_plan(
|
||||
&mut doublecheck_env,
|
||||
connection_index,
|
||||
plans,
|
||||
states,
|
||||
doublecheck_states,
|
||||
interaction,
|
||||
doublecheck_conn_state,
|
||||
&mut doublecheck_state,
|
||||
);
|
||||
|
||||
// Compare results
|
||||
if let Err(err) = compare_results(
|
||||
turso_res,
|
||||
turso_conn_state,
|
||||
doublecheck_res,
|
||||
doublecheck_conn_state,
|
||||
) {
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
return ExecutionResult::new(history, Some(err));
|
||||
}
|
||||
return ExecutionResult::new(history, Some(err));
|
||||
}
|
||||
|
||||
state.interaction_pointer += 1;
|
||||
|
||||
// Check if the maximum time for the simulation has been reached
|
||||
if now.elapsed().as_secs() >= env.opts.max_time_simulation as u64 {
|
||||
return ExecutionResult::new(
|
||||
@@ -131,176 +158,92 @@ pub(crate) fn execute_plans(
|
||||
ExecutionResult::new(history, None)
|
||||
}
|
||||
|
||||
fn execute_plan(
|
||||
env: &mut SimulatorEnv,
|
||||
doublecheck_env: &mut SimulatorEnv,
|
||||
connection_index: usize,
|
||||
plans: &mut [InteractionPlan],
|
||||
states: &mut [InteractionPlanState],
|
||||
doublecheck_states: &mut [InteractionPlanState],
|
||||
fn compare_results(
|
||||
turso_res: turso_core::Result<()>,
|
||||
turso_state: &mut ConnectionState,
|
||||
doublecheck_res: turso_core::Result<()>,
|
||||
doublecheck_state: &mut ConnectionState,
|
||||
) -> turso_core::Result<()> {
|
||||
let connection = &env.connections[connection_index];
|
||||
let doublecheck_connection = &doublecheck_env.connections[connection_index];
|
||||
let plan = &mut plans[connection_index];
|
||||
|
||||
let state = &mut states[connection_index];
|
||||
let doublecheck_state = &mut doublecheck_states[connection_index];
|
||||
|
||||
if state.interaction_pointer >= plan.plan.len() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let interaction = &plan.plan[state.interaction_pointer].interactions()[state.secondary_pointer];
|
||||
|
||||
tracing::debug!(
|
||||
"execute_plan(connection_index={}, interaction={})",
|
||||
connection_index,
|
||||
interaction
|
||||
);
|
||||
tracing::debug!(
|
||||
"connection: {}, doublecheck_connection: {}",
|
||||
connection,
|
||||
doublecheck_connection
|
||||
);
|
||||
match (connection, doublecheck_connection) {
|
||||
(SimConnection::Disconnected, SimConnection::Disconnected) => {
|
||||
tracing::debug!("connecting {}", connection_index);
|
||||
env.connect(connection_index);
|
||||
doublecheck_env.connect(connection_index);
|
||||
}
|
||||
(SimConnection::LimboConnection(_), SimConnection::LimboConnection(_)) => {
|
||||
let limbo_result =
|
||||
execute_interaction(env, connection_index, interaction, &mut state.stack);
|
||||
let doublecheck_result = execute_interaction(
|
||||
doublecheck_env,
|
||||
connection_index,
|
||||
interaction,
|
||||
&mut doublecheck_state.stack,
|
||||
);
|
||||
match (limbo_result, doublecheck_result) {
|
||||
(Ok(next_execution), Ok(next_execution_doublecheck)) => {
|
||||
if next_execution != next_execution_doublecheck {
|
||||
tracing::error!(
|
||||
"expected next executions of limbo and doublecheck do not match"
|
||||
);
|
||||
tracing::debug!(
|
||||
"limbo result: {:?}, doublecheck result: {:?}",
|
||||
next_execution,
|
||||
next_execution_doublecheck
|
||||
);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"expected next executions of limbo and doublecheck do not match".into(),
|
||||
));
|
||||
}
|
||||
|
||||
let limbo_values = state.stack.last();
|
||||
let doublecheck_values = doublecheck_state.stack.last();
|
||||
match (turso_res, doublecheck_res) {
|
||||
(Ok(..), Ok(..)) => {
|
||||
let turso_values = turso_state.stack.last();
|
||||
let doublecheck_values = doublecheck_state.stack.last();
|
||||
match (turso_values, doublecheck_values) {
|
||||
(Some(limbo_values), Some(doublecheck_values)) => {
|
||||
match (limbo_values, doublecheck_values) {
|
||||
(Some(limbo_values), Some(doublecheck_values)) => {
|
||||
match (limbo_values, doublecheck_values) {
|
||||
(Ok(limbo_values), Ok(doublecheck_values)) => {
|
||||
if limbo_values != doublecheck_values {
|
||||
tracing::error!(
|
||||
"returned values from limbo and doublecheck results do not match"
|
||||
);
|
||||
tracing::debug!("limbo values {:?}", limbo_values);
|
||||
tracing::debug!(
|
||||
"doublecheck values {:?}",
|
||||
doublecheck_values
|
||||
);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
(Ok(limbo_values), Ok(doublecheck_values)) => {
|
||||
if limbo_values != doublecheck_values {
|
||||
tracing::error!(
|
||||
"returned values from limbo and doublecheck results do not match"
|
||||
);
|
||||
tracing::debug!("limbo values {:?}", limbo_values);
|
||||
tracing::debug!("doublecheck values {:?}", doublecheck_values);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"returned values from limbo and doublecheck results do not match".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
(Err(limbo_err), Err(doublecheck_err)) => {
|
||||
if limbo_err.to_string() != doublecheck_err.to_string() {
|
||||
tracing::error!(
|
||||
"limbo and doublecheck errors do not match"
|
||||
);
|
||||
tracing::error!("limbo error {}", limbo_err);
|
||||
tracing::error!("doublecheck error {}", doublecheck_err);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"limbo and doublecheck errors do not match".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
(Ok(limbo_result), Err(doublecheck_err)) => {
|
||||
tracing::error!(
|
||||
"limbo and doublecheck results do not match, limbo returned values but doublecheck failed"
|
||||
);
|
||||
tracing::error!("limbo values {:?}", limbo_result);
|
||||
tracing::error!("doublecheck error {}", doublecheck_err);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"limbo and doublecheck results do not match".into(),
|
||||
));
|
||||
}
|
||||
(Err(limbo_err), Ok(_)) => {
|
||||
tracing::error!(
|
||||
"limbo and doublecheck results do not match, limbo failed but doublecheck returned values"
|
||||
);
|
||||
tracing::error!("limbo error {}", limbo_err);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"limbo and doublecheck results do not match".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
(None, None) => {}
|
||||
_ => {
|
||||
tracing::error!("limbo and doublecheck results do not match");
|
||||
(Err(limbo_err), Err(doublecheck_err)) => {
|
||||
if limbo_err.to_string() != doublecheck_err.to_string() {
|
||||
tracing::error!("limbo and doublecheck errors do not match");
|
||||
tracing::error!("limbo error {}", limbo_err);
|
||||
tracing::error!("doublecheck error {}", doublecheck_err);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"limbo and doublecheck errors do not match".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
(Ok(limbo_result), Err(doublecheck_err)) => {
|
||||
tracing::error!(
|
||||
"limbo and doublecheck results do not match, limbo returned values but doublecheck failed"
|
||||
);
|
||||
tracing::error!("limbo values {:?}", limbo_result);
|
||||
tracing::error!("doublecheck error {}", doublecheck_err);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"limbo and doublecheck results do not match".into(),
|
||||
));
|
||||
}
|
||||
(Err(limbo_err), Ok(_)) => {
|
||||
tracing::error!(
|
||||
"limbo and doublecheck results do not match, limbo failed but doublecheck returned values"
|
||||
);
|
||||
tracing::error!("limbo error {}", limbo_err);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"limbo and doublecheck results do not match".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Move to the next interaction or property
|
||||
match next_execution {
|
||||
ExecutionContinuation::NextInteraction => {
|
||||
if state.secondary_pointer + 1
|
||||
>= plan.plan[state.interaction_pointer].interactions().len()
|
||||
{
|
||||
// If we have reached the end of the interactions for this property, move to the next property
|
||||
state.interaction_pointer += 1;
|
||||
state.secondary_pointer = 0;
|
||||
} else {
|
||||
// Otherwise, move to the next interaction
|
||||
state.secondary_pointer += 1;
|
||||
}
|
||||
}
|
||||
ExecutionContinuation::NextProperty => {
|
||||
// Skip to the next property
|
||||
state.interaction_pointer += 1;
|
||||
state.secondary_pointer = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
(Err(err), Ok(_)) => {
|
||||
(None, None) => {}
|
||||
_ => {
|
||||
tracing::error!("limbo and doublecheck results do not match");
|
||||
tracing::error!("limbo error {}", err);
|
||||
return Err(err);
|
||||
}
|
||||
(Ok(val), Err(err)) => {
|
||||
tracing::error!("limbo and doublecheck results do not match");
|
||||
tracing::error!("limbo {:?}", val);
|
||||
tracing::error!("doublecheck error {}", err);
|
||||
return Err(err);
|
||||
}
|
||||
(Err(err), Err(err_doublecheck)) => {
|
||||
if err.to_string() != err_doublecheck.to_string() {
|
||||
tracing::error!("limbo and doublecheck errors do not match");
|
||||
tracing::error!("limbo error {}", err);
|
||||
tracing::error!("doublecheck error {}", err_doublecheck);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"limbo and doublecheck errors do not match".into(),
|
||||
));
|
||||
}
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"limbo and doublecheck results do not match".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => unreachable!("{} vs {}", connection, doublecheck_connection),
|
||||
(Err(err), Ok(_)) => {
|
||||
tracing::error!("limbo and doublecheck results do not match");
|
||||
tracing::error!("limbo error {}", err);
|
||||
return Err(err);
|
||||
}
|
||||
(Ok(val), Err(err)) => {
|
||||
tracing::error!("limbo and doublecheck results do not match");
|
||||
tracing::error!("limbo {:?}", val);
|
||||
tracing::error!("doublecheck error {}", err);
|
||||
return Err(err);
|
||||
}
|
||||
(Err(err), Err(err_doublecheck)) => {
|
||||
if err.to_string() != err_doublecheck.to_string() {
|
||||
tracing::error!("limbo and doublecheck errors do not match");
|
||||
tracing::error!("limbo error {}", err);
|
||||
tracing::error!("doublecheck error {}", err_doublecheck);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"limbo and doublecheck errors do not match".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user