Merge 'whopper: A new DST with concurrency' from Pekka Enberg

Our simulator is currently limited to concurrency of one. This
introduces a much less sophisticated DST with focus on finding
concurrency bugs.

Closes #2985
This commit is contained in:
Pekka Enberg
2025-09-11 18:42:45 +03:00
committed by GitHub
12 changed files with 932 additions and 2 deletions

View File

@@ -189,6 +189,65 @@ What this means is that the behavior of a test run is deterministic based on the
If the simulator catches a bug, you can always reproduce the exact same sequence of events by passing the same seed.
The simulator also performs fault injection to discover interesting bugs.
### Whopper
Whopper is a DST that, unlike `simulator`, performs concurrent query execution.
To run Whopper for your local changes, run:
```console
./whopper/bin/run
```
The output of the simulation run looks as follows:
```
mode = fast
seed = 11621338508193870992
. I/U/D/C
. 22/17/15/0
. 41/34/20/3
| 62/43/27/4
| 88/55/30/5
|╲ 97/58/30/6
╱╲|╱╲ 108/62/30/7
╱╲╱|╲╱╲ 115/67/32/7
╱╲╱╲|╱╲╱╲ 121/74/35/7
╱╲╱╲╱|╲╱╲╱╲ 125/80/38/7
╱╲╱╲╱╲|╱╲╱╲╱╲ 141/94/43/8
real 0m1.250s
user 0m0.843s
sys 0m0.043s
```
The simulator prints ten progress indication lines, regardless of how long a run takes. The progress indicator line shows the following stats:
* `I` -- the number of `INSERT` statements executed
* `U` -- the number of `UPDATE` statements executed
* `D` -- the number of `DELETE` statements executed
* `C` -- the number of `PRAGMA integrity_check` statements executed
This will do a short sanity check run in using the `fast` mode.
If you need to reproduce a run, just defined the `SEED` environment variable as follows:
```console
SEED=1234 ./whopper/bin/run
```
You can also run Whopper in exploration mode to find more serious bugs:
```console
./whopper/bin/explore
```
Note that exploration uses the `chaos` mode so if you need to reproduce a run, use:
```console
SEED=1234 ./whopper/bin/run --mode chaos
```
## Python Bindings
Turso provides Python bindings built on top of the [PyO3](https://pyo3.rs) project.

16
Cargo.lock generated
View File

@@ -4422,6 +4422,22 @@ dependencies = [
"turso_sync_engine",
]
[[package]]
name = "turso_whopper"
version = "0.1.5"
dependencies = [
"anyhow",
"clap",
"memmap2",
"rand 0.9.2",
"rand_chacha 0.9.0",
"sql_generation",
"tracing",
"tracing-subscriber",
"turso_core",
"turso_parser",
]
[[package]]
name = "typenum"
version = "1.18.0"

View File

@@ -29,6 +29,7 @@ members = [
"parser",
"sync/engine",
"sql_generation",
"whopper",
]
exclude = ["perf/latency/limbo"]

View File

@@ -2085,7 +2085,7 @@ impl WalFile {
"We must hold writer and checkpoint locks to restart the log, found: {:?}",
self.checkpoint_guard
);
tracing::info!("restart_log(mode={mode:?})");
tracing::debug!("restart_log(mode={mode:?})");
{
// Block all readers
let mut shared = self.get_shared_mut();

View File

@@ -2226,7 +2226,7 @@ pub fn op_transaction(
match res {
Ok(header_schema_cookie) => {
if header_schema_cookie != *schema_cookie {
tracing::info!(
tracing::debug!(
"schema changed, force reprepare: {} != {}",
header_schema_cookie,
*schema_cookie

View File

@@ -0,0 +1,12 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct DropIndex {
pub index_name: String,
}
impl std::fmt::Display for DropIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "DROP INDEX {}", self.index_name)
}
}

View File

@@ -2,6 +2,7 @@ pub use create::Create;
pub use create_index::CreateIndex;
pub use delete::Delete;
pub use drop::Drop;
pub use drop_index::DropIndex;
pub use insert::Insert;
pub use select::Select;
@@ -9,6 +10,7 @@ pub mod create;
pub mod create_index;
pub mod delete;
pub mod drop;
pub mod drop_index;
pub mod insert;
pub mod predicate;
pub mod select;

27
whopper/Cargo.toml Normal file
View File

@@ -0,0 +1,27 @@
# Copyright 2025 the Turso authors. All rights reserved. MIT license.
[package]
name = "turso_whopper"
version.workspace = true
authors.workspace = true
edition = "2024"
license.workspace = true
repository.workspace = true
description = "The Turso deterministic simulator"
publish = false
[[bin]]
name = "turso_whopper"
path = "main.rs"
[dependencies]
anyhow.workspace = true
clap = { version = "4.5", features = ["derive"] }
memmap2 = "0.9"
rand = { workspace = true }
rand_chacha = "0.9.0"
sql_generation = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
turso_core = { path = "../core", features = ["simulator"]}
turso_parser = { workspace = true }

10
whopper/bin/explore Executable file
View File

@@ -0,0 +1,10 @@
#!/bin/bash
set -e
cargo build -p turso_whopper
echo "Running Whopper in an infinite loop in 'chaos' mode..."
while true; do
time RUST_BACKTRACE=full ./target/debug/turso_whopper --mode chaos
done

7
whopper/bin/run Executable file
View File

@@ -0,0 +1,7 @@
#!/bin/bash
set -e
cargo build -p turso_whopper
time RUST_BACKTRACE=full ./target/debug/turso_whopper $*

282
whopper/io.rs Normal file
View File

@@ -0,0 +1,282 @@
use memmap2::{MmapMut, MmapOptions};
use rand::{Rng, RngCore};
use rand_chacha::ChaCha8Rng;
use std::collections::HashSet;
use std::fs::{File as StdFile, OpenOptions};
use std::sync::{Arc, Mutex, Weak};
use tracing::debug;
use turso_core::{Clock, Completion, File, IO, Instant, OpenFlags, Result};
#[derive(Debug, Clone)]
pub struct IOFaultConfig {
/// Probability of a cosmic ray bit flip on write (0.0-1.0)
pub cosmic_ray_probability: f64,
}
impl Default for IOFaultConfig {
fn default() -> Self {
Self {
cosmic_ray_probability: 0.0,
}
}
}
pub struct SimulatorIO {
files: Mutex<Vec<(String, Weak<SimulatorFile>)>>,
keep_files: bool,
rng: Mutex<ChaCha8Rng>,
fault_config: IOFaultConfig,
}
impl SimulatorIO {
pub fn new(keep_files: bool, rng: ChaCha8Rng, fault_config: IOFaultConfig) -> Self {
debug!("SimulatorIO fault config: {:?}", fault_config);
Self {
files: Mutex::new(Vec::new()),
keep_files,
rng: Mutex::new(rng),
fault_config,
}
}
}
impl Drop for SimulatorIO {
fn drop(&mut self) {
let files = self.files.lock().unwrap();
let paths: HashSet<String> = files.iter().map(|(path, _)| path.clone()).collect();
if !self.keep_files {
for path in paths.iter() {
let _ = std::fs::remove_file(path);
}
} else {
for path in paths.iter() {
println!("Keeping file: {path}");
}
}
}
}
impl Clock for SimulatorIO {
fn now(&self) -> Instant {
Instant { secs: 0, micros: 0 } // Simple implementation for now
}
}
impl IO for SimulatorIO {
fn open_file(&self, path: &str, _flags: OpenFlags, _create_new: bool) -> Result<Arc<dyn File>> {
let file = Arc::new(SimulatorFile::new(path));
// Store weak reference to avoid keeping files open forever
let mut files = self.files.lock().unwrap();
files.push((path.to_string(), Arc::downgrade(&file)));
Ok(file as Arc<dyn File>)
}
fn remove_file(&self, path: &str) -> Result<()> {
let mut files = self.files.lock().unwrap();
files.retain(|(p, _)| p != path);
if !self.keep_files {
let _ = std::fs::remove_file(path);
}
Ok(())
}
fn step(&self) -> Result<()> {
// Inject cosmic ray faults with configured probability
if self.fault_config.cosmic_ray_probability > 0.0 {
let mut rng = self.rng.lock().unwrap();
if rng.random::<f64>() < self.fault_config.cosmic_ray_probability {
// Clean up dead weak references and collect live files
let mut files = self.files.lock().unwrap();
files.retain(|(_, weak)| weak.strong_count() > 0);
// Collect files that are still alive
let open_files: Vec<_> = files
.iter()
.filter_map(|(path, weak)| weak.upgrade().map(|file| (path.clone(), file)))
.collect();
if !open_files.is_empty() {
let file_idx = rng.random_range(0..open_files.len());
let (path, file) = &open_files[file_idx];
// Get the actual file size (not the mmap size)
let file_size = *file.size.lock().unwrap();
if file_size > 0 {
// Pick a random offset within the actual file size
let byte_offset = rng.random_range(0..file_size);
let bit_idx = rng.random_range(0..8);
let mut mmap = file.mmap.lock().unwrap();
let old_byte = mmap[byte_offset];
mmap[byte_offset] ^= 1 << bit_idx;
println!(
"Cosmic ray! File: {} - Flipped bit {} at offset {} (0x{:02x} -> 0x{:02x})",
path, bit_idx, byte_offset, old_byte, mmap[byte_offset]
);
}
}
}
}
Ok(())
}
fn wait_for_completion(&self, _completion: Completion) -> Result<()> {
// No-op - completions are already completed immediately in the file operations
Ok(())
}
fn generate_random_number(&self) -> i64 {
let mut rng = self.rng.lock().unwrap();
rng.next_u64() as i64
}
}
const FILE_SIZE: usize = 1 << 30; // 1 GiB
struct SimulatorFile {
mmap: Mutex<MmapMut>,
size: Mutex<usize>,
_file: StdFile,
}
impl SimulatorFile {
fn new(file_path: &str) -> Self {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(file_path)
.unwrap_or_else(|e| panic!("Failed to create file {file_path}: {e}"));
file.set_len(FILE_SIZE as u64)
.unwrap_or_else(|e| panic!("Failed to truncate file {file_path}: {e}"));
let mmap = unsafe {
MmapOptions::new()
.len(FILE_SIZE)
.map_mut(&file)
.unwrap_or_else(|e| panic!("mmap failed for file {file_path}: {e}"))
};
Self {
mmap: Mutex::new(mmap),
size: Mutex::new(0),
_file: file,
}
}
}
impl Drop for SimulatorFile {
fn drop(&mut self) {}
}
unsafe impl Send for SimulatorFile {}
unsafe impl Sync for SimulatorFile {}
impl File for SimulatorFile {
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
let pos = pos as usize;
let read_completion = c.as_read();
let buffer = read_completion.buf_arc();
let len = buffer.len();
if pos + len <= FILE_SIZE {
let mmap = self.mmap.lock().unwrap();
buffer.as_mut_slice().copy_from_slice(&mmap[pos..pos + len]);
c.complete(len as i32);
} else {
c.complete(0);
}
Ok(c)
}
fn pwrite(
&self,
pos: u64,
buffer: Arc<turso_core::Buffer>,
c: Completion,
) -> Result<Completion> {
let pos = pos as usize;
let len = buffer.len();
if pos + len <= FILE_SIZE {
let mut mmap = self.mmap.lock().unwrap();
mmap[pos..pos + len].copy_from_slice(buffer.as_slice());
let mut size = self.size.lock().unwrap();
if pos + len > *size {
*size = pos + len;
}
c.complete(len as i32);
} else {
c.complete(0);
}
Ok(c)
}
fn pwritev(
&self,
pos: u64,
buffers: Vec<Arc<turso_core::Buffer>>,
c: Completion,
) -> Result<Completion> {
let mut offset = pos as usize;
let mut total_written = 0;
{
let mut mmap = self.mmap.lock().unwrap();
for buffer in buffers {
let len = buffer.len();
if offset + len <= FILE_SIZE {
mmap[offset..offset + len].copy_from_slice(buffer.as_slice());
offset += len;
total_written += len;
} else {
break;
}
}
}
// Update the file size if we wrote beyond the current size
if total_written > 0 {
let mut size = self.size.lock().unwrap();
let end_pos = (pos as usize) + total_written;
if end_pos > *size {
*size = end_pos;
}
}
c.complete(total_written as i32);
Ok(c)
}
fn sync(&self, c: Completion) -> Result<Completion> {
// No-op for memory files
c.complete(0);
Ok(c)
}
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
let mut size = self.size.lock().unwrap();
*size = len as usize;
c.complete(0);
Ok(c)
}
fn lock_file(&self, _exclusive: bool) -> Result<()> {
// No-op for memory files
Ok(())
}
fn unlock_file(&self) -> Result<()> {
// No-op for memory files
Ok(())
}
fn size(&self) -> Result<u64> {
Ok(*self.size.lock().unwrap() as u64)
}
}

514
whopper/main.rs Normal file
View File

@@ -0,0 +1,514 @@
/// Whopper is a deterministic simulator for testing the Turso database.
use clap::Parser;
use rand::{Rng, RngCore, SeedableRng};
use rand_chacha::ChaCha8Rng;
use sql_generation::{
generation::{Arbitrary, GenerationContext, Opts},
model::query::{
create::Create, create_index::CreateIndex, delete::Delete, drop_index::DropIndex,
insert::Insert, select::Select, update::Update,
},
model::table::{Column, ColumnType, Table},
};
use std::cell::RefCell;
use std::sync::Arc;
use tracing::trace;
use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt};
use turso_core::{Connection, Database, IO, Statement};
use turso_parser::ast::SortOrder;
mod io;
use io::{IOFaultConfig, SimulatorIO};
#[derive(Parser)]
#[command(name = "turso_whopper")]
#[command(about = "The Turso Whopper Simulator")]
struct Args {
/// Simulation mode (fast, chaos, ragnarök/ragnarok)
#[arg(long, default_value = "fast")]
mode: String,
/// Keep mmap I/O files on disk after run
#[arg(long)]
keep: bool,
}
struct SimulatorConfig {
max_connections: usize,
max_steps: usize,
cosmic_ray_probability: f64,
}
#[derive(Debug)]
enum FiberState {
Idle,
InTx,
}
struct SimulatorFiber {
connection: Arc<Connection>,
state: FiberState,
statement: RefCell<Option<Statement>>,
}
struct SimulatorContext {
fibers: Vec<SimulatorFiber>,
tables: Vec<Table>,
indexes: Vec<String>,
opts: Opts,
stats: Stats,
}
#[derive(Default)]
struct Stats {
inserts: usize,
updates: usize,
deletes: usize,
integrity_checks: usize,
}
fn main() -> anyhow::Result<()> {
init_logger();
let args = Args::parse();
let seed = std::env::var("SEED")
.ok()
.map(|s| s.parse::<u64>().unwrap())
.unwrap_or_else(|| {
let mut rng = rand::rng();
rng.next_u64()
});
println!("mode = {}", args.mode);
println!("seed = {seed}");
let mut rng = ChaCha8Rng::seed_from_u64(seed);
// Create a separate RNG for IO operations with a derived seed
let io_rng = ChaCha8Rng::seed_from_u64(seed.wrapping_add(1));
let config = gen_config(&mut rng, &args.mode)?;
let fault_config = IOFaultConfig {
cosmic_ray_probability: config.cosmic_ray_probability,
};
if config.cosmic_ray_probability > 0.0 {
println!("cosmic ray probability = {}", config.cosmic_ray_probability);
}
let io = Arc::new(SimulatorIO::new(args.keep, io_rng, fault_config)) as Arc<dyn IO>;
let db_path = format!("whopper-{}-{}.db", seed, std::process::id());
let db = match Database::open_file(io.clone(), &db_path, false, true) {
Ok(db) => db,
Err(e) => {
return Err(anyhow::anyhow!("Database open failed: {}", e));
}
};
let boostrap_conn = match db.connect() {
Ok(conn) => conn,
Err(e) => {
return Err(anyhow::anyhow!("Connection failed: {}", e));
}
};
let schema = create_initial_schema(&mut rng);
let tables = schema.iter().map(|t| t.table.clone()).collect::<Vec<_>>();
for create_table in &schema {
let sql = create_table.to_string();
trace!("{}", sql);
boostrap_conn.execute(&sql)?;
}
let indexes = create_initial_indexes(&mut rng, &tables);
for create_index in &indexes {
let sql = create_index.to_string();
trace!("{}", sql);
boostrap_conn.execute(&sql)?;
}
boostrap_conn.close()?;
let mut fibers = Vec::new();
for i in 0..config.max_connections {
let conn = match db.connect() {
Ok(conn) => conn,
Err(e) => {
return Err(anyhow::anyhow!(
"Failed to create fiber connection {}: {}",
i,
e
));
}
};
fibers.push(SimulatorFiber {
connection: conn,
state: FiberState::Idle,
statement: RefCell::new(None),
});
}
let mut context = SimulatorContext {
fibers,
tables,
indexes: indexes.iter().map(|idx| idx.index_name.clone()).collect(),
opts: Opts::default(),
stats: Stats::default(),
};
let progress_interval = config.max_steps / 10;
let progress_stages = [
" . I/U/D/C",
" . ",
" . ",
" | ",
" | ",
" |╲ ",
" ╱╲|╱╲ ",
" ╱╲╱|╲╱╲ ",
" ╱╲╱╲|╱╲╱╲ ",
" ╱╲╱╲╱|╲╱╲╱╲ ",
" ╱╲╱╲╱╲|╱╲╱╲╱╲ ",
];
let mut progress_index = 0;
println!("{}", progress_stages[progress_index]);
progress_index += 1;
for i in 0..config.max_steps {
let fiber_idx = i % context.fibers.len();
perform_work(fiber_idx, &mut rng, &mut context)?;
io.step()?;
if progress_interval > 0 && (i + 1) % progress_interval == 0 {
println!(
"{}{}/{}/{}/{}",
progress_stages[progress_index],
context.stats.inserts,
context.stats.updates,
context.stats.deletes,
context.stats.integrity_checks
);
progress_index += 1;
}
}
Ok(())
}
fn gen_config(rng: &mut ChaCha8Rng, mode: &str) -> anyhow::Result<SimulatorConfig> {
match mode {
"fast" => Ok(SimulatorConfig {
max_connections: rng.random_range(1..=8) as usize,
max_steps: 100000,
cosmic_ray_probability: 0.0,
}),
"chaos" => Ok(SimulatorConfig {
max_connections: rng.random_range(1..=8) as usize,
max_steps: 10000000,
cosmic_ray_probability: 0.0,
}),
"ragnarök" | "ragnarok" => Ok(SimulatorConfig {
max_connections: rng.random_range(1..=8) as usize,
max_steps: 1000000,
cosmic_ray_probability: 0.0001,
}),
_ => Err(anyhow::anyhow!("Unknown mode: {}", mode)),
}
}
fn create_initial_indexes(rng: &mut ChaCha8Rng, tables: &[Table]) -> Vec<CreateIndex> {
let mut indexes = Vec::new();
// Create 0-3 indexes per table
for table in tables {
let num_indexes = rng.random_range(0..=3);
for i in 0..num_indexes {
if !table.columns.is_empty() {
// Pick 1-3 columns for the index
let num_columns = rng.random_range(1..=std::cmp::min(3, table.columns.len()));
let mut selected_columns = Vec::new();
let mut available_columns = table.columns.clone();
for _ in 0..num_columns {
if available_columns.is_empty() {
break;
}
let col_idx = rng.random_range(0..available_columns.len());
let column = available_columns.remove(col_idx);
let sort_order = if rng.random_bool(0.5) {
SortOrder::Asc
} else {
SortOrder::Desc
};
selected_columns.push((column.name, sort_order));
}
if !selected_columns.is_empty() {
let index_name = format!("idx_{}_{}", table.name, i);
let create_index = CreateIndex {
index_name,
table_name: table.name.clone(),
columns: selected_columns,
};
indexes.push(create_index);
}
}
}
}
indexes
}
fn create_initial_schema(rng: &mut ChaCha8Rng) -> Vec<Create> {
let mut schema = Vec::new();
// Generate random number of tables (1-5)
let num_tables = rng.random_range(1..=5);
for i in 0..num_tables {
let table_name = format!("table_{i}");
// Generate random number of columns (2-8)
let num_columns = rng.random_range(2..=8);
let mut columns = Vec::new();
// Always add an id column as primary key
columns.push(Column {
name: "id".to_string(),
column_type: ColumnType::Integer,
primary: true,
unique: false,
});
// Add random columns
for j in 1..num_columns {
let col_type = match rng.random_range(0..3) {
0 => ColumnType::Integer,
1 => ColumnType::Text,
_ => ColumnType::Float,
};
columns.push(Column {
name: format!("col_{j}"),
column_type: col_type,
primary: false,
unique: rng.random_bool(0.2), // 20% chance of unique
});
}
let table = Table {
name: table_name,
columns,
rows: vec![],
indexes: vec![],
};
schema.push(Create { table });
}
schema
}
fn perform_work(
fiber_idx: usize,
rng: &mut ChaCha8Rng,
context: &mut SimulatorContext,
) -> anyhow::Result<()> {
// If we have a statement, step it.
let done = {
let mut stmt_borrow = context.fibers[fiber_idx].statement.borrow_mut();
if let Some(stmt) = stmt_borrow.as_mut() {
match stmt.step() {
Ok(result) => matches!(result, turso_core::StepResult::Done),
Err(e) => {
match e {
turso_core::LimboError::SchemaUpdated => {
trace!("{} Schema changed, rolling back transaction", fiber_idx);
drop(stmt_borrow);
context.fibers[fiber_idx].statement.replace(None);
// Rollback the transaction if we're in one
if matches!(context.fibers[fiber_idx].state, FiberState::InTx) {
if let Ok(rollback_stmt) =
context.fibers[fiber_idx].connection.prepare("ROLLBACK")
{
context.fibers[fiber_idx]
.statement
.replace(Some(rollback_stmt));
context.fibers[fiber_idx].state = FiberState::Idle;
}
}
return Ok(());
}
turso_core::LimboError::Busy => {
trace!("{} Database busy, rolling back transaction", fiber_idx);
drop(stmt_borrow);
context.fibers[fiber_idx].statement.replace(None);
// Rollback the transaction if we're in one
if matches!(context.fibers[fiber_idx].state, FiberState::InTx) {
if let Ok(rollback_stmt) =
context.fibers[fiber_idx].connection.prepare("ROLLBACK")
{
context.fibers[fiber_idx]
.statement
.replace(Some(rollback_stmt));
context.fibers[fiber_idx].state = FiberState::Idle;
}
}
return Ok(());
}
_ => {
return Err(e.into());
}
}
}
}
} else {
true
}
};
// If the statement has more work, we're done for this simulation step
if !done {
return Ok(());
}
context.fibers[fiber_idx].statement.replace(None);
match context.fibers[fiber_idx].state {
FiberState::Idle => {
let action = rng.random_range(0..100);
if action <= 29 {
// Start transaction
// FIXME: use deferred when it's fixed!
if let Ok(stmt) = context.fibers[fiber_idx].connection.prepare("BEGIN") {
context.fibers[fiber_idx].statement.replace(Some(stmt));
context.fibers[fiber_idx].state = FiberState::InTx;
trace!("{} BEGIN", fiber_idx);
}
} else if action == 30 {
// Integrity check
if let Ok(stmt) = context.fibers[fiber_idx]
.connection
.prepare("PRAGMA integrity_check")
{
context.fibers[fiber_idx].statement.replace(Some(stmt));
context.stats.integrity_checks += 1;
trace!("{} PRAGMA integrity_check", fiber_idx);
}
}
}
FiberState::InTx => {
let action = rng.random_range(0..100);
match action {
0..=9 => {
// SELECT (10%)
let select = Select::arbitrary(rng, context);
if let Ok(stmt) = context.fibers[fiber_idx]
.connection
.prepare(select.to_string())
{
context.fibers[fiber_idx].statement.replace(Some(stmt));
}
trace!("{} SELECT: {}", fiber_idx, select.to_string());
}
10..=39 => {
// INSERT (30%)
let insert = Insert::arbitrary(rng, context);
if let Ok(stmt) = context.fibers[fiber_idx]
.connection
.prepare(insert.to_string())
{
context.fibers[fiber_idx].statement.replace(Some(stmt));
context.stats.inserts += 1;
}
trace!("{} INSERT: {}", fiber_idx, insert.to_string());
}
40..=59 => {
// UPDATE (20%)
let update = Update::arbitrary(rng, context);
if let Ok(stmt) = context.fibers[fiber_idx]
.connection
.prepare(update.to_string())
{
context.fibers[fiber_idx].statement.replace(Some(stmt));
context.stats.updates += 1;
}
trace!("{} UPDATE: {}", fiber_idx, update.to_string());
}
60..=69 => {
// DELETE (10%)
let delete = Delete::arbitrary(rng, context);
if let Ok(stmt) = context.fibers[fiber_idx]
.connection
.prepare(delete.to_string())
{
context.fibers[fiber_idx].statement.replace(Some(stmt));
context.stats.deletes += 1;
}
trace!("{} DELETE: {}", fiber_idx, delete.to_string());
}
70..=71 => {
// CREATE INDEX (2%)
let create_index = CreateIndex::arbitrary(rng, context);
let sql = create_index.to_string();
if let Ok(stmt) = context.fibers[fiber_idx].connection.prepare(&sql) {
context.fibers[fiber_idx].statement.replace(Some(stmt));
context.indexes.push(create_index.index_name.clone());
}
trace!("{} CREATE INDEX: {}", fiber_idx, sql);
}
72..=73 => {
// DROP INDEX (2%)
if !context.indexes.is_empty() {
let index_idx = rng.random_range(0..context.indexes.len());
let index_name = context.indexes.remove(index_idx);
let drop_index = DropIndex { index_name };
let sql = drop_index.to_string();
if let Ok(stmt) = context.fibers[fiber_idx].connection.prepare(&sql) {
context.fibers[fiber_idx].statement.replace(Some(stmt));
}
trace!("{} DROP INDEX: {}", fiber_idx, sql);
}
}
74..=86 => {
// COMMIT (13%)
if let Ok(stmt) = context.fibers[fiber_idx].connection.prepare("COMMIT") {
context.fibers[fiber_idx].statement.replace(Some(stmt));
context.fibers[fiber_idx].state = FiberState::Idle;
}
trace!("{} COMMIT", fiber_idx);
}
87..=99 => {
// ROLLBACK (13%)
if let Ok(stmt) = context.fibers[fiber_idx].connection.prepare("ROLLBACK") {
context.fibers[fiber_idx].statement.replace(Some(stmt));
context.fibers[fiber_idx].state = FiberState::Idle;
}
trace!("{} ROLLBACK", fiber_idx);
}
_ => {}
}
}
}
Ok(())
}
impl GenerationContext for SimulatorContext {
fn tables(&self) -> &Vec<Table> {
&self.tables
}
fn opts(&self) -> &Opts {
&self.opts
}
}
fn init_logger() {
let _ = tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_line_number(true)
.without_time()
.with_thread_ids(false),
)
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.try_init();
}