inject latency with queuing system

This commit is contained in:
pedrocarlo
2025-07-09 14:37:32 -03:00
parent dc5f73887e
commit b292e08d2b
2 changed files with 58 additions and 11 deletions

View File

@@ -1,12 +1,13 @@
use std::{
cell::{Cell, RefCell},
fmt::Debug,
sync::Arc,
};
use rand::Rng as _;
use rand_chacha::ChaCha8Rng;
use tracing::{instrument, Level};
use turso_core::{Completion, File, Result};
use turso_core::{File, Result};
use crate::model::FAULT_ERROR_MSG;
pub(crate) struct SimulatorFile {
@@ -38,13 +39,22 @@ pub(crate) struct SimulatorFile {
pub latency_probability: usize,
pub sync_completion: RefCell<Option<Arc<turso_core::Completion>>>,
pub queued_io: RefCell<Vec<QueuedIo>>,
pub queued_io: RefCell<Vec<DelayedIo>>,
}
pub struct QueuedIo {
pub completion: Completion,
type IoOperation = Box<dyn FnOnce(&SimulatorFile) -> Result<Arc<turso_core::Completion>>>;
pub struct DelayedIo {
pub time: std::time::Instant,
pub op: Box<dyn FnOnce() -> Result<Arc<turso_core::Completion>>>,
pub op: IoOperation,
}
impl Debug for DelayedIo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DelayedIo")
.field("time", &self.time)
.finish()
}
}
unsafe impl Send for SimulatorFile {}
@@ -90,9 +100,32 @@ impl SimulatorFile {
// Chance to introduce some latency
rng.gen_bool(self.latency_probability as f64 / 100.0)
.then(|| {
std::time::Instant::now() + std::time::Duration::from_millis(rng.gen_range(20..50))
std::time::Instant::now() + std::time::Duration::from_millis(rng.gen_range(50..200))
})
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn run_queued_io(&self, now: std::time::Instant) -> Result<()> {
let mut queued_io = self.queued_io.borrow_mut();
tracing::debug!(?queued_io);
// TODO: as we are not in version 1.87 we cannot use `extract_if`
// so we have to do something different to achieve the same thing
// This code was acquired from: https://doc.rust-lang.org/beta/std/vec/struct.Vec.html#method.extract_if
let range = 0..queued_io.len();
let mut i = range.start;
let end_items = queued_io.len() - range.end;
while i < queued_io.len() - end_items {
if queued_io[i].time <= now {
let io = queued_io.remove(i);
// your code here
(io.op)(self)?;
} else {
i += 1;
}
}
Ok(())
}
}
impl File for SimulatorFile {
@@ -128,8 +161,11 @@ impl File for SimulatorFile {
));
}
if let Some(latency) = self.generate_latency_duration() {
let op = Box::new(|| self.inner.pread(pos, c.clone()));
let cloned_c = c.clone();
let op = Box::new(move |file: &SimulatorFile| file.inner.pread(pos, cloned_c));
self.queued_io
.borrow_mut()
.push(DelayedIo { time: latency, op });
Ok(c)
} else {
self.inner.pread(pos, c)
@@ -151,8 +187,11 @@ impl File for SimulatorFile {
));
}
if let Some(latency) = self.generate_latency_duration() {
let op = Box::new(|| self.inner.pwrite(pos, buffer, c.clone()));
let cloned_c = c.clone();
let op = Box::new(move |file: &SimulatorFile| file.inner.pwrite(pos, buffer, cloned_c));
self.queued_io
.borrow_mut()
.push(DelayedIo { time: latency, op });
Ok(c)
} else {
self.inner.pwrite(pos, buffer, c)
@@ -167,7 +206,11 @@ impl File for SimulatorFile {
self.fault.set(false);
}
let c = if let Some(latency) = self.generate_latency_duration() {
let op = Box::new(|| self.inner.sync(c.clone()));
let cloned_c = c.clone();
let op = Box::new(|file: &SimulatorFile| file.inner.sync(cloned_c));
self.queued_io
.borrow_mut()
.push(DelayedIo { time: latency, op });
Ok(c)
} else {
self.inner.sync(c)

View File

@@ -109,6 +109,10 @@ impl IO for SimulatorIO {
FAULT_ERROR_MSG.into(),
));
}
let now = std::time::Instant::now();
for file in self.files.borrow().iter() {
file.run_queued_io(now)?;
}
self.inner.run_once()?;
Ok(())
}