Merge 'Add support to load log file with stress test' from Pere Diaz Bou

run with: `RUST_BACKTRACE=1 cargo run -p limbo_stress -- -t 1 -l`
and then if you want to repeat same plan: `RUST_BACKTRACE=1 cargo run -p
limbo_stress -- -t 1 -L`

Closes #1296
This commit is contained in:
Pere Diaz Bou
2025-04-10 16:01:11 +02:00
2 changed files with 104 additions and 27 deletions

View File

@@ -10,8 +10,15 @@ use opts::Opts;
use serde_json::json;
use std::collections::HashSet;
use std::fs::File;
use std::io::Write;
use std::sync::{Arc, Mutex};
use std::io::{Read, Write};
use std::sync::Arc;
pub struct Plan {
pub ddl_statements: Vec<String>,
pub queries_per_thread: Vec<Vec<String>>,
pub nr_iterations: usize,
pub nr_threads: usize,
}
/// Represents a column in a SQLite table
#[derive(Debug, Clone)]
@@ -157,9 +164,9 @@ impl ArbitrarySchema {
col_def
})
.collect::<Vec<_>>()
.join(",\n");
.join(",");
format!("CREATE TABLE {} (\n{}\n);", table.name, columns)
format!("CREATE TABLE {} ({});", table.name, columns)
})
.collect()
}
@@ -287,6 +294,76 @@ fn generate_random_statement(schema: &ArbitrarySchema) -> String {
}
}
fn generate_plan(opts: &Opts) -> Result<Plan, Box<dyn std::error::Error + Send + Sync>> {
let schema = gen_schema();
// Write DDL statements to log file
let mut log_file = File::create(&opts.log_file)?;
let ddl_statements = schema.to_sql();
let mut plan = Plan {
ddl_statements: vec![],
queries_per_thread: vec![],
nr_iterations: opts.nr_iterations,
nr_threads: opts.nr_threads,
};
writeln!(log_file, "{}", opts.nr_threads)?;
writeln!(log_file, "{}", opts.nr_iterations)?;
writeln!(log_file, "{}", ddl_statements.len())?;
for stmt in &ddl_statements {
writeln!(log_file, "{}", stmt)?;
}
plan.ddl_statements = ddl_statements;
for _ in 0..opts.nr_threads {
let mut queries = vec![];
for _ in 0..opts.nr_iterations {
let sql = generate_random_statement(&schema);
writeln!(log_file, "{}", sql)?;
queries.push(sql);
}
}
Ok(plan)
}
fn read_plan_from_log_file(opts: &Opts) -> Result<Plan, Box<dyn std::error::Error + Send + Sync>> {
let mut file = File::open(&opts.log_file)?;
let mut buf = String::new();
let mut plan = Plan {
ddl_statements: vec![],
queries_per_thread: vec![],
nr_iterations: 0,
nr_threads: 0,
};
file.read_to_string(&mut buf).unwrap();
let mut lines = buf.lines();
plan.nr_threads = lines.next().expect("missing threads").parse().unwrap();
plan.nr_iterations = lines
.next()
.expect("missing nr_iterations")
.parse()
.unwrap();
let nr_ddl = lines
.next()
.expect("number of ddl statements")
.parse()
.unwrap();
for _ in 0..nr_ddl {
plan.ddl_statements
.push(lines.next().expect("expected ddl statement").to_string());
}
for _ in 0..plan.nr_threads {
let mut queries = vec![];
for _ in 0..plan.nr_iterations {
queries.push(
lines
.next()
.expect("missing query for thread {}")
.to_string(),
);
}
plan.queries_per_thread.push(queries);
}
Ok(plan)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let (num_nodes, main_id) = (1, "n-001");
@@ -297,29 +374,25 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
lifecycle::setup_complete(&startup_data);
antithesis_init();
let schema = gen_schema();
let ddl_statements = schema.to_sql();
let mut opts = Opts::parse();
let opts = Opts::parse();
let log_file = File::create(&opts.log_file)?;
let log_file = Arc::new(Mutex::new(log_file));
// Write DDL statements to log file
{
let mut file = log_file.lock().unwrap();
for stmt in &ddl_statements {
writeln!(file, "{}", stmt)?;
}
}
let plan = if opts.load_log {
read_plan_from_log_file(&mut opts)?
} else {
generate_plan(&opts)?
};
let mut handles = Vec::with_capacity(opts.nr_threads);
let plan = Arc::new(plan);
for _ in 0..opts.nr_threads {
for thread in 0..opts.nr_threads {
let db = Arc::new(Builder::new_local(&opts.db_file).build().await?);
let plan = plan.clone();
let conn = db.connect()?;
// Apply each DDL statement individually
for stmt in &ddl_statements {
for stmt in &plan.ddl_statements {
println!("executing ddl {}", stmt);
if let Err(e) = conn.execute(stmt, ()).await {
println!("Error creating table: {}", e);
}
@@ -327,17 +400,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let nr_iterations = opts.nr_iterations;
let db = db.clone();
let schema = schema.clone();
let log_file = log_file.clone();
let handle = tokio::spawn(async move {
let conn = db.connect()?;
for _ in 0..nr_iterations {
let sql = generate_random_statement(&schema);
{
let mut file = log_file.lock().unwrap();
writeln!(file, "{}", sql)?;
}
for query_index in 0..nr_iterations {
let sql = &plan.queries_per_thread[thread][query_index];
println!("executing: {}", sql);
if let Err(e) = conn.execute(&sql, ()).await {
println!("Error: {}", e);
}

View File

@@ -26,6 +26,15 @@ pub struct Opts {
)]
pub log_file: String,
/// Load log file instead of creating a new one
#[clap(
short = 'L',
long = "load-log",
help = "load log file instead of creating a new one",
default_value_t = false
)]
pub load_log: bool,
/// Database file
#[clap(
short = 'd',