use std::{ cell::{Cell, RefCell}, sync::Arc, }; use rand::{Rng as _, SeedableRng}; use rand_chacha::ChaCha8Rng; use tracing::{instrument, Level}; use turso_core::{Completion, File, Result}; use crate::runner::{ clock::SimulatorClock, memory::io::{CallbackQueue, Fd, Operation, OperationType}, }; pub struct MemorySimFile { // TODO: maybe have a pending queue which is fast to append // and then we just do a mem swap the pending with the callback to minimize lock contention on callback queue pub callbacks: CallbackQueue, pub fd: Arc, pub buffer: RefCell>, // TODO: add fault map later here pub closed: Cell, /// Number of `pread` function calls (both success and failures). pub nr_pread_calls: Cell, /// Number of `pwrite` function calls (both success and failures). pub nr_pwrite_calls: Cell, /// Number of `sync` function calls (both success and failures). pub nr_sync_calls: Cell, pub rng: RefCell, pub latency_probability: usize, clock: Arc, } type IoOperation = Box Result>>; pub struct DelayedIo { pub time: turso_core::Instant, pub op: IoOperation, } unsafe impl Send for MemorySimFile {} unsafe impl Sync for MemorySimFile {} impl MemorySimFile { pub fn new( callbacks: CallbackQueue, fd: Fd, seed: u64, latency_probability: usize, clock: Arc, ) -> Self { Self { callbacks, fd: Arc::new(fd), buffer: RefCell::new(Vec::new()), closed: Cell::new(false), nr_pread_calls: Cell::new(0), nr_pwrite_calls: Cell::new(0), nr_sync_calls: Cell::new(0), rng: RefCell::new(ChaCha8Rng::seed_from_u64(seed)), latency_probability, clock, } } pub fn stats_table(&self) -> String { let sum_calls = self.nr_pread_calls.get() + self.nr_pwrite_calls.get() + self.nr_sync_calls.get(); let stats_table = [ "op calls ".to_string(), "--------- --------".to_string(), format!("pread {:8}", self.nr_pread_calls.get()), format!("pwrite {:8}", self.nr_pwrite_calls.get()), format!("sync {:8}", self.nr_sync_calls.get()), "--------- -------- --------".to_string(), format!("total {sum_calls:8}"), ]; stats_table.join("\n") } #[instrument(skip_all, level = Level::TRACE)] fn generate_latency(&self) -> Option { let mut rng = self.rng.borrow_mut(); // Chance to introduce some latency rng.random_bool(self.latency_probability as f64 / 100.0) .then(|| { let now = self.clock.now(); let sum = now + std::time::Duration::from_millis(rng.random_range(5..20)); sum.into() }) } fn insert_op(&self, op: OperationType) { self.callbacks.lock().push(Operation { time: self.generate_latency(), op, }); } pub fn write_buf(&self, buf: &[u8], offset: usize) -> usize { let mut file_buf = self.buffer.borrow_mut(); let more_space = if file_buf.len() < offset { (offset + buf.len()) - file_buf.len() } else { buf.len().saturating_sub(file_buf.len() - offset) }; if more_space > 0 { file_buf.reserve(more_space); for _ in 0..more_space { file_buf.push(0); } } file_buf[offset..][0..buf.len()].copy_from_slice(buf); buf.len() } } impl File for MemorySimFile { fn lock_file(&self, _exclusive: bool) -> Result<()> { Ok(()) } fn unlock_file(&self) -> Result<()> { Ok(()) } fn pread(&self, pos: usize, c: Completion) -> Result { self.nr_pread_calls.set(self.nr_pread_calls.get() + 1); let op = OperationType::Read { fd: self.fd.clone(), completion: c.clone(), offset: pos, }; self.insert_op(op); Ok(c) } fn pwrite( &self, pos: usize, buffer: Arc, c: Completion, ) -> Result { self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1); let op = OperationType::Write { fd: self.fd.clone(), buffer, completion: c.clone(), offset: pos, }; self.insert_op(op); Ok(c) } fn pwritev( &self, pos: usize, buffers: Vec>, c: Completion, ) -> Result { if buffers.len() == 1 { return self.pwrite(pos, buffers[0].clone(), c); } let op = OperationType::WriteV { fd: self.fd.clone(), buffers, completion: c.clone(), offset: pos, }; self.insert_op(op); Ok(c) } fn sync(&self, c: Completion) -> Result { self.nr_sync_calls.set(self.nr_sync_calls.get() + 1); let op = OperationType::Sync { fd: self.fd.clone(), completion: c.clone(), }; self.insert_op(op); Ok(c) } fn size(&self) -> Result { // TODO: size operation should also be scheduled. But this requires a change in how we // Use this function internally in Turso Ok(self.buffer.borrow().len() as u64) } fn truncate(&self, len: usize, c: Completion) -> Result { let op = OperationType::Truncate { fd: self.fd.clone(), completion: c.clone(), len, }; self.insert_op(op); Ok(c) } }