From 6aaa105321f4b67e53838e36c161a8ef298ba9b6 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 10 Apr 2025 11:23:53 +0300 Subject: [PATCH 1/6] stress: Add schema generation support --- Cargo.lock | 1 + stress/Cargo.toml | 1 + stress/main.rs | 183 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 185 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 96aec95db..f9f912ea3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1894,6 +1894,7 @@ dependencies = [ name = "limbo_stress" version = "0.0.19-pre.4" dependencies = [ + "anarchist-readable-name-generator-lib", "antithesis_sdk", "clap", "limbo", diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 59c4e8256..3bd7c295b 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -20,3 +20,4 @@ clap = { version = "4.5", features = ["derive"] } limbo = { path = "../bindings/rust" } serde_json = "1.0.139" tokio = { version = "1.29.1", features = ["full"] } +anarchist-readable-name-generator-lib = "0.1.0" diff --git a/stress/main.rs b/stress/main.rs index c62714e63..c46215481 100644 --- a/stress/main.rs +++ b/stress/main.rs @@ -1,12 +1,188 @@ mod opts; +use anarchist_readable_name_generator_lib::readable_name_custom; +use antithesis_sdk::random::{get_random, AntithesisRng}; use antithesis_sdk::*; use clap::Parser; use limbo::{Builder, Value}; use opts::Opts; use serde_json::json; +use std::collections::HashSet; use std::sync::Arc; +/// Represents a column in a SQLite table +#[derive(Debug, Clone)] +pub struct Column { + pub name: String, + pub data_type: DataType, + pub constraints: Vec, +} + +/// Represents SQLite data types +#[derive(Debug, Clone)] +pub enum DataType { + Integer, + Real, + Text, + Blob, + Numeric, +} + +/// Represents column constraints +#[derive(Debug, Clone)] +pub enum Constraint { + PrimaryKey, + NotNull, + Unique, +} + +/// Represents a table in a SQLite schema +#[derive(Debug, Clone)] +pub struct Table { + pub name: String, + pub columns: Vec, +} + +/// Represents a complete SQLite schema +#[derive(Debug, Clone)] +pub struct ArbitrarySchema { + pub tables: Vec, +} + +// Helper functions for generating random data +fn generate_random_identifier() -> String { + readable_name_custom("_", AntithesisRng).replace('-', "_") +} + +fn generate_random_data_type() -> DataType { + match get_random() % 5 { + 0 => DataType::Integer, + 1 => DataType::Real, + 2 => DataType::Text, + 3 => DataType::Blob, + _ => DataType::Numeric, + } +} + +fn generate_random_constraint() -> Constraint { + match get_random() % 2 { + 0 => Constraint::NotNull, + _ => Constraint::Unique, + } +} + +fn generate_random_column() -> Column { + let name = generate_random_identifier(); + let data_type = generate_random_data_type(); + + let constraint_count = (get_random() % 3) as usize; + let mut constraints = Vec::with_capacity(constraint_count); + + for _ in 0..constraint_count { + constraints.push(generate_random_constraint()); + } + + Column { + name, + data_type, + constraints, + } +} + +fn generate_random_table() -> Table { + let name = generate_random_identifier(); + let column_count = (get_random() % 10 + 1) as usize; + let mut columns = Vec::with_capacity(column_count); + let mut column_names = HashSet::new(); + + // First, generate all columns without primary keys + for _ in 0..column_count { + let mut column = generate_random_column(); + + // Ensure column names are unique within the table + while column_names.contains(&column.name) { + column.name = generate_random_identifier(); + } + + column_names.insert(column.name.clone()); + columns.push(column); + } + + // Then, randomly select one column to be the primary key + let pk_index = (get_random() % column_count as u64) as usize; + columns[pk_index].constraints.push(Constraint::PrimaryKey); + + Table { name, columns } +} + +pub fn gen_schema() -> ArbitrarySchema { + let table_count = (get_random() % 10 + 1) as usize; + let mut tables = Vec::with_capacity(table_count); + let mut table_names = HashSet::new(); + + for _ in 0..table_count { + let mut table = generate_random_table(); + + // Ensure table names are unique + while table_names.contains(&table.name) { + table.name = generate_random_identifier(); + } + + table_names.insert(table.name.clone()); + tables.push(table); + } + + ArbitrarySchema { tables } +} + +impl ArbitrarySchema { + pub fn to_sql(&self) -> String { + let mut sql = String::new(); + + for table in &self.tables { + sql.push_str(&format!("CREATE TABLE {} (\n", table.name)); + + for (i, column) in table.columns.iter().enumerate() { + if i > 0 { + sql.push_str(",\n"); + } + + sql.push_str(&format!( + " {} {}", + column.name, + data_type_to_sql(&column.data_type) + )); + + for constraint in &column.constraints { + sql.push_str(&format!(" {}", constraint_to_sql(constraint))); + } + } + + sql.push_str("\n);\n\n"); + } + + sql + } +} + +fn data_type_to_sql(data_type: &DataType) -> &'static str { + match data_type { + DataType::Integer => "INTEGER", + DataType::Real => "REAL", + DataType::Text => "TEXT", + DataType::Blob => "BLOB", + DataType::Numeric => "NUMERIC", + } +} + +fn constraint_to_sql(constraint: &Constraint) -> String { + match constraint { + Constraint::PrimaryKey => "PRIMARY KEY".to_string(), + Constraint::NotNull => "NOT NULL".to_string(), + Constraint::Unique => "UNIQUE".to_string(), + } +} + #[tokio::main] async fn main() { let (num_nodes, main_id) = (1, "n-001"); @@ -17,12 +193,19 @@ async fn main() { lifecycle::setup_complete(&startup_data); antithesis_init(); + let schema = gen_schema(); + + let schema_sql = schema.to_sql(); + + println!("{}", schema_sql); let opts = Opts::parse(); let mut handles = Vec::new(); for _ in 0..opts.nr_threads { // TODO: share the database between threads let db = Arc::new(Builder::new_local(":memory:").build().await.unwrap()); + let conn = db.connect().unwrap(); + conn.execute(&schema_sql, ()).await.unwrap(); let nr_iterations = opts.nr_iterations; let db = db.clone(); let handle = tokio::spawn(async move { From 207563208f1734380b7a09e021f43c7e45a4a375 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 10 Apr 2025 11:51:04 +0300 Subject: [PATCH 2/6] stress: Add support for INSERT, DELETE, and UPDATE --- Cargo.lock | 1 + bindings/rust/src/lib.rs | 6 +- stress/Cargo.toml | 1 + stress/main.rs | 142 ++++++++++++++++++++++++++++++++++----- 4 files changed, 131 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f9f912ea3..14c1df80e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1897,6 +1897,7 @@ dependencies = [ "anarchist-readable-name-generator-lib", "antithesis_sdk", "clap", + "hex", "limbo", "serde_json", "tokio", diff --git a/bindings/rust/src/lib.rs b/bindings/rust/src/lib.rs index 60a7ffd77..61e6271c9 100644 --- a/bindings/rust/src/lib.rs +++ b/bindings/rust/src/lib.rs @@ -17,11 +17,13 @@ pub enum Error { ToSqlConversionFailure(BoxError), #[error("Mutex lock error: {0}")] MutexError(String), + #[error("SQL execution failure: `{0}`")] + SqlExecutionFailure(String), } impl From for Error { - fn from(_err: limbo_core::LimboError) -> Self { - todo!(); + fn from(err: limbo_core::LimboError) -> Self { + Error::SqlExecutionFailure(err.to_string()) } } diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 3bd7c295b..6f7a0a9e9 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -21,3 +21,4 @@ limbo = { path = "../bindings/rust" } serde_json = "1.0.139" tokio = { version = "1.29.1", features = ["full"] } anarchist-readable-name-generator-lib = "0.1.0" +hex = "0.4" diff --git a/stress/main.rs b/stress/main.rs index c46215481..3b1f53cb8 100644 --- a/stress/main.rs +++ b/stress/main.rs @@ -4,7 +4,8 @@ use anarchist_readable_name_generator_lib::readable_name_custom; use antithesis_sdk::random::{get_random, AntithesisRng}; use antithesis_sdk::*; use clap::Parser; -use limbo::{Builder, Value}; +use hex; +use limbo::Builder; use opts::Opts; use serde_json::json; use std::collections::HashSet; @@ -29,7 +30,7 @@ pub enum DataType { } /// Represents column constraints -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum Constraint { PrimaryKey, NotNull, @@ -183,8 +184,112 @@ fn constraint_to_sql(constraint: &Constraint) -> String { } } +/// Generate a random value for a given data type +fn generate_random_value(data_type: &DataType) -> String { + match data_type { + DataType::Integer => (get_random() % 1000).to_string(), + DataType::Real => format!("{:.2}", (get_random() % 1000) as f64 / 100.0), + DataType::Text => format!("'{}'", generate_random_identifier()), + DataType::Blob => format!("x'{}'", hex::encode(generate_random_identifier())), + DataType::Numeric => (get_random() % 1000).to_string(), + } +} + +/// Generate a random INSERT statement for a table +fn generate_insert(table: &Table) -> String { + let columns = table + .columns + .iter() + .map(|col| col.name.clone()) + .collect::>() + .join(", "); + + let values = table + .columns + .iter() + .map(|col| generate_random_value(&col.data_type)) + .collect::>() + .join(", "); + + format!( + "INSERT INTO {} ({}) VALUES ({});", + table.name, columns, values + ) +} + +/// Generate a random UPDATE statement for a table +fn generate_update(table: &Table) -> String { + // Find the primary key column + let pk_column = table + .columns + .iter() + .find(|col| col.constraints.contains(&Constraint::PrimaryKey)) + .expect("Table should have a primary key"); + + // Get all non-primary key columns + let non_pk_columns: Vec<_> = table + .columns + .iter() + .filter(|col| col.name != pk_column.name) + .collect(); + + // If we have no non-PK columns, just update the primary key itself + let set_clause = if non_pk_columns.is_empty() { + format!( + "{} = {}", + pk_column.name, + generate_random_value(&pk_column.data_type) + ) + } else { + non_pk_columns + .iter() + .map(|col| format!("{} = {}", col.name, generate_random_value(&col.data_type))) + .collect::>() + .join(", ") + }; + + let where_clause = format!( + "{} = {}", + pk_column.name, + generate_random_value(&pk_column.data_type) + ); + + format!( + "UPDATE {} SET {} WHERE {};", + table.name, set_clause, where_clause + ) +} + +/// Generate a random DELETE statement for a table +fn generate_delete(table: &Table) -> String { + // Find the primary key column + let pk_column = table + .columns + .iter() + .find(|col| col.constraints.contains(&Constraint::PrimaryKey)) + .expect("Table should have a primary key"); + + let where_clause = format!( + "{} = {}", + pk_column.name, + generate_random_value(&pk_column.data_type) + ); + + format!("DELETE FROM {} WHERE {};", table.name, where_clause) +} + +/// Generate a random SQL statement for a schema +fn generate_random_statement(schema: &ArbitrarySchema) -> String { + let table = &schema.tables[get_random() as usize % schema.tables.len()]; + match get_random() % 3 { + 0 => generate_insert(table), + 1 => generate_update(table), + _ => generate_delete(table), + } +} + #[tokio::main] -async fn main() { +async fn main() -> Result<(), Box> { let (num_nodes, main_id) = (1, "n-001"); let startup_data = json!({ "num_nodes": num_nodes, @@ -194,34 +299,37 @@ async fn main() { antithesis_init(); let schema = gen_schema(); - let schema_sql = schema.to_sql(); - println!("{}", schema_sql); + let opts = Opts::parse(); - let mut handles = Vec::new(); + let mut handles = Vec::with_capacity(opts.nr_threads); for _ in 0..opts.nr_threads { - // TODO: share the database between threads - let db = Arc::new(Builder::new_local(":memory:").build().await.unwrap()); - let conn = db.connect().unwrap(); - conn.execute(&schema_sql, ()).await.unwrap(); + let db = Arc::new(Builder::new_local(":memory:").build().await?); + let conn = db.connect()?; + conn.execute(&schema_sql, ()).await?; let nr_iterations = opts.nr_iterations; let db = db.clone(); - let handle = tokio::spawn(async move { - let conn = db.connect().unwrap(); + let schema = schema.clone(); + let handle = tokio::spawn(async move { + let conn = db.connect()?; for _ in 0..nr_iterations { - let mut rows = conn.query("select 1", ()).await.unwrap(); - let row = rows.next().await.unwrap().unwrap(); - let value = row.get_value(0).unwrap(); - assert_always!(matches!(value, Value::Integer(1)), "value is incorrect"); + let sql = generate_random_statement(&schema); + println!("{}", sql); + if let Err(e) = conn.execute(&sql, ()).await { + println!("Error: {}", e); + } } + Ok::<_, Box>(()) }); handles.push(handle); } + for handle in handles { - handle.await.unwrap(); + handle.await??; } println!("Done."); + Ok(()) } From f50662205e93dcf45bbb5c4f09790eb14f95c831 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 10 Apr 2025 11:52:53 +0300 Subject: [PATCH 3/6] stress: Fix schema creation --- stress/main.rs | 62 +++++++++++++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/stress/main.rs b/stress/main.rs index 3b1f53cb8..6bd7a6bef 100644 --- a/stress/main.rs +++ b/stress/main.rs @@ -137,32 +137,29 @@ pub fn gen_schema() -> ArbitrarySchema { } impl ArbitrarySchema { - pub fn to_sql(&self) -> String { - let mut sql = String::new(); + /// Convert the schema to a vector of SQL DDL statements + pub fn to_sql(&self) -> Vec { + self.tables + .iter() + .map(|table| { + let columns = table + .columns + .iter() + .map(|col| { + let mut col_def = + format!(" {} {}", col.name, data_type_to_sql(&col.data_type)); + for constraint in &col.constraints { + col_def.push(' '); + col_def.push_str(&constraint_to_sql(constraint)); + } + col_def + }) + .collect::>() + .join(",\n"); - for table in &self.tables { - sql.push_str(&format!("CREATE TABLE {} (\n", table.name)); - - for (i, column) in table.columns.iter().enumerate() { - if i > 0 { - sql.push_str(",\n"); - } - - sql.push_str(&format!( - " {} {}", - column.name, - data_type_to_sql(&column.data_type) - )); - - for constraint in &column.constraints { - sql.push_str(&format!(" {}", constraint_to_sql(constraint))); - } - } - - sql.push_str("\n);\n\n"); - } - - sql + format!("CREATE TABLE {} (\n{}\n);", table.name, columns) + }) + .collect() } } @@ -299,8 +296,10 @@ async fn main() -> Result<(), Box> { antithesis_init(); let schema = gen_schema(); - let schema_sql = schema.to_sql(); - println!("{}", schema_sql); + let ddl_statements = schema.to_sql(); + for stmt in &ddl_statements { + println!("{}", stmt); + } let opts = Opts::parse(); let mut handles = Vec::with_capacity(opts.nr_threads); @@ -308,7 +307,14 @@ async fn main() -> Result<(), Box> { for _ in 0..opts.nr_threads { let db = Arc::new(Builder::new_local(":memory:").build().await?); let conn = db.connect()?; - conn.execute(&schema_sql, ()).await?; + + // Apply each DDL statement individually + for stmt in &ddl_statements { + if let Err(e) = conn.execute(stmt, ()).await { + println!("Error creating table: {}", e); + } + } + let nr_iterations = opts.nr_iterations; let db = db.clone(); let schema = schema.clone(); From 39cee1b1465bea41eb24a1885717f27eab7b3d96 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 10 Apr 2025 11:55:03 +0300 Subject: [PATCH 4/6] stress: Increase default number of iterations --- stress/opts.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stress/opts.rs b/stress/opts.rs index 392d79448..da00e1a00 100644 --- a/stress/opts.rs +++ b/stress/opts.rs @@ -10,7 +10,7 @@ pub struct Opts { short = 'i', long, help = "the number of iterations", - default_value_t = 1000 + default_value_t = 100000 )] pub nr_iterations: usize, } From c4d983bcfed58980262abfa45d88f59557e17d8f Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 10 Apr 2025 11:57:01 +0300 Subject: [PATCH 5/6] stress: Log SQL statements to a file --- stress/main.rs | 26 ++++++++++++++++++++------ stress/opts.rs | 12 ++++++++++++ 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/stress/main.rs b/stress/main.rs index 6bd7a6bef..6180faf30 100644 --- a/stress/main.rs +++ b/stress/main.rs @@ -9,7 +9,9 @@ use limbo::Builder; use opts::Opts; use serde_json::json; use std::collections::HashSet; -use std::sync::Arc; +use std::fs::File; +use std::io::Write; +use std::sync::{Arc, Mutex}; /// Represents a column in a SQLite table #[derive(Debug, Clone)] @@ -297,11 +299,19 @@ async fn main() -> Result<(), Box> { let schema = gen_schema(); let ddl_statements = schema.to_sql(); - for stmt in &ddl_statements { - println!("{}", stmt); - } let opts = Opts::parse(); + let log_file = File::create(&opts.log_file)?; + let log_file = Arc::new(Mutex::new(log_file)); + + // Write DDL statements to log file + { + let mut file = log_file.lock().unwrap(); + for stmt in &ddl_statements { + writeln!(file, "{}", stmt)?; + } + } + let mut handles = Vec::with_capacity(opts.nr_threads); for _ in 0..opts.nr_threads { @@ -318,12 +328,16 @@ async fn main() -> Result<(), Box> { let nr_iterations = opts.nr_iterations; let db = db.clone(); let schema = schema.clone(); + let log_file = log_file.clone(); let handle = tokio::spawn(async move { let conn = db.connect()?; for _ in 0..nr_iterations { let sql = generate_random_statement(&schema); - println!("{}", sql); + { + let mut file = log_file.lock().unwrap(); + writeln!(file, "{}", sql)?; + } if let Err(e) = conn.execute(&sql, ()).await { println!("Error: {}", e); } @@ -336,6 +350,6 @@ async fn main() -> Result<(), Box> { for handle in handles { handle.await??; } - println!("Done."); + println!("Done. SQL statements written to {}", opts.log_file); Ok(()) } diff --git a/stress/opts.rs b/stress/opts.rs index da00e1a00..e7799d29a 100644 --- a/stress/opts.rs +++ b/stress/opts.rs @@ -4,8 +4,11 @@ use clap::{command, Parser}; #[command(name = "limbo_stress")] #[command(author, version, about, long_about = None)] pub struct Opts { + /// Number of threads to run #[clap(short = 't', long, help = "the number of threads", default_value_t = 8)] pub nr_threads: usize, + + /// Number of iterations per thread #[clap( short = 'i', long, @@ -13,4 +16,13 @@ pub struct Opts { default_value_t = 100000 )] pub nr_iterations: usize, + + /// Log file for SQL statements + #[clap( + short = 'l', + long, + help = "log file for SQL statements", + default_value = "limbostress.log" + )] + pub log_file: String, } From 441cd637b5f71384bc0ebbd610482dd356425d33 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 10 Apr 2025 11:58:52 +0300 Subject: [PATCH 6/6] stress: Make database file configurable --- stress/main.rs | 3 ++- stress/opts.rs | 9 +++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/stress/main.rs b/stress/main.rs index 6180faf30..c0c42449d 100644 --- a/stress/main.rs +++ b/stress/main.rs @@ -315,7 +315,7 @@ async fn main() -> Result<(), Box> { let mut handles = Vec::with_capacity(opts.nr_threads); for _ in 0..opts.nr_threads { - let db = Arc::new(Builder::new_local(":memory:").build().await?); + let db = Arc::new(Builder::new_local(&opts.db_file).build().await?); let conn = db.connect()?; // Apply each DDL statement individually @@ -351,5 +351,6 @@ async fn main() -> Result<(), Box> { handle.await??; } println!("Done. SQL statements written to {}", opts.log_file); + println!("Database file: {}", opts.db_file); Ok(()) } diff --git a/stress/opts.rs b/stress/opts.rs index e7799d29a..3084431c5 100644 --- a/stress/opts.rs +++ b/stress/opts.rs @@ -25,4 +25,13 @@ pub struct Opts { default_value = "limbostress.log" )] pub log_file: String, + + /// Database file + #[clap( + short = 'd', + long, + help = "database file", + default_value = "limbostress.db" + )] + pub db_file: String, }