diff --git a/Cargo.lock b/Cargo.lock index f30d5ffb4..3c7ce5329 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2192,6 +2192,7 @@ dependencies = [ "env_logger 0.10.2", "garde", "hex", + "indexmap 2.10.0", "itertools 0.14.0", "json5", "log", diff --git a/simulator/Cargo.toml b/simulator/Cargo.toml index b0f372819..89ee9634b 100644 --- a/simulator/Cargo.toml +++ b/simulator/Cargo.toml @@ -43,3 +43,4 @@ garde = { workspace = true, features = ["derive", "serde"] } json5 = { version = "0.4.1" } strum = { workspace = true } parking_lot = { workspace = true } +indexmap = "2.10.0" diff --git a/simulator/runner/memory/file.rs b/simulator/runner/memory/file.rs new file mode 100644 index 000000000..158f654b1 --- /dev/null +++ b/simulator/runner/memory/file.rs @@ -0,0 +1,163 @@ +use std::{ + cell::{Cell, RefCell}, + sync::Arc, +}; + +use rand::{Rng as _, SeedableRng}; +use rand_chacha::ChaCha8Rng; +use tracing::{instrument, Level}; +use turso_core::{Completion, File, Result}; + +use crate::runner::{ + clock::SimulatorClock, + memory::io::{CallbackQueue, Fd, Operation, OperationType}, +}; + +pub struct MemorySimFile { + pub callbacks: CallbackQueue, + pub fd: Arc, + pub buffer: RefCell>, + // TODO: add fault map later here + pub closed: Cell, + + /// Number of `pread` function calls (both success and failures). + pub nr_pread_calls: Cell, + /// Number of `pwrite` function calls (both success and failures). + pub nr_pwrite_calls: Cell, + /// Number of `sync` function calls (both success and failures). + pub nr_sync_calls: Cell, + + pub rng: RefCell, + + pub latency_probability: usize, + clock: Arc, +} + +type IoOperation = Box Result>>; + +pub struct DelayedIo { + pub time: turso_core::Instant, + pub op: IoOperation, +} + +unsafe impl Send for MemorySimFile {} +unsafe impl Sync for MemorySimFile {} + +impl MemorySimFile { + pub fn new( + callbacks: CallbackQueue, + fd: Fd, + seed: u64, + latency_probability: usize, + clock: Arc, + ) -> Self { + Self { + callbacks, + fd: Arc::new(fd), + buffer: RefCell::new(Vec::new()), + closed: Cell::new(false), + nr_pread_calls: Cell::new(0), + nr_pwrite_calls: Cell::new(0), + nr_sync_calls: Cell::new(0), + rng: RefCell::new(ChaCha8Rng::seed_from_u64(seed)), + latency_probability, + clock, + } + } + + pub fn stats_table(&self) -> String { + let sum_calls = + self.nr_pread_calls.get() + self.nr_pwrite_calls.get() + self.nr_sync_calls.get(); + let stats_table = [ + "op calls ".to_string(), + "--------- --------".to_string(), + format!("pread {:8}", self.nr_pread_calls.get()), + format!("pwrite {:8}", self.nr_pwrite_calls.get()), + format!("sync {:8}", self.nr_sync_calls.get()), + "--------- -------- --------".to_string(), + format!("total {sum_calls:8}"), + ]; + + stats_table.join("\n") + } + + #[instrument(skip_all, level = Level::TRACE)] + fn generate_latency(&self) -> Option { + let mut rng = self.rng.borrow_mut(); + // Chance to introduce some latency + rng.random_bool(self.latency_probability as f64 / 100.0) + .then(|| { + let now = self.clock.now(); + let sum = now + std::time::Duration::from_millis(rng.random_range(5..20)); + sum.into() + }) + } +} + +impl File for MemorySimFile { + fn lock_file(&self, _exclusive: bool) -> Result<()> { + Ok(()) + } + + fn unlock_file(&self) -> Result<()> { + Ok(()) + } + + fn pread(&self, pos: usize, c: Completion) -> Result { + self.nr_pread_calls.set(self.nr_pread_calls.get() + 1); + + let op = OperationType::Read { + fd: self.fd.clone(), + completion: c.clone(), + offset: pos, + }; + self.callbacks.lock().push(Operation { + time: self.generate_latency(), + op, + }); + Ok(c) + } + + fn pwrite( + &self, + pos: usize, + buffer: Arc, + c: Completion, + ) -> Result { + self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1); + let op = OperationType::Write { + fd: self.fd.clone(), + buffer, + completion: c.clone(), + offset: pos, + }; + self.callbacks.lock().push(Operation { + time: self.generate_latency(), + op, + }); + Ok(c) + } + + fn sync(&self, c: Completion) -> Result { + self.nr_sync_calls.set(self.nr_sync_calls.get() + 1); + let op = OperationType::Sync { + fd: self.fd.clone(), + completion: c.clone(), + }; + self.callbacks.lock().push(Operation { + time: self.generate_latency(), + op, + }); + Ok(c) + } + + fn size(&self) -> Result { + // TODO: size operation should also be scheduled. But this requires a change in how we + // Use this function internally in Turso + Ok(self.buffer.borrow().len() as u64) + } + + fn truncate(&self, len: usize, c: Completion) -> Result { + todo!() + } +} diff --git a/simulator/runner/memory/io.rs b/simulator/runner/memory/io.rs new file mode 100644 index 000000000..ad614ba9c --- /dev/null +++ b/simulator/runner/memory/io.rs @@ -0,0 +1,253 @@ +use std::cell::{Cell, RefCell}; +use std::sync::Arc; + +use indexmap::IndexMap; +use parking_lot::Mutex; +use rand::{RngCore, SeedableRng}; +use rand_chacha::ChaCha8Rng; +use turso_core::{Clock, Completion, Instant, OpenFlags, Result, IO}; + +use crate::runner::clock::SimulatorClock; +use crate::runner::SimIO; +use crate::{model::FAULT_ERROR_MSG, runner::memory::file::MemorySimFile}; + +/// File descriptor +pub type Fd = String; + +pub enum OperationType { + Read { + fd: Arc, + completion: Completion, + offset: usize, + }, + Write { + fd: Arc, + buffer: Arc, + completion: Completion, + offset: usize, + }, + Sync { + fd: Arc, + completion: Completion, + }, +} + +impl OperationType { + fn get_fd(&self) -> &Fd { + match self { + OperationType::Read { fd, .. } + | OperationType::Write { fd, .. } + | OperationType::Sync { fd, .. } => fd, + } + } +} + +pub struct Operation { + pub time: Option, + pub op: OperationType, +} + +impl Operation { + fn do_operation(self, files: &IndexMap>) { + match self.op { + OperationType::Read { + fd, + completion, + offset, + } => { + let file = files.get(fd.as_str()).unwrap(); + let file_buf = file.buffer.borrow_mut(); + let buffer = completion.as_read().buf.clone(); + let buf_size = { + let buf = buffer.as_mut_slice(); + // TODO: check for sector faults here + + buf.copy_from_slice(&file_buf[offset..][0..buf.len()]); + buf.len() as i32 + }; + completion.complete(buf_size); + } + OperationType::Write { + fd, + buffer, + completion, + offset, + } => { + let file = files.get(fd.as_str()).unwrap(); + let buf_size = { + let mut file_buf = file.buffer.borrow_mut(); + let buf = buffer.as_slice(); + let more_space = if file_buf.len() < offset { + (offset + buf.len()) - file_buf.len() + } else { + buf.len().saturating_sub(file_buf.len() - offset) + }; + if more_space > 0 { + file_buf.reserve(more_space); + for _ in 0..more_space { + file_buf.push(0); + } + } + + file_buf[offset..][0..buf.len()].copy_from_slice(buf); + buf.len() as i32 + }; + completion.complete(buf_size); + } + OperationType::Sync { completion, .. } => { + // There is no Sync for in memory + completion.complete(0); + } + } + } +} + +pub type CallbackQueue = Arc>>; + +pub struct MemorySimIO { + callbacks: CallbackQueue, + timeouts: CallbackQueue, + pub fault: Cell, + pub files: RefCell>>, + pub rng: RefCell, + pub nr_run_once_faults: Cell, + pub page_size: usize, + seed: u64, + latency_probability: usize, + clock: Arc, +} + +unsafe impl Send for MemorySimIO {} +unsafe impl Sync for MemorySimIO {} + +impl MemorySimIO { + pub fn new( + seed: u64, + page_size: usize, + latency_probability: usize, + min_tick: u64, + max_tick: u64, + ) -> Result { + let fault = Cell::new(false); + let files = RefCell::new(IndexMap::new()); + let rng = RefCell::new(ChaCha8Rng::seed_from_u64(seed)); + let nr_run_once_faults = Cell::new(0); + Ok(Self { + callbacks: Arc::new(Mutex::new(Vec::new())), + timeouts: Arc::new(Mutex::new(Vec::new())), + fault, + files, + rng, + nr_run_once_faults, + page_size, + seed, + latency_probability, + clock: Arc::new(SimulatorClock::new( + ChaCha8Rng::seed_from_u64(seed), + min_tick, + max_tick, + )), + }) + } +} + +impl SimIO for MemorySimIO { + fn inject_fault(&self, fault: bool) { + self.fault.replace(fault); + if fault { + tracing::debug!("fault injected"); + } + } + + fn print_stats(&self) { + tracing::info!("run_once faults: {}", self.nr_run_once_faults.get()); + for file in self.files.borrow().values() { + tracing::info!("\n===========================\n{}", file.stats_table()); + } + } + + fn syncing(&self) -> bool { + let callbacks = self.callbacks.try_lock().unwrap(); + callbacks + .iter() + .any(|operation| matches!(operation.op, OperationType::Sync { .. })) + } + + fn close_files(&self) { + for file in self.files.borrow().values() { + file.closed.set(true); + } + } +} + +impl Clock for MemorySimIO { + fn now(&self) -> Instant { + self.clock.now().into() + } +} + +impl IO for MemorySimIO { + fn open_file( + &self, + path: &str, + _flags: OpenFlags, // TODO: ignoring open flags for now as we don't test read only mode in the simulator yet + _direct: bool, + ) -> Result> { + let mut files = self.files.borrow_mut(); + let fd = path.to_string(); + let file = if let Some(file) = files.get(path) { + file.closed.set(false); + file.clone() + } else { + let file = Arc::new(MemorySimFile::new( + self.callbacks.clone(), + fd.clone(), + self.seed, + self.latency_probability, + self.clock.clone(), + )); + files.insert(fd, file.clone()); + file + }; + + Ok(file) + } + + fn run_once(&self) -> Result<()> { + let mut callbacks = self.callbacks.lock(); + let mut timeouts = self.timeouts.lock(); + tracing::trace!( + callbacks.len = callbacks.len(), + timeouts.len = timeouts.len() + ); + if self.fault.get() { + self.nr_run_once_faults + .replace(self.nr_run_once_faults.get() + 1); + // TODO: currently we only deal with single threaded execution in one file + // When we support multiple db files, we need to only remove callbacks not relevant to the current file + // and maybe connection + callbacks.clear(); + timeouts.clear(); + return Err(turso_core::LimboError::InternalError( + FAULT_ERROR_MSG.into(), + )); + } + let files = self.files.borrow_mut(); + let now = self.now(); + + callbacks.append(&mut timeouts); + + while let Some(callback) = callbacks.pop() { + if callback.time.is_none() || callback.time.is_some_and(|time| time < now) { + callback.do_operation(&files); + } else { + timeouts.push(callback); + } + } + Ok(()) + } + + fn generate_random_number(&self) -> i64 { + self.rng.borrow_mut().next_u64() as i64 + } +} diff --git a/simulator/runner/memory/mod.rs b/simulator/runner/memory/mod.rs new file mode 100644 index 000000000..d8e462011 --- /dev/null +++ b/simulator/runner/memory/mod.rs @@ -0,0 +1,2 @@ +pub mod file; +pub mod io; diff --git a/simulator/runner/mod.rs b/simulator/runner/mod.rs index 13b5c4258..b729045f0 100644 --- a/simulator/runner/mod.rs +++ b/simulator/runner/mod.rs @@ -8,6 +8,7 @@ pub mod execution; #[allow(dead_code)] pub mod file; pub mod io; +pub mod memory; pub mod watch; pub const FAULT_ERROR_MSG: &str = "Injected Fault";