diff --git a/simulator/runner/memory/file.rs b/simulator/runner/memory/file.rs index a5b956cc4..e648ef23d 100644 --- a/simulator/runner/memory/file.rs +++ b/simulator/runner/memory/file.rs @@ -13,6 +13,35 @@ use crate::runner::{ memory::io::{CallbackQueue, Fd, Operation, OperationType}, }; +/// Tracks IO calls and faults for each type of I/O operation +#[derive(Debug, Default)] +struct IOTracker { + pread_calls: usize, + pread_faults: usize, + + pwrite_calls: usize, + pwrite_faults: usize, + + pwritev_calls: usize, + pwritev_faults: usize, + + sync_calls: usize, + sync_faults: usize, + + truncate_calls: usize, + truncate_faults: usize, +} + +impl IOTracker { + fn total_calls(&self) -> usize { + self.pread_calls + + self.pwrite_calls + + self.pwritev_calls + + self.sync_calls + + self.truncate_calls + } +} + pub struct MemorySimFile { // TODO: maybe have a pending queue which is fast to append // and then we just do a mem swap the pending with the callback to minimize lock contention on callback queue @@ -21,25 +50,11 @@ pub struct MemorySimFile { pub buffer: RefCell>, // TODO: add fault map later here pub closed: Cell, - - /// Number of `pread` function calls (both success and failures). - pub nr_pread_calls: Cell, - /// Number of `pwrite` function calls (both success and failures). - pub nr_pwrite_calls: Cell, - /// Number of `sync` function calls (both success and failures). - pub nr_sync_calls: Cell, - + io_tracker: RefCell, pub rng: RefCell, - pub latency_probability: usize, clock: Arc, -} - -type IoOperation = Box Result>>; - -pub struct DelayedIo { - pub time: turso_core::Instant, - pub op: IoOperation, + fault: Cell, } unsafe impl Send for MemorySimFile {} @@ -58,24 +73,44 @@ impl MemorySimFile { fd: Arc::new(fd), buffer: RefCell::new(Vec::new()), closed: Cell::new(false), - nr_pread_calls: Cell::new(0), - nr_pwrite_calls: Cell::new(0), - nr_sync_calls: Cell::new(0), + io_tracker: RefCell::new(IOTracker::default()), rng: RefCell::new(ChaCha8Rng::seed_from_u64(seed)), latency_probability, clock, + fault: Cell::new(false), } } + pub fn inject_fault(&self, fault: bool) { + self.fault.set(fault); + } + pub fn stats_table(&self) -> String { - let sum_calls = - self.nr_pread_calls.get() + self.nr_pwrite_calls.get() + self.nr_sync_calls.get(); + let io_tracker = self.io_tracker.borrow(); + let sum_calls = io_tracker.total_calls(); let stats_table = [ - "op calls ".to_string(), - "--------- --------".to_string(), - format!("pread {:8}", self.nr_pread_calls.get()), - format!("pwrite {:8}", self.nr_pwrite_calls.get()), - format!("sync {:8}", self.nr_sync_calls.get()), + "op calls faults ".to_string(), + "--------- -------- --------".to_string(), + format!( + "pread {:8} {:8}", + io_tracker.pread_calls, io_tracker.pread_faults + ), + format!( + "pwrite {:8} {:8}", + io_tracker.pwrite_calls, io_tracker.pwrite_faults + ), + format!( + "pwritev {:8} {:8}", + io_tracker.pwritev_calls, io_tracker.pwritev_faults + ), + format!( + "sync {:8} {:8}", + io_tracker.sync_calls, io_tracker.sync_faults + ), + format!( + "truncate {:8} {:8}", + io_tracker.truncate_calls, io_tracker.truncate_faults + ), "--------- -------- --------".to_string(), format!("total {sum_calls:8}"), ]; @@ -96,9 +131,22 @@ impl MemorySimFile { } fn insert_op(&self, op: OperationType) { + let fault = self.fault.get(); + if fault { + let mut io_tracker = self.io_tracker.borrow_mut(); + match &op { + OperationType::Read { .. } => io_tracker.pread_faults += 1, + OperationType::Write { .. } => io_tracker.pwrite_faults += 1, + OperationType::WriteV { .. } => io_tracker.pwritev_faults += 1, + OperationType::Sync { .. } => io_tracker.sync_faults += 1, + OperationType::Truncate { .. } => io_tracker.truncate_faults += 1, + } + } + self.callbacks.lock().push(Operation { time: self.generate_latency(), op, + fault, }); } @@ -131,7 +179,7 @@ impl File for MemorySimFile { } fn pread(&self, pos: usize, c: Completion) -> Result { - self.nr_pread_calls.set(self.nr_pread_calls.get() + 1); + self.io_tracker.borrow_mut().pread_calls += 1; let op = OperationType::Read { fd: self.fd.clone(), @@ -148,7 +196,7 @@ impl File for MemorySimFile { buffer: Arc, c: Completion, ) -> Result { - self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1); + self.io_tracker.borrow_mut().pwrite_calls += 1; let op = OperationType::Write { fd: self.fd.clone(), buffer, @@ -168,6 +216,7 @@ impl File for MemorySimFile { if buffers.len() == 1 { return self.pwrite(pos, buffers[0].clone(), c); } + self.io_tracker.borrow_mut().pwritev_calls += 1; let op = OperationType::WriteV { fd: self.fd.clone(), buffers, @@ -179,7 +228,7 @@ impl File for MemorySimFile { } fn sync(&self, c: Completion) -> Result { - self.nr_sync_calls.set(self.nr_sync_calls.get() + 1); + self.io_tracker.borrow_mut().sync_calls += 1; let op = OperationType::Sync { fd: self.fd.clone(), completion: c.clone(), @@ -195,6 +244,7 @@ impl File for MemorySimFile { } fn truncate(&self, len: usize, c: Completion) -> Result { + self.io_tracker.borrow_mut().truncate_calls += 1; let op = OperationType::Truncate { fd: self.fd.clone(), completion: c.clone(), diff --git a/simulator/runner/memory/io.rs b/simulator/runner/memory/io.rs index 900237e96..8b07ba819 100644 --- a/simulator/runner/memory/io.rs +++ b/simulator/runner/memory/io.rs @@ -8,8 +8,8 @@ use rand_chacha::ChaCha8Rng; use turso_core::{Clock, Completion, Instant, OpenFlags, Result, IO}; use crate::runner::clock::SimulatorClock; +use crate::runner::memory::file::MemorySimFile; use crate::runner::SimIO; -use crate::{model::FAULT_ERROR_MSG, runner::memory::file::MemorySimFile}; /// File descriptor pub type Fd = String; @@ -68,7 +68,7 @@ impl OperationType { pub struct Operation { pub time: Option, pub op: OperationType, - // TODO: add a fault field here to signal if an Operation should fault + pub fault: bool, } impl Operation { @@ -142,7 +142,6 @@ pub type CallbackQueue = Arc>>; pub struct MemorySimIO { callbacks: CallbackQueue, timeouts: CallbackQueue, - pub fault: Cell, pub files: RefCell>>, pub rng: RefCell, pub nr_run_once_faults: Cell, @@ -163,14 +162,12 @@ impl MemorySimIO { min_tick: u64, max_tick: u64, ) -> Self { - let fault = Cell::new(false); let files = RefCell::new(IndexMap::new()); let rng = RefCell::new(ChaCha8Rng::seed_from_u64(seed)); let nr_run_once_faults = Cell::new(0); Self { callbacks: Arc::new(Mutex::new(Vec::new())), timeouts: Arc::new(Mutex::new(Vec::new())), - fault, files, rng, nr_run_once_faults, @@ -188,14 +185,15 @@ impl MemorySimIO { impl SimIO for MemorySimIO { fn inject_fault(&self, fault: bool) { - self.fault.replace(fault); + for file in self.files.borrow().values() { + file.inject_fault(fault); + } if fault { tracing::debug!("fault injected"); } } fn print_stats(&self) { - tracing::info!("run_once faults: {}", self.nr_run_once_faults.get()); for (path, file) in self.files.borrow().iter() { tracing::info!( "\n===========================\n\nPath: {}\n{}", @@ -259,13 +257,6 @@ impl IO for MemorySimIO { callbacks.len = callbacks.len(), timeouts.len = timeouts.len() ); - if self.fault.get() { - self.nr_run_once_faults - .replace(self.nr_run_once_faults.get() + 1); - return Err(turso_core::LimboError::InternalError( - FAULT_ERROR_MSG.into(), - )); - } let files = self.files.borrow_mut(); let now = self.now(); @@ -276,8 +267,13 @@ impl IO for MemorySimIO { if completion.finished() { continue; } + if callback.time.is_none() || callback.time.is_some_and(|time| time < now) { - // TODO: check if we should inject fault in operation here + if callback.fault { + // Inject the fault by aborting the completion + completion.abort(); + continue; + } callback.do_operation(&files); } else { timeouts.push(callback);