inject fault in the IO Operation in the MemorySim

This commit is contained in:
pedrocarlo
2025-08-22 11:11:26 -03:00
parent be855a8059
commit c158db072b
2 changed files with 90 additions and 44 deletions

View File

@@ -13,6 +13,35 @@ use crate::runner::{
memory::io::{CallbackQueue, Fd, Operation, OperationType},
};
/// Tracks IO calls and faults for each type of I/O operation
#[derive(Debug, Default)]
struct IOTracker {
pread_calls: usize,
pread_faults: usize,
pwrite_calls: usize,
pwrite_faults: usize,
pwritev_calls: usize,
pwritev_faults: usize,
sync_calls: usize,
sync_faults: usize,
truncate_calls: usize,
truncate_faults: usize,
}
impl IOTracker {
fn total_calls(&self) -> usize {
self.pread_calls
+ self.pwrite_calls
+ self.pwritev_calls
+ self.sync_calls
+ self.truncate_calls
}
}
pub struct MemorySimFile {
// TODO: maybe have a pending queue which is fast to append
// and then we just do a mem swap the pending with the callback to minimize lock contention on callback queue
@@ -21,25 +50,11 @@ pub struct MemorySimFile {
pub buffer: RefCell<Vec<u8>>,
// TODO: add fault map later here
pub closed: Cell<bool>,
/// Number of `pread` function calls (both success and failures).
pub nr_pread_calls: Cell<usize>,
/// Number of `pwrite` function calls (both success and failures).
pub nr_pwrite_calls: Cell<usize>,
/// Number of `sync` function calls (both success and failures).
pub nr_sync_calls: Cell<usize>,
io_tracker: RefCell<IOTracker>,
pub rng: RefCell<ChaCha8Rng>,
pub latency_probability: usize,
clock: Arc<SimulatorClock>,
}
type IoOperation = Box<dyn FnOnce(OperationType) -> Result<Arc<turso_core::Completion>>>;
pub struct DelayedIo {
pub time: turso_core::Instant,
pub op: IoOperation,
fault: Cell<bool>,
}
unsafe impl Send for MemorySimFile {}
@@ -58,24 +73,44 @@ impl MemorySimFile {
fd: Arc::new(fd),
buffer: RefCell::new(Vec::new()),
closed: Cell::new(false),
nr_pread_calls: Cell::new(0),
nr_pwrite_calls: Cell::new(0),
nr_sync_calls: Cell::new(0),
io_tracker: RefCell::new(IOTracker::default()),
rng: RefCell::new(ChaCha8Rng::seed_from_u64(seed)),
latency_probability,
clock,
fault: Cell::new(false),
}
}
pub fn inject_fault(&self, fault: bool) {
self.fault.set(fault);
}
pub fn stats_table(&self) -> String {
let sum_calls =
self.nr_pread_calls.get() + self.nr_pwrite_calls.get() + self.nr_sync_calls.get();
let io_tracker = self.io_tracker.borrow();
let sum_calls = io_tracker.total_calls();
let stats_table = [
"op calls ".to_string(),
"--------- --------".to_string(),
format!("pread {:8}", self.nr_pread_calls.get()),
format!("pwrite {:8}", self.nr_pwrite_calls.get()),
format!("sync {:8}", self.nr_sync_calls.get()),
"op calls faults ".to_string(),
"--------- -------- --------".to_string(),
format!(
"pread {:8} {:8}",
io_tracker.pread_calls, io_tracker.pread_faults
),
format!(
"pwrite {:8} {:8}",
io_tracker.pwrite_calls, io_tracker.pwrite_faults
),
format!(
"pwritev {:8} {:8}",
io_tracker.pwritev_calls, io_tracker.pwritev_faults
),
format!(
"sync {:8} {:8}",
io_tracker.sync_calls, io_tracker.sync_faults
),
format!(
"truncate {:8} {:8}",
io_tracker.truncate_calls, io_tracker.truncate_faults
),
"--------- -------- --------".to_string(),
format!("total {sum_calls:8}"),
];
@@ -96,9 +131,22 @@ impl MemorySimFile {
}
fn insert_op(&self, op: OperationType) {
let fault = self.fault.get();
if fault {
let mut io_tracker = self.io_tracker.borrow_mut();
match &op {
OperationType::Read { .. } => io_tracker.pread_faults += 1,
OperationType::Write { .. } => io_tracker.pwrite_faults += 1,
OperationType::WriteV { .. } => io_tracker.pwritev_faults += 1,
OperationType::Sync { .. } => io_tracker.sync_faults += 1,
OperationType::Truncate { .. } => io_tracker.truncate_faults += 1,
}
}
self.callbacks.lock().push(Operation {
time: self.generate_latency(),
op,
fault,
});
}
@@ -131,7 +179,7 @@ impl File for MemorySimFile {
}
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
self.nr_pread_calls.set(self.nr_pread_calls.get() + 1);
self.io_tracker.borrow_mut().pread_calls += 1;
let op = OperationType::Read {
fd: self.fd.clone(),
@@ -148,7 +196,7 @@ impl File for MemorySimFile {
buffer: Arc<turso_core::Buffer>,
c: Completion,
) -> Result<Completion> {
self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1);
self.io_tracker.borrow_mut().pwrite_calls += 1;
let op = OperationType::Write {
fd: self.fd.clone(),
buffer,
@@ -168,6 +216,7 @@ impl File for MemorySimFile {
if buffers.len() == 1 {
return self.pwrite(pos, buffers[0].clone(), c);
}
self.io_tracker.borrow_mut().pwritev_calls += 1;
let op = OperationType::WriteV {
fd: self.fd.clone(),
buffers,
@@ -179,7 +228,7 @@ impl File for MemorySimFile {
}
fn sync(&self, c: Completion) -> Result<Completion> {
self.nr_sync_calls.set(self.nr_sync_calls.get() + 1);
self.io_tracker.borrow_mut().sync_calls += 1;
let op = OperationType::Sync {
fd: self.fd.clone(),
completion: c.clone(),
@@ -195,6 +244,7 @@ impl File for MemorySimFile {
}
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
self.io_tracker.borrow_mut().truncate_calls += 1;
let op = OperationType::Truncate {
fd: self.fd.clone(),
completion: c.clone(),

View File

@@ -8,8 +8,8 @@ use rand_chacha::ChaCha8Rng;
use turso_core::{Clock, Completion, Instant, OpenFlags, Result, IO};
use crate::runner::clock::SimulatorClock;
use crate::runner::memory::file::MemorySimFile;
use crate::runner::SimIO;
use crate::{model::FAULT_ERROR_MSG, runner::memory::file::MemorySimFile};
/// File descriptor
pub type Fd = String;
@@ -68,7 +68,7 @@ impl OperationType {
pub struct Operation {
pub time: Option<turso_core::Instant>,
pub op: OperationType,
// TODO: add a fault field here to signal if an Operation should fault
pub fault: bool,
}
impl Operation {
@@ -142,7 +142,6 @@ pub type CallbackQueue = Arc<Mutex<Vec<Operation>>>;
pub struct MemorySimIO {
callbacks: CallbackQueue,
timeouts: CallbackQueue,
pub fault: Cell<bool>,
pub files: RefCell<IndexMap<Fd, Arc<MemorySimFile>>>,
pub rng: RefCell<ChaCha8Rng>,
pub nr_run_once_faults: Cell<usize>,
@@ -163,14 +162,12 @@ impl MemorySimIO {
min_tick: u64,
max_tick: u64,
) -> Self {
let fault = Cell::new(false);
let files = RefCell::new(IndexMap::new());
let rng = RefCell::new(ChaCha8Rng::seed_from_u64(seed));
let nr_run_once_faults = Cell::new(0);
Self {
callbacks: Arc::new(Mutex::new(Vec::new())),
timeouts: Arc::new(Mutex::new(Vec::new())),
fault,
files,
rng,
nr_run_once_faults,
@@ -188,14 +185,15 @@ impl MemorySimIO {
impl SimIO for MemorySimIO {
fn inject_fault(&self, fault: bool) {
self.fault.replace(fault);
for file in self.files.borrow().values() {
file.inject_fault(fault);
}
if fault {
tracing::debug!("fault injected");
}
}
fn print_stats(&self) {
tracing::info!("run_once faults: {}", self.nr_run_once_faults.get());
for (path, file) in self.files.borrow().iter() {
tracing::info!(
"\n===========================\n\nPath: {}\n{}",
@@ -259,13 +257,6 @@ impl IO for MemorySimIO {
callbacks.len = callbacks.len(),
timeouts.len = timeouts.len()
);
if self.fault.get() {
self.nr_run_once_faults
.replace(self.nr_run_once_faults.get() + 1);
return Err(turso_core::LimboError::InternalError(
FAULT_ERROR_MSG.into(),
));
}
let files = self.files.borrow_mut();
let now = self.now();
@@ -276,8 +267,13 @@ impl IO for MemorySimIO {
if completion.finished() {
continue;
}
if callback.time.is_none() || callback.time.is_some_and(|time| time < now) {
// TODO: check if we should inject fault in operation here
if callback.fault {
// Inject the fault by aborting the completion
completion.abort();
continue;
}
callback.do_operation(&files);
} else {
timeouts.push(callback);