initial impl for MemorySim

This commit is contained in:
pedrocarlo
2025-08-20 11:32:00 -03:00
parent c01449e71b
commit 40de4c0606
6 changed files with 421 additions and 0 deletions

1
Cargo.lock generated
View File

@@ -2192,6 +2192,7 @@ dependencies = [
"env_logger 0.10.2",
"garde",
"hex",
"indexmap 2.10.0",
"itertools 0.14.0",
"json5",
"log",

View File

@@ -43,3 +43,4 @@ garde = { workspace = true, features = ["derive", "serde"] }
json5 = { version = "0.4.1" }
strum = { workspace = true }
parking_lot = { workspace = true }
indexmap = "2.10.0"

View File

@@ -0,0 +1,163 @@
use std::{
cell::{Cell, RefCell},
sync::Arc,
};
use rand::{Rng as _, SeedableRng};
use rand_chacha::ChaCha8Rng;
use tracing::{instrument, Level};
use turso_core::{Completion, File, Result};
use crate::runner::{
clock::SimulatorClock,
memory::io::{CallbackQueue, Fd, Operation, OperationType},
};
pub struct MemorySimFile {
pub callbacks: CallbackQueue,
pub fd: Arc<Fd>,
pub buffer: RefCell<Vec<u8>>,
// TODO: add fault map later here
pub closed: Cell<bool>,
/// Number of `pread` function calls (both success and failures).
pub nr_pread_calls: Cell<usize>,
/// Number of `pwrite` function calls (both success and failures).
pub nr_pwrite_calls: Cell<usize>,
/// Number of `sync` function calls (both success and failures).
pub nr_sync_calls: Cell<usize>,
pub rng: RefCell<ChaCha8Rng>,
pub latency_probability: usize,
clock: Arc<SimulatorClock>,
}
type IoOperation = Box<dyn FnOnce(OperationType) -> Result<Arc<turso_core::Completion>>>;
pub struct DelayedIo {
pub time: turso_core::Instant,
pub op: IoOperation,
}
unsafe impl Send for MemorySimFile {}
unsafe impl Sync for MemorySimFile {}
impl MemorySimFile {
pub fn new(
callbacks: CallbackQueue,
fd: Fd,
seed: u64,
latency_probability: usize,
clock: Arc<SimulatorClock>,
) -> Self {
Self {
callbacks,
fd: Arc::new(fd),
buffer: RefCell::new(Vec::new()),
closed: Cell::new(false),
nr_pread_calls: Cell::new(0),
nr_pwrite_calls: Cell::new(0),
nr_sync_calls: Cell::new(0),
rng: RefCell::new(ChaCha8Rng::seed_from_u64(seed)),
latency_probability,
clock,
}
}
pub fn stats_table(&self) -> String {
let sum_calls =
self.nr_pread_calls.get() + self.nr_pwrite_calls.get() + self.nr_sync_calls.get();
let stats_table = [
"op calls ".to_string(),
"--------- --------".to_string(),
format!("pread {:8}", self.nr_pread_calls.get()),
format!("pwrite {:8}", self.nr_pwrite_calls.get()),
format!("sync {:8}", self.nr_sync_calls.get()),
"--------- -------- --------".to_string(),
format!("total {sum_calls:8}"),
];
stats_table.join("\n")
}
#[instrument(skip_all, level = Level::TRACE)]
fn generate_latency(&self) -> Option<turso_core::Instant> {
let mut rng = self.rng.borrow_mut();
// Chance to introduce some latency
rng.random_bool(self.latency_probability as f64 / 100.0)
.then(|| {
let now = self.clock.now();
let sum = now + std::time::Duration::from_millis(rng.random_range(5..20));
sum.into()
})
}
}
impl File for MemorySimFile {
fn lock_file(&self, _exclusive: bool) -> Result<()> {
Ok(())
}
fn unlock_file(&self) -> Result<()> {
Ok(())
}
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
self.nr_pread_calls.set(self.nr_pread_calls.get() + 1);
let op = OperationType::Read {
fd: self.fd.clone(),
completion: c.clone(),
offset: pos,
};
self.callbacks.lock().push(Operation {
time: self.generate_latency(),
op,
});
Ok(c)
}
fn pwrite(
&self,
pos: usize,
buffer: Arc<turso_core::Buffer>,
c: Completion,
) -> Result<Completion> {
self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1);
let op = OperationType::Write {
fd: self.fd.clone(),
buffer,
completion: c.clone(),
offset: pos,
};
self.callbacks.lock().push(Operation {
time: self.generate_latency(),
op,
});
Ok(c)
}
fn sync(&self, c: Completion) -> Result<Completion> {
self.nr_sync_calls.set(self.nr_sync_calls.get() + 1);
let op = OperationType::Sync {
fd: self.fd.clone(),
completion: c.clone(),
};
self.callbacks.lock().push(Operation {
time: self.generate_latency(),
op,
});
Ok(c)
}
fn size(&self) -> Result<u64> {
// TODO: size operation should also be scheduled. But this requires a change in how we
// Use this function internally in Turso
Ok(self.buffer.borrow().len() as u64)
}
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
todo!()
}
}

View File

@@ -0,0 +1,253 @@
use std::cell::{Cell, RefCell};
use std::sync::Arc;
use indexmap::IndexMap;
use parking_lot::Mutex;
use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha8Rng;
use turso_core::{Clock, Completion, Instant, OpenFlags, Result, IO};
use crate::runner::clock::SimulatorClock;
use crate::runner::SimIO;
use crate::{model::FAULT_ERROR_MSG, runner::memory::file::MemorySimFile};
/// File descriptor
pub type Fd = String;
pub enum OperationType {
Read {
fd: Arc<Fd>,
completion: Completion,
offset: usize,
},
Write {
fd: Arc<Fd>,
buffer: Arc<turso_core::Buffer>,
completion: Completion,
offset: usize,
},
Sync {
fd: Arc<Fd>,
completion: Completion,
},
}
impl OperationType {
fn get_fd(&self) -> &Fd {
match self {
OperationType::Read { fd, .. }
| OperationType::Write { fd, .. }
| OperationType::Sync { fd, .. } => fd,
}
}
}
pub struct Operation {
pub time: Option<turso_core::Instant>,
pub op: OperationType,
}
impl Operation {
fn do_operation(self, files: &IndexMap<Fd, Arc<MemorySimFile>>) {
match self.op {
OperationType::Read {
fd,
completion,
offset,
} => {
let file = files.get(fd.as_str()).unwrap();
let file_buf = file.buffer.borrow_mut();
let buffer = completion.as_read().buf.clone();
let buf_size = {
let buf = buffer.as_mut_slice();
// TODO: check for sector faults here
buf.copy_from_slice(&file_buf[offset..][0..buf.len()]);
buf.len() as i32
};
completion.complete(buf_size);
}
OperationType::Write {
fd,
buffer,
completion,
offset,
} => {
let file = files.get(fd.as_str()).unwrap();
let buf_size = {
let mut file_buf = file.buffer.borrow_mut();
let buf = buffer.as_slice();
let more_space = if file_buf.len() < offset {
(offset + buf.len()) - file_buf.len()
} else {
buf.len().saturating_sub(file_buf.len() - offset)
};
if more_space > 0 {
file_buf.reserve(more_space);
for _ in 0..more_space {
file_buf.push(0);
}
}
file_buf[offset..][0..buf.len()].copy_from_slice(buf);
buf.len() as i32
};
completion.complete(buf_size);
}
OperationType::Sync { completion, .. } => {
// There is no Sync for in memory
completion.complete(0);
}
}
}
}
pub type CallbackQueue = Arc<Mutex<Vec<Operation>>>;
pub struct MemorySimIO {
callbacks: CallbackQueue,
timeouts: CallbackQueue,
pub fault: Cell<bool>,
pub files: RefCell<IndexMap<Fd, Arc<MemorySimFile>>>,
pub rng: RefCell<ChaCha8Rng>,
pub nr_run_once_faults: Cell<usize>,
pub page_size: usize,
seed: u64,
latency_probability: usize,
clock: Arc<SimulatorClock>,
}
unsafe impl Send for MemorySimIO {}
unsafe impl Sync for MemorySimIO {}
impl MemorySimIO {
pub fn new(
seed: u64,
page_size: usize,
latency_probability: usize,
min_tick: u64,
max_tick: u64,
) -> Result<Self> {
let fault = Cell::new(false);
let files = RefCell::new(IndexMap::new());
let rng = RefCell::new(ChaCha8Rng::seed_from_u64(seed));
let nr_run_once_faults = Cell::new(0);
Ok(Self {
callbacks: Arc::new(Mutex::new(Vec::new())),
timeouts: Arc::new(Mutex::new(Vec::new())),
fault,
files,
rng,
nr_run_once_faults,
page_size,
seed,
latency_probability,
clock: Arc::new(SimulatorClock::new(
ChaCha8Rng::seed_from_u64(seed),
min_tick,
max_tick,
)),
})
}
}
impl SimIO for MemorySimIO {
fn inject_fault(&self, fault: bool) {
self.fault.replace(fault);
if fault {
tracing::debug!("fault injected");
}
}
fn print_stats(&self) {
tracing::info!("run_once faults: {}", self.nr_run_once_faults.get());
for file in self.files.borrow().values() {
tracing::info!("\n===========================\n{}", file.stats_table());
}
}
fn syncing(&self) -> bool {
let callbacks = self.callbacks.try_lock().unwrap();
callbacks
.iter()
.any(|operation| matches!(operation.op, OperationType::Sync { .. }))
}
fn close_files(&self) {
for file in self.files.borrow().values() {
file.closed.set(true);
}
}
}
impl Clock for MemorySimIO {
fn now(&self) -> Instant {
self.clock.now().into()
}
}
impl IO for MemorySimIO {
fn open_file(
&self,
path: &str,
_flags: OpenFlags, // TODO: ignoring open flags for now as we don't test read only mode in the simulator yet
_direct: bool,
) -> Result<Arc<dyn turso_core::File>> {
let mut files = self.files.borrow_mut();
let fd = path.to_string();
let file = if let Some(file) = files.get(path) {
file.closed.set(false);
file.clone()
} else {
let file = Arc::new(MemorySimFile::new(
self.callbacks.clone(),
fd.clone(),
self.seed,
self.latency_probability,
self.clock.clone(),
));
files.insert(fd, file.clone());
file
};
Ok(file)
}
fn run_once(&self) -> Result<()> {
let mut callbacks = self.callbacks.lock();
let mut timeouts = self.timeouts.lock();
tracing::trace!(
callbacks.len = callbacks.len(),
timeouts.len = timeouts.len()
);
if self.fault.get() {
self.nr_run_once_faults
.replace(self.nr_run_once_faults.get() + 1);
// TODO: currently we only deal with single threaded execution in one file
// When we support multiple db files, we need to only remove callbacks not relevant to the current file
// and maybe connection
callbacks.clear();
timeouts.clear();
return Err(turso_core::LimboError::InternalError(
FAULT_ERROR_MSG.into(),
));
}
let files = self.files.borrow_mut();
let now = self.now();
callbacks.append(&mut timeouts);
while let Some(callback) = callbacks.pop() {
if callback.time.is_none() || callback.time.is_some_and(|time| time < now) {
callback.do_operation(&files);
} else {
timeouts.push(callback);
}
}
Ok(())
}
fn generate_random_number(&self) -> i64 {
self.rng.borrow_mut().next_u64() as i64
}
}

View File

@@ -0,0 +1,2 @@
pub mod file;
pub mod io;

View File

@@ -8,6 +8,7 @@ pub mod execution;
#[allow(dead_code)]
pub mod file;
pub mod io;
pub mod memory;
pub mod watch;
pub const FAULT_ERROR_MSG: &str = "Injected Fault";