From ca51a60b3cea31e4fc314472fa6a634b52fdd212 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 9 Sep 2025 15:02:40 +0300 Subject: [PATCH 1/7] core/storage: Demote restart_log() logging to debug --- core/storage/wal.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index d21267bbb..61663f0e2 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -2083,7 +2083,7 @@ impl WalFile { "We must hold writer and checkpoint locks to restart the log, found: {:?}", self.checkpoint_guard ); - tracing::info!("restart_log(mode={mode:?})"); + tracing::debug!("restart_log(mode={mode:?})"); { // Block all readers let mut shared = self.get_shared_mut(); From b572366a2b78bc68d5f3e2c6a2022278030c2306 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 9 Sep 2025 18:48:41 +0300 Subject: [PATCH 2/7] core/vbe: Demote op_transaction() logging to debug --- core/vdbe/execute.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 83e164adc..de8c28144 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2226,7 +2226,7 @@ pub fn op_transaction( match res { Ok(header_schema_cookie) => { if header_schema_cookie != *schema_cookie { - tracing::info!( + tracing::debug!( "schema changed, force reprepare: {} != {}", header_schema_cookie, *schema_cookie From 74c14efdfa089784b05f18fd8997d920ca202d6c Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 9 Sep 2025 18:57:41 +0300 Subject: [PATCH 3/7] sql_generation: Add support for DROP INDEX --- sql_generation/model/query/drop_index.rs | 12 ++++++++++++ sql_generation/model/query/mod.rs | 2 ++ 2 files changed, 14 insertions(+) create mode 100644 sql_generation/model/query/drop_index.rs diff --git a/sql_generation/model/query/drop_index.rs b/sql_generation/model/query/drop_index.rs new file mode 100644 index 000000000..18cadb12d --- /dev/null +++ b/sql_generation/model/query/drop_index.rs @@ -0,0 +1,12 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct DropIndex { + pub index_name: String, +} + +impl std::fmt::Display for DropIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DROP INDEX {}", self.index_name) + } +} diff --git a/sql_generation/model/query/mod.rs b/sql_generation/model/query/mod.rs index 11d117a53..98ec2bdfd 100644 --- a/sql_generation/model/query/mod.rs +++ b/sql_generation/model/query/mod.rs @@ -2,6 +2,7 @@ pub use create::Create; pub use create_index::CreateIndex; pub use delete::Delete; pub use drop::Drop; +pub use drop_index::DropIndex; pub use insert::Insert; pub use select::Select; @@ -9,6 +10,7 @@ pub mod create; pub mod create_index; pub mod delete; pub mod drop; +pub mod drop_index; pub mod insert; pub mod predicate; pub mod select; From a9694c87b1a07fd69038a766c38baad8ce0967a1 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 9 Sep 2025 13:20:40 +0300 Subject: [PATCH 4/7] whopper: A new DST with concurrency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is a new deterministic simulator for Turso that focuses on finding concurrency bugs. You can run whopper with: ```console penberg@vonneumann turso % SEED=1234 ./whopper/bin/run Compiling turso_whopper v0.1.5-pre.3 (/Users/penberg/src/tursodatabase/turso/whopper) Finished `dev` profile [unoptimized + debuginfo] target(s) in 2.22s seed = 1234 . I/U/D/C . 44/19/11/2 . 68/33/21/2 | 104/51/29/3 | 121/69/41/3 ╱|╲ 150/84/51/3 ╱╲|╱╲ 184/97/59/3 ╱╲╱|╲╱╲ 199/105/64/4 ╱╲╱╲|╱╲╱╲ 206/115/69/5 ╱╲╱╲╱|╲╱╲╱╲ 234/138/82/6 ╱╲╱╲╱╲|╱╲╱╲╱╲ 269/164/91/7 ``` --- CONTRIBUTING.md | 59 ++++++ Cargo.lock | 16 ++ Cargo.toml | 1 + whopper/Cargo.toml | 27 +++ whopper/bin/explore | 10 + whopper/bin/run | 7 + whopper/io.rs | 232 +++++++++++++++++++++ whopper/main.rs | 498 ++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 850 insertions(+) create mode 100644 whopper/Cargo.toml create mode 100755 whopper/bin/explore create mode 100755 whopper/bin/run create mode 100644 whopper/io.rs create mode 100644 whopper/main.rs diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b9bcf3afb..d42f1c4ec 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -189,6 +189,65 @@ What this means is that the behavior of a test run is deterministic based on the If the simulator catches a bug, you can always reproduce the exact same sequence of events by passing the same seed. The simulator also performs fault injection to discover interesting bugs. +### Whopper + +Whopper is a DST that, unlike `simulator`, performs concurrent query execution. + +To run Whopper for your local changes, run: + +```console +./whopper/bin/run +``` + +The output of the simulation run looks as follows: + +``` +mode = fast +seed = 11621338508193870992 + . I/U/D/C + . 22/17/15/0 + . 41/34/20/3 + | 62/43/27/4 + | 88/55/30/5 + ╱|╲ 97/58/30/6 + ╱╲|╱╲ 108/62/30/7 + ╱╲╱|╲╱╲ 115/67/32/7 + ╱╲╱╲|╱╲╱╲ 121/74/35/7 + ╱╲╱╲╱|╲╱╲╱╲ 125/80/38/7 + ╱╲╱╲╱╲|╱╲╱╲╱╲ 141/94/43/8 + +real 0m1.250s +user 0m0.843s +sys 0m0.043s +``` + +The simulator prints ten progress indication lines, regardless of how long a run takes. The progress indicator line shows the following stats: + +* `I` -- the number of `INSERT` statements executed +* `U` -- the number of `UPDATE` statements executed +* `D` -- the number of `DELETE` statements executed +* `C` -- the number of `PRAGMA integrity_check` statements executed + +This will do a short sanity check run in using the `fast` mode. + +If you need to reproduce a run, just defined the `SEED` environment variable as follows: + +```console +SEED=1234 ./whopper/bin/run +``` + +You can also run Whopper in exploration mode to find more serious bugs: + +```console +./whopper/bin/explore +``` + +Note that exploration uses the `chaos` mode so if you need to reproduce a run, use: + +```console +SEED=1234 ./whopper/bin/run --mode chaos +``` + ## Python Bindings Turso provides Python bindings built on top of the [PyO3](https://pyo3.rs) project. diff --git a/Cargo.lock b/Cargo.lock index e72c7b5f2..bb493b066 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4440,6 +4440,22 @@ dependencies = [ "turso_sync_engine", ] +[[package]] +name = "turso_whopper" +version = "0.1.5" +dependencies = [ + "anyhow", + "clap", + "libc", + "rand 0.9.2", + "rand_chacha 0.9.0", + "sql_generation", + "tracing", + "tracing-subscriber", + "turso_core", + "turso_parser", +] + [[package]] name = "typenum" version = "1.18.0" diff --git a/Cargo.toml b/Cargo.toml index 28059424e..47a031414 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ members = [ "sync/engine", "sync/javascript", "sql_generation", + "whopper", ] exclude = ["perf/latency/limbo"] diff --git a/whopper/Cargo.toml b/whopper/Cargo.toml new file mode 100644 index 000000000..f1dc683c3 --- /dev/null +++ b/whopper/Cargo.toml @@ -0,0 +1,27 @@ +# Copyright 2025 the Turso authors. All rights reserved. MIT license. + +[package] +name = "turso_whopper" +version.workspace = true +authors.workspace = true +edition = "2024" +license.workspace = true +repository.workspace = true +description = "The Turso deterministic simulator" +publish = false + +[[bin]] +name = "turso_whopper" +path = "main.rs" + +[dependencies] +anyhow.workspace = true +clap = { version = "4.5", features = ["derive"] } +libc = "0.2" +rand = { workspace = true } +rand_chacha = "0.9.0" +sql_generation = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } +turso_core = { path = "../core", features = ["simulator"]} +turso_parser = { workspace = true } diff --git a/whopper/bin/explore b/whopper/bin/explore new file mode 100755 index 000000000..72d3b66de --- /dev/null +++ b/whopper/bin/explore @@ -0,0 +1,10 @@ +#!/bin/bash + +set -e + +cargo build -p turso_whopper + +echo "Running Whopper in an infinite loop in 'chaos' mode..." +while true; do + time RUST_BACKTRACE=full ./target/debug/turso_whopper --mode chaos +done diff --git a/whopper/bin/run b/whopper/bin/run new file mode 100755 index 000000000..152ad7e36 --- /dev/null +++ b/whopper/bin/run @@ -0,0 +1,7 @@ +#!/bin/bash + +set -e + +cargo build -p turso_whopper + +time RUST_BACKTRACE=full ./target/debug/turso_whopper $* diff --git a/whopper/io.rs b/whopper/io.rs new file mode 100644 index 000000000..596588af7 --- /dev/null +++ b/whopper/io.rs @@ -0,0 +1,232 @@ +use libc::{ + MAP_SHARED, O_CREAT, O_RDWR, PROT_READ, PROT_WRITE, close, ftruncate, mmap, munmap, open, +}; +use rand::RngCore; +use rand_chacha::ChaCha8Rng; +use std::collections::HashSet; +use std::ptr; +use std::sync::{Arc, Mutex}; +use turso_core::{Clock, Completion, File, IO, Instant, OpenFlags, Result}; + +pub struct SimulatorIO { + files: Mutex>, + keep_files: bool, + rng: Mutex, +} + +impl SimulatorIO { + pub fn new(keep_files: bool, rng: ChaCha8Rng) -> Self { + Self { + files: Mutex::new(HashSet::new()), + keep_files, + rng: Mutex::new(rng), + } + } +} + +impl Drop for SimulatorIO { + fn drop(&mut self) { + let files = self.files.lock().unwrap(); + if !self.keep_files { + for path in files.iter() { + unsafe { + let c_path = std::ffi::CString::new(path.clone()).unwrap(); + libc::unlink(c_path.as_ptr()); + } + } + } else { + for path in files.iter() { + println!("Keeping file: {}", path); + } + } + } +} + +impl Clock for SimulatorIO { + fn now(&self) -> Instant { + Instant { secs: 0, micros: 0 } // Simple implementation for now + } +} + +impl IO for SimulatorIO { + fn open_file(&self, path: &str, _flags: OpenFlags, _create_new: bool) -> Result> { + let file = Arc::new(SimulatorFile::new(path)); + self.files.lock().unwrap().insert(path.to_string()); + Ok(file) + } + + fn remove_file(&self, path: &str) -> Result<()> { + let mut files = self.files.lock().unwrap(); + if files.remove(path) { + // File descriptor and mmap will be cleaned up when the Arc is dropped + if !self.keep_files { + unsafe { + let c_path = std::ffi::CString::new(path).unwrap(); + libc::unlink(c_path.as_ptr()); + } + } + } + Ok(()) + } + + fn step(&self) -> Result<()> { + // No-op for now + Ok(()) + } + + fn wait_for_completion(&self, _completion: Completion) -> Result<()> { + // No-op - completions are already completed immediately in the file operations + Ok(()) + } + + fn generate_random_number(&self) -> i64 { + let mut rng = self.rng.lock().unwrap(); + rng.next_u64() as i64 + } +} + +const FILE_SIZE: usize = 1 << 30; // 1 GiB + +struct SimulatorFile { + data: *mut u8, + size: usize, + fd: i32, +} + +impl SimulatorFile { + fn new(file_path: &str) -> Self { + unsafe { + let c_path = std::ffi::CString::new(file_path).unwrap(); + + let fd = open(c_path.as_ptr(), O_CREAT | O_RDWR, 0o644); + if fd == -1 { + let errno = std::io::Error::last_os_error(); + panic!("Failed to create file {}: {}", file_path, errno); + } + + if ftruncate(fd, FILE_SIZE as i64) == -1 { + let errno = std::io::Error::last_os_error(); + panic!("Failed to truncate file {}: {}", file_path, errno); + } + + let data = mmap( + ptr::null_mut(), + FILE_SIZE, + PROT_READ | PROT_WRITE, + MAP_SHARED, + fd, + 0, + ) as *mut u8; + + if data == libc::MAP_FAILED as *mut u8 { + let errno = std::io::Error::last_os_error(); + panic!("mmap failed for file {}: {}", file_path, errno); + } + + Self { data, size: 0, fd } + } + } +} + +impl Drop for SimulatorFile { + fn drop(&mut self) { + unsafe { + munmap(self.data as *mut libc::c_void, FILE_SIZE); + close(self.fd); + } + } +} + +unsafe impl Send for SimulatorFile {} +unsafe impl Sync for SimulatorFile {} + +impl File for SimulatorFile { + fn pread(&self, pos: u64, c: Completion) -> Result { + let pos = pos as usize; + let read_completion = c.as_read(); + let buffer = read_completion.buf_arc(); + let len = buffer.len(); + + unsafe { + if pos + len <= FILE_SIZE { + ptr::copy_nonoverlapping(self.data.add(pos), buffer.as_mut_ptr(), len); + c.complete(len as i32); + } else { + c.complete(0); + } + } + Ok(c) + } + + fn pwrite( + &self, + pos: u64, + buffer: Arc, + c: Completion, + ) -> Result { + let pos = pos as usize; + let len = buffer.len(); + + unsafe { + if pos + len <= FILE_SIZE { + ptr::copy_nonoverlapping(buffer.as_ptr(), self.data.add(pos), len); + c.complete(len as i32); + } else { + c.complete(0); + } + } + Ok(c) + } + + fn pwritev( + &self, + pos: u64, + buffers: Vec>, + c: Completion, + ) -> Result { + let mut offset = pos as usize; + let mut total_written = 0; + + unsafe { + for buffer in buffers { + let len = buffer.len(); + if offset + len <= FILE_SIZE { + ptr::copy_nonoverlapping(buffer.as_ptr(), self.data.add(offset), len); + offset += len; + total_written += len; + } else { + break; + } + } + } + + c.complete(total_written as i32); + Ok(c) + } + + fn sync(&self, c: Completion) -> Result { + // No-op for memory files + c.complete(0); + Ok(c) + } + + fn truncate(&self, _len: u64, c: Completion) -> Result { + // Simple truncate implementation + c.complete(0); + Ok(c) + } + + fn lock_file(&self, _exclusive: bool) -> Result<()> { + // No-op for memory files + Ok(()) + } + + fn unlock_file(&self) -> Result<()> { + // No-op for memory files + Ok(()) + } + + fn size(&self) -> Result { + Ok(self.size as u64) + } +} diff --git a/whopper/main.rs b/whopper/main.rs new file mode 100644 index 000000000..d30d5ab69 --- /dev/null +++ b/whopper/main.rs @@ -0,0 +1,498 @@ +/// Whopper is a deterministic simulator for testing the Turso database. +use clap::Parser; +use rand::{Rng, RngCore, SeedableRng}; +use rand_chacha::ChaCha8Rng; +use sql_generation::{ + generation::{Arbitrary, GenerationContext, Opts}, + model::query::{ + create::Create, create_index::CreateIndex, delete::Delete, drop_index::DropIndex, + insert::Insert, select::Select, update::Update, + }, + model::table::{Column, ColumnType, Table}, +}; +use std::cell::RefCell; +use std::sync::Arc; +use tracing::trace; +use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; +use turso_core::{Connection, Database, IO, Statement}; +use turso_parser::ast::SortOrder; + +mod io; +use io::SimulatorIO; + +#[derive(Parser)] +#[command(name = "turso_whopper")] +#[command(about = "The Turso Whopper Simulator")] +struct Args { + /// Simulation mode + #[arg(long, default_value = "fast")] + mode: String, + /// Keep mmap I/O files on disk after run + #[arg(long)] + keep: bool, +} + +struct SimulatorConfig { + max_connections: usize, + max_steps: usize, +} + +#[derive(Debug)] +enum FiberState { + Idle, + InTx, +} + +struct SimulatorFiber { + connection: Arc, + state: FiberState, + statement: RefCell>, +} + +struct SimulatorContext { + fibers: Vec, + tables: Vec, + indexes: Vec, + opts: Opts, + stats: Stats, +} + +#[derive(Default)] +struct Stats { + inserts: usize, + updates: usize, + deletes: usize, + integrity_checks: usize, +} + +fn main() -> anyhow::Result<()> { + init_logger(); + + let args = Args::parse(); + + let seed = std::env::var("SEED") + .ok() + .map(|s| s.parse::().unwrap()) + .unwrap_or_else(|| { + let mut rng = rand::rng(); + rng.next_u64() + }); + + println!("mode = {}", args.mode); + println!("seed = {}", seed); + + let mut rng = ChaCha8Rng::seed_from_u64(seed); + + // Create a separate RNG for IO operations with a derived seed + let io_rng = ChaCha8Rng::seed_from_u64(seed.wrapping_add(1)); + + let config = gen_config(&mut rng, &args.mode)?; + + let io = Arc::new(SimulatorIO::new(args.keep, io_rng)) as Arc; + + let db_path = format!("whopper-{}-{}.db", seed, std::process::id()); + + let db = match Database::open_file(io.clone(), &db_path, false, true) { + Ok(db) => db, + Err(e) => { + return Err(anyhow::anyhow!("Database open failed: {}", e)); + } + }; + let boostrap_conn = match db.connect() { + Ok(conn) => conn, + Err(e) => { + return Err(anyhow::anyhow!("Connection failed: {}", e)); + } + }; + + let schema = create_initial_schema(&mut rng); + let tables = schema.iter().map(|t| t.table.clone()).collect::>(); + for create_table in &schema { + let sql = create_table.to_string(); + trace!("{}", sql); + boostrap_conn.execute(&sql)?; + } + + let indexes = create_initial_indexes(&mut rng, &tables); + for create_index in &indexes { + let sql = create_index.to_string(); + trace!("{}", sql); + boostrap_conn.execute(&sql)?; + } + + boostrap_conn.close()?; + + let mut fibers = Vec::new(); + for i in 0..config.max_connections { + let conn = match db.connect() { + Ok(conn) => conn, + Err(e) => { + return Err(anyhow::anyhow!( + "Failed to create fiber connection {}: {}", + i, + e + )); + } + }; + fibers.push(SimulatorFiber { + connection: conn, + state: FiberState::Idle, + statement: RefCell::new(None), + }); + } + + let mut context = SimulatorContext { + fibers, + tables, + indexes: indexes.iter().map(|idx| idx.index_name.clone()).collect(), + opts: Opts::default(), + stats: Stats::default(), + }; + + let progress_interval = config.max_steps / 10; + let progress_stages = [ + " . I/U/D/C", + " . ", + " . ", + " | ", + " | ", + " ╱|╲ ", + " ╱╲|╱╲ ", + " ╱╲╱|╲╱╲ ", + " ╱╲╱╲|╱╲╱╲ ", + " ╱╲╱╲╱|╲╱╲╱╲ ", + " ╱╲╱╲╱╲|╱╲╱╲╱╲ ", + ]; + let mut progress_index = 0; + println!("{}", progress_stages[progress_index]); + progress_index += 1; + + for i in 0..config.max_steps { + let fiber_idx = i % context.fibers.len(); + perform_work(fiber_idx, &mut rng, &mut context)?; + io.step()?; + if progress_interval > 0 && (i + 1) % progress_interval == 0 { + println!( + "{}{}/{}/{}/{}", + progress_stages[progress_index], + context.stats.inserts, + context.stats.updates, + context.stats.deletes, + context.stats.integrity_checks + ); + progress_index += 1; + } + } + Ok(()) +} + +fn gen_config(rng: &mut ChaCha8Rng, mode: &str) -> anyhow::Result { + match mode { + "fast" => Ok(SimulatorConfig { + max_connections: rng.random_range(1..=8) as usize, + max_steps: 100000, + }), + "chaos" => Ok(SimulatorConfig { + max_connections: rng.random_range(1..=8) as usize, + max_steps: 10000000, + }), + _ => Err(anyhow::anyhow!("Unknown mode: {}", mode)), + } +} + +fn create_initial_indexes(rng: &mut ChaCha8Rng, tables: &[Table]) -> Vec { + let mut indexes = Vec::new(); + + // Create 0-3 indexes per table + for table in tables { + let num_indexes = rng.random_range(0..=3); + for i in 0..num_indexes { + if !table.columns.is_empty() { + // Pick 1-3 columns for the index + let num_columns = rng.random_range(1..=std::cmp::min(3, table.columns.len())); + let mut selected_columns = Vec::new(); + let mut available_columns = table.columns.clone(); + + for _ in 0..num_columns { + if available_columns.is_empty() { + break; + } + let col_idx = rng.random_range(0..available_columns.len()); + let column = available_columns.remove(col_idx); + let sort_order = if rng.random_bool(0.5) { + SortOrder::Asc + } else { + SortOrder::Desc + }; + selected_columns.push((column.name, sort_order)); + } + + if !selected_columns.is_empty() { + let index_name = format!("idx_{}_{}", table.name, i); + let create_index = CreateIndex { + index_name, + table_name: table.name.clone(), + columns: selected_columns, + }; + indexes.push(create_index); + } + } + } + } + + indexes +} + +fn create_initial_schema(rng: &mut ChaCha8Rng) -> Vec { + let mut schema = Vec::new(); + + // Generate random number of tables (1-5) + let num_tables = rng.random_range(1..=5); + + for i in 0..num_tables { + let table_name = format!("table_{}", i); + + // Generate random number of columns (2-8) + let num_columns = rng.random_range(2..=8); + let mut columns = Vec::new(); + + // Always add an id column as primary key + columns.push(Column { + name: "id".to_string(), + column_type: ColumnType::Integer, + primary: true, + unique: false, + }); + + // Add random columns + for j in 1..num_columns { + let col_type = match rng.random_range(0..3) { + 0 => ColumnType::Integer, + 1 => ColumnType::Text, + _ => ColumnType::Float, + }; + + columns.push(Column { + name: format!("col_{}", j), + column_type: col_type, + primary: false, + unique: rng.random_bool(0.2), // 20% chance of unique + }); + } + + let table = Table { + name: table_name, + columns, + rows: vec![], + indexes: vec![], + }; + + schema.push(Create { table }); + } + + schema +} + +fn perform_work( + fiber_idx: usize, + rng: &mut ChaCha8Rng, + context: &mut SimulatorContext, +) -> anyhow::Result<()> { + // If we have a statement, step it. + let done = { + let mut stmt_borrow = context.fibers[fiber_idx].statement.borrow_mut(); + if let Some(stmt) = stmt_borrow.as_mut() { + match stmt.step() { + Ok(result) => matches!(result, turso_core::StepResult::Done), + Err(e) => { + match e { + turso_core::LimboError::SchemaUpdated => { + trace!("{} Schema changed, rolling back transaction", fiber_idx); + drop(stmt_borrow); + context.fibers[fiber_idx].statement.replace(None); + // Rollback the transaction if we're in one + if matches!(context.fibers[fiber_idx].state, FiberState::InTx) { + if let Ok(rollback_stmt) = + context.fibers[fiber_idx].connection.prepare("ROLLBACK") + { + context.fibers[fiber_idx] + .statement + .replace(Some(rollback_stmt)); + context.fibers[fiber_idx].state = FiberState::Idle; + } + } + return Ok(()); + } + turso_core::LimboError::Busy => { + trace!("{} Database busy, rolling back transaction", fiber_idx); + drop(stmt_borrow); + context.fibers[fiber_idx].statement.replace(None); + // Rollback the transaction if we're in one + if matches!(context.fibers[fiber_idx].state, FiberState::InTx) { + if let Ok(rollback_stmt) = + context.fibers[fiber_idx].connection.prepare("ROLLBACK") + { + context.fibers[fiber_idx] + .statement + .replace(Some(rollback_stmt)); + context.fibers[fiber_idx].state = FiberState::Idle; + } + } + return Ok(()); + } + _ => { + return Err(e.into()); + } + } + } + } + } else { + true + } + }; + // If the statement has more work, we're done for this simulation step + if !done { + return Ok(()); + } + context.fibers[fiber_idx].statement.replace(None); + match context.fibers[fiber_idx].state { + FiberState::Idle => { + let action = rng.random_range(0..100); + if action <= 29 { + // Start transaction + // FIXME: use deferred when it's fixed! + if let Ok(stmt) = context.fibers[fiber_idx].connection.prepare("BEGIN") { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.fibers[fiber_idx].state = FiberState::InTx; + trace!("{} BEGIN", fiber_idx); + } + } else if action == 30 { + // Integrity check + if let Ok(stmt) = context.fibers[fiber_idx] + .connection + .prepare("PRAGMA integrity_check") + { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.stats.integrity_checks += 1; + trace!("{} PRAGMA integrity_check", fiber_idx); + } + } + } + FiberState::InTx => { + let action = rng.random_range(0..100); + match action { + 0..=9 => { + // SELECT (10%) + let select = Select::arbitrary(rng, context); + if let Ok(stmt) = context.fibers[fiber_idx] + .connection + .prepare(&select.to_string()) + { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + } + trace!("{} SELECT: {}", fiber_idx, select.to_string()); + } + 10..=39 => { + // INSERT (30%) + let insert = Insert::arbitrary(rng, context); + if let Ok(stmt) = context.fibers[fiber_idx] + .connection + .prepare(&insert.to_string()) + { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.stats.inserts += 1; + } + trace!("{} INSERT: {}", fiber_idx, insert.to_string()); + } + 40..=59 => { + // UPDATE (20%) + let update = Update::arbitrary(rng, context); + if let Ok(stmt) = context.fibers[fiber_idx] + .connection + .prepare(&update.to_string()) + { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.stats.updates += 1; + } + trace!("{} UPDATE: {}", fiber_idx, update.to_string()); + } + 60..=69 => { + // DELETE (10%) + let delete = Delete::arbitrary(rng, context); + if let Ok(stmt) = context.fibers[fiber_idx] + .connection + .prepare(&delete.to_string()) + { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.stats.deletes += 1; + } + trace!("{} DELETE: {}", fiber_idx, delete.to_string()); + } + 70..=71 => { + // CREATE INDEX (2%) + let create_index = CreateIndex::arbitrary(rng, context); + let sql = create_index.to_string(); + if let Ok(stmt) = context.fibers[fiber_idx].connection.prepare(&sql) { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.indexes.push(create_index.index_name.clone()); + } + trace!("{} CREATE INDEX: {}", fiber_idx, sql); + } + 72..=73 => { + // DROP INDEX (2%) + if !context.indexes.is_empty() { + let index_idx = rng.random_range(0..context.indexes.len()); + let index_name = context.indexes.remove(index_idx); + let drop_index = DropIndex { index_name }; + let sql = drop_index.to_string(); + if let Ok(stmt) = context.fibers[fiber_idx].connection.prepare(&sql) { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + } + trace!("{} DROP INDEX: {}", fiber_idx, sql); + } + } + 74..=86 => { + // COMMIT (13%) + if let Ok(stmt) = context.fibers[fiber_idx].connection.prepare("COMMIT") { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.fibers[fiber_idx].state = FiberState::Idle; + } + trace!("{} COMMIT", fiber_idx); + } + 87..=99 => { + // ROLLBACK (13%) + if let Ok(stmt) = context.fibers[fiber_idx].connection.prepare("ROLLBACK") { + context.fibers[fiber_idx].statement.replace(Some(stmt)); + context.fibers[fiber_idx].state = FiberState::Idle; + } + trace!("{} ROLLBACK", fiber_idx); + } + _ => {} + } + } + } + Ok(()) +} + +impl GenerationContext for SimulatorContext { + fn tables(&self) -> &Vec
{ + &self.tables + } + + fn opts(&self) -> &Opts { + &self.opts + } +} + +fn init_logger() { + let _ = tracing_subscriber::registry() + .with( + tracing_subscriber::fmt::layer() + .with_ansi(false) + .with_line_number(true) + .without_time() + .with_thread_ids(false), + ) + .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))) + .try_init(); +} From ae920c435d00e7c52aca929e184d708e3b62087c Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 10 Sep 2025 15:51:47 +0300 Subject: [PATCH 5/7] =?UTF-8?q?whopper:=20Ragnar=C3=B6k=20mode=20with=20co?= =?UTF-8?q?smic=20rays?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- whopper/io.rs | 126 +++++++++++++++++++++++++++++++++++++++--------- whopper/main.rs | 22 +++++++-- 2 files changed, 123 insertions(+), 25 deletions(-) diff --git a/whopper/io.rs b/whopper/io.rs index 596588af7..8d27cc094 100644 --- a/whopper/io.rs +++ b/whopper/io.rs @@ -1,25 +1,43 @@ use libc::{ MAP_SHARED, O_CREAT, O_RDWR, PROT_READ, PROT_WRITE, close, ftruncate, mmap, munmap, open, }; -use rand::RngCore; +use rand::{Rng, RngCore}; use rand_chacha::ChaCha8Rng; use std::collections::HashSet; use std::ptr; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, Weak}; +use tracing::debug; use turso_core::{Clock, Completion, File, IO, Instant, OpenFlags, Result}; +#[derive(Debug, Clone)] +pub struct IOFaultConfig { + /// Probability of a cosmic ray bit flip on write (0.0-1.0) + pub cosmic_ray_probability: f64, +} + +impl Default for IOFaultConfig { + fn default() -> Self { + Self { + cosmic_ray_probability: 0.0, + } + } +} + pub struct SimulatorIO { - files: Mutex>, + files: Mutex)>>, keep_files: bool, rng: Mutex, + fault_config: IOFaultConfig, } impl SimulatorIO { - pub fn new(keep_files: bool, rng: ChaCha8Rng) -> Self { + pub fn new(keep_files: bool, rng: ChaCha8Rng, fault_config: IOFaultConfig) -> Self { + debug!("SimulatorIO fault config: {:?}", fault_config); Self { - files: Mutex::new(HashSet::new()), + files: Mutex::new(Vec::new()), keep_files, rng: Mutex::new(rng), + fault_config, } } } @@ -27,15 +45,16 @@ impl SimulatorIO { impl Drop for SimulatorIO { fn drop(&mut self) { let files = self.files.lock().unwrap(); + let paths: HashSet = files.iter().map(|(path, _)| path.clone()).collect(); if !self.keep_files { - for path in files.iter() { + for path in paths.iter() { unsafe { let c_path = std::ffi::CString::new(path.clone()).unwrap(); libc::unlink(c_path.as_ptr()); } } } else { - for path in files.iter() { + for path in paths.iter() { println!("Keeping file: {}", path); } } @@ -51,26 +70,70 @@ impl Clock for SimulatorIO { impl IO for SimulatorIO { fn open_file(&self, path: &str, _flags: OpenFlags, _create_new: bool) -> Result> { let file = Arc::new(SimulatorFile::new(path)); - self.files.lock().unwrap().insert(path.to_string()); - Ok(file) + + // Store weak reference to avoid keeping files open forever + let mut files = self.files.lock().unwrap(); + files.push((path.to_string(), Arc::downgrade(&file))); + + Ok(file as Arc) } fn remove_file(&self, path: &str) -> Result<()> { let mut files = self.files.lock().unwrap(); - if files.remove(path) { - // File descriptor and mmap will be cleaned up when the Arc is dropped - if !self.keep_files { - unsafe { - let c_path = std::ffi::CString::new(path).unwrap(); - libc::unlink(c_path.as_ptr()); - } + files.retain(|(p, _)| p != path); + + // File descriptor and mmap will be cleaned up when the Arc is dropped + if !self.keep_files { + unsafe { + let c_path = std::ffi::CString::new(path).unwrap(); + libc::unlink(c_path.as_ptr()); } } Ok(()) } fn step(&self) -> Result<()> { - // No-op for now + // Inject cosmic ray faults with configured probability + if self.fault_config.cosmic_ray_probability > 0.0 { + let mut rng = self.rng.lock().unwrap(); + if rng.random::() < self.fault_config.cosmic_ray_probability { + // Clean up dead weak references and collect live files + let mut files = self.files.lock().unwrap(); + files.retain(|(_, weak)| weak.strong_count() > 0); + + // Collect files that are still alive + let open_files: Vec<_> = files + .iter() + .filter_map(|(path, weak)| weak.upgrade().map(|file| (path.clone(), file))) + .collect(); + + if !open_files.is_empty() { + let file_idx = rng.random_range(0..open_files.len()); + let (path, file) = &open_files[file_idx]; + + // Get the actual file size (not the mmap size) + let file_size = *file.size.lock().unwrap(); + if file_size > 0 { + // Pick a random offset within the actual file size + let byte_offset = rng.random_range(0..file_size); + let bit_idx = rng.random_range(0..8); + + unsafe { + let old_byte = *file.data.add(byte_offset); + *file.data.add(byte_offset) ^= 1 << bit_idx; + println!( + "Cosmic ray! File: {} - Flipped bit {} at offset {} (0x{:02x} -> 0x{:02x})", + path, + bit_idx, + byte_offset, + old_byte, + *file.data.add(byte_offset) + ); + } + } + } + } + } Ok(()) } @@ -89,7 +152,7 @@ const FILE_SIZE: usize = 1 << 30; // 1 GiB struct SimulatorFile { data: *mut u8, - size: usize, + size: Mutex, fd: i32, } @@ -123,7 +186,11 @@ impl SimulatorFile { panic!("mmap failed for file {}: {}", file_path, errno); } - Self { data, size: 0, fd } + Self { + data, + size: Mutex::new(0), + fd, + } } } } @@ -170,6 +237,11 @@ impl File for SimulatorFile { unsafe { if pos + len <= FILE_SIZE { ptr::copy_nonoverlapping(buffer.as_ptr(), self.data.add(pos), len); + // Update the file size if we wrote beyond the current size + let mut size = self.size.lock().unwrap(); + if pos + len > *size { + *size = pos + len; + } c.complete(len as i32); } else { c.complete(0); @@ -200,6 +272,15 @@ impl File for SimulatorFile { } } + // Update the file size if we wrote beyond the current size + if total_written > 0 { + let mut size = self.size.lock().unwrap(); + let end_pos = (pos as usize) + total_written; + if end_pos > *size { + *size = end_pos; + } + } + c.complete(total_written as i32); Ok(c) } @@ -210,8 +291,9 @@ impl File for SimulatorFile { Ok(c) } - fn truncate(&self, _len: u64, c: Completion) -> Result { - // Simple truncate implementation + fn truncate(&self, len: u64, c: Completion) -> Result { + let mut size = self.size.lock().unwrap(); + *size = len as usize; c.complete(0); Ok(c) } @@ -227,6 +309,6 @@ impl File for SimulatorFile { } fn size(&self) -> Result { - Ok(self.size as u64) + Ok(*self.size.lock().unwrap() as u64) } } diff --git a/whopper/main.rs b/whopper/main.rs index d30d5ab69..64b6b47d0 100644 --- a/whopper/main.rs +++ b/whopper/main.rs @@ -18,13 +18,13 @@ use turso_core::{Connection, Database, IO, Statement}; use turso_parser::ast::SortOrder; mod io; -use io::SimulatorIO; +use io::{IOFaultConfig, SimulatorIO}; #[derive(Parser)] #[command(name = "turso_whopper")] #[command(about = "The Turso Whopper Simulator")] struct Args { - /// Simulation mode + /// Simulation mode (fast, chaos, ragnarök/ragnarok) #[arg(long, default_value = "fast")] mode: String, /// Keep mmap I/O files on disk after run @@ -35,6 +35,7 @@ struct Args { struct SimulatorConfig { max_connections: usize, max_steps: usize, + cosmic_ray_probability: f64, } #[derive(Debug)] @@ -88,7 +89,15 @@ fn main() -> anyhow::Result<()> { let config = gen_config(&mut rng, &args.mode)?; - let io = Arc::new(SimulatorIO::new(args.keep, io_rng)) as Arc; + let fault_config = IOFaultConfig { + cosmic_ray_probability: config.cosmic_ray_probability, + }; + + if config.cosmic_ray_probability > 0.0 { + println!("cosmic ray probability = {}", config.cosmic_ray_probability); + } + + let io = Arc::new(SimulatorIO::new(args.keep, io_rng, fault_config)) as Arc; let db_path = format!("whopper-{}-{}.db", seed, std::process::id()); @@ -191,10 +200,17 @@ fn gen_config(rng: &mut ChaCha8Rng, mode: &str) -> anyhow::Result Ok(SimulatorConfig { max_connections: rng.random_range(1..=8) as usize, max_steps: 100000, + cosmic_ray_probability: 0.0, }), "chaos" => Ok(SimulatorConfig { max_connections: rng.random_range(1..=8) as usize, max_steps: 10000000, + cosmic_ray_probability: 0.0, + }), + "ragnarök" | "ragnarok" => Ok(SimulatorConfig { + max_connections: rng.random_range(1..=8) as usize, + max_steps: 1000000, + cosmic_ray_probability: 0.0001, }), _ => Err(anyhow::anyhow!("Unknown mode: {}", mode)), } From c5ca259abcc11565921a1fa59b80b3be04067bf8 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 10 Sep 2025 16:13:53 +0300 Subject: [PATCH 6/7] whopper: Run cargo clippy --fix --- whopper/io.rs | 8 ++++---- whopper/main.rs | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/whopper/io.rs b/whopper/io.rs index 8d27cc094..6df9d2447 100644 --- a/whopper/io.rs +++ b/whopper/io.rs @@ -55,7 +55,7 @@ impl Drop for SimulatorIO { } } else { for path in paths.iter() { - println!("Keeping file: {}", path); + println!("Keeping file: {path}"); } } } @@ -164,12 +164,12 @@ impl SimulatorFile { let fd = open(c_path.as_ptr(), O_CREAT | O_RDWR, 0o644); if fd == -1 { let errno = std::io::Error::last_os_error(); - panic!("Failed to create file {}: {}", file_path, errno); + panic!("Failed to create file {file_path}: {errno}"); } if ftruncate(fd, FILE_SIZE as i64) == -1 { let errno = std::io::Error::last_os_error(); - panic!("Failed to truncate file {}: {}", file_path, errno); + panic!("Failed to truncate file {file_path}: {errno}"); } let data = mmap( @@ -183,7 +183,7 @@ impl SimulatorFile { if data == libc::MAP_FAILED as *mut u8 { let errno = std::io::Error::last_os_error(); - panic!("mmap failed for file {}: {}", file_path, errno); + panic!("mmap failed for file {file_path}: {errno}"); } Self { diff --git a/whopper/main.rs b/whopper/main.rs index 64b6b47d0..e1a7f9cd2 100644 --- a/whopper/main.rs +++ b/whopper/main.rs @@ -80,7 +80,7 @@ fn main() -> anyhow::Result<()> { }); println!("mode = {}", args.mode); - println!("seed = {}", seed); + println!("seed = {seed}"); let mut rng = ChaCha8Rng::seed_from_u64(seed); @@ -266,7 +266,7 @@ fn create_initial_schema(rng: &mut ChaCha8Rng) -> Vec { let num_tables = rng.random_range(1..=5); for i in 0..num_tables { - let table_name = format!("table_{}", i); + let table_name = format!("table_{i}"); // Generate random number of columns (2-8) let num_columns = rng.random_range(2..=8); @@ -289,7 +289,7 @@ fn create_initial_schema(rng: &mut ChaCha8Rng) -> Vec { }; columns.push(Column { - name: format!("col_{}", j), + name: format!("col_{j}"), column_type: col_type, primary: false, unique: rng.random_bool(0.2), // 20% chance of unique @@ -402,7 +402,7 @@ fn perform_work( let select = Select::arbitrary(rng, context); if let Ok(stmt) = context.fibers[fiber_idx] .connection - .prepare(&select.to_string()) + .prepare(select.to_string()) { context.fibers[fiber_idx].statement.replace(Some(stmt)); } @@ -413,7 +413,7 @@ fn perform_work( let insert = Insert::arbitrary(rng, context); if let Ok(stmt) = context.fibers[fiber_idx] .connection - .prepare(&insert.to_string()) + .prepare(insert.to_string()) { context.fibers[fiber_idx].statement.replace(Some(stmt)); context.stats.inserts += 1; @@ -425,7 +425,7 @@ fn perform_work( let update = Update::arbitrary(rng, context); if let Ok(stmt) = context.fibers[fiber_idx] .connection - .prepare(&update.to_string()) + .prepare(update.to_string()) { context.fibers[fiber_idx].statement.replace(Some(stmt)); context.stats.updates += 1; @@ -437,7 +437,7 @@ fn perform_work( let delete = Delete::arbitrary(rng, context); if let Ok(stmt) = context.fibers[fiber_idx] .connection - .prepare(&delete.to_string()) + .prepare(delete.to_string()) { context.fibers[fiber_idx].statement.replace(Some(stmt)); context.stats.deletes += 1; From 89d1c9a4214486a13a061ef577add0c5893b96bd Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 11 Sep 2025 07:42:45 +0300 Subject: [PATCH 7/7] whopper: Switch to mmap2 crate to fix Windows build --- Cargo.lock | 2 +- whopper/Cargo.toml | 2 +- whopper/io.rs | 134 +++++++++++++++++---------------------------- 3 files changed, 53 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bb493b066..fcffbbbb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4446,7 +4446,7 @@ version = "0.1.5" dependencies = [ "anyhow", "clap", - "libc", + "memmap2", "rand 0.9.2", "rand_chacha 0.9.0", "sql_generation", diff --git a/whopper/Cargo.toml b/whopper/Cargo.toml index f1dc683c3..0cd4abcd7 100644 --- a/whopper/Cargo.toml +++ b/whopper/Cargo.toml @@ -17,7 +17,7 @@ path = "main.rs" [dependencies] anyhow.workspace = true clap = { version = "4.5", features = ["derive"] } -libc = "0.2" +memmap2 = "0.9" rand = { workspace = true } rand_chacha = "0.9.0" sql_generation = { workspace = true } diff --git a/whopper/io.rs b/whopper/io.rs index 6df9d2447..af972c2bc 100644 --- a/whopper/io.rs +++ b/whopper/io.rs @@ -1,10 +1,8 @@ -use libc::{ - MAP_SHARED, O_CREAT, O_RDWR, PROT_READ, PROT_WRITE, close, ftruncate, mmap, munmap, open, -}; +use memmap2::{MmapMut, MmapOptions}; use rand::{Rng, RngCore}; use rand_chacha::ChaCha8Rng; use std::collections::HashSet; -use std::ptr; +use std::fs::{File as StdFile, OpenOptions}; use std::sync::{Arc, Mutex, Weak}; use tracing::debug; use turso_core::{Clock, Completion, File, IO, Instant, OpenFlags, Result}; @@ -48,10 +46,7 @@ impl Drop for SimulatorIO { let paths: HashSet = files.iter().map(|(path, _)| path.clone()).collect(); if !self.keep_files { for path in paths.iter() { - unsafe { - let c_path = std::ffi::CString::new(path.clone()).unwrap(); - libc::unlink(c_path.as_ptr()); - } + let _ = std::fs::remove_file(path); } } else { for path in paths.iter() { @@ -82,12 +77,8 @@ impl IO for SimulatorIO { let mut files = self.files.lock().unwrap(); files.retain(|(p, _)| p != path); - // File descriptor and mmap will be cleaned up when the Arc is dropped if !self.keep_files { - unsafe { - let c_path = std::ffi::CString::new(path).unwrap(); - libc::unlink(c_path.as_ptr()); - } + let _ = std::fs::remove_file(path); } Ok(()) } @@ -118,18 +109,13 @@ impl IO for SimulatorIO { let byte_offset = rng.random_range(0..file_size); let bit_idx = rng.random_range(0..8); - unsafe { - let old_byte = *file.data.add(byte_offset); - *file.data.add(byte_offset) ^= 1 << bit_idx; - println!( - "Cosmic ray! File: {} - Flipped bit {} at offset {} (0x{:02x} -> 0x{:02x})", - path, - bit_idx, - byte_offset, - old_byte, - *file.data.add(byte_offset) - ); - } + let mut mmap = file.mmap.lock().unwrap(); + let old_byte = mmap[byte_offset]; + mmap[byte_offset] ^= 1 << bit_idx; + println!( + "Cosmic ray! File: {} - Flipped bit {} at offset {} (0x{:02x} -> 0x{:02x})", + path, bit_idx, byte_offset, old_byte, mmap[byte_offset] + ); } } } @@ -151,57 +137,41 @@ impl IO for SimulatorIO { const FILE_SIZE: usize = 1 << 30; // 1 GiB struct SimulatorFile { - data: *mut u8, + mmap: Mutex, size: Mutex, - fd: i32, + _file: StdFile, } impl SimulatorFile { fn new(file_path: &str) -> Self { - unsafe { - let c_path = std::ffi::CString::new(file_path).unwrap(); + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(file_path) + .unwrap_or_else(|e| panic!("Failed to create file {file_path}: {e}")); - let fd = open(c_path.as_ptr(), O_CREAT | O_RDWR, 0o644); - if fd == -1 { - let errno = std::io::Error::last_os_error(); - panic!("Failed to create file {file_path}: {errno}"); - } + file.set_len(FILE_SIZE as u64) + .unwrap_or_else(|e| panic!("Failed to truncate file {file_path}: {e}")); - if ftruncate(fd, FILE_SIZE as i64) == -1 { - let errno = std::io::Error::last_os_error(); - panic!("Failed to truncate file {file_path}: {errno}"); - } + let mmap = unsafe { + MmapOptions::new() + .len(FILE_SIZE) + .map_mut(&file) + .unwrap_or_else(|e| panic!("mmap failed for file {file_path}: {e}")) + }; - let data = mmap( - ptr::null_mut(), - FILE_SIZE, - PROT_READ | PROT_WRITE, - MAP_SHARED, - fd, - 0, - ) as *mut u8; - - if data == libc::MAP_FAILED as *mut u8 { - let errno = std::io::Error::last_os_error(); - panic!("mmap failed for file {file_path}: {errno}"); - } - - Self { - data, - size: Mutex::new(0), - fd, - } + Self { + mmap: Mutex::new(mmap), + size: Mutex::new(0), + _file: file, } } } impl Drop for SimulatorFile { - fn drop(&mut self) { - unsafe { - munmap(self.data as *mut libc::c_void, FILE_SIZE); - close(self.fd); - } - } + fn drop(&mut self) {} } unsafe impl Send for SimulatorFile {} @@ -214,13 +184,12 @@ impl File for SimulatorFile { let buffer = read_completion.buf_arc(); let len = buffer.len(); - unsafe { - if pos + len <= FILE_SIZE { - ptr::copy_nonoverlapping(self.data.add(pos), buffer.as_mut_ptr(), len); - c.complete(len as i32); - } else { - c.complete(0); - } + if pos + len <= FILE_SIZE { + let mmap = self.mmap.lock().unwrap(); + buffer.as_mut_slice().copy_from_slice(&mmap[pos..pos + len]); + c.complete(len as i32); + } else { + c.complete(0); } Ok(c) } @@ -234,18 +203,16 @@ impl File for SimulatorFile { let pos = pos as usize; let len = buffer.len(); - unsafe { - if pos + len <= FILE_SIZE { - ptr::copy_nonoverlapping(buffer.as_ptr(), self.data.add(pos), len); - // Update the file size if we wrote beyond the current size - let mut size = self.size.lock().unwrap(); - if pos + len > *size { - *size = pos + len; - } - c.complete(len as i32); - } else { - c.complete(0); + if pos + len <= FILE_SIZE { + let mut mmap = self.mmap.lock().unwrap(); + mmap[pos..pos + len].copy_from_slice(buffer.as_slice()); + let mut size = self.size.lock().unwrap(); + if pos + len > *size { + *size = pos + len; } + c.complete(len as i32); + } else { + c.complete(0); } Ok(c) } @@ -259,11 +226,12 @@ impl File for SimulatorFile { let mut offset = pos as usize; let mut total_written = 0; - unsafe { + { + let mut mmap = self.mmap.lock().unwrap(); for buffer in buffers { let len = buffer.len(); if offset + len <= FILE_SIZE { - ptr::copy_nonoverlapping(buffer.as_ptr(), self.data.add(offset), len); + mmap[offset..offset + len].copy_from_slice(buffer.as_slice()); offset += len; total_written += len; } else {