mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-02 07:54:19 +01:00
sleep inside Io completion
This commit is contained in:
@@ -3,7 +3,7 @@ use std::{
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use limbo_core::{File, Result};
|
||||
use limbo_core::{CompletionType, File, Result};
|
||||
use rand::Rng as _;
|
||||
use rand_chacha::ChaCha8Rng;
|
||||
use tracing::{instrument, Level};
|
||||
@@ -71,14 +71,11 @@ impl SimulatorFile {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn generate_latency(&self) {
|
||||
fn generate_latency_duration(&self) -> Option<std::time::Duration> {
|
||||
let mut rng = self.rng.borrow_mut();
|
||||
// Chance to introduce some latency
|
||||
if rng.gen_bool(self.latency_probability as f64 / 100.0) {
|
||||
let latency = std::time::Duration::from_millis(rng.gen_range(20..50));
|
||||
tracing::trace!(?latency);
|
||||
std::thread::sleep(latency);
|
||||
}
|
||||
rng.gen_bool(self.latency_probability as f64 / 100.0)
|
||||
.then(|| std::time::Duration::from_millis(rng.gen_range(20..50)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,7 +86,6 @@ impl File for SimulatorFile {
|
||||
"Injected fault".into(),
|
||||
));
|
||||
}
|
||||
self.generate_latency();
|
||||
self.inner.lock_file(exclusive)
|
||||
}
|
||||
|
||||
@@ -99,11 +95,14 @@ impl File for SimulatorFile {
|
||||
"Injected fault".into(),
|
||||
));
|
||||
}
|
||||
self.generate_latency();
|
||||
self.inner.unlock_file()
|
||||
}
|
||||
|
||||
fn pread(&self, pos: usize, c: Arc<limbo_core::Completion>) -> Result<()> {
|
||||
fn pread(
|
||||
&self,
|
||||
pos: usize,
|
||||
mut c: limbo_core::Completion,
|
||||
) -> Result<Arc<limbo_core::Completion>> {
|
||||
self.nr_pread_calls.set(self.nr_pread_calls.get() + 1);
|
||||
if self.fault.get() {
|
||||
self.nr_pread_faults.set(self.nr_pread_faults.get() + 1);
|
||||
@@ -111,7 +110,24 @@ impl File for SimulatorFile {
|
||||
"Injected fault".into(),
|
||||
));
|
||||
}
|
||||
self.generate_latency();
|
||||
if let Some(latency) = self.generate_latency_duration() {
|
||||
let CompletionType::Read(read_completion) = &mut c.completion_type else {
|
||||
unreachable!();
|
||||
};
|
||||
let before = self.rng.borrow_mut().gen_bool(0.5);
|
||||
let dummy_complete = Box::new(|_| {});
|
||||
let prev_complete = std::mem::replace(&mut read_completion.complete, dummy_complete);
|
||||
let new_complete = move |res| {
|
||||
if before {
|
||||
std::thread::sleep(latency);
|
||||
}
|
||||
(prev_complete)(res);
|
||||
if !before {
|
||||
std::thread::sleep(latency);
|
||||
}
|
||||
};
|
||||
read_completion.complete = Box::new(new_complete);
|
||||
};
|
||||
self.inner.pread(pos, c)
|
||||
}
|
||||
|
||||
@@ -119,8 +135,8 @@ impl File for SimulatorFile {
|
||||
&self,
|
||||
pos: usize,
|
||||
buffer: Arc<RefCell<limbo_core::Buffer>>,
|
||||
c: Arc<limbo_core::Completion>,
|
||||
) -> Result<()> {
|
||||
mut c: limbo_core::Completion,
|
||||
) -> Result<Arc<limbo_core::Completion>> {
|
||||
self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1);
|
||||
if self.fault.get() {
|
||||
self.nr_pwrite_faults.set(self.nr_pwrite_faults.get() + 1);
|
||||
@@ -128,13 +144,47 @@ impl File for SimulatorFile {
|
||||
"Injected fault".into(),
|
||||
));
|
||||
}
|
||||
self.generate_latency();
|
||||
if let Some(latency) = self.generate_latency_duration() {
|
||||
let CompletionType::Write(write_completion) = &mut c.completion_type else {
|
||||
unreachable!();
|
||||
};
|
||||
let before = self.rng.borrow_mut().gen_bool(0.5);
|
||||
let dummy_complete = Box::new(|_| {});
|
||||
let prev_complete = std::mem::replace(&mut write_completion.complete, dummy_complete);
|
||||
let new_complete = move |res| {
|
||||
if before {
|
||||
std::thread::sleep(latency);
|
||||
}
|
||||
(prev_complete)(res);
|
||||
if !before {
|
||||
std::thread::sleep(latency);
|
||||
}
|
||||
};
|
||||
write_completion.complete = Box::new(new_complete);
|
||||
};
|
||||
self.inner.pwrite(pos, buffer, c)
|
||||
}
|
||||
|
||||
fn sync(&self, c: Arc<limbo_core::Completion>) -> Result<()> {
|
||||
fn sync(&self, mut c: limbo_core::Completion) -> Result<Arc<limbo_core::Completion>> {
|
||||
self.nr_sync_calls.set(self.nr_sync_calls.get() + 1);
|
||||
self.generate_latency();
|
||||
if let Some(latency) = self.generate_latency_duration() {
|
||||
let CompletionType::Sync(sync_completion) = &mut c.completion_type else {
|
||||
unreachable!();
|
||||
};
|
||||
let before = self.rng.borrow_mut().gen_bool(0.5);
|
||||
let dummy_complete = Box::new(|_| {});
|
||||
let prev_complete = std::mem::replace(&mut sync_completion.complete, dummy_complete);
|
||||
let new_complete = move |res| {
|
||||
if before {
|
||||
std::thread::sleep(latency);
|
||||
}
|
||||
(prev_complete)(res);
|
||||
if !before {
|
||||
std::thread::sleep(latency);
|
||||
}
|
||||
};
|
||||
sync_completion.complete = Box::new(new_complete);
|
||||
};
|
||||
self.inner.sync(c)
|
||||
}
|
||||
|
||||
@@ -148,3 +198,5 @@ impl Drop for SimulatorFile {
|
||||
self.inner.unlock_file().expect("Failed to unlock file");
|
||||
}
|
||||
}
|
||||
|
||||
struct Latency {}
|
||||
|
||||
Reference in New Issue
Block a user