simplify run_simualtion signature + remove watch mode file as it shares logic with default run simulation logic

This commit is contained in:
pedrocarlo
2025-09-16 16:03:40 -03:00
parent 35ddcb4270
commit 4d9e676f65
5 changed files with 21 additions and 171 deletions

View File

@@ -42,7 +42,7 @@ impl InteractionPlan {
/// delete interactions from the human readable file, and this function uses the JSON file as
/// a baseline to detect with interactions were deleted and constructs the plan from the
/// remaining interactions.
pub(crate) fn compute_via_diff(plan_path: &Path) -> Vec<Vec<Interaction>> {
pub(crate) fn compute_via_diff(plan_path: &Path) -> Vec<Interaction> {
let interactions = std::fs::read_to_string(plan_path).unwrap();
let interactions = interactions.lines().collect::<Vec<_>>();
@@ -97,7 +97,7 @@ impl InteractionPlan {
}
}
let _ = plan.split_off(j);
plan
plan.into_iter().flatten().collect()
}
pub fn interactions_list(&self) -> impl Iterator<Item = Interaction> {

View File

@@ -7,9 +7,9 @@ use notify::{EventKind, RecursiveMode, Watcher};
use rand::prelude::*;
use runner::bugbase::{Bug, BugBase, LoadedBug};
use runner::cli::{SimulatorCLI, SimulatorCommand};
use runner::differential;
use runner::env::SimulatorEnv;
use runner::execution::{Execution, ExecutionHistory, ExecutionResult, execute_interactions};
use runner::{differential, watch};
use std::any::Any;
use std::backtrace::Backtrace;
use std::fs::OpenOptions;
@@ -168,7 +168,7 @@ fn watch_mode(env: SimulatorEnv) -> notify::Result<()> {
// below will be monitored for changes.
watcher.watch(&env.get_plan_path(), RecursiveMode::NonRecursive)?;
// Block forever, printing out events as they come in
let last_execution = Arc::new(Mutex::new(Execution::new(0, 0, 0)));
let last_execution = Arc::new(Mutex::new(Execution::new(0, 0)));
for res in rx {
match res {
Ok(event) => {
@@ -179,8 +179,7 @@ fn watch_mode(env: SimulatorEnv) -> notify::Result<()> {
let result = SandboxedResult::from(
std::panic::catch_unwind(move || {
let mut env = env;
let plan: Vec<Vec<Interaction>> =
InteractionPlan::compute_via_diff(&env.get_plan_path());
let plan = InteractionPlan::compute_via_diff(&env.get_plan_path());
tracing::error!("plan_len: {}", plan.len());
env.clear();
@@ -191,7 +190,7 @@ fn watch_mode(env: SimulatorEnv) -> notify::Result<()> {
// });
let env = Arc::new(Mutex::new(env.clone_without_connections()));
watch::run_simulation(env, &mut [plan], last_execution_.clone())
run_simulation_default(env, plan, last_execution_.clone())
}),
last_execution.clone(),
);
@@ -236,11 +235,12 @@ fn run_simulator(
tracing::error!("captured backtrace:\n{}", bt);
}));
let last_execution = Arc::new(Mutex::new(Execution::new(0, 0, 0)));
let last_execution = Arc::new(Mutex::new(Execution::new(0, 0)));
let env = Arc::new(Mutex::new(env));
let result = SandboxedResult::from(
std::panic::catch_unwind(|| {
run_simulation(env.clone(), &mut plan.clone(), last_execution.clone())
let interactions = plan.interactions_list().collect::<Vec<_>>();
run_simulation(env.clone(), interactions, last_execution.clone())
}),
last_execution.clone(),
);
@@ -270,10 +270,8 @@ fn run_simulator(
for execution in history.history.iter() {
writeln!(
f,
"{} {} {}",
execution.connection_index,
execution.interaction_index,
execution.secondary_index
"{} {}",
execution.connection_index, execution.interaction_index,
)
.unwrap();
}
@@ -308,17 +306,15 @@ fn run_simulator(
let env = Arc::new(Mutex::new(env));
let shrunk = SandboxedResult::from(
std::panic::catch_unwind(|| {
run_simulation(
env.clone(),
&mut shrunk_plan.clone(),
last_execution.clone(),
)
let interactions = shrunk_plan.interactions_list().collect::<Vec<_>>();
run_simulation(env.clone(), interactions, last_execution.clone())
}),
last_execution,
);
(shrunk_plan, shrunk)
} else {
(plan, result.clone())
(plan.clone(), result.clone())
};
match (&shrunk, &result) {
@@ -541,7 +537,7 @@ fn setup_simulation(
fn run_simulation(
env: Arc<Mutex<SimulatorEnv>>,
plan: &mut InteractionPlan,
plan: Vec<Interaction>,
last_execution: Arc<Mutex<Execution>>,
) -> ExecutionResult {
let simulation_type = {
@@ -574,7 +570,7 @@ fn run_simulation(
fn run_simulation_default(
env: Arc<Mutex<SimulatorEnv>>,
plan: &mut InteractionPlan,
plan: Vec<Interaction>,
last_execution: Arc<Mutex<Execution>>,
) -> ExecutionResult {
tracing::info!("Executing database interaction plan...");
@@ -592,11 +588,9 @@ fn run_simulation_default(
interaction_pointer: 0,
};
let interactions = plan.interactions_list().collect::<Vec<_>>();
let mut result = execute_interactions(
env.clone(),
interactions,
plan,
&mut state,
&mut conn_states,
last_execution,

View File

@@ -9,7 +9,6 @@ pub mod execution;
pub mod file;
pub mod io;
pub mod memory;
pub mod watch;
pub const FAULT_ERROR_MSG: &str = "Injected Fault";

View File

@@ -1,145 +0,0 @@
use std::sync::{Arc, Mutex};
use sql_generation::generation::pick_index;
use crate::{
generation::plan::{Interaction, InteractionPlanState},
integrity_check,
runner::execution::ExecutionContinuation,
};
use super::{
env::{SimConnection, SimulatorEnv},
execution::{Execution, ExecutionHistory, ExecutionResult, execute_interaction},
};
pub(crate) fn run_simulation(
env: Arc<Mutex<SimulatorEnv>>,
plans: &mut [Vec<Vec<Interaction>>],
last_execution: Arc<Mutex<Execution>>,
) -> ExecutionResult {
let mut states = plans
.iter()
.map(|_| InteractionPlanState {
stack: vec![],
interaction_pointer: 0,
secondary_pointer: 0,
})
.collect::<Vec<_>>();
let mut result = execute_plans(env.clone(), plans, &mut states, last_execution);
let env = env.lock().unwrap();
env.io.print_stats();
tracing::info!("Simulation completed");
if result.error.is_none() {
let ic = integrity_check(&env.get_db_path());
if let Err(err) = ic {
tracing::error!("integrity check failed: {}", err);
result.error = Some(turso_core::LimboError::InternalError(err.to_string()));
} else {
tracing::info!("integrity check passed");
}
}
result
}
pub(crate) fn execute_plans(
env: Arc<Mutex<SimulatorEnv>>,
plans: &mut [Vec<Vec<Interaction>>],
states: &mut [InteractionPlanState],
last_execution: Arc<Mutex<Execution>>,
) -> ExecutionResult {
let mut history = ExecutionHistory::new();
let now = std::time::Instant::now();
let mut env = env.lock().unwrap();
for _tick in 0..env.opts.ticks {
// Pick the connection to interact with
let connection_index = pick_index(env.connections.len(), &mut env.rng);
let state = &mut states[connection_index];
history.history.push(Execution::new(
connection_index,
state.interaction_pointer,
state.secondary_pointer,
));
let mut last_execution = last_execution.lock().unwrap();
last_execution.connection_index = connection_index;
last_execution.interaction_index = state.interaction_pointer;
last_execution.secondary_index = state.secondary_pointer;
// Execute the interaction for the selected connection
match execute_plan(&mut env, connection_index, plans, states) {
Ok(_) => {}
Err(err) => {
return ExecutionResult::new(history, Some(err));
}
}
// Check if the maximum time for the simulation has been reached
if now.elapsed().as_secs() >= env.opts.max_time_simulation as u64 {
return ExecutionResult::new(
history,
Some(turso_core::LimboError::InternalError(
"maximum time for simulation reached".into(),
)),
);
}
}
ExecutionResult::new(history, None)
}
fn execute_plan(
env: &mut SimulatorEnv,
connection_index: usize,
plans: &mut [Vec<Vec<Interaction>>],
states: &mut [InteractionPlanState],
) -> turso_core::Result<()> {
let connection = &env.connections[connection_index];
let plan = &mut plans[connection_index];
let state = &mut states[connection_index];
if state.interaction_pointer >= plan.len() {
return Ok(());
}
let interaction = &plan[state.interaction_pointer][state.secondary_pointer];
if let SimConnection::Disconnected = connection {
tracing::debug!("connecting {}", connection_index);
env.connections[connection_index] = SimConnection::LimboConnection(
env.db.as_ref().expect("db to be Some").connect().unwrap(),
);
} else {
match execute_interaction(env, connection_index, interaction, &mut state.stack) {
Ok(next_execution) => {
tracing::debug!("connection {} processed", connection_index);
// Move to the next interaction or property
match next_execution {
ExecutionContinuation::NextInteraction => {
if state.secondary_pointer + 1 >= plan[state.interaction_pointer].len() {
// If we have reached the end of the interactions for this property, move to the next property
state.interaction_pointer += 1;
state.secondary_pointer = 0;
} else {
// Otherwise, move to the next interaction
state.secondary_pointer += 1;
}
}
ExecutionContinuation::NextProperty => {
// Skip to the next property
state.interaction_pointer += 1;
state.secondary_pointer = 0;
}
}
}
Err(err) => {
tracing::error!("error {}", err);
return Err(err);
}
}
}
Ok(())
}

View File

@@ -275,7 +275,9 @@ impl InteractionPlan {
let last_execution = Arc::new(Mutex::new(*failing_execution));
let result = SandboxedResult::from(
std::panic::catch_unwind(|| {
run_simulation(env.clone(), &mut test_plan.clone(), last_execution.clone())
let interactions = test_plan.interactions_list().collect::<Vec<_>>();
run_simulation(env.clone(), interactions, last_execution.clone())
}),
last_execution,
);