Merge 'Stress improvements' from Pekka Enberg

Closes #1292
This commit is contained in:
Pekka Enberg
2025-04-10 12:18:53 +03:00
5 changed files with 356 additions and 17 deletions

2
Cargo.lock generated
View File

@@ -1894,8 +1894,10 @@ dependencies = [
name = "limbo_stress"
version = "0.0.19-pre.4"
dependencies = [
"anarchist-readable-name-generator-lib",
"antithesis_sdk",
"clap",
"hex",
"limbo",
"serde_json",
"tokio",

View File

@@ -17,11 +17,13 @@ pub enum Error {
ToSqlConversionFailure(BoxError),
#[error("Mutex lock error: {0}")]
MutexError(String),
#[error("SQL execution failure: `{0}`")]
SqlExecutionFailure(String),
}
impl From<limbo_core::LimboError> for Error {
fn from(_err: limbo_core::LimboError) -> Self {
todo!();
fn from(err: limbo_core::LimboError) -> Self {
Error::SqlExecutionFailure(err.to_string())
}
}

View File

@@ -20,3 +20,5 @@ clap = { version = "4.5", features = ["derive"] }
limbo = { path = "../bindings/rust" }
serde_json = "1.0.139"
tokio = { version = "1.29.1", features = ["full"] }
anarchist-readable-name-generator-lib = "0.1.0"
hex = "0.4"

View File

@@ -1,14 +1,294 @@
mod opts;
use anarchist_readable_name_generator_lib::readable_name_custom;
use antithesis_sdk::random::{get_random, AntithesisRng};
use antithesis_sdk::*;
use clap::Parser;
use limbo::{Builder, Value};
use hex;
use limbo::Builder;
use opts::Opts;
use serde_json::json;
use std::sync::Arc;
use std::collections::HashSet;
use std::fs::File;
use std::io::Write;
use std::sync::{Arc, Mutex};
/// Represents a column in a SQLite table
#[derive(Debug, Clone)]
pub struct Column {
pub name: String,
pub data_type: DataType,
pub constraints: Vec<Constraint>,
}
/// Represents SQLite data types
#[derive(Debug, Clone)]
pub enum DataType {
Integer,
Real,
Text,
Blob,
Numeric,
}
/// Represents column constraints
#[derive(Debug, Clone, PartialEq)]
pub enum Constraint {
PrimaryKey,
NotNull,
Unique,
}
/// Represents a table in a SQLite schema
#[derive(Debug, Clone)]
pub struct Table {
pub name: String,
pub columns: Vec<Column>,
}
/// Represents a complete SQLite schema
#[derive(Debug, Clone)]
pub struct ArbitrarySchema {
pub tables: Vec<Table>,
}
// Helper functions for generating random data
fn generate_random_identifier() -> String {
readable_name_custom("_", AntithesisRng).replace('-', "_")
}
fn generate_random_data_type() -> DataType {
match get_random() % 5 {
0 => DataType::Integer,
1 => DataType::Real,
2 => DataType::Text,
3 => DataType::Blob,
_ => DataType::Numeric,
}
}
fn generate_random_constraint() -> Constraint {
match get_random() % 2 {
0 => Constraint::NotNull,
_ => Constraint::Unique,
}
}
fn generate_random_column() -> Column {
let name = generate_random_identifier();
let data_type = generate_random_data_type();
let constraint_count = (get_random() % 3) as usize;
let mut constraints = Vec::with_capacity(constraint_count);
for _ in 0..constraint_count {
constraints.push(generate_random_constraint());
}
Column {
name,
data_type,
constraints,
}
}
fn generate_random_table() -> Table {
let name = generate_random_identifier();
let column_count = (get_random() % 10 + 1) as usize;
let mut columns = Vec::with_capacity(column_count);
let mut column_names = HashSet::new();
// First, generate all columns without primary keys
for _ in 0..column_count {
let mut column = generate_random_column();
// Ensure column names are unique within the table
while column_names.contains(&column.name) {
column.name = generate_random_identifier();
}
column_names.insert(column.name.clone());
columns.push(column);
}
// Then, randomly select one column to be the primary key
let pk_index = (get_random() % column_count as u64) as usize;
columns[pk_index].constraints.push(Constraint::PrimaryKey);
Table { name, columns }
}
pub fn gen_schema() -> ArbitrarySchema {
let table_count = (get_random() % 10 + 1) as usize;
let mut tables = Vec::with_capacity(table_count);
let mut table_names = HashSet::new();
for _ in 0..table_count {
let mut table = generate_random_table();
// Ensure table names are unique
while table_names.contains(&table.name) {
table.name = generate_random_identifier();
}
table_names.insert(table.name.clone());
tables.push(table);
}
ArbitrarySchema { tables }
}
impl ArbitrarySchema {
/// Convert the schema to a vector of SQL DDL statements
pub fn to_sql(&self) -> Vec<String> {
self.tables
.iter()
.map(|table| {
let columns = table
.columns
.iter()
.map(|col| {
let mut col_def =
format!(" {} {}", col.name, data_type_to_sql(&col.data_type));
for constraint in &col.constraints {
col_def.push(' ');
col_def.push_str(&constraint_to_sql(constraint));
}
col_def
})
.collect::<Vec<_>>()
.join(",\n");
format!("CREATE TABLE {} (\n{}\n);", table.name, columns)
})
.collect()
}
}
fn data_type_to_sql(data_type: &DataType) -> &'static str {
match data_type {
DataType::Integer => "INTEGER",
DataType::Real => "REAL",
DataType::Text => "TEXT",
DataType::Blob => "BLOB",
DataType::Numeric => "NUMERIC",
}
}
fn constraint_to_sql(constraint: &Constraint) -> String {
match constraint {
Constraint::PrimaryKey => "PRIMARY KEY".to_string(),
Constraint::NotNull => "NOT NULL".to_string(),
Constraint::Unique => "UNIQUE".to_string(),
}
}
/// Generate a random value for a given data type
fn generate_random_value(data_type: &DataType) -> String {
match data_type {
DataType::Integer => (get_random() % 1000).to_string(),
DataType::Real => format!("{:.2}", (get_random() % 1000) as f64 / 100.0),
DataType::Text => format!("'{}'", generate_random_identifier()),
DataType::Blob => format!("x'{}'", hex::encode(generate_random_identifier())),
DataType::Numeric => (get_random() % 1000).to_string(),
}
}
/// Generate a random INSERT statement for a table
fn generate_insert(table: &Table) -> String {
let columns = table
.columns
.iter()
.map(|col| col.name.clone())
.collect::<Vec<_>>()
.join(", ");
let values = table
.columns
.iter()
.map(|col| generate_random_value(&col.data_type))
.collect::<Vec<_>>()
.join(", ");
format!(
"INSERT INTO {} ({}) VALUES ({});",
table.name, columns, values
)
}
/// Generate a random UPDATE statement for a table
fn generate_update(table: &Table) -> String {
// Find the primary key column
let pk_column = table
.columns
.iter()
.find(|col| col.constraints.contains(&Constraint::PrimaryKey))
.expect("Table should have a primary key");
// Get all non-primary key columns
let non_pk_columns: Vec<_> = table
.columns
.iter()
.filter(|col| col.name != pk_column.name)
.collect();
// If we have no non-PK columns, just update the primary key itself
let set_clause = if non_pk_columns.is_empty() {
format!(
"{} = {}",
pk_column.name,
generate_random_value(&pk_column.data_type)
)
} else {
non_pk_columns
.iter()
.map(|col| format!("{} = {}", col.name, generate_random_value(&col.data_type)))
.collect::<Vec<_>>()
.join(", ")
};
let where_clause = format!(
"{} = {}",
pk_column.name,
generate_random_value(&pk_column.data_type)
);
format!(
"UPDATE {} SET {} WHERE {};",
table.name, set_clause, where_clause
)
}
/// Generate a random DELETE statement for a table
fn generate_delete(table: &Table) -> String {
// Find the primary key column
let pk_column = table
.columns
.iter()
.find(|col| col.constraints.contains(&Constraint::PrimaryKey))
.expect("Table should have a primary key");
let where_clause = format!(
"{} = {}",
pk_column.name,
generate_random_value(&pk_column.data_type)
);
format!("DELETE FROM {} WHERE {};", table.name, where_clause)
}
/// Generate a random SQL statement for a schema
fn generate_random_statement(schema: &ArbitrarySchema) -> String {
let table = &schema.tables[get_random() as usize % schema.tables.len()];
match get_random() % 3 {
0 => generate_insert(table),
1 => generate_update(table),
_ => generate_delete(table),
}
}
#[tokio::main]
async fn main() {
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let (num_nodes, main_id) = (1, "n-001");
let startup_data = json!({
"num_nodes": num_nodes,
@@ -17,28 +297,60 @@ async fn main() {
lifecycle::setup_complete(&startup_data);
antithesis_init();
let schema = gen_schema();
let ddl_statements = schema.to_sql();
let opts = Opts::parse();
let mut handles = Vec::new();
let log_file = File::create(&opts.log_file)?;
let log_file = Arc::new(Mutex::new(log_file));
// Write DDL statements to log file
{
let mut file = log_file.lock().unwrap();
for stmt in &ddl_statements {
writeln!(file, "{}", stmt)?;
}
}
let mut handles = Vec::with_capacity(opts.nr_threads);
for _ in 0..opts.nr_threads {
// TODO: share the database between threads
let db = Arc::new(Builder::new_local(":memory:").build().await.unwrap());
let db = Arc::new(Builder::new_local(&opts.db_file).build().await?);
let conn = db.connect()?;
// Apply each DDL statement individually
for stmt in &ddl_statements {
if let Err(e) = conn.execute(stmt, ()).await {
println!("Error creating table: {}", e);
}
}
let nr_iterations = opts.nr_iterations;
let db = db.clone();
let handle = tokio::spawn(async move {
let conn = db.connect().unwrap();
let schema = schema.clone();
let log_file = log_file.clone();
let handle = tokio::spawn(async move {
let conn = db.connect()?;
for _ in 0..nr_iterations {
let mut rows = conn.query("select 1", ()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
let value = row.get_value(0).unwrap();
assert_always!(matches!(value, Value::Integer(1)), "value is incorrect");
let sql = generate_random_statement(&schema);
{
let mut file = log_file.lock().unwrap();
writeln!(file, "{}", sql)?;
}
if let Err(e) = conn.execute(&sql, ()).await {
println!("Error: {}", e);
}
}
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(())
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
handle.await??;
}
println!("Done.");
println!("Done. SQL statements written to {}", opts.log_file);
println!("Database file: {}", opts.db_file);
Ok(())
}

View File

@@ -4,13 +4,34 @@ use clap::{command, Parser};
#[command(name = "limbo_stress")]
#[command(author, version, about, long_about = None)]
pub struct Opts {
/// Number of threads to run
#[clap(short = 't', long, help = "the number of threads", default_value_t = 8)]
pub nr_threads: usize,
/// Number of iterations per thread
#[clap(
short = 'i',
long,
help = "the number of iterations",
default_value_t = 1000
default_value_t = 100000
)]
pub nr_iterations: usize,
/// Log file for SQL statements
#[clap(
short = 'l',
long,
help = "log file for SQL statements",
default_value = "limbostress.log"
)]
pub log_file: String,
/// Database file
#[clap(
short = 'd',
long,
help = "database file",
default_value = "limbostress.db"
)]
pub db_file: String,
}