From d8e9f145e63a07eeaaa8d707218dfc98f0ba0a37 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 20 Aug 2025 10:58:01 -0300 Subject: [PATCH 01/17] create SimIO trait --- simulator/runner/mod.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/simulator/runner/mod.rs b/simulator/runner/mod.rs index 3eef78331..13b5c4258 100644 --- a/simulator/runner/mod.rs +++ b/simulator/runner/mod.rs @@ -11,3 +11,13 @@ pub mod io; pub mod watch; pub const FAULT_ERROR_MSG: &str = "Injected Fault"; + +pub trait SimIO: turso_core::IO { + fn inject_fault(&self, fault: bool); + + fn print_stats(&self); + + fn syncing(&self) -> bool; + + fn close_files(&self); +} From 8c7da3a7048b1fdb0003860f3ce8caf778a85846 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 20 Aug 2025 11:17:27 -0300 Subject: [PATCH 02/17] impl SimIO for SimulatorIO --- Cargo.lock | 26 +++++++++++++------------- simulator/generation/plan.rs | 24 +++++------------------- simulator/runner/env.rs | 3 ++- simulator/runner/execution.rs | 2 +- simulator/runner/io.rs | 20 +++++++++++++++++--- 5 files changed, 38 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cf9df0f9f..023963d8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1800,9 +1800,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.8.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3954d50fe15b02142bf25d3b8bdadb634ec3948f103d04ffe3031bc8fe9d7058" +checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" dependencies = [ "equivalent", "hashbrown 0.15.2", @@ -1822,7 +1822,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88" dependencies = [ "ahash", - "indexmap 2.8.0", + "indexmap 2.10.0", "is-terminal", "itoa", "log", @@ -2265,9 +2265,9 @@ checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856" [[package]] name = "lock_api" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765" dependencies = [ "autocfg", "scopeguard", @@ -2670,9 +2670,9 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" dependencies = [ "lock_api", "parking_lot_core", @@ -2680,9 +2680,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.10" +version = "0.9.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" dependencies = [ "cfg-if", "libc", @@ -2772,7 +2772,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac26e981c03a6e53e0aee43c113e3202f5581d5360dae7bd2c70e800dd0451d" dependencies = [ "base64", - "indexmap 2.8.0", + "indexmap 2.10.0", "quick-xml 0.32.0", "serde", "time", @@ -4062,7 +4062,7 @@ version = "0.8.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05ae329d1f08c4d17a59bed7ff5b5a769d062e64a62d34a3261b219e62cd5aae" dependencies = [ - "indexmap 2.8.0", + "indexmap 2.10.0", "serde", "serde_spanned", "toml_datetime", @@ -4084,7 +4084,7 @@ version = "0.22.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "310068873db2c5b3e7659d2cc35d21855dbafa50d1ce336397c666e3cb08137e" dependencies = [ - "indexmap 2.8.0", + "indexmap 2.10.0", "serde", "serde_spanned", "toml_datetime", @@ -4369,7 +4369,7 @@ dependencies = [ "cc", "env_logger 0.11.7", "fallible-iterator", - "indexmap 2.8.0", + "indexmap 2.10.0", "log", "memchr", "miette", diff --git a/simulator/generation/plan.rs b/simulator/generation/plan.rs index 365b4cd3d..fcfa07ed4 100644 --- a/simulator/generation/plan.rs +++ b/simulator/generation/plan.rs @@ -21,10 +21,7 @@ use crate::{ SimulatorEnv, generation::Shadow, model::Query, - runner::{ - env::{SimConnection, SimulationType, SimulatorTables}, - io::SimulatorIO, - }, + runner::env::{SimConnection, SimulationType, SimulatorTables}, }; use super::property::{Property, remaining}; @@ -452,7 +449,7 @@ impl Shadow for Interaction { } } impl Interaction { - pub(crate) fn execute_query(&self, conn: &mut Arc, _io: &SimulatorIO) -> ResultSet { + pub(crate) fn execute_query(&self, conn: &mut Arc) -> ResultSet { if let Self::Query(query) = self { let query_str = query.to_string(); let rows = conn.query(&query_str); @@ -611,13 +608,7 @@ impl Interaction { out.push(r); } StepResult::IO => { - let syncing = { - let files = env.io.files.borrow(); - // TODO: currently assuming we only have 1 file that is syncing - files - .iter() - .any(|file| file.sync_completion.borrow().is_some()) - }; + let syncing = env.io.syncing(); if syncing { reopen_database(env); } else { @@ -666,12 +657,7 @@ impl Interaction { let mut current_prob = 0.05; let mut incr = 0.001; loop { - let syncing = { - let files = env.io.files.borrow(); - files - .iter() - .any(|file| file.sync_completion.borrow().is_some()) - }; + let syncing = env.io.syncing(); let inject_fault = env.rng.random_bool(current_prob); // TODO: avoid for now injecting faults when syncing if inject_fault && !syncing { @@ -722,7 +708,7 @@ fn reopen_database(env: &mut SimulatorEnv) { // Clear all open files // TODO: for correct reporting of faults we should get all the recorded numbers and transfer to the new file - env.io.files.borrow_mut().clear(); + env.io.close_files(); // 2. Re-open database match env.type_ { diff --git a/simulator/runner/env.rs b/simulator/runner/env.rs index 567f2bad9..2a971b597 100644 --- a/simulator/runner/env.rs +++ b/simulator/runner/env.rs @@ -13,6 +13,7 @@ use turso_core::Database; use crate::profiles::Profile; use crate::runner::io::SimulatorIO; +use crate::runner::SimIO; use super::cli::SimulatorCLI; @@ -63,7 +64,7 @@ pub(crate) struct SimulatorEnv { pub(crate) opts: SimulatorOpts, pub profile: Profile, pub(crate) connections: Vec, - pub(crate) io: Arc, + pub(crate) io: Arc, pub(crate) db: Option>, pub(crate) rng: ChaCha8Rng, pub(crate) paths: Paths, diff --git a/simulator/runner/execution.rs b/simulator/runner/execution.rs index a7d7aa3d6..c7f31ed88 100644 --- a/simulator/runner/execution.rs +++ b/simulator/runner/execution.rs @@ -191,7 +191,7 @@ pub(crate) fn execute_interaction( SimConnection::Disconnected => unreachable!(), }; tracing::debug!(?interaction); - let results = interaction.execute_query(conn, &env.io); + let results = interaction.execute_query(conn); if results.is_err() { tracing::error!(?results); } diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index fcc23be75..d6cdcace7 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -7,7 +7,7 @@ use rand::{RngCore, SeedableRng}; use rand_chacha::ChaCha8Rng; use turso_core::{Clock, IO, Instant, OpenFlags, PlatformIO, Result}; -use crate::runner::{clock::SimulatorClock, file::SimulatorFile}; +use crate::runner::{clock::SimulatorClock, file::SimulatorFile, SimIO}; pub(crate) struct SimulatorIO { pub(crate) inner: Box, @@ -48,15 +48,17 @@ impl SimulatorIO { clock: Arc::new(clock), }) } +} - pub(crate) fn inject_fault(&self, fault: bool) { +impl SimIO for SimulatorIO { + fn inject_fault(&self, fault: bool) { self.fault.replace(fault); for file in self.files.borrow().iter() { file.inject_fault(fault); } } - pub(crate) fn print_stats(&self) { + fn print_stats(&self) { for file in self.files.borrow().iter() { tracing::info!( "\n===========================\n\nPath: {}\n{}", @@ -65,6 +67,18 @@ impl SimulatorIO { ); } } + + fn syncing(&self) -> bool { + let files = self.files.borrow(); + // TODO: currently assuming we only have 1 file that is syncing + files + .iter() + .any(|file| file.sync_completion.borrow().is_some()) + } + + fn close_files(&self) { + self.files.borrow_mut().clear() + } } impl Clock for SimulatorIO { From c01449e71b1e00e07ad44af0becba5c59e2f32fb Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 20 Aug 2025 11:29:23 -0300 Subject: [PATCH 03/17] add parking_lot to simulator --- Cargo.lock | 1 + Cargo.toml | 1 + core/Cargo.toml | 2 +- simulator/Cargo.toml | 1 + 4 files changed, 4 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 023963d8e..f30d5ffb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2196,6 +2196,7 @@ dependencies = [ "json5", "log", "notify", + "parking_lot", "rand 0.9.2", "rand_chacha 0.9.0", "regex", diff --git a/Cargo.toml b/Cargo.toml index da36927d9..79b6d20d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ rand = "0.9.2" tracing = "0.1.41" schemars = "1.0.4" garde = "0.22" +parking_lot = "0.12.4" [profile.release] debug = "line-tables-only" diff --git a/core/Cargo.toml b/core/Cargo.toml index 37c150524..b53fcfbb0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -61,7 +61,7 @@ libm = "0.2" turso_macros = { workspace = true } miette = "7.6.0" strum = { workspace = true } -parking_lot = "0.12.3" +parking_lot = { workspace = true } crossbeam-skiplist = "0.1.3" tracing = "0.1.41" ryu = "1.0.19" diff --git a/simulator/Cargo.toml b/simulator/Cargo.toml index a8f85ca58..b0f372819 100644 --- a/simulator/Cargo.toml +++ b/simulator/Cargo.toml @@ -42,3 +42,4 @@ schemars = { workspace = true } garde = { workspace = true, features = ["derive", "serde"] } json5 = { version = "0.4.1" } strum = { workspace = true } +parking_lot = { workspace = true } From 40de4c0606db8022ac185e2a9a44ff13cc6dffbb Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 20 Aug 2025 11:32:00 -0300 Subject: [PATCH 04/17] initial impl for MemorySim --- Cargo.lock | 1 + simulator/Cargo.toml | 1 + simulator/runner/memory/file.rs | 163 ++++++++++++++++++++ simulator/runner/memory/io.rs | 253 ++++++++++++++++++++++++++++++++ simulator/runner/memory/mod.rs | 2 + simulator/runner/mod.rs | 1 + 6 files changed, 421 insertions(+) create mode 100644 simulator/runner/memory/file.rs create mode 100644 simulator/runner/memory/io.rs create mode 100644 simulator/runner/memory/mod.rs diff --git a/Cargo.lock b/Cargo.lock index f30d5ffb4..3c7ce5329 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2192,6 +2192,7 @@ dependencies = [ "env_logger 0.10.2", "garde", "hex", + "indexmap 2.10.0", "itertools 0.14.0", "json5", "log", diff --git a/simulator/Cargo.toml b/simulator/Cargo.toml index b0f372819..89ee9634b 100644 --- a/simulator/Cargo.toml +++ b/simulator/Cargo.toml @@ -43,3 +43,4 @@ garde = { workspace = true, features = ["derive", "serde"] } json5 = { version = "0.4.1" } strum = { workspace = true } parking_lot = { workspace = true } +indexmap = "2.10.0" diff --git a/simulator/runner/memory/file.rs b/simulator/runner/memory/file.rs new file mode 100644 index 000000000..158f654b1 --- /dev/null +++ b/simulator/runner/memory/file.rs @@ -0,0 +1,163 @@ +use std::{ + cell::{Cell, RefCell}, + sync::Arc, +}; + +use rand::{Rng as _, SeedableRng}; +use rand_chacha::ChaCha8Rng; +use tracing::{instrument, Level}; +use turso_core::{Completion, File, Result}; + +use crate::runner::{ + clock::SimulatorClock, + memory::io::{CallbackQueue, Fd, Operation, OperationType}, +}; + +pub struct MemorySimFile { + pub callbacks: CallbackQueue, + pub fd: Arc, + pub buffer: RefCell>, + // TODO: add fault map later here + pub closed: Cell, + + /// Number of `pread` function calls (both success and failures). + pub nr_pread_calls: Cell, + /// Number of `pwrite` function calls (both success and failures). + pub nr_pwrite_calls: Cell, + /// Number of `sync` function calls (both success and failures). + pub nr_sync_calls: Cell, + + pub rng: RefCell, + + pub latency_probability: usize, + clock: Arc, +} + +type IoOperation = Box Result>>; + +pub struct DelayedIo { + pub time: turso_core::Instant, + pub op: IoOperation, +} + +unsafe impl Send for MemorySimFile {} +unsafe impl Sync for MemorySimFile {} + +impl MemorySimFile { + pub fn new( + callbacks: CallbackQueue, + fd: Fd, + seed: u64, + latency_probability: usize, + clock: Arc, + ) -> Self { + Self { + callbacks, + fd: Arc::new(fd), + buffer: RefCell::new(Vec::new()), + closed: Cell::new(false), + nr_pread_calls: Cell::new(0), + nr_pwrite_calls: Cell::new(0), + nr_sync_calls: Cell::new(0), + rng: RefCell::new(ChaCha8Rng::seed_from_u64(seed)), + latency_probability, + clock, + } + } + + pub fn stats_table(&self) -> String { + let sum_calls = + self.nr_pread_calls.get() + self.nr_pwrite_calls.get() + self.nr_sync_calls.get(); + let stats_table = [ + "op calls ".to_string(), + "--------- --------".to_string(), + format!("pread {:8}", self.nr_pread_calls.get()), + format!("pwrite {:8}", self.nr_pwrite_calls.get()), + format!("sync {:8}", self.nr_sync_calls.get()), + "--------- -------- --------".to_string(), + format!("total {sum_calls:8}"), + ]; + + stats_table.join("\n") + } + + #[instrument(skip_all, level = Level::TRACE)] + fn generate_latency(&self) -> Option { + let mut rng = self.rng.borrow_mut(); + // Chance to introduce some latency + rng.random_bool(self.latency_probability as f64 / 100.0) + .then(|| { + let now = self.clock.now(); + let sum = now + std::time::Duration::from_millis(rng.random_range(5..20)); + sum.into() + }) + } +} + +impl File for MemorySimFile { + fn lock_file(&self, _exclusive: bool) -> Result<()> { + Ok(()) + } + + fn unlock_file(&self) -> Result<()> { + Ok(()) + } + + fn pread(&self, pos: usize, c: Completion) -> Result { + self.nr_pread_calls.set(self.nr_pread_calls.get() + 1); + + let op = OperationType::Read { + fd: self.fd.clone(), + completion: c.clone(), + offset: pos, + }; + self.callbacks.lock().push(Operation { + time: self.generate_latency(), + op, + }); + Ok(c) + } + + fn pwrite( + &self, + pos: usize, + buffer: Arc, + c: Completion, + ) -> Result { + self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1); + let op = OperationType::Write { + fd: self.fd.clone(), + buffer, + completion: c.clone(), + offset: pos, + }; + self.callbacks.lock().push(Operation { + time: self.generate_latency(), + op, + }); + Ok(c) + } + + fn sync(&self, c: Completion) -> Result { + self.nr_sync_calls.set(self.nr_sync_calls.get() + 1); + let op = OperationType::Sync { + fd: self.fd.clone(), + completion: c.clone(), + }; + self.callbacks.lock().push(Operation { + time: self.generate_latency(), + op, + }); + Ok(c) + } + + fn size(&self) -> Result { + // TODO: size operation should also be scheduled. But this requires a change in how we + // Use this function internally in Turso + Ok(self.buffer.borrow().len() as u64) + } + + fn truncate(&self, len: usize, c: Completion) -> Result { + todo!() + } +} diff --git a/simulator/runner/memory/io.rs b/simulator/runner/memory/io.rs new file mode 100644 index 000000000..ad614ba9c --- /dev/null +++ b/simulator/runner/memory/io.rs @@ -0,0 +1,253 @@ +use std::cell::{Cell, RefCell}; +use std::sync::Arc; + +use indexmap::IndexMap; +use parking_lot::Mutex; +use rand::{RngCore, SeedableRng}; +use rand_chacha::ChaCha8Rng; +use turso_core::{Clock, Completion, Instant, OpenFlags, Result, IO}; + +use crate::runner::clock::SimulatorClock; +use crate::runner::SimIO; +use crate::{model::FAULT_ERROR_MSG, runner::memory::file::MemorySimFile}; + +/// File descriptor +pub type Fd = String; + +pub enum OperationType { + Read { + fd: Arc, + completion: Completion, + offset: usize, + }, + Write { + fd: Arc, + buffer: Arc, + completion: Completion, + offset: usize, + }, + Sync { + fd: Arc, + completion: Completion, + }, +} + +impl OperationType { + fn get_fd(&self) -> &Fd { + match self { + OperationType::Read { fd, .. } + | OperationType::Write { fd, .. } + | OperationType::Sync { fd, .. } => fd, + } + } +} + +pub struct Operation { + pub time: Option, + pub op: OperationType, +} + +impl Operation { + fn do_operation(self, files: &IndexMap>) { + match self.op { + OperationType::Read { + fd, + completion, + offset, + } => { + let file = files.get(fd.as_str()).unwrap(); + let file_buf = file.buffer.borrow_mut(); + let buffer = completion.as_read().buf.clone(); + let buf_size = { + let buf = buffer.as_mut_slice(); + // TODO: check for sector faults here + + buf.copy_from_slice(&file_buf[offset..][0..buf.len()]); + buf.len() as i32 + }; + completion.complete(buf_size); + } + OperationType::Write { + fd, + buffer, + completion, + offset, + } => { + let file = files.get(fd.as_str()).unwrap(); + let buf_size = { + let mut file_buf = file.buffer.borrow_mut(); + let buf = buffer.as_slice(); + let more_space = if file_buf.len() < offset { + (offset + buf.len()) - file_buf.len() + } else { + buf.len().saturating_sub(file_buf.len() - offset) + }; + if more_space > 0 { + file_buf.reserve(more_space); + for _ in 0..more_space { + file_buf.push(0); + } + } + + file_buf[offset..][0..buf.len()].copy_from_slice(buf); + buf.len() as i32 + }; + completion.complete(buf_size); + } + OperationType::Sync { completion, .. } => { + // There is no Sync for in memory + completion.complete(0); + } + } + } +} + +pub type CallbackQueue = Arc>>; + +pub struct MemorySimIO { + callbacks: CallbackQueue, + timeouts: CallbackQueue, + pub fault: Cell, + pub files: RefCell>>, + pub rng: RefCell, + pub nr_run_once_faults: Cell, + pub page_size: usize, + seed: u64, + latency_probability: usize, + clock: Arc, +} + +unsafe impl Send for MemorySimIO {} +unsafe impl Sync for MemorySimIO {} + +impl MemorySimIO { + pub fn new( + seed: u64, + page_size: usize, + latency_probability: usize, + min_tick: u64, + max_tick: u64, + ) -> Result { + let fault = Cell::new(false); + let files = RefCell::new(IndexMap::new()); + let rng = RefCell::new(ChaCha8Rng::seed_from_u64(seed)); + let nr_run_once_faults = Cell::new(0); + Ok(Self { + callbacks: Arc::new(Mutex::new(Vec::new())), + timeouts: Arc::new(Mutex::new(Vec::new())), + fault, + files, + rng, + nr_run_once_faults, + page_size, + seed, + latency_probability, + clock: Arc::new(SimulatorClock::new( + ChaCha8Rng::seed_from_u64(seed), + min_tick, + max_tick, + )), + }) + } +} + +impl SimIO for MemorySimIO { + fn inject_fault(&self, fault: bool) { + self.fault.replace(fault); + if fault { + tracing::debug!("fault injected"); + } + } + + fn print_stats(&self) { + tracing::info!("run_once faults: {}", self.nr_run_once_faults.get()); + for file in self.files.borrow().values() { + tracing::info!("\n===========================\n{}", file.stats_table()); + } + } + + fn syncing(&self) -> bool { + let callbacks = self.callbacks.try_lock().unwrap(); + callbacks + .iter() + .any(|operation| matches!(operation.op, OperationType::Sync { .. })) + } + + fn close_files(&self) { + for file in self.files.borrow().values() { + file.closed.set(true); + } + } +} + +impl Clock for MemorySimIO { + fn now(&self) -> Instant { + self.clock.now().into() + } +} + +impl IO for MemorySimIO { + fn open_file( + &self, + path: &str, + _flags: OpenFlags, // TODO: ignoring open flags for now as we don't test read only mode in the simulator yet + _direct: bool, + ) -> Result> { + let mut files = self.files.borrow_mut(); + let fd = path.to_string(); + let file = if let Some(file) = files.get(path) { + file.closed.set(false); + file.clone() + } else { + let file = Arc::new(MemorySimFile::new( + self.callbacks.clone(), + fd.clone(), + self.seed, + self.latency_probability, + self.clock.clone(), + )); + files.insert(fd, file.clone()); + file + }; + + Ok(file) + } + + fn run_once(&self) -> Result<()> { + let mut callbacks = self.callbacks.lock(); + let mut timeouts = self.timeouts.lock(); + tracing::trace!( + callbacks.len = callbacks.len(), + timeouts.len = timeouts.len() + ); + if self.fault.get() { + self.nr_run_once_faults + .replace(self.nr_run_once_faults.get() + 1); + // TODO: currently we only deal with single threaded execution in one file + // When we support multiple db files, we need to only remove callbacks not relevant to the current file + // and maybe connection + callbacks.clear(); + timeouts.clear(); + return Err(turso_core::LimboError::InternalError( + FAULT_ERROR_MSG.into(), + )); + } + let files = self.files.borrow_mut(); + let now = self.now(); + + callbacks.append(&mut timeouts); + + while let Some(callback) = callbacks.pop() { + if callback.time.is_none() || callback.time.is_some_and(|time| time < now) { + callback.do_operation(&files); + } else { + timeouts.push(callback); + } + } + Ok(()) + } + + fn generate_random_number(&self) -> i64 { + self.rng.borrow_mut().next_u64() as i64 + } +} diff --git a/simulator/runner/memory/mod.rs b/simulator/runner/memory/mod.rs new file mode 100644 index 000000000..d8e462011 --- /dev/null +++ b/simulator/runner/memory/mod.rs @@ -0,0 +1,2 @@ +pub mod file; +pub mod io; diff --git a/simulator/runner/mod.rs b/simulator/runner/mod.rs index 13b5c4258..b729045f0 100644 --- a/simulator/runner/mod.rs +++ b/simulator/runner/mod.rs @@ -8,6 +8,7 @@ pub mod execution; #[allow(dead_code)] pub mod file; pub mod io; +pub mod memory; pub mod watch; pub const FAULT_ERROR_MSG: &str = "Injected Fault"; From 24c0d24be6be27c4dd692d90c699c258547222d9 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 20 Aug 2025 11:39:58 -0300 Subject: [PATCH 05/17] impl Truncate for MemorySim --- simulator/runner/memory/file.rs | 13 ++++++++++++- simulator/runner/memory/io.rs | 25 +++++++++++++++++++------ 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/simulator/runner/memory/file.rs b/simulator/runner/memory/file.rs index 158f654b1..ffa0b8fd6 100644 --- a/simulator/runner/memory/file.rs +++ b/simulator/runner/memory/file.rs @@ -14,6 +14,8 @@ use crate::runner::{ }; pub struct MemorySimFile { + // TODO: maybe have a pending queue which is fast to append + // and then we just do a mem swap the pending with the callback to minimize lock contention on callback queue pub callbacks: CallbackQueue, pub fd: Arc, pub buffer: RefCell>, @@ -158,6 +160,15 @@ impl File for MemorySimFile { } fn truncate(&self, len: usize, c: Completion) -> Result { - todo!() + let op = OperationType::Truncate { + fd: self.fd.clone(), + completion: c.clone(), + len, + }; + self.callbacks.lock().push(Operation { + time: self.generate_latency(), + op, + }); + Ok(c) } } diff --git a/simulator/runner/memory/io.rs b/simulator/runner/memory/io.rs index ad614ba9c..e1597100f 100644 --- a/simulator/runner/memory/io.rs +++ b/simulator/runner/memory/io.rs @@ -30,6 +30,11 @@ pub enum OperationType { fd: Arc, completion: Completion, }, + Truncate { + fd: Arc, + completion: Completion, + len: usize, + }, } impl OperationType { @@ -37,7 +42,8 @@ impl OperationType { match self { OperationType::Read { fd, .. } | OperationType::Write { fd, .. } - | OperationType::Sync { fd, .. } => fd, + | OperationType::Sync { fd, .. } + | OperationType::Truncate { fd, .. } => fd, } } } @@ -45,6 +51,7 @@ impl OperationType { pub struct Operation { pub time: Option, pub op: OperationType, + // TODO: add a fault field here to signal if an Operation should fault } impl Operation { @@ -98,6 +105,16 @@ impl Operation { // There is no Sync for in memory completion.complete(0); } + OperationType::Truncate { + fd, + completion, + len, + } => { + let file = files.get(fd.as_str()).unwrap(); + let mut file_buf = file.buffer.borrow_mut(); + file_buf.truncate(len); + completion.complete(0); + } } } } @@ -223,11 +240,6 @@ impl IO for MemorySimIO { if self.fault.get() { self.nr_run_once_faults .replace(self.nr_run_once_faults.get() + 1); - // TODO: currently we only deal with single threaded execution in one file - // When we support multiple db files, we need to only remove callbacks not relevant to the current file - // and maybe connection - callbacks.clear(); - timeouts.clear(); return Err(turso_core::LimboError::InternalError( FAULT_ERROR_MSG.into(), )); @@ -239,6 +251,7 @@ impl IO for MemorySimIO { while let Some(callback) = callbacks.pop() { if callback.time.is_none() || callback.time.is_some_and(|time| time < now) { + // TODO: check if we should inject fault in operation here callback.do_operation(&files); } else { timeouts.push(callback); From 117451fba148fc78d71f3073604fbbe5fae77321 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 20 Aug 2025 11:49:34 -0300 Subject: [PATCH 06/17] impl WriteV for MemorySim --- simulator/runner/memory/file.rs | 64 ++++++++++++++++++++++++--------- simulator/runner/memory/io.rs | 46 ++++++++++++++---------- 2 files changed, 75 insertions(+), 35 deletions(-) diff --git a/simulator/runner/memory/file.rs b/simulator/runner/memory/file.rs index ffa0b8fd6..a5b956cc4 100644 --- a/simulator/runner/memory/file.rs +++ b/simulator/runner/memory/file.rs @@ -94,6 +94,31 @@ impl MemorySimFile { sum.into() }) } + + fn insert_op(&self, op: OperationType) { + self.callbacks.lock().push(Operation { + time: self.generate_latency(), + op, + }); + } + + pub fn write_buf(&self, buf: &[u8], offset: usize) -> usize { + let mut file_buf = self.buffer.borrow_mut(); + let more_space = if file_buf.len() < offset { + (offset + buf.len()) - file_buf.len() + } else { + buf.len().saturating_sub(file_buf.len() - offset) + }; + if more_space > 0 { + file_buf.reserve(more_space); + for _ in 0..more_space { + file_buf.push(0); + } + } + + file_buf[offset..][0..buf.len()].copy_from_slice(buf); + buf.len() + } } impl File for MemorySimFile { @@ -113,10 +138,7 @@ impl File for MemorySimFile { completion: c.clone(), offset: pos, }; - self.callbacks.lock().push(Operation { - time: self.generate_latency(), - op, - }); + self.insert_op(op); Ok(c) } @@ -133,10 +155,26 @@ impl File for MemorySimFile { completion: c.clone(), offset: pos, }; - self.callbacks.lock().push(Operation { - time: self.generate_latency(), - op, - }); + self.insert_op(op); + Ok(c) + } + + fn pwritev( + &self, + pos: usize, + buffers: Vec>, + c: Completion, + ) -> Result { + if buffers.len() == 1 { + return self.pwrite(pos, buffers[0].clone(), c); + } + let op = OperationType::WriteV { + fd: self.fd.clone(), + buffers, + completion: c.clone(), + offset: pos, + }; + self.insert_op(op); Ok(c) } @@ -146,10 +184,7 @@ impl File for MemorySimFile { fd: self.fd.clone(), completion: c.clone(), }; - self.callbacks.lock().push(Operation { - time: self.generate_latency(), - op, - }); + self.insert_op(op); Ok(c) } @@ -165,10 +200,7 @@ impl File for MemorySimFile { completion: c.clone(), len, }; - self.callbacks.lock().push(Operation { - time: self.generate_latency(), - op, - }); + self.insert_op(op); Ok(c) } } diff --git a/simulator/runner/memory/io.rs b/simulator/runner/memory/io.rs index e1597100f..03e1ad0e1 100644 --- a/simulator/runner/memory/io.rs +++ b/simulator/runner/memory/io.rs @@ -26,6 +26,12 @@ pub enum OperationType { completion: Completion, offset: usize, }, + WriteV { + fd: Arc, + buffers: Vec>, + completion: Completion, + offset: usize, + }, Sync { fd: Arc, completion: Completion, @@ -42,6 +48,7 @@ impl OperationType { match self { OperationType::Read { fd, .. } | OperationType::Write { fd, .. } + | OperationType::WriteV { fd, .. } | OperationType::Sync { fd, .. } | OperationType::Truncate { fd, .. } => fd, } @@ -81,25 +88,26 @@ impl Operation { offset, } => { let file = files.get(fd.as_str()).unwrap(); - let buf_size = { - let mut file_buf = file.buffer.borrow_mut(); - let buf = buffer.as_slice(); - let more_space = if file_buf.len() < offset { - (offset + buf.len()) - file_buf.len() - } else { - buf.len().saturating_sub(file_buf.len() - offset) - }; - if more_space > 0 { - file_buf.reserve(more_space); - for _ in 0..more_space { - file_buf.push(0); - } - } - - file_buf[offset..][0..buf.len()].copy_from_slice(buf); - buf.len() as i32 - }; - completion.complete(buf_size); + let buf_size = file.write_buf(buffer.as_slice(), offset); + completion.complete(buf_size as i32); + } + OperationType::WriteV { + fd, + buffers, + completion, + offset, + } => { + if buffers.is_empty() { + return; + } + let file = files.get(fd.as_str()).unwrap(); + let mut pos = offset; + let written = buffers.into_iter().fold(0, |written, buffer| { + let buf_size = file.write_buf(buffer.as_slice(), pos); + pos += buf_size; + written + buf_size + }); + completion.complete(written as i32); } OperationType::Sync { completion, .. } => { // There is no Sync for in memory From fd4e74929a72a1700d82f2bdcdf62e027af236ad Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 20 Aug 2025 12:56:30 -0300 Subject: [PATCH 07/17] Cli option to enable memory IO --- simulator/runner/cli.rs | 6 +++++ simulator/runner/env.rs | 42 +++++++++++++++++++++++++++-------- simulator/runner/memory/io.rs | 6 ++--- 3 files changed, 42 insertions(+), 12 deletions(-) diff --git a/simulator/runner/cli.rs b/simulator/runner/cli.rs index daa00be38..1db597223 100644 --- a/simulator/runner/cli.rs +++ b/simulator/runner/cli.rs @@ -130,6 +130,12 @@ pub struct SimulatorCLI { default_value_t = false )] pub keep_files: bool, + #[clap( + long, + help = "Use memory IO for complex simulations", + default_value_t = false + )] + pub memory_io: bool, #[clap(long, default_value_t = ProfileType::Default)] /// Profile selector for Simulation run pub profile: ProfileType, diff --git a/simulator/runner/env.rs b/simulator/runner/env.rs index 2a971b597..f66cf967d 100644 --- a/simulator/runner/env.rs +++ b/simulator/runner/env.rs @@ -12,8 +12,9 @@ use sql_generation::model::table::Table; use turso_core::Database; use crate::profiles::Profile; -use crate::runner::io::SimulatorIO; use crate::runner::SimIO; +use crate::runner::io::SimulatorIO; +use crate::runner::memory::io::MemorySimIO; use super::cli::SimulatorCLI; @@ -71,6 +72,7 @@ pub(crate) struct SimulatorEnv { pub(crate) type_: SimulationType, pub(crate) phase: SimulationPhase, pub(crate) tables: SimulatorTables, + pub memory_io: bool, } impl UnwindSafe for SimulatorEnv {} @@ -89,6 +91,7 @@ impl SimulatorEnv { paths: self.paths.clone(), type_: self.type_, phase: self.phase, + memory_io: self.memory_io, profile: self.profile.clone(), } } @@ -100,16 +103,26 @@ impl SimulatorEnv { let latency_prof = &self.profile.io.latency; - let io = Arc::new( - SimulatorIO::new( + let io: Arc = if self.memory_io { + Arc::new(MemorySimIO::new( self.opts.seed, self.opts.page_size, latency_prof.latency_probability, latency_prof.min_tick, latency_prof.max_tick, + )) + } else { + Arc::new( + SimulatorIO::new( + self.opts.seed, + self.opts.page_size, + latency_prof.latency_probability, + latency_prof.min_tick, + latency_prof.max_tick, + ) + .unwrap(), ) - .unwrap(), - ); + }; // Remove existing database file let db_path = self.get_db_path(); @@ -283,16 +296,26 @@ impl SimulatorEnv { let latency_prof = &profile.io.latency; - let io = Arc::new( - SimulatorIO::new( + let io: Arc = if cli_opts.memory_io { + Arc::new(MemorySimIO::new( seed, opts.page_size, latency_prof.latency_probability, latency_prof.min_tick, latency_prof.max_tick, + )) + } else { + Arc::new( + SimulatorIO::new( + seed, + opts.page_size, + latency_prof.latency_probability, + latency_prof.min_tick, + latency_prof.max_tick, + ) + .unwrap(), ) - .unwrap(), - ); + }; let db = match Database::open_file( io.clone(), @@ -320,6 +343,7 @@ impl SimulatorEnv { db: Some(db), type_: simulation_type, phase: SimulationPhase::Test, + memory_io: cli_opts.memory_io, profile: profile.clone(), } } diff --git a/simulator/runner/memory/io.rs b/simulator/runner/memory/io.rs index 03e1ad0e1..e6ac7075e 100644 --- a/simulator/runner/memory/io.rs +++ b/simulator/runner/memory/io.rs @@ -152,12 +152,12 @@ impl MemorySimIO { latency_probability: usize, min_tick: u64, max_tick: u64, - ) -> Result { + ) -> Self { let fault = Cell::new(false); let files = RefCell::new(IndexMap::new()); let rng = RefCell::new(ChaCha8Rng::seed_from_u64(seed)); let nr_run_once_faults = Cell::new(0); - Ok(Self { + Self { callbacks: Arc::new(Mutex::new(Vec::new())), timeouts: Arc::new(Mutex::new(Vec::new())), fault, @@ -172,7 +172,7 @@ impl MemorySimIO { min_tick, max_tick, )), - }) + } } } From ff51965c3ea119395fc9ec200dedc5c15fa9bf32 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 20 Aug 2025 14:34:37 -0300 Subject: [PATCH 08/17] fix shrinking to not include some properties that are present in faulty query, but that are not relevant to the error --- simulator/shrink/plan.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/simulator/shrink/plan.rs b/simulator/shrink/plan.rs index 7def800ce..43e2912fb 100644 --- a/simulator/shrink/plan.rs +++ b/simulator/shrink/plan.rs @@ -56,6 +56,12 @@ impl InteractionPlan { // Remove all properties that do not use the failing tables plan.plan.retain_mut(|interactions| { let retain = if idx == failing_execution.interaction_index { + if let Interactions::Property( + Property::FsyncNoWait { tables, .. } | Property::FaultyQuery { tables, .. }, + ) = interactions + { + tables.retain(|table| depending_tables.contains(table)); + } true } else { let mut has_table = interactions @@ -73,9 +79,13 @@ impl InteractionPlan { | Property::DropSelect { queries, .. } => { queries.clear(); } - Property::FsyncNoWait { tables, .. } - | Property::FaultyQuery { tables, .. } => { - tables.retain(|table| depending_tables.contains(table)); + Property::FsyncNoWait { tables, query } + | Property::FaultyQuery { tables, query } => { + if !query.uses().iter().any(|t| depending_tables.contains(t)) { + tables.clear(); + } else { + tables.retain(|table| depending_tables.contains(table)); + } } Property::SelectLimit { .. } | Property::SelectSelectOptimizer { .. } From cc3488bba021b3c7e3891efc3d81147894f64f7a Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 20 Aug 2025 14:42:25 -0300 Subject: [PATCH 09/17] print path in stats for memory IO --- simulator/runner/memory/io.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/simulator/runner/memory/io.rs b/simulator/runner/memory/io.rs index e6ac7075e..e2b9f038c 100644 --- a/simulator/runner/memory/io.rs +++ b/simulator/runner/memory/io.rs @@ -186,8 +186,12 @@ impl SimIO for MemorySimIO { fn print_stats(&self) { tracing::info!("run_once faults: {}", self.nr_run_once_faults.get()); - for file in self.files.borrow().values() { - tracing::info!("\n===========================\n{}", file.stats_table()); + for (path, file) in self.files.borrow().iter() { + tracing::info!( + "\n===========================\n\nPath: {}\n{}", + path, + file.stats_table() + ); } } From bd8cfe40f19322bee4725411687a239cd4aaafe9 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 20 Aug 2025 23:44:50 -0300 Subject: [PATCH 10/17] impl `remove_file` for MemoryIO + skip completion if finished --- simulator/runner/memory/io.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/simulator/runner/memory/io.rs b/simulator/runner/memory/io.rs index e2b9f038c..900237e96 100644 --- a/simulator/runner/memory/io.rs +++ b/simulator/runner/memory/io.rs @@ -53,6 +53,16 @@ impl OperationType { | OperationType::Truncate { fd, .. } => fd, } } + + fn get_completion(&self) -> &Completion { + match self { + OperationType::Read { completion, .. } + | OperationType::Write { completion, .. } + | OperationType::WriteV { completion, .. } + | OperationType::Sync { completion, .. } + | OperationType::Truncate { completion, .. } => completion, + } + } } pub struct Operation { @@ -262,6 +272,10 @@ impl IO for MemorySimIO { callbacks.append(&mut timeouts); while let Some(callback) = callbacks.pop() { + let completion = callback.op.get_completion(); + if completion.finished() { + continue; + } if callback.time.is_none() || callback.time.is_some_and(|time| time < now) { // TODO: check if we should inject fault in operation here callback.do_operation(&files); @@ -275,4 +289,9 @@ impl IO for MemorySimIO { fn generate_random_number(&self) -> i64 { self.rng.borrow_mut().next_u64() as i64 } + + fn remove_file(&self, path: &str) -> Result<()> { + self.files.borrow_mut().shift_remove(path); + Ok(()) + } } From be855a80591ba1b8e0ece647b83eb4afeb82128e Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Thu, 21 Aug 2025 23:02:57 -0300 Subject: [PATCH 11/17] IOCompletions: abort other remaining completions if previous one errors --- core/types.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/types.rs b/core/types.rs index cc9c78021..dca2f4bb1 100644 --- a/core/types.rs +++ b/core/types.rs @@ -2480,8 +2480,15 @@ impl IOCompletions { match self { IOCompletions::Single(c) => io.wait_for_completion(c), IOCompletions::Many(completions) => { - for c in completions { - io.wait_for_completion(c)?; + let mut completions = completions.into_iter(); + while let Some(c) = completions.next() { + let res = io.wait_for_completion(c); + if res.is_err() { + for c in completions { + c.abort(); + } + return res; + } } Ok(()) } From c158db072b549c13b4ee78160b5733de7fd20dd8 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Fri, 22 Aug 2025 11:11:26 -0300 Subject: [PATCH 12/17] inject fault in the IO Operation in the MemorySim --- simulator/runner/memory/file.rs | 108 +++++++++++++++++++++++--------- simulator/runner/memory/io.rs | 26 ++++---- 2 files changed, 90 insertions(+), 44 deletions(-) diff --git a/simulator/runner/memory/file.rs b/simulator/runner/memory/file.rs index a5b956cc4..e648ef23d 100644 --- a/simulator/runner/memory/file.rs +++ b/simulator/runner/memory/file.rs @@ -13,6 +13,35 @@ use crate::runner::{ memory::io::{CallbackQueue, Fd, Operation, OperationType}, }; +/// Tracks IO calls and faults for each type of I/O operation +#[derive(Debug, Default)] +struct IOTracker { + pread_calls: usize, + pread_faults: usize, + + pwrite_calls: usize, + pwrite_faults: usize, + + pwritev_calls: usize, + pwritev_faults: usize, + + sync_calls: usize, + sync_faults: usize, + + truncate_calls: usize, + truncate_faults: usize, +} + +impl IOTracker { + fn total_calls(&self) -> usize { + self.pread_calls + + self.pwrite_calls + + self.pwritev_calls + + self.sync_calls + + self.truncate_calls + } +} + pub struct MemorySimFile { // TODO: maybe have a pending queue which is fast to append // and then we just do a mem swap the pending with the callback to minimize lock contention on callback queue @@ -21,25 +50,11 @@ pub struct MemorySimFile { pub buffer: RefCell>, // TODO: add fault map later here pub closed: Cell, - - /// Number of `pread` function calls (both success and failures). - pub nr_pread_calls: Cell, - /// Number of `pwrite` function calls (both success and failures). - pub nr_pwrite_calls: Cell, - /// Number of `sync` function calls (both success and failures). - pub nr_sync_calls: Cell, - + io_tracker: RefCell, pub rng: RefCell, - pub latency_probability: usize, clock: Arc, -} - -type IoOperation = Box Result>>; - -pub struct DelayedIo { - pub time: turso_core::Instant, - pub op: IoOperation, + fault: Cell, } unsafe impl Send for MemorySimFile {} @@ -58,24 +73,44 @@ impl MemorySimFile { fd: Arc::new(fd), buffer: RefCell::new(Vec::new()), closed: Cell::new(false), - nr_pread_calls: Cell::new(0), - nr_pwrite_calls: Cell::new(0), - nr_sync_calls: Cell::new(0), + io_tracker: RefCell::new(IOTracker::default()), rng: RefCell::new(ChaCha8Rng::seed_from_u64(seed)), latency_probability, clock, + fault: Cell::new(false), } } + pub fn inject_fault(&self, fault: bool) { + self.fault.set(fault); + } + pub fn stats_table(&self) -> String { - let sum_calls = - self.nr_pread_calls.get() + self.nr_pwrite_calls.get() + self.nr_sync_calls.get(); + let io_tracker = self.io_tracker.borrow(); + let sum_calls = io_tracker.total_calls(); let stats_table = [ - "op calls ".to_string(), - "--------- --------".to_string(), - format!("pread {:8}", self.nr_pread_calls.get()), - format!("pwrite {:8}", self.nr_pwrite_calls.get()), - format!("sync {:8}", self.nr_sync_calls.get()), + "op calls faults ".to_string(), + "--------- -------- --------".to_string(), + format!( + "pread {:8} {:8}", + io_tracker.pread_calls, io_tracker.pread_faults + ), + format!( + "pwrite {:8} {:8}", + io_tracker.pwrite_calls, io_tracker.pwrite_faults + ), + format!( + "pwritev {:8} {:8}", + io_tracker.pwritev_calls, io_tracker.pwritev_faults + ), + format!( + "sync {:8} {:8}", + io_tracker.sync_calls, io_tracker.sync_faults + ), + format!( + "truncate {:8} {:8}", + io_tracker.truncate_calls, io_tracker.truncate_faults + ), "--------- -------- --------".to_string(), format!("total {sum_calls:8}"), ]; @@ -96,9 +131,22 @@ impl MemorySimFile { } fn insert_op(&self, op: OperationType) { + let fault = self.fault.get(); + if fault { + let mut io_tracker = self.io_tracker.borrow_mut(); + match &op { + OperationType::Read { .. } => io_tracker.pread_faults += 1, + OperationType::Write { .. } => io_tracker.pwrite_faults += 1, + OperationType::WriteV { .. } => io_tracker.pwritev_faults += 1, + OperationType::Sync { .. } => io_tracker.sync_faults += 1, + OperationType::Truncate { .. } => io_tracker.truncate_faults += 1, + } + } + self.callbacks.lock().push(Operation { time: self.generate_latency(), op, + fault, }); } @@ -131,7 +179,7 @@ impl File for MemorySimFile { } fn pread(&self, pos: usize, c: Completion) -> Result { - self.nr_pread_calls.set(self.nr_pread_calls.get() + 1); + self.io_tracker.borrow_mut().pread_calls += 1; let op = OperationType::Read { fd: self.fd.clone(), @@ -148,7 +196,7 @@ impl File for MemorySimFile { buffer: Arc, c: Completion, ) -> Result { - self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1); + self.io_tracker.borrow_mut().pwrite_calls += 1; let op = OperationType::Write { fd: self.fd.clone(), buffer, @@ -168,6 +216,7 @@ impl File for MemorySimFile { if buffers.len() == 1 { return self.pwrite(pos, buffers[0].clone(), c); } + self.io_tracker.borrow_mut().pwritev_calls += 1; let op = OperationType::WriteV { fd: self.fd.clone(), buffers, @@ -179,7 +228,7 @@ impl File for MemorySimFile { } fn sync(&self, c: Completion) -> Result { - self.nr_sync_calls.set(self.nr_sync_calls.get() + 1); + self.io_tracker.borrow_mut().sync_calls += 1; let op = OperationType::Sync { fd: self.fd.clone(), completion: c.clone(), @@ -195,6 +244,7 @@ impl File for MemorySimFile { } fn truncate(&self, len: usize, c: Completion) -> Result { + self.io_tracker.borrow_mut().truncate_calls += 1; let op = OperationType::Truncate { fd: self.fd.clone(), completion: c.clone(), diff --git a/simulator/runner/memory/io.rs b/simulator/runner/memory/io.rs index 900237e96..8b07ba819 100644 --- a/simulator/runner/memory/io.rs +++ b/simulator/runner/memory/io.rs @@ -8,8 +8,8 @@ use rand_chacha::ChaCha8Rng; use turso_core::{Clock, Completion, Instant, OpenFlags, Result, IO}; use crate::runner::clock::SimulatorClock; +use crate::runner::memory::file::MemorySimFile; use crate::runner::SimIO; -use crate::{model::FAULT_ERROR_MSG, runner::memory::file::MemorySimFile}; /// File descriptor pub type Fd = String; @@ -68,7 +68,7 @@ impl OperationType { pub struct Operation { pub time: Option, pub op: OperationType, - // TODO: add a fault field here to signal if an Operation should fault + pub fault: bool, } impl Operation { @@ -142,7 +142,6 @@ pub type CallbackQueue = Arc>>; pub struct MemorySimIO { callbacks: CallbackQueue, timeouts: CallbackQueue, - pub fault: Cell, pub files: RefCell>>, pub rng: RefCell, pub nr_run_once_faults: Cell, @@ -163,14 +162,12 @@ impl MemorySimIO { min_tick: u64, max_tick: u64, ) -> Self { - let fault = Cell::new(false); let files = RefCell::new(IndexMap::new()); let rng = RefCell::new(ChaCha8Rng::seed_from_u64(seed)); let nr_run_once_faults = Cell::new(0); Self { callbacks: Arc::new(Mutex::new(Vec::new())), timeouts: Arc::new(Mutex::new(Vec::new())), - fault, files, rng, nr_run_once_faults, @@ -188,14 +185,15 @@ impl MemorySimIO { impl SimIO for MemorySimIO { fn inject_fault(&self, fault: bool) { - self.fault.replace(fault); + for file in self.files.borrow().values() { + file.inject_fault(fault); + } if fault { tracing::debug!("fault injected"); } } fn print_stats(&self) { - tracing::info!("run_once faults: {}", self.nr_run_once_faults.get()); for (path, file) in self.files.borrow().iter() { tracing::info!( "\n===========================\n\nPath: {}\n{}", @@ -259,13 +257,6 @@ impl IO for MemorySimIO { callbacks.len = callbacks.len(), timeouts.len = timeouts.len() ); - if self.fault.get() { - self.nr_run_once_faults - .replace(self.nr_run_once_faults.get() + 1); - return Err(turso_core::LimboError::InternalError( - FAULT_ERROR_MSG.into(), - )); - } let files = self.files.borrow_mut(); let now = self.now(); @@ -276,8 +267,13 @@ impl IO for MemorySimIO { if completion.finished() { continue; } + if callback.time.is_none() || callback.time.is_some_and(|time| time < now) { - // TODO: check if we should inject fault in operation here + if callback.fault { + // Inject the fault by aborting the completion + completion.abort(); + continue; + } callback.do_operation(&files); } else { timeouts.push(callback); From 1eb1171f553b54a9adc1069f3b0e777165def8a0 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Tue, 26 Aug 2025 12:26:20 -0300 Subject: [PATCH 13/17] do not fault on Fsync until we correctly define the expected behaviour in the simulator --- simulator/runner/io.rs | 2 +- simulator/runner/memory/file.rs | 5 +++-- simulator/runner/memory/io.rs | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index d6cdcace7..dbb0b3953 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -7,7 +7,7 @@ use rand::{RngCore, SeedableRng}; use rand_chacha::ChaCha8Rng; use turso_core::{Clock, IO, Instant, OpenFlags, PlatformIO, Result}; -use crate::runner::{clock::SimulatorClock, file::SimulatorFile, SimIO}; +use crate::runner::{SimIO, clock::SimulatorClock, file::SimulatorFile}; pub(crate) struct SimulatorIO { pub(crate) inner: Box, diff --git a/simulator/runner/memory/file.rs b/simulator/runner/memory/file.rs index e648ef23d..573a70820 100644 --- a/simulator/runner/memory/file.rs +++ b/simulator/runner/memory/file.rs @@ -5,7 +5,7 @@ use std::{ use rand::{Rng as _, SeedableRng}; use rand_chacha::ChaCha8Rng; -use tracing::{instrument, Level}; +use tracing::{Level, instrument}; use turso_core::{Completion, File, Result}; use crate::runner::{ @@ -131,7 +131,8 @@ impl MemorySimFile { } fn insert_op(&self, op: OperationType) { - let fault = self.fault.get(); + // FIXME: currently avoid any fsync faults until we correctly define the expected behaviour in the simulator + let fault = self.fault.get() && !matches!(op, OperationType::Sync { .. }); if fault { let mut io_tracker = self.io_tracker.borrow_mut(); match &op { diff --git a/simulator/runner/memory/io.rs b/simulator/runner/memory/io.rs index 8b07ba819..1a2ea49a8 100644 --- a/simulator/runner/memory/io.rs +++ b/simulator/runner/memory/io.rs @@ -5,11 +5,11 @@ use indexmap::IndexMap; use parking_lot::Mutex; use rand::{RngCore, SeedableRng}; use rand_chacha::ChaCha8Rng; -use turso_core::{Clock, Completion, Instant, OpenFlags, Result, IO}; +use turso_core::{Clock, Completion, IO, Instant, OpenFlags, Result}; +use crate::runner::SimIO; use crate::runner::clock::SimulatorClock; use crate::runner::memory::file::MemorySimFile; -use crate::runner::SimIO; /// File descriptor pub type Fd = String; From 51a54d3c33e86668d7b6cb0b298787b5e0f3b6e1 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Mon, 1 Sep 2025 16:13:21 -0300 Subject: [PATCH 14/17] Fd should be part of Operation struct --- simulator/runner/memory/file.rs | 6 +----- simulator/runner/memory/io.rs | 31 ++++--------------------------- 2 files changed, 5 insertions(+), 32 deletions(-) diff --git a/simulator/runner/memory/file.rs b/simulator/runner/memory/file.rs index 573a70820..14ccfd344 100644 --- a/simulator/runner/memory/file.rs +++ b/simulator/runner/memory/file.rs @@ -148,6 +148,7 @@ impl MemorySimFile { time: self.generate_latency(), op, fault, + fd: self.fd.clone(), }); } @@ -183,7 +184,6 @@ impl File for MemorySimFile { self.io_tracker.borrow_mut().pread_calls += 1; let op = OperationType::Read { - fd: self.fd.clone(), completion: c.clone(), offset: pos, }; @@ -199,7 +199,6 @@ impl File for MemorySimFile { ) -> Result { self.io_tracker.borrow_mut().pwrite_calls += 1; let op = OperationType::Write { - fd: self.fd.clone(), buffer, completion: c.clone(), offset: pos, @@ -219,7 +218,6 @@ impl File for MemorySimFile { } self.io_tracker.borrow_mut().pwritev_calls += 1; let op = OperationType::WriteV { - fd: self.fd.clone(), buffers, completion: c.clone(), offset: pos, @@ -231,7 +229,6 @@ impl File for MemorySimFile { fn sync(&self, c: Completion) -> Result { self.io_tracker.borrow_mut().sync_calls += 1; let op = OperationType::Sync { - fd: self.fd.clone(), completion: c.clone(), }; self.insert_op(op); @@ -247,7 +244,6 @@ impl File for MemorySimFile { fn truncate(&self, len: usize, c: Completion) -> Result { self.io_tracker.borrow_mut().truncate_calls += 1; let op = OperationType::Truncate { - fd: self.fd.clone(), completion: c.clone(), len, }; diff --git a/simulator/runner/memory/io.rs b/simulator/runner/memory/io.rs index 1a2ea49a8..46f272500 100644 --- a/simulator/runner/memory/io.rs +++ b/simulator/runner/memory/io.rs @@ -16,44 +16,29 @@ pub type Fd = String; pub enum OperationType { Read { - fd: Arc, completion: Completion, offset: usize, }, Write { - fd: Arc, buffer: Arc, completion: Completion, offset: usize, }, WriteV { - fd: Arc, buffers: Vec>, completion: Completion, offset: usize, }, Sync { - fd: Arc, completion: Completion, }, Truncate { - fd: Arc, completion: Completion, len: usize, }, } impl OperationType { - fn get_fd(&self) -> &Fd { - match self { - OperationType::Read { fd, .. } - | OperationType::Write { fd, .. } - | OperationType::WriteV { fd, .. } - | OperationType::Sync { fd, .. } - | OperationType::Truncate { fd, .. } => fd, - } - } - fn get_completion(&self) -> &Completion { match self { OperationType::Read { completion, .. } @@ -69,16 +54,14 @@ pub struct Operation { pub time: Option, pub op: OperationType, pub fault: bool, + pub fd: Arc, } impl Operation { fn do_operation(self, files: &IndexMap>) { + let fd = self.fd; match self.op { - OperationType::Read { - fd, - completion, - offset, - } => { + OperationType::Read { completion, offset } => { let file = files.get(fd.as_str()).unwrap(); let file_buf = file.buffer.borrow_mut(); let buffer = completion.as_read().buf.clone(); @@ -92,7 +75,6 @@ impl Operation { completion.complete(buf_size); } OperationType::Write { - fd, buffer, completion, offset, @@ -102,7 +84,6 @@ impl Operation { completion.complete(buf_size as i32); } OperationType::WriteV { - fd, buffers, completion, offset, @@ -123,11 +104,7 @@ impl Operation { // There is no Sync for in memory completion.complete(0); } - OperationType::Truncate { - fd, - completion, - len, - } => { + OperationType::Truncate { completion, len } => { let file = files.get(fd.as_str()).unwrap(); let mut file_buf = file.buffer.borrow_mut(); file_buf.truncate(len); From 07feacbc76ab126316b24927cab93d449a91ff12 Mon Sep 17 00:00:00 2001 From: TcMits Date: Tue, 2 Sep 2025 18:10:28 +0700 Subject: [PATCH 15/17] remove turso_sqlite3_parser from turso_parser --- Cargo.lock | 1 - parser/Cargo.toml | 1 - parser/benches/parser_benchmark.rs | 29 ----------------------------- 3 files changed, 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ef9447be6..f6f66d107 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4345,7 +4345,6 @@ dependencies = [ "strum", "strum_macros", "thiserror 1.0.69", - "turso_sqlite3_parser", ] [[package]] diff --git a/parser/Cargo.toml b/parser/Cargo.toml index c2b674a4d..41c6e559c 100644 --- a/parser/Cargo.toml +++ b/parser/Cargo.toml @@ -25,7 +25,6 @@ thiserror = "1.0.61" [dev-dependencies] fallible-iterator = "0.3" criterion = { version = "0.5", features = ["html_reports" ] } -turso_sqlite3_parser = { workspace = true } [target.'cfg(not(target_family = "windows"))'.dev-dependencies] pprof = { version = "0.14.0", features = ["criterion", "flamegraph"] } diff --git a/parser/benches/parser_benchmark.rs b/parser/benches/parser_benchmark.rs index 0c6291553..a467ada4b 100644 --- a/parser/benches/parser_benchmark.rs +++ b/parser/benches/parser_benchmark.rs @@ -1,11 +1,6 @@ use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; -use fallible_iterator::FallibleIterator; use pprof::criterion::{Output, PProfProfiler}; use turso_parser::{lexer::Lexer, parser::Parser}; -use turso_sqlite3_parser::lexer::{ - sql::{Parser as OldParser, Tokenizer}, - Scanner, -}; fn bench_parser(criterion: &mut Criterion) { let queries = [ @@ -22,12 +17,6 @@ fn bench_parser(criterion: &mut Criterion) { b.iter(|| Parser::new(black_box(qb)).next().unwrap()); }); - group.bench_function(BenchmarkId::new("limbo_old_parser_query", ""), |b| { - b.iter(|| { - OldParser::new(black_box(qb)).next().unwrap().unwrap(); - }); - }); - group.finish(); } } @@ -49,12 +38,6 @@ fn bench_parser_insert_batch(criterion: &mut Criterion) { b.iter(|| Parser::new(black_box(qb)).next().unwrap()); }); - group.bench_function(BenchmarkId::new("limbo_old_parser_insert_batch", ""), |b| { - b.iter(|| { - OldParser::new(black_box(qb)).next().unwrap().unwrap(); - }); - }); - group.finish(); } } @@ -78,18 +61,6 @@ fn bench_lexer(criterion: &mut Criterion) { }); }); - group.bench_function(BenchmarkId::new("limbo_old_lexer_query", ""), |b| { - b.iter(|| { - let tokenizer = Tokenizer::new(); - let mut scanner = Scanner::new(black_box(tokenizer)); - loop { - if let (_, None, _) = scanner.scan(black_box(qb)).unwrap() { - break; - } - } - }); - }); - group.finish(); } } From 8204fbc8ec16c269a91c1fed6bf9e5150ff106c4 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 2 Sep 2025 14:14:04 +0300 Subject: [PATCH 16/17] simulator: Fix 64-bit offset build failures Fix brekage from first merging commit d959319b ("Merge 'Use u64 for file offsets in I/O and calculate such offsets in u64' from Preston Thorpe") and then commit 6591b66c ("Merge 'Simulate I/O in memory' from Pedro Muniz"), which was unaware of the changes. --- simulator/runner/memory/file.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/simulator/runner/memory/file.rs b/simulator/runner/memory/file.rs index 14ccfd344..e40fdfbb3 100644 --- a/simulator/runner/memory/file.rs +++ b/simulator/runner/memory/file.rs @@ -180,12 +180,12 @@ impl File for MemorySimFile { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result { + fn pread(&self, pos: u64, c: Completion) -> Result { self.io_tracker.borrow_mut().pread_calls += 1; let op = OperationType::Read { completion: c.clone(), - offset: pos, + offset: pos as usize, }; self.insert_op(op); Ok(c) @@ -193,7 +193,7 @@ impl File for MemorySimFile { fn pwrite( &self, - pos: usize, + pos: u64, buffer: Arc, c: Completion, ) -> Result { @@ -201,7 +201,7 @@ impl File for MemorySimFile { let op = OperationType::Write { buffer, completion: c.clone(), - offset: pos, + offset: pos as usize, }; self.insert_op(op); Ok(c) @@ -209,7 +209,7 @@ impl File for MemorySimFile { fn pwritev( &self, - pos: usize, + pos: u64, buffers: Vec>, c: Completion, ) -> Result { @@ -220,7 +220,7 @@ impl File for MemorySimFile { let op = OperationType::WriteV { buffers, completion: c.clone(), - offset: pos, + offset: pos as usize, }; self.insert_op(op); Ok(c) @@ -241,11 +241,11 @@ impl File for MemorySimFile { Ok(self.buffer.borrow().len() as u64) } - fn truncate(&self, len: usize, c: Completion) -> Result { + fn truncate(&self, len: u64, c: Completion) -> Result { self.io_tracker.borrow_mut().truncate_calls += 1; let op = OperationType::Truncate { completion: c.clone(), - len, + len: len as usize, }; self.insert_op(op); Ok(c) From 8f7e43b32bb643a9a85e598142d3174551007586 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 2 Sep 2025 14:21:04 +0300 Subject: [PATCH 17/17] scripts/publish-crates.sh: Remove turso_sqlite3_parser package --- scripts/publish-crates.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/publish-crates.sh b/scripts/publish-crates.sh index 7d32697cf..4b379cc7b 100755 --- a/scripts/publish-crates.sh +++ b/scripts/publish-crates.sh @@ -2,7 +2,6 @@ cargo publish -p turso_macros cargo publish -p turso_ext -cargo publish -p turso_sqlite3_parser cargo publish -p turso_parser cargo publish -p turso_core cargo publish -p turso