diff --git a/Cargo.lock b/Cargo.lock index 70d4077d5..93f8cd35f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1800,9 +1800,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.8.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3954d50fe15b02142bf25d3b8bdadb634ec3948f103d04ffe3031bc8fe9d7058" +checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" dependencies = [ "equivalent", "hashbrown 0.15.2", @@ -1822,7 +1822,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88" dependencies = [ "ahash", - "indexmap 2.8.0", + "indexmap 2.10.0", "is-terminal", "itoa", "log", @@ -2192,10 +2192,12 @@ dependencies = [ "env_logger 0.10.2", "garde", "hex", + "indexmap 2.10.0", "itertools 0.14.0", "json5", "log", "notify", + "parking_lot", "rand 0.9.2", "rand_chacha 0.9.0", "regex", @@ -2265,9 +2267,9 @@ checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856" [[package]] name = "lock_api" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765" dependencies = [ "autocfg", "scopeguard", @@ -2670,9 +2672,9 @@ dependencies = [ [[package]] name = "parking_lot" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" dependencies = [ "lock_api", "parking_lot_core", @@ -2680,9 +2682,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.10" +version = "0.9.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" dependencies = [ "cfg-if", "libc", @@ -2772,7 +2774,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac26e981c03a6e53e0aee43c113e3202f5581d5360dae7bd2c70e800dd0451d" dependencies = [ "base64", - "indexmap 2.8.0", + "indexmap 2.10.0", "quick-xml 0.32.0", "serde", "time", @@ -4062,7 +4064,7 @@ version = "0.8.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05ae329d1f08c4d17a59bed7ff5b5a769d062e64a62d34a3261b219e62cd5aae" dependencies = [ - "indexmap 2.8.0", + "indexmap 2.10.0", "serde", "serde_spanned", "toml_datetime", @@ -4084,7 +4086,7 @@ version = "0.22.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "310068873db2c5b3e7659d2cc35d21855dbafa50d1ce336397c666e3cb08137e" dependencies = [ - "indexmap 2.8.0", + "indexmap 2.10.0", "serde", "serde_spanned", "toml_datetime", @@ -4346,7 +4348,6 @@ dependencies = [ "strum_macros", "thiserror 1.0.69", "turso_macros", - "turso_sqlite3_parser", ] [[package]] @@ -4370,7 +4371,7 @@ dependencies = [ "cc", "env_logger 0.11.7", "fallible-iterator", - "indexmap 2.8.0", + "indexmap 2.10.0", "log", "memchr", "miette", diff --git a/Cargo.toml b/Cargo.toml index dc42dd463..61027dd19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ rand = "0.9.2" tracing = "0.1.41" schemars = "1.0.4" garde = "0.22" +parking_lot = "0.12.4" [profile.release] debug = "line-tables-only" diff --git a/core/Cargo.toml b/core/Cargo.toml index 37c150524..b53fcfbb0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -61,7 +61,7 @@ libm = "0.2" turso_macros = { workspace = true } miette = "7.6.0" strum = { workspace = true } -parking_lot = "0.12.3" +parking_lot = { workspace = true } crossbeam-skiplist = "0.1.3" tracing = "0.1.41" ryu = "1.0.19" diff --git a/core/types.rs b/core/types.rs index ce29d89ba..26eb79c41 100644 --- a/core/types.rs +++ b/core/types.rs @@ -2487,8 +2487,15 @@ impl IOCompletions { match self { IOCompletions::Single(c) => io.wait_for_completion(c), IOCompletions::Many(completions) => { - for c in completions { - io.wait_for_completion(c)?; + let mut completions = completions.into_iter(); + while let Some(c) = completions.next() { + let res = io.wait_for_completion(c); + if res.is_err() { + for c in completions { + c.abort(); + } + return res; + } } Ok(()) } diff --git a/parser/Cargo.toml b/parser/Cargo.toml index d3a81cb6a..d4768d2f6 100644 --- a/parser/Cargo.toml +++ b/parser/Cargo.toml @@ -26,7 +26,6 @@ turso_macros = { workspace = true } [dev-dependencies] fallible-iterator = "0.3" criterion = { version = "0.5", features = ["html_reports" ] } -turso_sqlite3_parser = { workspace = true } [target.'cfg(not(target_family = "windows"))'.dev-dependencies] pprof = { version = "0.14.0", features = ["criterion", "flamegraph"] } diff --git a/parser/benches/parser_benchmark.rs b/parser/benches/parser_benchmark.rs index 0c6291553..a467ada4b 100644 --- a/parser/benches/parser_benchmark.rs +++ b/parser/benches/parser_benchmark.rs @@ -1,11 +1,6 @@ use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; -use fallible_iterator::FallibleIterator; use pprof::criterion::{Output, PProfProfiler}; use turso_parser::{lexer::Lexer, parser::Parser}; -use turso_sqlite3_parser::lexer::{ - sql::{Parser as OldParser, Tokenizer}, - Scanner, -}; fn bench_parser(criterion: &mut Criterion) { let queries = [ @@ -22,12 +17,6 @@ fn bench_parser(criterion: &mut Criterion) { b.iter(|| Parser::new(black_box(qb)).next().unwrap()); }); - group.bench_function(BenchmarkId::new("limbo_old_parser_query", ""), |b| { - b.iter(|| { - OldParser::new(black_box(qb)).next().unwrap().unwrap(); - }); - }); - group.finish(); } } @@ -49,12 +38,6 @@ fn bench_parser_insert_batch(criterion: &mut Criterion) { b.iter(|| Parser::new(black_box(qb)).next().unwrap()); }); - group.bench_function(BenchmarkId::new("limbo_old_parser_insert_batch", ""), |b| { - b.iter(|| { - OldParser::new(black_box(qb)).next().unwrap().unwrap(); - }); - }); - group.finish(); } } @@ -78,18 +61,6 @@ fn bench_lexer(criterion: &mut Criterion) { }); }); - group.bench_function(BenchmarkId::new("limbo_old_lexer_query", ""), |b| { - b.iter(|| { - let tokenizer = Tokenizer::new(); - let mut scanner = Scanner::new(black_box(tokenizer)); - loop { - if let (_, None, _) = scanner.scan(black_box(qb)).unwrap() { - break; - } - } - }); - }); - group.finish(); } } diff --git a/scripts/publish-crates.sh b/scripts/publish-crates.sh index 7d32697cf..4b379cc7b 100755 --- a/scripts/publish-crates.sh +++ b/scripts/publish-crates.sh @@ -2,7 +2,6 @@ cargo publish -p turso_macros cargo publish -p turso_ext -cargo publish -p turso_sqlite3_parser cargo publish -p turso_parser cargo publish -p turso_core cargo publish -p turso diff --git a/simulator/Cargo.toml b/simulator/Cargo.toml index a8f85ca58..89ee9634b 100644 --- a/simulator/Cargo.toml +++ b/simulator/Cargo.toml @@ -42,3 +42,5 @@ schemars = { workspace = true } garde = { workspace = true, features = ["derive", "serde"] } json5 = { version = "0.4.1" } strum = { workspace = true } +parking_lot = { workspace = true } +indexmap = "2.10.0" diff --git a/simulator/generation/plan.rs b/simulator/generation/plan.rs index 365b4cd3d..fcfa07ed4 100644 --- a/simulator/generation/plan.rs +++ b/simulator/generation/plan.rs @@ -21,10 +21,7 @@ use crate::{ SimulatorEnv, generation::Shadow, model::Query, - runner::{ - env::{SimConnection, SimulationType, SimulatorTables}, - io::SimulatorIO, - }, + runner::env::{SimConnection, SimulationType, SimulatorTables}, }; use super::property::{Property, remaining}; @@ -452,7 +449,7 @@ impl Shadow for Interaction { } } impl Interaction { - pub(crate) fn execute_query(&self, conn: &mut Arc, _io: &SimulatorIO) -> ResultSet { + pub(crate) fn execute_query(&self, conn: &mut Arc) -> ResultSet { if let Self::Query(query) = self { let query_str = query.to_string(); let rows = conn.query(&query_str); @@ -611,13 +608,7 @@ impl Interaction { out.push(r); } StepResult::IO => { - let syncing = { - let files = env.io.files.borrow(); - // TODO: currently assuming we only have 1 file that is syncing - files - .iter() - .any(|file| file.sync_completion.borrow().is_some()) - }; + let syncing = env.io.syncing(); if syncing { reopen_database(env); } else { @@ -666,12 +657,7 @@ impl Interaction { let mut current_prob = 0.05; let mut incr = 0.001; loop { - let syncing = { - let files = env.io.files.borrow(); - files - .iter() - .any(|file| file.sync_completion.borrow().is_some()) - }; + let syncing = env.io.syncing(); let inject_fault = env.rng.random_bool(current_prob); // TODO: avoid for now injecting faults when syncing if inject_fault && !syncing { @@ -722,7 +708,7 @@ fn reopen_database(env: &mut SimulatorEnv) { // Clear all open files // TODO: for correct reporting of faults we should get all the recorded numbers and transfer to the new file - env.io.files.borrow_mut().clear(); + env.io.close_files(); // 2. Re-open database match env.type_ { diff --git a/simulator/runner/cli.rs b/simulator/runner/cli.rs index daa00be38..1db597223 100644 --- a/simulator/runner/cli.rs +++ b/simulator/runner/cli.rs @@ -130,6 +130,12 @@ pub struct SimulatorCLI { default_value_t = false )] pub keep_files: bool, + #[clap( + long, + help = "Use memory IO for complex simulations", + default_value_t = false + )] + pub memory_io: bool, #[clap(long, default_value_t = ProfileType::Default)] /// Profile selector for Simulation run pub profile: ProfileType, diff --git a/simulator/runner/env.rs b/simulator/runner/env.rs index 567f2bad9..f66cf967d 100644 --- a/simulator/runner/env.rs +++ b/simulator/runner/env.rs @@ -12,7 +12,9 @@ use sql_generation::model::table::Table; use turso_core::Database; use crate::profiles::Profile; +use crate::runner::SimIO; use crate::runner::io::SimulatorIO; +use crate::runner::memory::io::MemorySimIO; use super::cli::SimulatorCLI; @@ -63,13 +65,14 @@ pub(crate) struct SimulatorEnv { pub(crate) opts: SimulatorOpts, pub profile: Profile, pub(crate) connections: Vec, - pub(crate) io: Arc, + pub(crate) io: Arc, pub(crate) db: Option>, pub(crate) rng: ChaCha8Rng, pub(crate) paths: Paths, pub(crate) type_: SimulationType, pub(crate) phase: SimulationPhase, pub(crate) tables: SimulatorTables, + pub memory_io: bool, } impl UnwindSafe for SimulatorEnv {} @@ -88,6 +91,7 @@ impl SimulatorEnv { paths: self.paths.clone(), type_: self.type_, phase: self.phase, + memory_io: self.memory_io, profile: self.profile.clone(), } } @@ -99,16 +103,26 @@ impl SimulatorEnv { let latency_prof = &self.profile.io.latency; - let io = Arc::new( - SimulatorIO::new( + let io: Arc = if self.memory_io { + Arc::new(MemorySimIO::new( self.opts.seed, self.opts.page_size, latency_prof.latency_probability, latency_prof.min_tick, latency_prof.max_tick, + )) + } else { + Arc::new( + SimulatorIO::new( + self.opts.seed, + self.opts.page_size, + latency_prof.latency_probability, + latency_prof.min_tick, + latency_prof.max_tick, + ) + .unwrap(), ) - .unwrap(), - ); + }; // Remove existing database file let db_path = self.get_db_path(); @@ -282,16 +296,26 @@ impl SimulatorEnv { let latency_prof = &profile.io.latency; - let io = Arc::new( - SimulatorIO::new( + let io: Arc = if cli_opts.memory_io { + Arc::new(MemorySimIO::new( seed, opts.page_size, latency_prof.latency_probability, latency_prof.min_tick, latency_prof.max_tick, + )) + } else { + Arc::new( + SimulatorIO::new( + seed, + opts.page_size, + latency_prof.latency_probability, + latency_prof.min_tick, + latency_prof.max_tick, + ) + .unwrap(), ) - .unwrap(), - ); + }; let db = match Database::open_file( io.clone(), @@ -319,6 +343,7 @@ impl SimulatorEnv { db: Some(db), type_: simulation_type, phase: SimulationPhase::Test, + memory_io: cli_opts.memory_io, profile: profile.clone(), } } diff --git a/simulator/runner/execution.rs b/simulator/runner/execution.rs index a7d7aa3d6..c7f31ed88 100644 --- a/simulator/runner/execution.rs +++ b/simulator/runner/execution.rs @@ -191,7 +191,7 @@ pub(crate) fn execute_interaction( SimConnection::Disconnected => unreachable!(), }; tracing::debug!(?interaction); - let results = interaction.execute_query(conn, &env.io); + let results = interaction.execute_query(conn); if results.is_err() { tracing::error!(?results); } diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index fcc23be75..dbb0b3953 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -7,7 +7,7 @@ use rand::{RngCore, SeedableRng}; use rand_chacha::ChaCha8Rng; use turso_core::{Clock, IO, Instant, OpenFlags, PlatformIO, Result}; -use crate::runner::{clock::SimulatorClock, file::SimulatorFile}; +use crate::runner::{SimIO, clock::SimulatorClock, file::SimulatorFile}; pub(crate) struct SimulatorIO { pub(crate) inner: Box, @@ -48,15 +48,17 @@ impl SimulatorIO { clock: Arc::new(clock), }) } +} - pub(crate) fn inject_fault(&self, fault: bool) { +impl SimIO for SimulatorIO { + fn inject_fault(&self, fault: bool) { self.fault.replace(fault); for file in self.files.borrow().iter() { file.inject_fault(fault); } } - pub(crate) fn print_stats(&self) { + fn print_stats(&self) { for file in self.files.borrow().iter() { tracing::info!( "\n===========================\n\nPath: {}\n{}", @@ -65,6 +67,18 @@ impl SimulatorIO { ); } } + + fn syncing(&self) -> bool { + let files = self.files.borrow(); + // TODO: currently assuming we only have 1 file that is syncing + files + .iter() + .any(|file| file.sync_completion.borrow().is_some()) + } + + fn close_files(&self) { + self.files.borrow_mut().clear() + } } impl Clock for SimulatorIO { diff --git a/simulator/runner/memory/file.rs b/simulator/runner/memory/file.rs new file mode 100644 index 000000000..e40fdfbb3 --- /dev/null +++ b/simulator/runner/memory/file.rs @@ -0,0 +1,253 @@ +use std::{ + cell::{Cell, RefCell}, + sync::Arc, +}; + +use rand::{Rng as _, SeedableRng}; +use rand_chacha::ChaCha8Rng; +use tracing::{Level, instrument}; +use turso_core::{Completion, File, Result}; + +use crate::runner::{ + clock::SimulatorClock, + memory::io::{CallbackQueue, Fd, Operation, OperationType}, +}; + +/// Tracks IO calls and faults for each type of I/O operation +#[derive(Debug, Default)] +struct IOTracker { + pread_calls: usize, + pread_faults: usize, + + pwrite_calls: usize, + pwrite_faults: usize, + + pwritev_calls: usize, + pwritev_faults: usize, + + sync_calls: usize, + sync_faults: usize, + + truncate_calls: usize, + truncate_faults: usize, +} + +impl IOTracker { + fn total_calls(&self) -> usize { + self.pread_calls + + self.pwrite_calls + + self.pwritev_calls + + self.sync_calls + + self.truncate_calls + } +} + +pub struct MemorySimFile { + // TODO: maybe have a pending queue which is fast to append + // and then we just do a mem swap the pending with the callback to minimize lock contention on callback queue + pub callbacks: CallbackQueue, + pub fd: Arc, + pub buffer: RefCell>, + // TODO: add fault map later here + pub closed: Cell, + io_tracker: RefCell, + pub rng: RefCell, + pub latency_probability: usize, + clock: Arc, + fault: Cell, +} + +unsafe impl Send for MemorySimFile {} +unsafe impl Sync for MemorySimFile {} + +impl MemorySimFile { + pub fn new( + callbacks: CallbackQueue, + fd: Fd, + seed: u64, + latency_probability: usize, + clock: Arc, + ) -> Self { + Self { + callbacks, + fd: Arc::new(fd), + buffer: RefCell::new(Vec::new()), + closed: Cell::new(false), + io_tracker: RefCell::new(IOTracker::default()), + rng: RefCell::new(ChaCha8Rng::seed_from_u64(seed)), + latency_probability, + clock, + fault: Cell::new(false), + } + } + + pub fn inject_fault(&self, fault: bool) { + self.fault.set(fault); + } + + pub fn stats_table(&self) -> String { + let io_tracker = self.io_tracker.borrow(); + let sum_calls = io_tracker.total_calls(); + let stats_table = [ + "op calls faults ".to_string(), + "--------- -------- --------".to_string(), + format!( + "pread {:8} {:8}", + io_tracker.pread_calls, io_tracker.pread_faults + ), + format!( + "pwrite {:8} {:8}", + io_tracker.pwrite_calls, io_tracker.pwrite_faults + ), + format!( + "pwritev {:8} {:8}", + io_tracker.pwritev_calls, io_tracker.pwritev_faults + ), + format!( + "sync {:8} {:8}", + io_tracker.sync_calls, io_tracker.sync_faults + ), + format!( + "truncate {:8} {:8}", + io_tracker.truncate_calls, io_tracker.truncate_faults + ), + "--------- -------- --------".to_string(), + format!("total {sum_calls:8}"), + ]; + + stats_table.join("\n") + } + + #[instrument(skip_all, level = Level::TRACE)] + fn generate_latency(&self) -> Option { + let mut rng = self.rng.borrow_mut(); + // Chance to introduce some latency + rng.random_bool(self.latency_probability as f64 / 100.0) + .then(|| { + let now = self.clock.now(); + let sum = now + std::time::Duration::from_millis(rng.random_range(5..20)); + sum.into() + }) + } + + fn insert_op(&self, op: OperationType) { + // FIXME: currently avoid any fsync faults until we correctly define the expected behaviour in the simulator + let fault = self.fault.get() && !matches!(op, OperationType::Sync { .. }); + if fault { + let mut io_tracker = self.io_tracker.borrow_mut(); + match &op { + OperationType::Read { .. } => io_tracker.pread_faults += 1, + OperationType::Write { .. } => io_tracker.pwrite_faults += 1, + OperationType::WriteV { .. } => io_tracker.pwritev_faults += 1, + OperationType::Sync { .. } => io_tracker.sync_faults += 1, + OperationType::Truncate { .. } => io_tracker.truncate_faults += 1, + } + } + + self.callbacks.lock().push(Operation { + time: self.generate_latency(), + op, + fault, + fd: self.fd.clone(), + }); + } + + pub fn write_buf(&self, buf: &[u8], offset: usize) -> usize { + let mut file_buf = self.buffer.borrow_mut(); + let more_space = if file_buf.len() < offset { + (offset + buf.len()) - file_buf.len() + } else { + buf.len().saturating_sub(file_buf.len() - offset) + }; + if more_space > 0 { + file_buf.reserve(more_space); + for _ in 0..more_space { + file_buf.push(0); + } + } + + file_buf[offset..][0..buf.len()].copy_from_slice(buf); + buf.len() + } +} + +impl File for MemorySimFile { + fn lock_file(&self, _exclusive: bool) -> Result<()> { + Ok(()) + } + + fn unlock_file(&self) -> Result<()> { + Ok(()) + } + + fn pread(&self, pos: u64, c: Completion) -> Result { + self.io_tracker.borrow_mut().pread_calls += 1; + + let op = OperationType::Read { + completion: c.clone(), + offset: pos as usize, + }; + self.insert_op(op); + Ok(c) + } + + fn pwrite( + &self, + pos: u64, + buffer: Arc, + c: Completion, + ) -> Result { + self.io_tracker.borrow_mut().pwrite_calls += 1; + let op = OperationType::Write { + buffer, + completion: c.clone(), + offset: pos as usize, + }; + self.insert_op(op); + Ok(c) + } + + fn pwritev( + &self, + pos: u64, + buffers: Vec>, + c: Completion, + ) -> Result { + if buffers.len() == 1 { + return self.pwrite(pos, buffers[0].clone(), c); + } + self.io_tracker.borrow_mut().pwritev_calls += 1; + let op = OperationType::WriteV { + buffers, + completion: c.clone(), + offset: pos as usize, + }; + self.insert_op(op); + Ok(c) + } + + fn sync(&self, c: Completion) -> Result { + self.io_tracker.borrow_mut().sync_calls += 1; + let op = OperationType::Sync { + completion: c.clone(), + }; + self.insert_op(op); + Ok(c) + } + + fn size(&self) -> Result { + // TODO: size operation should also be scheduled. But this requires a change in how we + // Use this function internally in Turso + Ok(self.buffer.borrow().len() as u64) + } + + fn truncate(&self, len: u64, c: Completion) -> Result { + self.io_tracker.borrow_mut().truncate_calls += 1; + let op = OperationType::Truncate { + completion: c.clone(), + len: len as usize, + }; + self.insert_op(op); + Ok(c) + } +} diff --git a/simulator/runner/memory/io.rs b/simulator/runner/memory/io.rs new file mode 100644 index 000000000..46f272500 --- /dev/null +++ b/simulator/runner/memory/io.rs @@ -0,0 +1,270 @@ +use std::cell::{Cell, RefCell}; +use std::sync::Arc; + +use indexmap::IndexMap; +use parking_lot::Mutex; +use rand::{RngCore, SeedableRng}; +use rand_chacha::ChaCha8Rng; +use turso_core::{Clock, Completion, IO, Instant, OpenFlags, Result}; + +use crate::runner::SimIO; +use crate::runner::clock::SimulatorClock; +use crate::runner::memory::file::MemorySimFile; + +/// File descriptor +pub type Fd = String; + +pub enum OperationType { + Read { + completion: Completion, + offset: usize, + }, + Write { + buffer: Arc, + completion: Completion, + offset: usize, + }, + WriteV { + buffers: Vec>, + completion: Completion, + offset: usize, + }, + Sync { + completion: Completion, + }, + Truncate { + completion: Completion, + len: usize, + }, +} + +impl OperationType { + fn get_completion(&self) -> &Completion { + match self { + OperationType::Read { completion, .. } + | OperationType::Write { completion, .. } + | OperationType::WriteV { completion, .. } + | OperationType::Sync { completion, .. } + | OperationType::Truncate { completion, .. } => completion, + } + } +} + +pub struct Operation { + pub time: Option, + pub op: OperationType, + pub fault: bool, + pub fd: Arc, +} + +impl Operation { + fn do_operation(self, files: &IndexMap>) { + let fd = self.fd; + match self.op { + OperationType::Read { completion, offset } => { + let file = files.get(fd.as_str()).unwrap(); + let file_buf = file.buffer.borrow_mut(); + let buffer = completion.as_read().buf.clone(); + let buf_size = { + let buf = buffer.as_mut_slice(); + // TODO: check for sector faults here + + buf.copy_from_slice(&file_buf[offset..][0..buf.len()]); + buf.len() as i32 + }; + completion.complete(buf_size); + } + OperationType::Write { + buffer, + completion, + offset, + } => { + let file = files.get(fd.as_str()).unwrap(); + let buf_size = file.write_buf(buffer.as_slice(), offset); + completion.complete(buf_size as i32); + } + OperationType::WriteV { + buffers, + completion, + offset, + } => { + if buffers.is_empty() { + return; + } + let file = files.get(fd.as_str()).unwrap(); + let mut pos = offset; + let written = buffers.into_iter().fold(0, |written, buffer| { + let buf_size = file.write_buf(buffer.as_slice(), pos); + pos += buf_size; + written + buf_size + }); + completion.complete(written as i32); + } + OperationType::Sync { completion, .. } => { + // There is no Sync for in memory + completion.complete(0); + } + OperationType::Truncate { completion, len } => { + let file = files.get(fd.as_str()).unwrap(); + let mut file_buf = file.buffer.borrow_mut(); + file_buf.truncate(len); + completion.complete(0); + } + } + } +} + +pub type CallbackQueue = Arc>>; + +pub struct MemorySimIO { + callbacks: CallbackQueue, + timeouts: CallbackQueue, + pub files: RefCell>>, + pub rng: RefCell, + pub nr_run_once_faults: Cell, + pub page_size: usize, + seed: u64, + latency_probability: usize, + clock: Arc, +} + +unsafe impl Send for MemorySimIO {} +unsafe impl Sync for MemorySimIO {} + +impl MemorySimIO { + pub fn new( + seed: u64, + page_size: usize, + latency_probability: usize, + min_tick: u64, + max_tick: u64, + ) -> Self { + let files = RefCell::new(IndexMap::new()); + let rng = RefCell::new(ChaCha8Rng::seed_from_u64(seed)); + let nr_run_once_faults = Cell::new(0); + Self { + callbacks: Arc::new(Mutex::new(Vec::new())), + timeouts: Arc::new(Mutex::new(Vec::new())), + files, + rng, + nr_run_once_faults, + page_size, + seed, + latency_probability, + clock: Arc::new(SimulatorClock::new( + ChaCha8Rng::seed_from_u64(seed), + min_tick, + max_tick, + )), + } + } +} + +impl SimIO for MemorySimIO { + fn inject_fault(&self, fault: bool) { + for file in self.files.borrow().values() { + file.inject_fault(fault); + } + if fault { + tracing::debug!("fault injected"); + } + } + + fn print_stats(&self) { + for (path, file) in self.files.borrow().iter() { + tracing::info!( + "\n===========================\n\nPath: {}\n{}", + path, + file.stats_table() + ); + } + } + + fn syncing(&self) -> bool { + let callbacks = self.callbacks.try_lock().unwrap(); + callbacks + .iter() + .any(|operation| matches!(operation.op, OperationType::Sync { .. })) + } + + fn close_files(&self) { + for file in self.files.borrow().values() { + file.closed.set(true); + } + } +} + +impl Clock for MemorySimIO { + fn now(&self) -> Instant { + self.clock.now().into() + } +} + +impl IO for MemorySimIO { + fn open_file( + &self, + path: &str, + _flags: OpenFlags, // TODO: ignoring open flags for now as we don't test read only mode in the simulator yet + _direct: bool, + ) -> Result> { + let mut files = self.files.borrow_mut(); + let fd = path.to_string(); + let file = if let Some(file) = files.get(path) { + file.closed.set(false); + file.clone() + } else { + let file = Arc::new(MemorySimFile::new( + self.callbacks.clone(), + fd.clone(), + self.seed, + self.latency_probability, + self.clock.clone(), + )); + files.insert(fd, file.clone()); + file + }; + + Ok(file) + } + + fn run_once(&self) -> Result<()> { + let mut callbacks = self.callbacks.lock(); + let mut timeouts = self.timeouts.lock(); + tracing::trace!( + callbacks.len = callbacks.len(), + timeouts.len = timeouts.len() + ); + let files = self.files.borrow_mut(); + let now = self.now(); + + callbacks.append(&mut timeouts); + + while let Some(callback) = callbacks.pop() { + let completion = callback.op.get_completion(); + if completion.finished() { + continue; + } + + if callback.time.is_none() || callback.time.is_some_and(|time| time < now) { + if callback.fault { + // Inject the fault by aborting the completion + completion.abort(); + continue; + } + callback.do_operation(&files); + } else { + timeouts.push(callback); + } + } + Ok(()) + } + + fn generate_random_number(&self) -> i64 { + self.rng.borrow_mut().next_u64() as i64 + } + + fn remove_file(&self, path: &str) -> Result<()> { + self.files.borrow_mut().shift_remove(path); + Ok(()) + } +} diff --git a/simulator/runner/memory/mod.rs b/simulator/runner/memory/mod.rs new file mode 100644 index 000000000..d8e462011 --- /dev/null +++ b/simulator/runner/memory/mod.rs @@ -0,0 +1,2 @@ +pub mod file; +pub mod io; diff --git a/simulator/runner/mod.rs b/simulator/runner/mod.rs index 3eef78331..b729045f0 100644 --- a/simulator/runner/mod.rs +++ b/simulator/runner/mod.rs @@ -8,6 +8,17 @@ pub mod execution; #[allow(dead_code)] pub mod file; pub mod io; +pub mod memory; pub mod watch; pub const FAULT_ERROR_MSG: &str = "Injected Fault"; + +pub trait SimIO: turso_core::IO { + fn inject_fault(&self, fault: bool); + + fn print_stats(&self); + + fn syncing(&self) -> bool; + + fn close_files(&self); +} diff --git a/simulator/shrink/plan.rs b/simulator/shrink/plan.rs index 7def800ce..43e2912fb 100644 --- a/simulator/shrink/plan.rs +++ b/simulator/shrink/plan.rs @@ -56,6 +56,12 @@ impl InteractionPlan { // Remove all properties that do not use the failing tables plan.plan.retain_mut(|interactions| { let retain = if idx == failing_execution.interaction_index { + if let Interactions::Property( + Property::FsyncNoWait { tables, .. } | Property::FaultyQuery { tables, .. }, + ) = interactions + { + tables.retain(|table| depending_tables.contains(table)); + } true } else { let mut has_table = interactions @@ -73,9 +79,13 @@ impl InteractionPlan { | Property::DropSelect { queries, .. } => { queries.clear(); } - Property::FsyncNoWait { tables, .. } - | Property::FaultyQuery { tables, .. } => { - tables.retain(|table| depending_tables.contains(table)); + Property::FsyncNoWait { tables, query } + | Property::FaultyQuery { tables, query } => { + if !query.uses().iter().any(|t| depending_tables.contains(t)) { + tables.clear(); + } else { + tables.retain(|table| depending_tables.contains(table)); + } } Property::SelectLimit { .. } | Property::SelectSelectOptimizer { .. }