diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index 435a60997..1f0ad838e 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -3,7 +3,7 @@ use std::{ sync::Arc, }; -use limbo_core::{File, Result}; +use limbo_core::{CompletionType, File, Result}; use rand::Rng as _; use rand_chacha::ChaCha8Rng; use tracing::{instrument, Level}; @@ -71,14 +71,11 @@ impl SimulatorFile { } #[instrument(skip_all, level = Level::TRACE)] - fn generate_latency(&self) { + fn generate_latency_duration(&self) -> Option { let mut rng = self.rng.borrow_mut(); // Chance to introduce some latency - if rng.gen_bool(self.latency_probability as f64 / 100.0) { - let latency = std::time::Duration::from_millis(rng.gen_range(20..50)); - tracing::trace!(?latency); - std::thread::sleep(latency); - } + rng.gen_bool(self.latency_probability as f64 / 100.0) + .then(|| std::time::Duration::from_millis(rng.gen_range(20..50))) } } @@ -89,7 +86,6 @@ impl File for SimulatorFile { "Injected fault".into(), )); } - self.generate_latency(); self.inner.lock_file(exclusive) } @@ -99,11 +95,14 @@ impl File for SimulatorFile { "Injected fault".into(), )); } - self.generate_latency(); self.inner.unlock_file() } - fn pread(&self, pos: usize, c: Arc) -> Result<()> { + fn pread( + &self, + pos: usize, + mut c: limbo_core::Completion, + ) -> Result> { self.nr_pread_calls.set(self.nr_pread_calls.get() + 1); if self.fault.get() { self.nr_pread_faults.set(self.nr_pread_faults.get() + 1); @@ -111,7 +110,24 @@ impl File for SimulatorFile { "Injected fault".into(), )); } - self.generate_latency(); + if let Some(latency) = self.generate_latency_duration() { + let CompletionType::Read(read_completion) = &mut c.completion_type else { + unreachable!(); + }; + let before = self.rng.borrow_mut().gen_bool(0.5); + let dummy_complete = Box::new(|_| {}); + let prev_complete = std::mem::replace(&mut read_completion.complete, dummy_complete); + let new_complete = move |res| { + if before { + std::thread::sleep(latency); + } + (prev_complete)(res); + if !before { + std::thread::sleep(latency); + } + }; + read_completion.complete = Box::new(new_complete); + }; self.inner.pread(pos, c) } @@ -119,8 +135,8 @@ impl File for SimulatorFile { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result<()> { + mut c: limbo_core::Completion, + ) -> Result> { self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1); if self.fault.get() { self.nr_pwrite_faults.set(self.nr_pwrite_faults.get() + 1); @@ -128,13 +144,47 @@ impl File for SimulatorFile { "Injected fault".into(), )); } - self.generate_latency(); + if let Some(latency) = self.generate_latency_duration() { + let CompletionType::Write(write_completion) = &mut c.completion_type else { + unreachable!(); + }; + let before = self.rng.borrow_mut().gen_bool(0.5); + let dummy_complete = Box::new(|_| {}); + let prev_complete = std::mem::replace(&mut write_completion.complete, dummy_complete); + let new_complete = move |res| { + if before { + std::thread::sleep(latency); + } + (prev_complete)(res); + if !before { + std::thread::sleep(latency); + } + }; + write_completion.complete = Box::new(new_complete); + }; self.inner.pwrite(pos, buffer, c) } - fn sync(&self, c: Arc) -> Result<()> { + fn sync(&self, mut c: limbo_core::Completion) -> Result> { self.nr_sync_calls.set(self.nr_sync_calls.get() + 1); - self.generate_latency(); + if let Some(latency) = self.generate_latency_duration() { + let CompletionType::Sync(sync_completion) = &mut c.completion_type else { + unreachable!(); + }; + let before = self.rng.borrow_mut().gen_bool(0.5); + let dummy_complete = Box::new(|_| {}); + let prev_complete = std::mem::replace(&mut sync_completion.complete, dummy_complete); + let new_complete = move |res| { + if before { + std::thread::sleep(latency); + } + (prev_complete)(res); + if !before { + std::thread::sleep(latency); + } + }; + sync_completion.complete = Box::new(new_complete); + }; self.inner.sync(c) } @@ -148,3 +198,5 @@ impl Drop for SimulatorFile { self.inner.unlock_file().expect("Failed to unlock file"); } } + +struct Latency {}