From b3555680236489623df435200ce0f64770cda3ce Mon Sep 17 00:00:00 2001 From: alpaylan Date: Thu, 19 Dec 2024 23:40:04 -0500 Subject: [PATCH] use ticks as the main simulator driver, handle disconnects correctly, add multi-connection setup --- simulator/generation/plan.rs | 62 +++++++++++++++++- simulator/main.rs | 122 +++++++++++++++++++++-------------- 2 files changed, 136 insertions(+), 48 deletions(-) diff --git a/simulator/generation/plan.rs b/simulator/generation/plan.rs index d7c309b93..b1a233f9e 100644 --- a/simulator/generation/plan.rs +++ b/simulator/generation/plan.rs @@ -9,7 +9,7 @@ use crate::{ query::{Create, Insert, Predicate, Query, Select}, table::Value, }, - SimulatorEnv, SimulatorOpts, + SimConnection, SimulatorEnv, SimulatorOpts, }; use crate::generation::{frequency, Arbitrary, ArbitraryFrom}; @@ -21,6 +21,7 @@ pub(crate) type ResultSet = Vec>; pub(crate) struct InteractionPlan { pub(crate) plan: Vec, pub(crate) stack: Vec, + pub(crate) interaction_pointer: usize, } impl Display for InteractionPlan { @@ -31,6 +32,7 @@ impl Display for InteractionPlan { Interaction::Assertion(assertion) => { write!(f, "-- ASSERT: {};\n", assertion.message)? } + Interaction::Fault(fault) => write!(f, "-- FAULT: {};\n", fault)?, } } @@ -58,6 +60,7 @@ impl Display for InteractionStats { pub(crate) enum Interaction { Query(Query), Assertion(Assertion), + Fault(Fault), } impl Display for Interaction { @@ -65,6 +68,7 @@ impl Display for Interaction { match self { Interaction::Query(query) => write!(f, "{}", query), Interaction::Assertion(assertion) => write!(f, "ASSERT: {}", assertion.message), + Interaction::Fault(fault) => write!(f, "FAULT: {}", fault), } } } @@ -74,6 +78,18 @@ pub(crate) struct Assertion { pub(crate) message: String, } +pub(crate) enum Fault { + Disconnect, +} + +impl Display for Fault { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Fault::Disconnect => write!(f, "DISCONNECT"), + } + } +} + pub(crate) struct Interactions(Vec); impl Interactions { @@ -96,6 +112,7 @@ impl Interactions { Query::Select(_) => {} }, Interaction::Assertion(_) => {} + Interaction::Fault(_) => {} } } } @@ -106,6 +123,7 @@ impl InteractionPlan { InteractionPlan { plan: Vec::new(), stack: Vec::new(), + interaction_pointer: 0, } } @@ -127,6 +145,7 @@ impl InteractionPlan { Query::Create(_) => {} }, Interaction::Assertion(_) => {} + Interaction::Fault(_) => {} } } @@ -223,6 +242,9 @@ impl Interaction { Interaction::Assertion(_) => { unreachable!("unexpected: this function should only be called on queries") } + Interaction::Fault(fault) => { + unreachable!("unexpected: this function should only be called on queries") + } } } @@ -239,6 +261,38 @@ impl Interaction { } Ok(()) } + Interaction::Fault(_) => { + unreachable!("unexpected: this function should only be called on assertions") + } + } + } + + pub(crate) fn execute_fault(&self, env: &mut SimulatorEnv, conn_index: usize) -> Result<()> { + match self { + Interaction::Query(_) => { + unreachable!("unexpected: this function should only be called on faults") + } + Interaction::Assertion(_) => { + unreachable!("unexpected: this function should only be called on faults") + } + Interaction::Fault(fault) => { + match fault { + Fault::Disconnect => { + match env.connections[conn_index] { + SimConnection::Connected(ref mut conn) => { + conn.close()?; + } + SimConnection::Disconnected => { + return Err(limbo_core::LimboError::InternalError( + "Tried to disconnect a disconnected connection".to_string(), + )); + } + } + env.connections[conn_index] = SimConnection::Disconnected; + } + } + Ok(()) + } } } } @@ -307,6 +361,11 @@ fn random_write(rng: &mut R, env: &SimulatorEnv) -> Interactions { Interactions(vec![insert_query]) } +fn random_fault(rng: &mut R, env: &SimulatorEnv) -> Interactions { + let fault = Interaction::Fault(Fault::Disconnect); + Interactions(vec![fault]) +} + impl ArbitraryFrom<(&SimulatorEnv, InteractionStats)> for Interactions { fn arbitrary_from( rng: &mut R, @@ -334,6 +393,7 @@ impl ArbitraryFrom<(&SimulatorEnv, InteractionStats)> for Interactions { Box::new(|rng: &mut R| random_write(rng, env)), ), (1, Box::new(|rng: &mut R| create_table(rng, env))), + (1, Box::new(|rng: &mut R| random_fault(rng, env))), ], rng, ) diff --git a/simulator/main.rs b/simulator/main.rs index 67d9b92f9..dfdc974d3 100644 --- a/simulator/main.rs +++ b/simulator/main.rs @@ -1,4 +1,4 @@ -use generation::plan::{Interaction, ResultSet}; +use generation::plan::{Interaction, InteractionPlan, ResultSet}; use generation::{pick, pick_index, Arbitrary, ArbitraryFrom}; use limbo_core::{Connection, Database, File, OpenFlags, PlatformIO, Result, RowResult, IO}; use model::query::{Create, Insert, Predicate, Query, Select}; @@ -66,7 +66,7 @@ fn main() { }; let opts = SimulatorOpts { - ticks: rng.gen_range(0..4096), + ticks: rng.gen_range(0..1024), max_connections: 1, // TODO: for now let's use one connection as we didn't implement // correct transactions procesing max_tables: rng.gen_range(0..128), @@ -74,7 +74,7 @@ fn main() { write_percent, delete_percent, page_size: 4096, // TODO: randomize this too - max_interactions: rng.gen_range(0..10000), + max_interactions: rng.gen_range(0..1024), }; let io = Arc::new(SimulatorIO::new(seed, opts.page_size).unwrap()); @@ -101,63 +101,87 @@ fn main() { println!("Initial opts {:?}", env.opts); log::info!("Generating database interaction plan..."); - let mut plan = generation::plan::InteractionPlan::arbitrary_from(&mut env.rng.clone(), &env); + let mut plans = (1..=env.opts.max_connections) + .map(|_| InteractionPlan::arbitrary_from(&mut env.rng.clone(), &env)) + .collect::>(); - log::info!("{}", plan.stats()); + log::info!("{}", plans[0].stats()); - for interaction in &plan.plan { - let connection_index = pick_index(env.connections.len(), &mut env.rng); - let mut connection = env.connections[connection_index].clone(); + log::info!("Executing database interaction plan..."); + let result = execute_plans(&mut env, &mut plans); - if matches!(connection, SimConnection::Disconnected) { - connection = SimConnection::Connected(env.db.connect()); - env.connections[connection_index] = connection.clone(); - } - - match &mut connection { - SimConnection::Connected(conn) => { - let disconnect = env.rng.gen_ratio(1, 100); - if disconnect { - log::info!("disconnecting {}", connection_index); - let _ = conn.close(); - env.connections[connection_index] = SimConnection::Disconnected; - } else { - match process_connection(conn, interaction, &mut plan.stack) { - Ok(_) => { - log::info!("connection {} processed", connection_index); - } - Err(err) => { - log::error!("error {}", err); - log::debug!("db is at {:?}", path); - // save the interaction plan - let mut path = TempDir::new().unwrap().into_path(); - path.push("simulator.plan"); - let mut f = std::fs::File::create(path.clone()).unwrap(); - f.write(plan.to_string().as_bytes()).unwrap(); - log::debug!("plan saved at {:?}", path); - log::debug!("seed was {}", seed); - break; - } - } - } - } - SimConnection::Disconnected => { - log::info!("disconnecting {}", connection_index); - env.connections[connection_index] = SimConnection::Connected(env.db.connect()); - } - } + if result.is_err() { + log::error!("error executing plans: {:?}", result.err()); } + log::info!("db is at {:?}", path); + let mut path = TempDir::new().unwrap().into_path(); + path.push("simulator.plan"); + let mut f = std::fs::File::create(path.clone()).unwrap(); + f.write(plans[0].to_string().as_bytes()).unwrap(); + log::info!("plan saved at {:?}", path); + log::info!("seed was {}", seed); env.io.print_stats(); } -fn process_connection( - conn: &mut Rc, +fn execute_plans(env: &mut SimulatorEnv, plans: &mut Vec) -> Result<()> { + // todo: add history here by recording which interaction was executed at which tick + for _tick in 0..env.opts.ticks { + // Pick the connection to interact with + let connection_index = pick_index(env.connections.len(), &mut env.rng); + // Execute the interaction for the selected connection + execute_plan(env, connection_index, plans)?; + } + + Ok(()) +} + +fn execute_plan( + env: &mut SimulatorEnv, + connection_index: usize, + plans: &mut Vec, +) -> Result<()> { + let connection = &env.connections[connection_index]; + let plan = &mut plans[connection_index]; + + if plan.interaction_pointer >= plan.plan.len() { + return Ok(()); + } + + let interaction = &plan.plan[plan.interaction_pointer]; + + if let SimConnection::Disconnected = connection { + log::info!("connecting {}", connection_index); + env.connections[connection_index] = SimConnection::Connected(env.db.connect()); + } else { + match execute_interaction(env, connection_index, interaction, &mut plan.stack) { + Ok(_) => { + log::debug!("connection {} processed", connection_index); + plan.interaction_pointer += 1; + } + Err(err) => { + log::error!("error {}", err); + return Err(err); + } + } + } + + Ok(()) +} + +fn execute_interaction( + env: &mut SimulatorEnv, + connection_index: usize, interaction: &Interaction, stack: &mut Vec, ) -> Result<()> { match interaction { generation::plan::Interaction::Query(_) => { + let conn = match &mut env.connections[connection_index] { + SimConnection::Connected(conn) => conn, + SimConnection::Disconnected => unreachable!(), + }; + log::debug!("{}", interaction); let results = interaction.execute_query(conn)?; log::debug!("{:?}", results); @@ -165,6 +189,10 @@ fn process_connection( } generation::plan::Interaction::Assertion(_) => { interaction.execute_assertion(stack)?; + stack.clear(); + } + Interaction::Fault(_) => { + interaction.execute_fault(env, connection_index)?; } }