mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-08 09:44:21 +01:00
simplify code for differential testing
This commit is contained in:
@@ -108,11 +108,12 @@ impl InteractionPlan {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct InteractionPlanState {
|
||||
pub interaction_pointer: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct ConnectionState {
|
||||
pub stack: Vec<ResultSet>,
|
||||
}
|
||||
|
||||
@@ -1,54 +1,41 @@
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use sql_generation::{generation::pick_index, model::table::SimValue};
|
||||
use turso_core::Value;
|
||||
|
||||
use crate::{
|
||||
InteractionPlan,
|
||||
generation::{
|
||||
Shadow as _,
|
||||
plan::{Interaction, InteractionPlanState, InteractionType, ResultSet},
|
||||
},
|
||||
model::Query,
|
||||
runner::execution::ExecutionContinuation,
|
||||
};
|
||||
use crate::generation::plan::{ConnectionState, Interaction, InteractionPlanState};
|
||||
|
||||
use super::{
|
||||
env::{SimConnection, SimulatorEnv},
|
||||
execution::{Execution, ExecutionHistory, ExecutionResult, execute_interaction},
|
||||
env::SimulatorEnv,
|
||||
execution::{Execution, ExecutionHistory, ExecutionResult},
|
||||
};
|
||||
|
||||
pub(crate) fn run_simulation(
|
||||
pub fn run_simulation(
|
||||
env: Arc<Mutex<SimulatorEnv>>,
|
||||
rusqlite_env: Arc<Mutex<SimulatorEnv>>,
|
||||
plans: &mut [InteractionPlan],
|
||||
plan: Vec<Interaction>,
|
||||
last_execution: Arc<Mutex<Execution>>,
|
||||
) -> ExecutionResult {
|
||||
tracing::info!("Executing database interaction plan...");
|
||||
|
||||
let mut states = plans
|
||||
.iter()
|
||||
.map(|_| InteractionPlanState {
|
||||
stack: vec![],
|
||||
interaction_pointer: 0,
|
||||
secondary_pointer: 0,
|
||||
})
|
||||
let num_conns = {
|
||||
let env = env.lock().unwrap();
|
||||
env.connections.len()
|
||||
};
|
||||
|
||||
let mut conn_states = (0..num_conns)
|
||||
.map(|_| ConnectionState::default())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut rusqlite_states = plans
|
||||
.iter()
|
||||
.map(|_| InteractionPlanState {
|
||||
stack: vec![],
|
||||
interaction_pointer: 0,
|
||||
secondary_pointer: 0,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let mut rusqlite_states = conn_states.clone();
|
||||
|
||||
let result = execute_plans(
|
||||
let mut state = InteractionPlanState {
|
||||
interaction_pointer: 0,
|
||||
};
|
||||
|
||||
let result = execute_interactions(
|
||||
env,
|
||||
rusqlite_env,
|
||||
plans,
|
||||
&mut states,
|
||||
plan,
|
||||
&mut state,
|
||||
&mut conn_states,
|
||||
&mut rusqlite_states,
|
||||
last_execution,
|
||||
);
|
||||
@@ -58,80 +45,13 @@ pub(crate) fn run_simulation(
|
||||
result
|
||||
}
|
||||
|
||||
fn execute_query_rusqlite(
|
||||
connection: &rusqlite::Connection,
|
||||
query: &Query,
|
||||
) -> rusqlite::Result<Vec<Vec<SimValue>>> {
|
||||
match query {
|
||||
Query::Create(create) => {
|
||||
connection.execute(create.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::Select(select) => {
|
||||
let mut stmt = connection.prepare(select.to_string().as_str())?;
|
||||
let columns = stmt.column_count();
|
||||
let rows = stmt.query_map([], |row| {
|
||||
let mut values = vec![];
|
||||
for i in 0..columns {
|
||||
let value = row.get_unwrap(i);
|
||||
let value = match value {
|
||||
rusqlite::types::Value::Null => Value::Null,
|
||||
rusqlite::types::Value::Integer(i) => Value::Integer(i),
|
||||
rusqlite::types::Value::Real(f) => Value::Float(f),
|
||||
rusqlite::types::Value::Text(s) => Value::build_text(s),
|
||||
rusqlite::types::Value::Blob(b) => Value::Blob(b),
|
||||
};
|
||||
values.push(SimValue(value));
|
||||
}
|
||||
Ok(values)
|
||||
})?;
|
||||
let mut result = vec![];
|
||||
for row in rows {
|
||||
result.push(row?);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
Query::Insert(insert) => {
|
||||
connection.execute(insert.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::Delete(delete) => {
|
||||
connection.execute(delete.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::Drop(drop) => {
|
||||
connection.execute(drop.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::Update(update) => {
|
||||
connection.execute(update.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::CreateIndex(create_index) => {
|
||||
connection.execute(create_index.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::Begin(begin) => {
|
||||
connection.execute(begin.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::Commit(commit) => {
|
||||
connection.execute(commit.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::Rollback(rollback) => {
|
||||
connection.execute(rollback.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn execute_plans(
|
||||
pub(crate) fn execute_interactions(
|
||||
env: Arc<Mutex<SimulatorEnv>>,
|
||||
rusqlite_env: Arc<Mutex<SimulatorEnv>>,
|
||||
plans: &mut [InteractionPlan],
|
||||
states: &mut [InteractionPlanState],
|
||||
rusqlite_states: &mut [InteractionPlanState],
|
||||
interactions: Vec<Interaction>,
|
||||
state: &mut InteractionPlanState,
|
||||
conn_states: &mut [ConnectionState],
|
||||
rusqlite_states: &mut [ConnectionState],
|
||||
last_execution: Arc<Mutex<Execution>>,
|
||||
) -> ExecutionResult {
|
||||
let mut history = ExecutionHistory::new();
|
||||
@@ -140,34 +60,57 @@ pub(crate) fn execute_plans(
|
||||
let mut env = env.lock().unwrap();
|
||||
let mut rusqlite_env = rusqlite_env.lock().unwrap();
|
||||
|
||||
for _tick in 0..env.opts.ticks {
|
||||
// Pick the connection to interact with
|
||||
let connection_index = pick_index(env.connections.len(), &mut env.rng);
|
||||
let state = &mut states[connection_index];
|
||||
env.tables.clear();
|
||||
rusqlite_env.tables.clear();
|
||||
|
||||
history.history.push(Execution::new(
|
||||
connection_index,
|
||||
state.interaction_pointer,
|
||||
state.secondary_pointer,
|
||||
));
|
||||
for _tick in 0..env.opts.ticks {
|
||||
if state.interaction_pointer >= interactions.len() {
|
||||
break;
|
||||
}
|
||||
|
||||
let interaction = &interactions[state.interaction_pointer];
|
||||
|
||||
let connection_index = interaction.connection_index;
|
||||
let turso_conn_state = &mut conn_states[connection_index];
|
||||
let rusqlite_conn_state = &mut rusqlite_states[connection_index];
|
||||
|
||||
history
|
||||
.history
|
||||
.push(Execution::new(connection_index, state.interaction_pointer));
|
||||
let mut last_execution = last_execution.lock().unwrap();
|
||||
last_execution.connection_index = connection_index;
|
||||
last_execution.interaction_index = state.interaction_pointer;
|
||||
last_execution.secondary_index = state.secondary_pointer;
|
||||
// Execute the interaction for the selected connection
|
||||
match execute_plan(
|
||||
|
||||
let mut turso_state = state.clone();
|
||||
|
||||
// first execute turso
|
||||
let turso_res = super::execution::execute_plan(
|
||||
&mut env,
|
||||
&mut rusqlite_env,
|
||||
connection_index,
|
||||
plans,
|
||||
states,
|
||||
rusqlite_states,
|
||||
interaction,
|
||||
turso_conn_state,
|
||||
&mut turso_state,
|
||||
);
|
||||
|
||||
let mut rusqlite_state = state.clone();
|
||||
|
||||
// second execute rusqlite
|
||||
let rusqlite_res = super::execution::execute_plan(
|
||||
&mut env,
|
||||
interaction,
|
||||
rusqlite_conn_state,
|
||||
&mut rusqlite_state,
|
||||
);
|
||||
|
||||
// Compare results
|
||||
if let Err(err) = compare_results(
|
||||
turso_res,
|
||||
turso_conn_state,
|
||||
rusqlite_res,
|
||||
rusqlite_conn_state,
|
||||
) {
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
return ExecutionResult::new(history, Some(err));
|
||||
}
|
||||
return ExecutionResult::new(history, Some(err));
|
||||
}
|
||||
|
||||
// Check if the maximum time for the simulation has been reached
|
||||
if now.elapsed().as_secs() >= env.opts.max_time_simulation as u64 {
|
||||
return ExecutionResult::new(
|
||||
@@ -182,257 +125,122 @@ pub(crate) fn execute_plans(
|
||||
ExecutionResult::new(history, None)
|
||||
}
|
||||
|
||||
fn execute_plan(
|
||||
env: &mut SimulatorEnv,
|
||||
rusqlite_env: &mut SimulatorEnv,
|
||||
connection_index: usize,
|
||||
plans: &mut [InteractionPlan],
|
||||
states: &mut [InteractionPlanState],
|
||||
rusqlite_states: &mut [InteractionPlanState],
|
||||
fn compare_results(
|
||||
turso_res: turso_core::Result<()>,
|
||||
turso_conn_state: &mut ConnectionState,
|
||||
rusqlite_res: turso_core::Result<()>,
|
||||
rusqlite_conn_state: &mut ConnectionState,
|
||||
) -> turso_core::Result<()> {
|
||||
let connection = &env.connections[connection_index];
|
||||
let rusqlite_connection = &rusqlite_env.connections[connection_index];
|
||||
let plan = &mut plans[connection_index];
|
||||
let state = &mut states[connection_index];
|
||||
let rusqlite_state = &mut rusqlite_states[connection_index];
|
||||
if state.interaction_pointer >= plan.plan.len() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let interaction = &plan.plan[state.interaction_pointer].interactions()[state.secondary_pointer];
|
||||
|
||||
tracing::debug!(
|
||||
"execute_plan(connection_index={}, interaction={})",
|
||||
connection_index,
|
||||
interaction
|
||||
);
|
||||
tracing::debug!(
|
||||
"connection: {}, rusqlite_connection: {}",
|
||||
connection,
|
||||
rusqlite_connection
|
||||
);
|
||||
match (connection, rusqlite_connection) {
|
||||
(SimConnection::Disconnected, SimConnection::Disconnected) => {
|
||||
tracing::debug!("connecting {}", connection_index);
|
||||
env.connect(connection_index);
|
||||
rusqlite_env.connect(connection_index);
|
||||
}
|
||||
(SimConnection::LimboConnection(_), SimConnection::SQLiteConnection(_)) => {
|
||||
let limbo_result =
|
||||
execute_interaction(env, connection_index, interaction, &mut state.stack);
|
||||
let ruqlite_result = execute_interaction_rusqlite(
|
||||
rusqlite_env,
|
||||
connection_index,
|
||||
interaction,
|
||||
&mut rusqlite_state.stack,
|
||||
);
|
||||
match (limbo_result, ruqlite_result) {
|
||||
(Ok(next_execution), Ok(next_execution_rusqlite)) => {
|
||||
if next_execution != next_execution_rusqlite {
|
||||
tracing::error!(
|
||||
"expected next executions of limbo and rusqlite do not match"
|
||||
);
|
||||
tracing::debug!(
|
||||
"limbo result: {:?}, rusqlite result: {:?}",
|
||||
next_execution,
|
||||
next_execution_rusqlite
|
||||
);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"expected next executions of limbo and rusqlite do not match".into(),
|
||||
));
|
||||
}
|
||||
|
||||
let limbo_values = state.stack.last();
|
||||
let rusqlite_values = rusqlite_state.stack.last();
|
||||
match (turso_res, rusqlite_res) {
|
||||
(Ok(..), Ok(..)) => {
|
||||
let limbo_values = turso_conn_state.stack.last();
|
||||
let rusqlite_values = rusqlite_conn_state.stack.last();
|
||||
match (limbo_values, rusqlite_values) {
|
||||
(Some(limbo_values), Some(rusqlite_values)) => {
|
||||
match (limbo_values, rusqlite_values) {
|
||||
(Some(limbo_values), Some(rusqlite_values)) => {
|
||||
match (limbo_values, rusqlite_values) {
|
||||
(Ok(limbo_values), Ok(rusqlite_values)) => {
|
||||
if limbo_values != rusqlite_values {
|
||||
tracing::error!(
|
||||
"returned values from limbo and rusqlite results do not match"
|
||||
);
|
||||
let diff = limbo_values
|
||||
.iter()
|
||||
.zip(rusqlite_values.iter())
|
||||
.enumerate()
|
||||
.filter(|(_, (l, r))| l != r)
|
||||
.collect::<Vec<_>>();
|
||||
(Ok(limbo_values), Ok(rusqlite_values)) => {
|
||||
if limbo_values != rusqlite_values {
|
||||
tracing::error!(
|
||||
"returned values from limbo and rusqlite results do not match"
|
||||
);
|
||||
let diff = limbo_values
|
||||
.iter()
|
||||
.zip(rusqlite_values.iter())
|
||||
.enumerate()
|
||||
.filter(|(_, (l, r))| l != r)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let diff = diff
|
||||
.iter()
|
||||
.flat_map(|(i, (l, r))| {
|
||||
let mut diffs = vec![];
|
||||
for (j, (l, r)) in
|
||||
l.iter().zip(r.iter()).enumerate()
|
||||
{
|
||||
if l != r {
|
||||
tracing::debug!(
|
||||
"difference at index {}, {}: {} != {}",
|
||||
i,
|
||||
j,
|
||||
l.to_string(),
|
||||
r.to_string()
|
||||
);
|
||||
diffs
|
||||
.push(((i, j), (l.clone(), r.clone())));
|
||||
}
|
||||
}
|
||||
diffs
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
tracing::debug!("limbo values {:?}", limbo_values);
|
||||
tracing::debug!("rusqlite values {:?}", rusqlite_values);
|
||||
tracing::debug!(
|
||||
"differences: {}",
|
||||
diff.iter()
|
||||
.map(|((i, j), (l, r))| format!(
|
||||
"\t({i}, {j}): ({l}) != ({r})"
|
||||
))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n")
|
||||
);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"returned values from limbo and rusqlite results do not match".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
(Err(limbo_err), Err(rusqlite_err)) => {
|
||||
tracing::warn!(
|
||||
"limbo and rusqlite both fail, requires manual check"
|
||||
);
|
||||
tracing::warn!("limbo error {}", limbo_err);
|
||||
tracing::warn!("rusqlite error {}", rusqlite_err);
|
||||
}
|
||||
(Ok(limbo_result), Err(rusqlite_err)) => {
|
||||
tracing::error!(
|
||||
"limbo and rusqlite results do not match, limbo returned values but rusqlite failed"
|
||||
);
|
||||
tracing::error!("limbo values {:?}", limbo_result);
|
||||
tracing::error!("rusqlite error {}", rusqlite_err);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"limbo and rusqlite results do not match".into(),
|
||||
));
|
||||
}
|
||||
(Err(limbo_err), Ok(_)) => {
|
||||
tracing::error!(
|
||||
"limbo and rusqlite results do not match, limbo failed but rusqlite returned values"
|
||||
);
|
||||
tracing::error!("limbo error {}", limbo_err);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"limbo and rusqlite results do not match".into(),
|
||||
));
|
||||
}
|
||||
let diff = diff
|
||||
.iter()
|
||||
.flat_map(|(i, (l, r))| {
|
||||
let mut diffs = vec![];
|
||||
for (j, (l, r)) in l.iter().zip(r.iter()).enumerate() {
|
||||
if l != r {
|
||||
tracing::debug!(
|
||||
"difference at index {}, {}: {} != {}",
|
||||
i,
|
||||
j,
|
||||
l.to_string(),
|
||||
r.to_string()
|
||||
);
|
||||
diffs.push(((i, j), (l.clone(), r.clone())));
|
||||
}
|
||||
}
|
||||
diffs
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
tracing::debug!("limbo values {:?}", limbo_values);
|
||||
tracing::debug!("rusqlite values {:?}", rusqlite_values);
|
||||
tracing::debug!(
|
||||
"differences: {}",
|
||||
diff.iter()
|
||||
.map(|((i, j), (l, r))| format!(
|
||||
"\t({i}, {j}): ({l}) != ({r})"
|
||||
))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n")
|
||||
);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"returned values from limbo and rusqlite results do not match"
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
(None, None) => {}
|
||||
_ => {
|
||||
tracing::error!("limbo and rusqlite results do not match");
|
||||
(Err(limbo_err), Err(rusqlite_err)) => {
|
||||
tracing::warn!("limbo and rusqlite both fail, requires manual check");
|
||||
tracing::warn!("limbo error {}", limbo_err);
|
||||
tracing::warn!("rusqlite error {}", rusqlite_err);
|
||||
}
|
||||
(Ok(limbo_result), Err(rusqlite_err)) => {
|
||||
tracing::error!(
|
||||
"limbo and rusqlite results do not match, limbo returned values but rusqlite failed"
|
||||
);
|
||||
tracing::error!("limbo values {:?}", limbo_result);
|
||||
tracing::error!("rusqlite error {}", rusqlite_err);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"limbo and rusqlite results do not match".into(),
|
||||
));
|
||||
}
|
||||
(Err(limbo_err), Ok(_)) => {
|
||||
tracing::error!(
|
||||
"limbo and rusqlite results do not match, limbo failed but rusqlite returned values"
|
||||
);
|
||||
tracing::error!("limbo error {}", limbo_err);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"limbo and rusqlite results do not match".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Move to the next interaction or property
|
||||
match next_execution {
|
||||
ExecutionContinuation::NextInteraction => {
|
||||
if state.secondary_pointer + 1
|
||||
>= plan.plan[state.interaction_pointer].interactions().len()
|
||||
{
|
||||
// If we have reached the end of the interactions for this property, move to the next property
|
||||
state.interaction_pointer += 1;
|
||||
state.secondary_pointer = 0;
|
||||
} else {
|
||||
// Otherwise, move to the next interaction
|
||||
state.secondary_pointer += 1;
|
||||
}
|
||||
}
|
||||
ExecutionContinuation::NextProperty => {
|
||||
// Skip to the next property
|
||||
state.interaction_pointer += 1;
|
||||
state.secondary_pointer = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
(Err(err), Ok(_)) => {
|
||||
(None, None) => {}
|
||||
_ => {
|
||||
tracing::error!("limbo and rusqlite results do not match");
|
||||
tracing::error!("limbo error {}", err);
|
||||
return Err(err);
|
||||
}
|
||||
(Ok(val), Err(err)) => {
|
||||
tracing::error!("limbo and rusqlite results do not match");
|
||||
tracing::error!("limbo {:?}", val);
|
||||
tracing::error!("rusqlite error {}", err);
|
||||
return Err(err);
|
||||
}
|
||||
(Err(err), Err(err_rusqlite)) => {
|
||||
tracing::error!("limbo and rusqlite both fail, requires manual check");
|
||||
tracing::error!("limbo error {}", err);
|
||||
tracing::error!("rusqlite error {}", err_rusqlite);
|
||||
// todo: Previously, we returned an error here, but now we just log it.
|
||||
// The problem is that the errors might be different, and we cannot
|
||||
// just assume both of them being errors has the same semantics.
|
||||
// return Err(err);
|
||||
return Err(turso_core::LimboError::InternalError(
|
||||
"limbo and rusqlite results do not match".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => unreachable!("{} vs {}", connection, rusqlite_connection),
|
||||
(Err(err), Ok(_)) => {
|
||||
tracing::error!("limbo and rusqlite results do not match");
|
||||
tracing::error!("limbo error {}", err);
|
||||
return Err(err);
|
||||
}
|
||||
(Ok(val), Err(err)) => {
|
||||
tracing::error!("limbo and rusqlite results do not match");
|
||||
tracing::error!("limbo {:?}", val);
|
||||
tracing::error!("rusqlite error {}", err);
|
||||
return Err(err);
|
||||
}
|
||||
(Err(err), Err(err_rusqlite)) => {
|
||||
tracing::error!("limbo and rusqlite both fail, requires manual check");
|
||||
tracing::error!("limbo error {}", err);
|
||||
tracing::error!("rusqlite error {}", err_rusqlite);
|
||||
// todo: Previously, we returned an error here, but now we just log it.
|
||||
// The problem is that the errors might be different, and we cannot
|
||||
// just assume both of them being errors has the same semantics.
|
||||
// return Err(err);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn execute_interaction_rusqlite(
|
||||
env: &mut SimulatorEnv,
|
||||
connection_index: usize,
|
||||
interaction: &Interaction,
|
||||
stack: &mut Vec<ResultSet>,
|
||||
) -> turso_core::Result<ExecutionContinuation> {
|
||||
tracing::trace!(
|
||||
"execute_interaction_rusqlite(connection_index={}, interaction={})",
|
||||
connection_index,
|
||||
interaction
|
||||
);
|
||||
match &interaction.interaction {
|
||||
InteractionType::Query(query) => {
|
||||
let conn = match &mut env.connections[connection_index] {
|
||||
SimConnection::SQLiteConnection(conn) => conn,
|
||||
SimConnection::LimboConnection(_) => unreachable!(),
|
||||
SimConnection::Disconnected => unreachable!(),
|
||||
};
|
||||
|
||||
tracing::debug!("{}", interaction);
|
||||
let results = execute_query_rusqlite(conn, query).map_err(|e| {
|
||||
turso_core::LimboError::InternalError(format!("error executing query: {e}"))
|
||||
});
|
||||
tracing::debug!("{:?}", results);
|
||||
stack.push(results);
|
||||
}
|
||||
InteractionType::FsyncQuery(..) => {
|
||||
unimplemented!("cannot implement fsync query in rusqlite, as we do not control IO");
|
||||
}
|
||||
InteractionType::Assertion(_) => {
|
||||
interaction.execute_assertion(stack, env)?;
|
||||
stack.clear();
|
||||
}
|
||||
InteractionType::Assumption(_) => {
|
||||
let assumption_result = interaction.execute_assumption(stack, env);
|
||||
stack.clear();
|
||||
|
||||
if assumption_result.is_err() {
|
||||
tracing::warn!("assumption failed: {:?}", assumption_result);
|
||||
return Ok(ExecutionContinuation::NextProperty);
|
||||
}
|
||||
}
|
||||
InteractionType::Fault(_) => {
|
||||
interaction.execute_fault(env, connection_index)?;
|
||||
}
|
||||
InteractionType::FaultyQuery(_) => {
|
||||
unimplemented!("cannot implement faulty query in rusqlite, as we do not control IO");
|
||||
}
|
||||
}
|
||||
|
||||
let _ = interaction.shadow(&mut env.tables);
|
||||
Ok(ExecutionContinuation::NextInteraction)
|
||||
}
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use sql_generation::model::table::SimValue;
|
||||
use tracing::instrument;
|
||||
use turso_core::{Connection, LimboError, Result, StepResult};
|
||||
use turso_core::{Connection, LimboError, Result, StepResult, Value};
|
||||
|
||||
use crate::generation::{
|
||||
Shadow as _,
|
||||
plan::{ConnectionState, Interaction, InteractionPlanState, InteractionType, ResultSet},
|
||||
use crate::{
|
||||
generation::{
|
||||
Shadow as _,
|
||||
plan::{ConnectionState, Interaction, InteractionPlanState, InteractionType, ResultSet},
|
||||
},
|
||||
model::Query,
|
||||
};
|
||||
|
||||
use super::env::{SimConnection, SimulatorEnv};
|
||||
@@ -102,7 +106,7 @@ pub(crate) fn execute_interactions(
|
||||
ExecutionResult::new(history, None)
|
||||
}
|
||||
|
||||
fn execute_plan(
|
||||
pub fn execute_plan(
|
||||
env: &mut SimulatorEnv,
|
||||
interaction: &Interaction,
|
||||
conn_state: &mut ConnectionState,
|
||||
@@ -112,9 +116,7 @@ fn execute_plan(
|
||||
let connection = &mut env.connections[connection_index];
|
||||
if let SimConnection::Disconnected = connection {
|
||||
tracing::debug!("connecting {}", connection_index);
|
||||
*connection = SimConnection::LimboConnection(
|
||||
env.db.as_ref().expect("db to be Some").connect().unwrap(),
|
||||
);
|
||||
env.connect(connection_index);
|
||||
} else {
|
||||
tracing::debug!("connection {} already connected", connection_index);
|
||||
match execute_interaction(env, interaction, &mut conn_state.stack) {
|
||||
@@ -149,22 +151,35 @@ pub(crate) enum ExecutionContinuation {
|
||||
// NextProperty,
|
||||
}
|
||||
|
||||
#[instrument(skip(env, interaction, stack), fields(seed = %env.opts.seed, interaction = %interaction))]
|
||||
pub(crate) fn execute_interaction(
|
||||
pub fn execute_interaction(
|
||||
env: &mut SimulatorEnv,
|
||||
interaction: &Interaction,
|
||||
stack: &mut Vec<ResultSet>,
|
||||
) -> Result<ExecutionContinuation> {
|
||||
// Leave this empty info! here to print the span of the execution
|
||||
let connection = &mut env.connections[interaction.connection_index];
|
||||
match connection {
|
||||
SimConnection::LimboConnection(..) => execute_interaction_turso(env, interaction, stack),
|
||||
SimConnection::SQLiteConnection(..) => {
|
||||
execute_interaction_rusqlite(env, interaction, stack)
|
||||
}
|
||||
SimConnection::Disconnected => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(env, interaction, stack), fields(seed = %env.opts.seed, interaction = %interaction))]
|
||||
pub fn execute_interaction_turso(
|
||||
env: &mut SimulatorEnv,
|
||||
interaction: &Interaction,
|
||||
stack: &mut Vec<ResultSet>,
|
||||
) -> Result<ExecutionContinuation> {
|
||||
let SimConnection::LimboConnection(conn) = &mut env.connections[interaction.connection_index]
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
// Leave this empty info! here to print the span of the execution
|
||||
tracing::info!("");
|
||||
match &interaction.interaction {
|
||||
InteractionType::Query(_) => {
|
||||
let conn = match connection {
|
||||
SimConnection::LimboConnection(conn) => conn,
|
||||
SimConnection::SQLiteConnection(_) => unreachable!(),
|
||||
SimConnection::Disconnected => unreachable!(),
|
||||
};
|
||||
tracing::debug!(?interaction);
|
||||
let results = interaction.execute_query(conn);
|
||||
if results.is_err() {
|
||||
@@ -174,12 +189,6 @@ pub(crate) fn execute_interaction(
|
||||
limbo_integrity_check(conn)?;
|
||||
}
|
||||
InteractionType::FsyncQuery(query) => {
|
||||
let conn = match &connection {
|
||||
SimConnection::LimboConnection(conn) => conn.clone(),
|
||||
SimConnection::SQLiteConnection(_) => unreachable!(),
|
||||
SimConnection::Disconnected => unreachable!(),
|
||||
};
|
||||
|
||||
let results = interaction.execute_fsync_query(conn.clone(), env);
|
||||
if results.is_err() {
|
||||
tracing::error!(?results);
|
||||
@@ -211,12 +220,7 @@ pub(crate) fn execute_interaction(
|
||||
interaction.execute_fault(env, interaction.connection_index)?;
|
||||
}
|
||||
InteractionType::FaultyQuery(_) => {
|
||||
let conn = match &connection {
|
||||
SimConnection::LimboConnection(conn) => conn.clone(),
|
||||
SimConnection::SQLiteConnection(_) => unreachable!(),
|
||||
SimConnection::Disconnected => unreachable!(),
|
||||
};
|
||||
|
||||
let conn = conn.clone();
|
||||
let results = interaction.execute_faulty_query(&conn, env);
|
||||
if results.is_err() {
|
||||
tracing::error!(?results);
|
||||
@@ -272,3 +276,123 @@ fn limbo_integrity_check(conn: &Arc<Connection>) -> Result<()> {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn execute_interaction_rusqlite(
|
||||
env: &mut SimulatorEnv,
|
||||
interaction: &Interaction,
|
||||
stack: &mut Vec<ResultSet>,
|
||||
) -> turso_core::Result<ExecutionContinuation> {
|
||||
tracing::trace!(
|
||||
"execute_interaction_rusqlite(connection_index={}, interaction={})",
|
||||
interaction.connection_index,
|
||||
interaction
|
||||
);
|
||||
let SimConnection::SQLiteConnection(conn) = &mut env.connections[interaction.connection_index]
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
match &interaction.interaction {
|
||||
InteractionType::Query(query) => {
|
||||
tracing::debug!("{}", interaction);
|
||||
let results = execute_query_rusqlite(conn, query).map_err(|e| {
|
||||
turso_core::LimboError::InternalError(format!("error executing query: {e}"))
|
||||
});
|
||||
tracing::debug!("{:?}", results);
|
||||
stack.push(results);
|
||||
}
|
||||
InteractionType::FsyncQuery(..) => {
|
||||
unimplemented!("cannot implement fsync query in rusqlite, as we do not control IO");
|
||||
}
|
||||
InteractionType::Assertion(_) => {
|
||||
interaction.execute_assertion(stack, env)?;
|
||||
stack.clear();
|
||||
}
|
||||
InteractionType::Assumption(_) => {
|
||||
let assumption_result = interaction.execute_assumption(stack, env);
|
||||
stack.clear();
|
||||
|
||||
if assumption_result.is_err() {
|
||||
tracing::warn!("assumption failed: {:?}", assumption_result);
|
||||
todo!("remove assumptions");
|
||||
// return Ok(ExecutionContinuation::NextProperty);
|
||||
}
|
||||
}
|
||||
InteractionType::Fault(_) => {
|
||||
interaction.execute_fault(env, interaction.connection_index)?;
|
||||
}
|
||||
InteractionType::FaultyQuery(_) => {
|
||||
unimplemented!("cannot implement faulty query in rusqlite, as we do not control IO");
|
||||
}
|
||||
}
|
||||
|
||||
let _ = interaction.shadow(&mut env.tables);
|
||||
Ok(ExecutionContinuation::NextInteraction)
|
||||
}
|
||||
|
||||
fn execute_query_rusqlite(
|
||||
connection: &rusqlite::Connection,
|
||||
query: &Query,
|
||||
) -> rusqlite::Result<Vec<Vec<SimValue>>> {
|
||||
match query {
|
||||
Query::Create(create) => {
|
||||
connection.execute(create.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::Select(select) => {
|
||||
let mut stmt = connection.prepare(select.to_string().as_str())?;
|
||||
let columns = stmt.column_count();
|
||||
let rows = stmt.query_map([], |row| {
|
||||
let mut values = vec![];
|
||||
for i in 0..columns {
|
||||
let value = row.get_unwrap(i);
|
||||
let value = match value {
|
||||
rusqlite::types::Value::Null => Value::Null,
|
||||
rusqlite::types::Value::Integer(i) => Value::Integer(i),
|
||||
rusqlite::types::Value::Real(f) => Value::Float(f),
|
||||
rusqlite::types::Value::Text(s) => Value::build_text(s),
|
||||
rusqlite::types::Value::Blob(b) => Value::Blob(b),
|
||||
};
|
||||
values.push(SimValue(value));
|
||||
}
|
||||
Ok(values)
|
||||
})?;
|
||||
let mut result = vec![];
|
||||
for row in rows {
|
||||
result.push(row?);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
Query::Insert(insert) => {
|
||||
connection.execute(insert.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::Delete(delete) => {
|
||||
connection.execute(delete.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::Drop(drop) => {
|
||||
connection.execute(drop.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::Update(update) => {
|
||||
connection.execute(update.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::CreateIndex(create_index) => {
|
||||
connection.execute(create_index.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::Begin(begin) => {
|
||||
connection.execute(begin.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::Commit(commit) => {
|
||||
connection.execute(commit.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
Query::Rollback(rollback) => {
|
||||
connection.execute(rollback.to_string().as_str(), ())?;
|
||||
Ok(vec![])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user