diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index 310d227d7..edcb44c31 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -1,12 +1,13 @@ use std::{ cell::{Cell, RefCell}, + fmt::Debug, sync::Arc, }; use rand::Rng as _; use rand_chacha::ChaCha8Rng; use tracing::{instrument, Level}; -use turso_core::{Completion, File, Result}; +use turso_core::{File, Result}; use crate::model::FAULT_ERROR_MSG; pub(crate) struct SimulatorFile { @@ -38,13 +39,22 @@ pub(crate) struct SimulatorFile { pub latency_probability: usize, pub sync_completion: RefCell>>, - pub queued_io: RefCell>, + pub queued_io: RefCell>, } -pub struct QueuedIo { - pub completion: Completion, +type IoOperation = Box Result>>; + +pub struct DelayedIo { pub time: std::time::Instant, - pub op: Box Result>>, + pub op: IoOperation, +} + +impl Debug for DelayedIo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DelayedIo") + .field("time", &self.time) + .finish() + } } unsafe impl Send for SimulatorFile {} @@ -90,9 +100,32 @@ impl SimulatorFile { // Chance to introduce some latency rng.gen_bool(self.latency_probability as f64 / 100.0) .then(|| { - std::time::Instant::now() + std::time::Duration::from_millis(rng.gen_range(20..50)) + std::time::Instant::now() + std::time::Duration::from_millis(rng.gen_range(50..200)) }) } + + #[instrument(skip_all, level = Level::DEBUG)] + pub fn run_queued_io(&self, now: std::time::Instant) -> Result<()> { + let mut queued_io = self.queued_io.borrow_mut(); + tracing::debug!(?queued_io); + // TODO: as we are not in version 1.87 we cannot use `extract_if` + // so we have to do something different to achieve the same thing + // This code was acquired from: https://doc.rust-lang.org/beta/std/vec/struct.Vec.html#method.extract_if + let range = 0..queued_io.len(); + let mut i = range.start; + let end_items = queued_io.len() - range.end; + + while i < queued_io.len() - end_items { + if queued_io[i].time <= now { + let io = queued_io.remove(i); + // your code here + (io.op)(self)?; + } else { + i += 1; + } + } + Ok(()) + } } impl File for SimulatorFile { @@ -128,8 +161,11 @@ impl File for SimulatorFile { )); } if let Some(latency) = self.generate_latency_duration() { - let op = Box::new(|| self.inner.pread(pos, c.clone())); - + let cloned_c = c.clone(); + let op = Box::new(move |file: &SimulatorFile| file.inner.pread(pos, cloned_c)); + self.queued_io + .borrow_mut() + .push(DelayedIo { time: latency, op }); Ok(c) } else { self.inner.pread(pos, c) @@ -151,8 +187,11 @@ impl File for SimulatorFile { )); } if let Some(latency) = self.generate_latency_duration() { - let op = Box::new(|| self.inner.pwrite(pos, buffer, c.clone())); - + let cloned_c = c.clone(); + let op = Box::new(move |file: &SimulatorFile| file.inner.pwrite(pos, buffer, cloned_c)); + self.queued_io + .borrow_mut() + .push(DelayedIo { time: latency, op }); Ok(c) } else { self.inner.pwrite(pos, buffer, c) @@ -167,7 +206,11 @@ impl File for SimulatorFile { self.fault.set(false); } let c = if let Some(latency) = self.generate_latency_duration() { - let op = Box::new(|| self.inner.sync(c.clone())); + let cloned_c = c.clone(); + let op = Box::new(|file: &SimulatorFile| file.inner.sync(cloned_c)); + self.queued_io + .borrow_mut() + .push(DelayedIo { time: latency, op }); Ok(c) } else { self.inner.sync(c) diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index bf416455c..cc4b04f4e 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -109,6 +109,10 @@ impl IO for SimulatorIO { FAULT_ERROR_MSG.into(), )); } + let now = std::time::Instant::now(); + for file in self.files.borrow().iter() { + file.run_queued_io(now)?; + } self.inner.run_once()?; Ok(()) }