diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b9bcf3afb..d42f1c4ec 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -189,6 +189,65 @@ What this means is that the behavior of a test run is deterministic based on the If the simulator catches a bug, you can always reproduce the exact same sequence of events by passing the same seed. The simulator also performs fault injection to discover interesting bugs. +### Whopper + +Whopper is a DST that, unlike `simulator`, performs concurrent query execution. + +To run Whopper for your local changes, run: + +```console +./whopper/bin/run +``` + +The output of the simulation run looks as follows: + +``` +mode = fast +seed = 11621338508193870992 + . I/U/D/C + . 22/17/15/0 + . 41/34/20/3 + | 62/43/27/4 + | 88/55/30/5 + ╱|╲ 97/58/30/6 + ╱╲|╱╲ 108/62/30/7 + ╱╲╱|╲╱╲ 115/67/32/7 + ╱╲╱╲|╱╲╱╲ 121/74/35/7 + ╱╲╱╲╱|╲╱╲╱╲ 125/80/38/7 + ╱╲╱╲╱╲|╱╲╱╲╱╲ 141/94/43/8 + +real 0m1.250s +user 0m0.843s +sys 0m0.043s +``` + +The simulator prints ten progress indication lines, regardless of how long a run takes. The progress indicator line shows the following stats: + +* `I` -- the number of `INSERT` statements executed +* `U` -- the number of `UPDATE` statements executed +* `D` -- the number of `DELETE` statements executed +* `C` -- the number of `PRAGMA integrity_check` statements executed + +This will do a short sanity check run in using the `fast` mode. + +If you need to reproduce a run, just defined the `SEED` environment variable as follows: + +```console +SEED=1234 ./whopper/bin/run +``` + +You can also run Whopper in exploration mode to find more serious bugs: + +```console +./whopper/bin/explore +``` + +Note that exploration uses the `chaos` mode so if you need to reproduce a run, use: + +```console +SEED=1234 ./whopper/bin/run --mode chaos +``` + ## Python Bindings Turso provides Python bindings built on top of the [PyO3](https://pyo3.rs) project. diff --git a/Cargo.lock b/Cargo.lock index 30e8a1fb1..c44ea4996 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4422,6 +4422,22 @@ dependencies = [ "turso_sync_engine", ] +[[package]] +name = "turso_whopper" +version = "0.1.5" +dependencies = [ + "anyhow", + "clap", + "memmap2", + "rand 0.9.2", + "rand_chacha 0.9.0", + "sql_generation", + "tracing", + "tracing-subscriber", + "turso_core", + "turso_parser", +] + [[package]] name = "typenum" version = "1.18.0" diff --git a/Cargo.toml b/Cargo.toml index 037a49bac..e0c00b599 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ members = [ "parser", "sync/engine", "sql_generation", + "whopper", ] exclude = ["perf/latency/limbo"] diff --git a/core/storage/wal.rs b/core/storage/wal.rs index b5c875cf2..ac9eff3e1 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -2085,7 +2085,7 @@ impl WalFile { "We must hold writer and checkpoint locks to restart the log, found: {:?}", self.checkpoint_guard ); - tracing::info!("restart_log(mode={mode:?})"); + tracing::debug!("restart_log(mode={mode:?})"); { // Block all readers let mut shared = self.get_shared_mut(); diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 11257211a..8dfb51719 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2226,7 +2226,7 @@ pub fn op_transaction( match res { Ok(header_schema_cookie) => { if header_schema_cookie != *schema_cookie { - tracing::info!( + tracing::debug!( "schema changed, force reprepare: {} != {}", header_schema_cookie, *schema_cookie diff --git a/sql_generation/model/query/drop_index.rs b/sql_generation/model/query/drop_index.rs new file mode 100644 index 000000000..18cadb12d --- /dev/null +++ b/sql_generation/model/query/drop_index.rs @@ -0,0 +1,12 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct DropIndex { + pub index_name: String, +} + +impl std::fmt::Display for DropIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DROP INDEX {}", self.index_name) + } +} diff --git a/sql_generation/model/query/mod.rs b/sql_generation/model/query/mod.rs index 11d117a53..98ec2bdfd 100644 --- a/sql_generation/model/query/mod.rs +++ b/sql_generation/model/query/mod.rs @@ -2,6 +2,7 @@ pub use create::Create; pub use create_index::CreateIndex; pub use delete::Delete; pub use drop::Drop; +pub use drop_index::DropIndex; pub use insert::Insert; pub use select::Select; @@ -9,6 +10,7 @@ pub mod create; pub mod create_index; pub mod delete; pub mod drop; +pub mod drop_index; pub mod insert; pub mod predicate; pub mod select; diff --git a/whopper/Cargo.toml b/whopper/Cargo.toml new file mode 100644 index 000000000..0cd4abcd7 --- /dev/null +++ b/whopper/Cargo.toml @@ -0,0 +1,27 @@ +# Copyright 2025 the Turso authors. All rights reserved. MIT license. + +[package] +name = "turso_whopper" +version.workspace = true +authors.workspace = true +edition = "2024" +license.workspace = true +repository.workspace = true +description = "The Turso deterministic simulator" +publish = false + +[[bin]] +name = "turso_whopper" +path = "main.rs" + +[dependencies] +anyhow.workspace = true +clap = { version = "4.5", features = ["derive"] } +memmap2 = "0.9" +rand = { workspace = true } +rand_chacha = "0.9.0" +sql_generation = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } +turso_core = { path = "../core", features = ["simulator"]} +turso_parser = { workspace = true } diff --git a/whopper/bin/explore b/whopper/bin/explore new file mode 100755 index 000000000..72d3b66de --- /dev/null +++ b/whopper/bin/explore @@ -0,0 +1,10 @@ +#!/bin/bash + +set -e + +cargo build -p turso_whopper + +echo "Running Whopper in an infinite loop in 'chaos' mode..." +while true; do + time RUST_BACKTRACE=full ./target/debug/turso_whopper --mode chaos +done diff --git a/whopper/bin/run b/whopper/bin/run new file mode 100755 index 000000000..152ad7e36 --- /dev/null +++ b/whopper/bin/run @@ -0,0 +1,7 @@ +#!/bin/bash + +set -e + +cargo build -p turso_whopper + +time RUST_BACKTRACE=full ./target/debug/turso_whopper $* diff --git a/whopper/io.rs b/whopper/io.rs new file mode 100644 index 000000000..af972c2bc --- /dev/null +++ b/whopper/io.rs @@ -0,0 +1,282 @@ +use memmap2::{MmapMut, MmapOptions}; +use rand::{Rng, RngCore}; +use rand_chacha::ChaCha8Rng; +use std::collections::HashSet; +use std::fs::{File as StdFile, OpenOptions}; +use std::sync::{Arc, Mutex, Weak}; +use tracing::debug; +use turso_core::{Clock, Completion, File, IO, Instant, OpenFlags, Result}; + +#[derive(Debug, Clone)] +pub struct IOFaultConfig { + /// Probability of a cosmic ray bit flip on write (0.0-1.0) + pub cosmic_ray_probability: f64, +} + +impl Default for IOFaultConfig { + fn default() -> Self { + Self { + cosmic_ray_probability: 0.0, + } + } +} + +pub struct SimulatorIO { + files: Mutex)>>, + keep_files: bool, + rng: Mutex, + fault_config: IOFaultConfig, +} + +impl SimulatorIO { + pub fn new(keep_files: bool, rng: ChaCha8Rng, fault_config: IOFaultConfig) -> Self { + debug!("SimulatorIO fault config: {:?}", fault_config); + Self { + files: Mutex::new(Vec::new()), + keep_files, + rng: Mutex::new(rng), + fault_config, + } + } +} + +impl Drop for SimulatorIO { + fn drop(&mut self) { + let files = self.files.lock().unwrap(); + let paths: HashSet = files.iter().map(|(path, _)| path.clone()).collect(); + if !self.keep_files { + for path in paths.iter() { + let _ = std::fs::remove_file(path); + } + } else { + for path in paths.iter() { + println!("Keeping file: {path}"); + } + } + } +} + +impl Clock for SimulatorIO { + fn now(&self) -> Instant { + Instant { secs: 0, micros: 0 } // Simple implementation for now + } +} + +impl IO for SimulatorIO { + fn open_file(&self, path: &str, _flags: OpenFlags, _create_new: bool) -> Result> { + let file = Arc::new(SimulatorFile::new(path)); + + // Store weak reference to avoid keeping files open forever + let mut files = self.files.lock().unwrap(); + files.push((path.to_string(), Arc::downgrade(&file))); + + Ok(file as Arc) + } + + fn remove_file(&self, path: &str) -> Result<()> { + let mut files = self.files.lock().unwrap(); + files.retain(|(p, _)| p != path); + + if !self.keep_files { + let _ = std::fs::remove_file(path); + } + Ok(()) + } + + fn step(&self) -> Result<()> { + // Inject cosmic ray faults with configured probability + if self.fault_config.cosmic_ray_probability > 0.0 { + let mut rng = self.rng.lock().unwrap(); + if rng.random::() < self.fault_config.cosmic_ray_probability { + // Clean up dead weak references and collect live files + let mut files = self.files.lock().unwrap(); + files.retain(|(_, weak)| weak.strong_count() > 0); + + // Collect files that are still alive + let open_files: Vec<_> = files + .iter() + .filter_map(|(path, weak)| weak.upgrade().map(|file| (path.clone(), file))) + .collect(); + + if !open_files.is_empty() { + let file_idx = rng.random_range(0..open_files.len()); + let (path, file) = &open_files[file_idx]; + + // Get the actual file size (not the mmap size) + let file_size = *file.size.lock().unwrap(); + if file_size > 0 { + // Pick a random offset within the actual file size + let byte_offset = rng.random_range(0..file_size); + let bit_idx = rng.random_range(0..8); + + let mut mmap = file.mmap.lock().unwrap(); + let old_byte = mmap[byte_offset]; + mmap[byte_offset] ^= 1 << bit_idx; + println!( + "Cosmic ray! File: {} - Flipped bit {} at offset {} (0x{:02x} -> 0x{:02x})", + path, bit_idx, byte_offset, old_byte, mmap[byte_offset] + ); + } + } + } + } + Ok(()) + } + + fn wait_for_completion(&self, _completion: Completion) -> Result<()> { + // No-op - completions are already completed immediately in the file operations + Ok(()) + } + + fn generate_random_number(&self) -> i64 { + let mut rng = self.rng.lock().unwrap(); + rng.next_u64() as i64 + } +} + +const FILE_SIZE: usize = 1 << 30; // 1 GiB + +struct SimulatorFile { + mmap: Mutex, + size: Mutex, + _file: StdFile, +} + +impl SimulatorFile { + fn new(file_path: &str) -> Self { + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(file_path) + .unwrap_or_else(|e| panic!("Failed to create file {file_path}: {e}")); + + file.set_len(FILE_SIZE as u64) + .unwrap_or_else(|e| panic!("Failed to truncate file {file_path}: {e}")); + + let mmap = unsafe { + MmapOptions::new() + .len(FILE_SIZE) + .map_mut(&file) + .unwrap_or_else(|e| panic!("mmap failed for file {file_path}: {e}")) + }; + + Self { + mmap: Mutex::new(mmap), + size: Mutex::new(0), + _file: file, + } + } +} + +impl Drop for SimulatorFile { + fn drop(&mut self) {} +} + +unsafe impl Send for SimulatorFile {} +unsafe impl Sync for SimulatorFile {} + +impl File for SimulatorFile { + fn pread(&self, pos: u64, c: Completion) -> Result { + let pos = pos as usize; + let read_completion = c.as_read(); + let buffer = read_completion.buf_arc(); + let len = buffer.len(); + + if pos + len <= FILE_SIZE { + let mmap = self.mmap.lock().unwrap(); + buffer.as_mut_slice().copy_from_slice(&mmap[pos..pos + len]); + c.complete(len as i32); + } else { + c.complete(0); + } + Ok(c) + } + + fn pwrite( + &self, + pos: u64, + buffer: Arc, + c: Completion, + ) -> Result { + let pos = pos as usize; + let len = buffer.len(); + + if pos + len <= FILE_SIZE { + let mut mmap = self.mmap.lock().unwrap(); + mmap[pos..pos + len].copy_from_slice(buffer.as_slice()); + let mut size = self.size.lock().unwrap(); + if pos + len > *size { + *size = pos + len; + } + c.complete(len as i32); + } else { + c.complete(0); + } + Ok(c) + } + + fn pwritev( + &self, + pos: u64, + buffers: Vec>, + c: Completion, + ) -> Result { + let mut offset = pos as usize; + let mut total_written = 0; + + { + let mut mmap = self.mmap.lock().unwrap(); + for buffer in buffers { + let len = buffer.len(); + if offset + len <= FILE_SIZE { + mmap[offset..offset + len].copy_from_slice(buffer.as_slice()); + offset += len; + total_written += len; + } else { + break; + } + } + } + + // Update the file size if we wrote beyond the current size + if total_written > 0 { + let mut size = self.size.lock().unwrap(); + let end_pos = (pos as usize) + total_written; + if end_pos > *size { + *size = end_pos; + } + } + + c.complete(total_written as i32); + Ok(c) + } + + fn sync(&self, c: Completion) -> Result { + // No-op for memory files + c.complete(0); + Ok(c) + } + + fn truncate(&self, len: u64, c: Completion) -> Result { + let mut size = self.size.lock().unwrap(); + *size = len as usize; + c.complete(0); + Ok(c) + } + + fn lock_file(&self, _exclusive: bool) -> Result<()> { + // No-op for memory files + Ok(()) + } + + fn unlock_file(&self) -> Result<()> { + // No-op for memory files + Ok(()) + } + + fn size(&self) -> Result { + Ok(*self.size.lock().unwrap() as u64) + } +} diff --git a/whopper/main.rs b/whopper/main.rs new file mode 100644 index 000000000..e1a7f9cd2 --- /dev/null +++ b/whopper/main.rs @@ -0,0 +1,514 @@ +/// Whopper is a deterministic simulator for testing the Turso database. +use clap::Parser; +use rand::{Rng, RngCore, SeedableRng}; +use rand_chacha::ChaCha8Rng; +use sql_generation::{ + generation::{Arbitrary, GenerationContext, Opts}, + model::query::{ + create::Create, create_index::CreateIndex, delete::Delete, drop_index::DropIndex, + insert::Insert, select::Select, update::Update, + }, + model::table::{Column, ColumnType, Table}, +}; +use std::cell::RefCell; +use std::sync::Arc; +use tracing::trace; +use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; +use turso_core::{Connection, Database, IO, Statement}; +use turso_parser::ast::SortOrder; + +mod io; +use io::{IOFaultConfig, SimulatorIO}; + +#[derive(Parser)] +#[command(name = "turso_whopper")] +#[command(about = "The Turso Whopper Simulator")] +struct Args { + /// Simulation mode (fast, chaos, ragnarök/ragnarok) + #[arg(long, default_value = "fast")] + mode: String, + /// Keep mmap I/O files on disk after run + #[arg(long)] + keep: bool, +} + +struct SimulatorConfig { + max_connections: usize, + max_steps: usize, + cosmic_ray_probability: f64, +} + +#[derive(Debug)] +enum FiberState { + Idle, + InTx, +} + +struct SimulatorFiber { + connection: Arc, + state: FiberState, + statement: RefCell>, +} + +struct SimulatorContext { + fibers: Vec, + tables: Vec, + indexes: Vec, + opts: Opts, + stats: Stats, +} + +#[derive(Default)] +struct Stats { + inserts: usize, + updates: usize, + deletes: usize, + integrity_checks: usize, +} + +fn main() -> anyhow::Result<()> { + init_logger(); + + let args = Args::parse(); + + let seed = std::env::var("SEED") + .ok() + .map(|s| s.parse::().unwrap()) + .unwrap_or_else(|| { + let mut rng = rand::rng(); + rng.next_u64() + }); + + println!("mode = {}", args.mode); + println!("seed = {seed}"); + + let mut rng = ChaCha8Rng::seed_from_u64(seed); + + // Create a separate RNG for IO operations with a derived seed + let io_rng = ChaCha8Rng::seed_from_u64(seed.wrapping_add(1)); + + let config = gen_config(&mut rng, &args.mode)?; + + let fault_config = IOFaultConfig { + cosmic_ray_probability: config.cosmic_ray_probability, + }; + + if config.cosmic_ray_probability > 0.0 { + println!("cosmic ray probability = {}", config.cosmic_ray_probability); + } + + let io = Arc::new(SimulatorIO::new(args.keep, io_rng, fault_config)) as Arc; + + let db_path = format!("whopper-{}-{}.db", seed, std::process::id()); + + let db = match Database::open_file(io.clone(), &db_path, false, true) { + Ok(db) => db, + Err(e) => { + return Err(anyhow::anyhow!("Database open failed: {}", e)); + } + }; + let boostrap_conn = match db.connect() { + Ok(conn) => conn, + Err(e) => { + return Err(anyhow::anyhow!("Connection failed: {}", e)); + } + }; + + let schema = create_initial_schema(&mut rng); + let tables = schema.iter().map(|t| t.table.clone()).collect::>(); + for create_table in &schema { + let sql = create_table.to_string(); + trace!("{}", sql); + boostrap_conn.execute(&sql)?; + } + + let indexes = create_initial_indexes(&mut rng, &tables); + for create_index in &indexes { + let sql = create_index.to_string(); + trace!("{}", sql); + boostrap_conn.execute(&sql)?; + } + + boostrap_conn.close()?; + + let mut fibers = Vec::new(); + for i in 0..config.max_connections { + let conn = match db.connect() { + Ok(conn) => conn, + Err(e) => { + return Err(anyhow::anyhow!( + "Failed to create fiber connection {}: {}", + i, + e + )); + } + }; + fibers.push(SimulatorFiber { + connection: conn, + state: FiberState::Idle, + statement: RefCell::new(None), + }); + } + + let mut context = SimulatorContext { + fibers, + tables, + indexes: indexes.iter().map(|idx| idx.index_name.clone()).collect(), + opts: Opts::default(), + stats: Stats::default(), + }; + + let progress_interval = config.max_steps / 10; + let progress_stages = [ + " . I/U/D/C", + " . ", + " . ", + " | ", + " | ", + " ╱|╲ ", + " ╱╲|╱╲ ", + " ╱╲╱|╲╱╲ ", + " ╱╲╱╲|╱╲╱╲ ", + " ╱╲╱╲╱|╲╱╲╱╲ ", + " ╱╲╱╲╱╲|╱╲╱╲╱╲ ", + ]; + let mut progress_index = 0; + println!("{}", progress_stages[progress_index]); + progress_index += 1; + + for i in 0..config.max_steps { + let fiber_idx = i % context.fibers.len(); + perform_work(fiber_idx, &mut rng, &mut context)?; + io.step()?; + if progress_interval > 0 && (i + 1) % progress_interval == 0 { + println!( + "{}{}/{}/{}/{}", + progress_stages[progress_index], + context.stats.inserts, + context.stats.updates, + context.stats.deletes, + context.stats.integrity_checks + ); + progress_index += 1; + } + } + Ok(()) +} + +fn gen_config(rng: &mut ChaCha8Rng, mode: &str) -> anyhow::Result { + match mode { + "fast" => Ok(SimulatorConfig { + max_connections: rng.random_range(1..=8) as usize, + max_steps: 100000, + cosmic_ray_probability: 0.0, + }), + "chaos" => Ok(SimulatorConfig { + max_connections: rng.random_range(1..=8) as usize, + max_steps: 10000000, + cosmic_ray_probability: 0.0, + }), + "ragnarök" | "ragnarok" => Ok(SimulatorConfig { + max_connections: rng.random_range(1..=8) as usize, + max_steps: 1000000, + cosmic_ray_probability: 0.0001, + }), + _ => Err(anyhow::anyhow!("Unknown mode: {}", mode)), + } +} + +fn create_initial_indexes(rng: &mut ChaCha8Rng, tables: &[Table]) -> Vec { + let mut indexes = Vec::new(); + + // Create 0-3 indexes per table + for table in tables { + let num_indexes = rng.random_range(0..=3); + for i in 0..num_indexes { + if !table.columns.is_empty() { + // Pick 1-3 columns for the index + let num_columns = rng.random_range(1..=std::cmp::min(3, table.columns.len())); + let mut selected_columns = Vec::new(); + let mut available_columns = table.columns.clone(); + + for _ in 0..num_columns { + if available_columns.is_empty() { + break; + } + let col_idx = rng.random_range(0..available_columns.len()); + let column = available_columns.remove(col_idx); + let sort_order = if rng.random_bool(0.5) { + SortOrder::Asc + } else { + SortOrder::Desc + }; + selected_columns.push((column.name, sort_order)); + } + + if !selected_columns.is_empty() { + let index_name = format!("idx_{}_{}", table.name, i); + let create_index = CreateIndex { + index_name, + table_name: table.name.clone(), + columns: selected_columns, + }; + indexes.push(create_index); + } + } + } + } + + indexes +} + +fn create_initial_schema(rng: &mut ChaCha8Rng) -> Vec { + let mut schema = Vec::new(); + + // Generate random number of tables (1-5) + let num_tables = rng.random_range(1..=5); + + for i in 0..num_tables { + let table_name = format!("table_{i}"); + + // Generate random number of columns (2-8) + let num_columns = rng.random_range(2..=8); + let mut columns = Vec::new(); + + // Always add an id column as primary key + columns.push(Column { + name: "id".to_string(), + column_type: ColumnType::Integer, + primary: true, + unique: false, + }); + + // Add random columns + for j in 1..num_columns { + let col_type = match rng.random_range(0..3) { + 0 => ColumnType::Integer, + 1 => ColumnType::Text, + _ => ColumnType::Float, + }; + + columns.push(Column { + name: format!("col_{j}"), + column_type: col_type, + primary: false, + unique: rng.random_bool(0.2), // 20% chance of unique + }); + } + + let table = Table { + name: table_name, + columns, + rows: vec![], + indexes: vec![], + }; + + schema.push(Create { table }); + } + + schema +} + +fn perform_work( + fiber_idx: usize, + rng: &mut ChaCha8Rng, + context: &mut SimulatorContext, +) -> anyhow::Result<()> { + // If we have a statement, step it. + let done = { + let mut stmt_borrow = context.fibers[fiber_idx].statement.borrow_mut(); + if let Some(stmt) = stmt_borrow.as_mut() { + match stmt.step() { + Ok(result) => matches!(result, turso_core::StepResult::Done), + Err(e) => { + match e { + turso_core::LimboError::SchemaUpdated => { + trace!("{} Schema changed, rolling back transaction", fiber_idx); + drop(stmt_borrow); + context.fibers[fiber_idx].statement.replace(None); + // Rollback the transaction if we're in one + if matches!(context.fibers[fiber_idx].state, FiberState::InTx) { + if let Ok(rollback_stmt) = + context.fibers[fiber_idx].connection.prepare("ROLLBACK") + { + context.fibers[fiber_idx] + .statement + .replace(Some(rollback_stmt)); + context.fibers[fiber_idx].state = FiberState::Idle; + } + } + return Ok(()); + } + turso_core::LimboError::Busy => { + trace!("{} Database busy, rolling back transaction", fiber_idx); + drop(stmt_borrow); + context.fibers[fiber_idx].statement.replace(None); + // Rollback the transaction if we're in one + if matches!(context.fibers[fiber_idx].state, FiberState::InTx) { + if let Ok(rollback_stmt) = + context.fibers[fiber_idx].connection.prepare("ROLLBACK") + { + context.fibers[fiber_idx] + .statement + .replace(Some(rollback_stmt)); + context.fibers[fiber_idx].state = FiberState::Idle; + } + } + return Ok(()); + } + _ => { + return Err(e.into()); + } + } + } + } + } else { + true + } + }; + // If the statement has more work, we're done for this simulation step + if !done { + return Ok(()); + } + context.fibers[fiber_idx].statement.replace(None); + match context.fibers[fiber_idx].state { + FiberState::Idle => { + let action = rng.random_range(0..100); + if action <= 29 { + // Start transaction + // FIXME: use deferred when it's fixed! + if let Ok(stmt) = context.fibers[fiber_idx].connection.prepare("BEGIN") { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.fibers[fiber_idx].state = FiberState::InTx; + trace!("{} BEGIN", fiber_idx); + } + } else if action == 30 { + // Integrity check + if let Ok(stmt) = context.fibers[fiber_idx] + .connection + .prepare("PRAGMA integrity_check") + { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.stats.integrity_checks += 1; + trace!("{} PRAGMA integrity_check", fiber_idx); + } + } + } + FiberState::InTx => { + let action = rng.random_range(0..100); + match action { + 0..=9 => { + // SELECT (10%) + let select = Select::arbitrary(rng, context); + if let Ok(stmt) = context.fibers[fiber_idx] + .connection + .prepare(select.to_string()) + { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + } + trace!("{} SELECT: {}", fiber_idx, select.to_string()); + } + 10..=39 => { + // INSERT (30%) + let insert = Insert::arbitrary(rng, context); + if let Ok(stmt) = context.fibers[fiber_idx] + .connection + .prepare(insert.to_string()) + { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.stats.inserts += 1; + } + trace!("{} INSERT: {}", fiber_idx, insert.to_string()); + } + 40..=59 => { + // UPDATE (20%) + let update = Update::arbitrary(rng, context); + if let Ok(stmt) = context.fibers[fiber_idx] + .connection + .prepare(update.to_string()) + { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.stats.updates += 1; + } + trace!("{} UPDATE: {}", fiber_idx, update.to_string()); + } + 60..=69 => { + // DELETE (10%) + let delete = Delete::arbitrary(rng, context); + if let Ok(stmt) = context.fibers[fiber_idx] + .connection + .prepare(delete.to_string()) + { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.stats.deletes += 1; + } + trace!("{} DELETE: {}", fiber_idx, delete.to_string()); + } + 70..=71 => { + // CREATE INDEX (2%) + let create_index = CreateIndex::arbitrary(rng, context); + let sql = create_index.to_string(); + if let Ok(stmt) = context.fibers[fiber_idx].connection.prepare(&sql) { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.indexes.push(create_index.index_name.clone()); + } + trace!("{} CREATE INDEX: {}", fiber_idx, sql); + } + 72..=73 => { + // DROP INDEX (2%) + if !context.indexes.is_empty() { + let index_idx = rng.random_range(0..context.indexes.len()); + let index_name = context.indexes.remove(index_idx); + let drop_index = DropIndex { index_name }; + let sql = drop_index.to_string(); + if let Ok(stmt) = context.fibers[fiber_idx].connection.prepare(&sql) { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + } + trace!("{} DROP INDEX: {}", fiber_idx, sql); + } + } + 74..=86 => { + // COMMIT (13%) + if let Ok(stmt) = context.fibers[fiber_idx].connection.prepare("COMMIT") { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.fibers[fiber_idx].state = FiberState::Idle; + } + trace!("{} COMMIT", fiber_idx); + } + 87..=99 => { + // ROLLBACK (13%) + if let Ok(stmt) = context.fibers[fiber_idx].connection.prepare("ROLLBACK") { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.fibers[fiber_idx].state = FiberState::Idle; + } + trace!("{} ROLLBACK", fiber_idx); + } + _ => {} + } + } + } + Ok(()) +} + +impl GenerationContext for SimulatorContext { + fn tables(&self) -> &Vec
{ + &self.tables + } + + fn opts(&self) -> &Opts { + &self.opts + } +} + +fn init_logger() { + let _ = tracing_subscriber::registry() + .with( + tracing_subscriber::fmt::layer() + .with_ansi(false) + .with_line_number(true) + .without_time() + .with_thread_ids(false), + ) + .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))) + .try_init(); +}