From f795a9e331b4b337e6df6ebcadc25d2b0a36ac61 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 10 Apr 2025 13:41:10 +0200 Subject: [PATCH 1/2] Add support to load log file with stress test --- stress/main.rs | 120 ++++++++++++++++++++++++++++++++++++++----------- stress/opts.rs | 9 ++++ 2 files changed, 103 insertions(+), 26 deletions(-) diff --git a/stress/main.rs b/stress/main.rs index c0c42449d..cbef2d42e 100644 --- a/stress/main.rs +++ b/stress/main.rs @@ -10,9 +10,16 @@ use opts::Opts; use serde_json::json; use std::collections::HashSet; use std::fs::File; -use std::io::Write; +use std::io::{Read, Write}; use std::sync::{Arc, Mutex}; +pub struct Plan { + pub ddl_statements: Vec, + pub queries_per_thread: Vec>, + pub nr_iterations: usize, + pub nr_threads: usize, +} + /// Represents a column in a SQLite table #[derive(Debug, Clone)] pub struct Column { @@ -157,9 +164,9 @@ impl ArbitrarySchema { col_def }) .collect::>() - .join(",\n"); + .join(","); - format!("CREATE TABLE {} (\n{}\n);", table.name, columns) + format!("CREATE TABLE {} ({});", table.name, columns) }) .collect() } @@ -287,6 +294,76 @@ fn generate_random_statement(schema: &ArbitrarySchema) -> String { } } +fn generate_plan(opts: &Opts) -> Result> { + let schema = gen_schema(); + // Write DDL statements to log file + let mut log_file = File::create(&opts.log_file)?; + let ddl_statements = schema.to_sql(); + let mut plan = Plan { + ddl_statements: vec![], + queries_per_thread: vec![], + nr_iterations: opts.nr_iterations, + nr_threads: opts.nr_threads, + }; + writeln!(log_file, "{}", opts.nr_threads)?; + writeln!(log_file, "{}", opts.nr_iterations)?; + writeln!(log_file, "{}", ddl_statements.len())?; + for stmt in &ddl_statements { + writeln!(log_file, "{}", stmt)?; + } + plan.ddl_statements = ddl_statements; + for _ in 0..opts.nr_threads { + let mut queries = vec![]; + for _ in 0..opts.nr_iterations { + let sql = generate_random_statement(&schema); + writeln!(log_file, "{}", sql)?; + queries.push(sql); + } + } + Ok(plan) +} + +fn read_plan_from_log_file(opts: &Opts) -> Result> { + let mut file = File::open(&opts.log_file)?; + let mut buf = String::new(); + let mut plan = Plan { + ddl_statements: vec![], + queries_per_thread: vec![], + nr_iterations: 0, + nr_threads: 0, + }; + file.read_to_string(&mut buf).unwrap(); + let mut lines = buf.lines(); + plan.nr_threads = lines.next().expect("missing threads").parse().unwrap(); + plan.nr_iterations = lines + .next() + .expect("missing nr_iterations") + .parse() + .unwrap(); + let nr_ddl = lines + .next() + .expect("number of ddl statements") + .parse() + .unwrap(); + for _ in 0..nr_ddl { + plan.ddl_statements + .push(lines.next().expect("expected ddl statement").to_string()); + } + for i in 0..plan.nr_threads { + let mut queries = vec![]; + for _ in 0..plan.nr_iterations { + queries.push( + lines + .next() + .expect(format!("missing query for thread {}", i).as_str()) + .to_string(), + ); + } + plan.queries_per_thread.push(queries); + } + Ok(plan) +} + #[tokio::main] async fn main() -> Result<(), Box> { let (num_nodes, main_id) = (1, "n-001"); @@ -297,29 +374,25 @@ async fn main() -> Result<(), Box> { lifecycle::setup_complete(&startup_data); antithesis_init(); - let schema = gen_schema(); - let ddl_statements = schema.to_sql(); + let mut opts = Opts::parse(); - let opts = Opts::parse(); - let log_file = File::create(&opts.log_file)?; - let log_file = Arc::new(Mutex::new(log_file)); - - // Write DDL statements to log file - { - let mut file = log_file.lock().unwrap(); - for stmt in &ddl_statements { - writeln!(file, "{}", stmt)?; - } - } + let plan = if opts.load_log { + read_plan_from_log_file(&mut opts)? + } else { + generate_plan(&opts)? + }; let mut handles = Vec::with_capacity(opts.nr_threads); + let plan = Arc::new(plan); - for _ in 0..opts.nr_threads { + for thread in 0..opts.nr_threads { let db = Arc::new(Builder::new_local(&opts.db_file).build().await?); + let plan = plan.clone(); let conn = db.connect()?; // Apply each DDL statement individually - for stmt in &ddl_statements { + for stmt in &plan.ddl_statements { + println!("executing ddl {}", stmt); if let Err(e) = conn.execute(stmt, ()).await { println!("Error creating table: {}", e); } @@ -327,17 +400,12 @@ async fn main() -> Result<(), Box> { let nr_iterations = opts.nr_iterations; let db = db.clone(); - let schema = schema.clone(); - let log_file = log_file.clone(); let handle = tokio::spawn(async move { let conn = db.connect()?; - for _ in 0..nr_iterations { - let sql = generate_random_statement(&schema); - { - let mut file = log_file.lock().unwrap(); - writeln!(file, "{}", sql)?; - } + for query_index in 0..nr_iterations { + let sql = &plan.queries_per_thread[thread][query_index]; + println!("executing: {}", sql); if let Err(e) = conn.execute(&sql, ()).await { println!("Error: {}", e); } diff --git a/stress/opts.rs b/stress/opts.rs index 3084431c5..a8cbb5b2a 100644 --- a/stress/opts.rs +++ b/stress/opts.rs @@ -26,6 +26,15 @@ pub struct Opts { )] pub log_file: String, + /// Load log file instead of creating a new one + #[clap( + short = 'L', + long = "load-log", + help = "load log file instead of creating a new one", + default_value_t = false + )] + pub load_log: bool, + /// Database file #[clap( short = 'd', From cdcbcafbdda681a4d2a978eb3264e166f2fcb855 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 10 Apr 2025 13:46:40 +0200 Subject: [PATCH 2/2] clipppy --- stress/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/stress/main.rs b/stress/main.rs index cbef2d42e..bd687a231 100644 --- a/stress/main.rs +++ b/stress/main.rs @@ -11,7 +11,7 @@ use serde_json::json; use std::collections::HashSet; use std::fs::File; use std::io::{Read, Write}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; pub struct Plan { pub ddl_statements: Vec, @@ -349,13 +349,13 @@ fn read_plan_from_log_file(opts: &Opts) -> Result