diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index cfa88b5f7..64e8ab7b4 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -672,7 +672,7 @@ impl turso_core::DatabaseStorage for DatabaseFile { return Err(turso_core::LimboError::NotADB); } let pos = (page_idx - 1) * size; - self.file.pread(pos, c)?; + self.file.pread(pos, c.into())?; Ok(()) } @@ -684,12 +684,12 @@ impl turso_core::DatabaseStorage for DatabaseFile { ) -> turso_core::Result<()> { let size = buffer.borrow().len(); let pos = (page_idx - 1) * size; - self.file.pwrite(pos, buffer, c)?; + self.file.pwrite(pos, buffer, c.into())?; Ok(()) } fn sync(&self, c: turso_core::Completion) -> turso_core::Result<()> { - let _ = self.file.sync(c)?; + let _ = self.file.sync(c.into())?; Ok(()) } diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 3d0caf705..7962f0366 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -213,7 +213,11 @@ impl turso_core::File for File { Ok(()) } - fn pread(&self, pos: usize, c: turso_core::Completion) -> Result> { + fn pread( + &self, + pos: usize, + c: Arc, + ) -> Result> { let r = match c.completion_type { turso_core::CompletionType::Read(ref r) => r, _ => unreachable!(), @@ -225,14 +229,14 @@ impl turso_core::File for File { }; r.complete(nr); #[allow(clippy::arc_with_non_send_sync)] - Ok(Arc::new(c)) + Ok(c) } fn pwrite( &self, pos: usize, buffer: Arc>, - c: turso_core::Completion, + c: Arc, ) -> Result> { let w = match c.completion_type { turso_core::CompletionType::Write(ref w) => w, @@ -243,14 +247,14 @@ impl turso_core::File for File { self.vfs.pwrite(self.fd, buf, pos); w.complete(buf.len() as i32); #[allow(clippy::arc_with_non_send_sync)] - Ok(Arc::new(c)) + Ok(c) } - fn sync(&self, c: turso_core::Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { self.vfs.sync(self.fd); c.complete(0); #[allow(clippy::arc_with_non_send_sync)] - Ok(Arc::new(c)) + Ok(c) } fn size(&self) -> Result { @@ -351,7 +355,7 @@ impl turso_core::DatabaseStorage for DatabaseFile { return Err(turso_core::LimboError::NotADB); } let pos = (page_idx - 1) * size; - self.file.pread(pos, c)?; + self.file.pread(pos, c.into())?; Ok(()) } @@ -363,12 +367,12 @@ impl turso_core::DatabaseStorage for DatabaseFile { ) -> Result<()> { let size = buffer.borrow().len(); let pos = (page_idx - 1) * size; - self.file.pwrite(pos, buffer, c)?; + self.file.pwrite(pos, buffer, c.into())?; Ok(()) } fn sync(&self, c: turso_core::Completion) -> Result<()> { - let _ = self.file.sync(c)?; + let _ = self.file.sync(c.into())?; Ok(()) } diff --git a/core/io/generic.rs b/core/io/generic.rs index 6e1faee73..ebbd67d17 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -86,7 +86,7 @@ impl File for GenericFile { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result> { + fn pread(&self, pos: usize, c: Arc) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; { @@ -99,14 +99,14 @@ impl File for GenericFile { file.read_exact(buf)?; } c.complete(0); - Ok(Arc::new(c)) + Ok(c) } fn pwrite( &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; @@ -114,14 +114,14 @@ impl File for GenericFile { let buf = buf.as_slice(); file.write_all(buf)?; c.complete(buf.len() as i32); - Ok(Arc::new(c)) + Ok(c) } - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { let mut file = self.file.borrow_mut(); file.sync_all().map_err(|err| LimboError::IOError(err))?; c.complete(0); - Ok(Arc::new(c)) + Ok(c) } fn size(&self) -> Result { diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index e204e044c..b873d4c79 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -302,7 +302,7 @@ impl File for UringFile { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result> { + fn pread(&self, pos: usize, c: Arc) -> Result> { let r = c.as_read(); trace!("pread(pos = {}, length = {})", pos, r.buf().len()); let mut io = self.io.borrow_mut(); @@ -317,7 +317,6 @@ impl File for UringFile { .user_data(io.ring.get_key()) }) }; - let c = Arc::new(c); io.ring.submit_entry(&read_e, c.clone()); Ok(c) } @@ -326,7 +325,7 @@ impl File for UringFile { &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { let mut io = self.io.borrow_mut(); let write = { @@ -339,7 +338,6 @@ impl File for UringFile { .user_data(io.ring.get_key()) }) }; - let c = Arc::new(c); let c_uring = c.clone(); io.ring.submit_entry( &write, @@ -354,7 +352,7 @@ impl File for UringFile { Ok(c) } - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { let mut io = self.io.borrow_mut(); trace!("sync()"); let sync = with_fd!(self, |fd| { @@ -362,7 +360,6 @@ impl File for UringFile { .build() .user_data(io.ring.get_key()) }); - let c = Arc::new(c); io.ring.submit_entry(&sync, c.clone()); Ok(c) } diff --git a/core/io/memory.rs b/core/io/memory.rs index b2f8bef5b..597696605 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -83,8 +83,7 @@ impl File for MemoryFile { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result> { - let c = Arc::new(c); + fn pread(&self, pos: usize, c: Arc) -> Result> { let r = c.as_read(); let buf_len = r.buf().len(); if buf_len == 0 { @@ -129,9 +128,8 @@ impl File for MemoryFile { &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { - let c = Arc::new(c); let buf = buffer.borrow(); let buf_len = buf.len(); if buf_len == 0 { @@ -167,10 +165,10 @@ impl File for MemoryFile { Ok(c) } - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { // no-op c.complete(0); - Ok(Arc::new(c)) + Ok(c) } fn size(&self) -> Result { diff --git a/core/io/mod.rs b/core/io/mod.rs index 5ba5d2a27..d462b4071 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -14,14 +14,14 @@ use std::{ pub trait File: Send + Sync { fn lock_file(&self, exclusive: bool) -> Result<()>; fn unlock_file(&self) -> Result<()>; - fn pread(&self, pos: usize, c: Completion) -> Result>; + fn pread(&self, pos: usize, c: Arc) -> Result>; fn pwrite( &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result>; - fn sync(&self, c: Completion) -> Result>; + fn sync(&self, c: Arc) -> Result>; fn size(&self) -> Result; } diff --git a/core/io/unix.rs b/core/io/unix.rs index 17cdb029c..26a63e6c6 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -335,14 +335,13 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::INFO)] - fn pread(&self, pos: usize, c: Completion) -> Result> { + fn pread(&self, pos: usize, c: Arc) -> Result> { let file = self.file.borrow(); let result = { let r = c.as_read(); let mut buf = r.buf_mut(); rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64) }; - let c = Arc::new(c); match result { Ok(n) => { trace!("pread n: {}", n); @@ -373,14 +372,13 @@ impl File for UnixFile<'_> { &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { let file = self.file.borrow(); let result = { let buf = buffer.borrow(); rustix::io::pwrite(file.as_fd(), buf.as_slice(), pos as u64) }; - let c = Arc::new(c); match result { Ok(n) => { trace!("pwrite n: {}", n); @@ -405,10 +403,9 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::INFO)] - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { let file = self.file.borrow(); let result = fs::fsync(file.as_fd()); - let c = Arc::new(c); match result { Ok(()) => { trace!("fsync"); diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 78b76ad6a..4df856eed 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -98,7 +98,7 @@ impl File for VfsFileImpl { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result> { + fn pread(&self, pos: usize, c: Arc) -> Result> { let r = match c.completion_type { CompletionType::Read(ref r) => r, _ => unreachable!(), @@ -113,7 +113,7 @@ impl File for VfsFileImpl { Err(LimboError::ExtensionError("pread failed".to_string())) } else { c.complete(result); - Ok(Arc::new(c)) + Ok(c) } } @@ -121,7 +121,7 @@ impl File for VfsFileImpl { &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { let buf = buffer.borrow(); let count = buf.as_slice().len(); @@ -142,18 +142,18 @@ impl File for VfsFileImpl { Err(LimboError::ExtensionError("pwrite failed".to_string())) } else { c.complete(result); - Ok(Arc::new(c)) + Ok(c) } } - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { let vfs = unsafe { &*self.vfs }; let result = unsafe { (vfs.sync)(self.file) }; if result < 0 { Err(LimboError::ExtensionError("sync failed".to_string())) } else { c.complete(0); - Ok(Arc::new(c)) + Ok(c) } } diff --git a/core/io/windows.rs b/core/io/windows.rs index cf2a60b38..3f7ea3a53 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -81,7 +81,7 @@ impl File for WindowsFile { unimplemented!() } - fn pread(&self, pos: usize, c: Completion) -> Result> { + fn pread(&self, pos: usize, c: Arc) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; let nr = { @@ -92,14 +92,14 @@ impl File for WindowsFile { buf.len() as i32 }; c.complete(nr); - Ok(Arc::new(c)) + Ok(c) } fn pwrite( &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; @@ -107,14 +107,14 @@ impl File for WindowsFile { let buf = buf.as_slice(); file.write_all(buf)?; c.complete(buffer.borrow().len() as i32); - Ok(Arc::new(c)) + Ok(c) } - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { let file = self.file.borrow_mut(); file.sync_all().map_err(LimboError::IOError)?; c.complete(0); - Ok(Arc::new(c)) + Ok(c) } fn size(&self) -> Result { diff --git a/core/storage/database.rs b/core/storage/database.rs index c2ad2c57a..a55c819e3 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -42,7 +42,7 @@ impl DatabaseStorage for DatabaseFile { return Err(LimboError::NotADB); } let pos = (page_idx - 1) * size; - self.file.pread(pos, c)?; + self.file.pread(pos, c.into())?; Ok(()) } @@ -59,13 +59,13 @@ impl DatabaseStorage for DatabaseFile { assert!(buffer_size <= 65536); assert_eq!(buffer_size & (buffer_size - 1), 0); let pos = (page_idx - 1) * buffer_size; - self.file.pwrite(pos, buffer, c)?; + self.file.pwrite(pos, buffer, c.into())?; Ok(()) } #[instrument(skip_all, level = Level::INFO)] fn sync(&self, c: Completion) -> Result<()> { - let _ = self.file.sync(c)?; + let _ = self.file.sync(c.into())?; Ok(()) } @@ -102,7 +102,7 @@ impl DatabaseStorage for FileMemoryStorage { return Err(LimboError::NotADB); } let pos = (page_idx - 1) * size; - self.file.pread(pos, c)?; + self.file.pread(pos, c.into())?; Ok(()) } @@ -118,13 +118,13 @@ impl DatabaseStorage for FileMemoryStorage { assert!(buffer_size <= 65536); assert_eq!(buffer_size & (buffer_size - 1), 0); let pos = (page_idx - 1) * buffer_size; - self.file.pwrite(pos, buffer, c)?; + self.file.pwrite(pos, buffer, c.into())?; Ok(()) } #[instrument(skip_all, level = Level::INFO)] fn sync(&self, c: Completion) -> Result<()> { - let _ = self.file.sync(c)?; + let _ = self.file.sync(c.into())?; Ok(()) } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index abbdc637c..e8e496173 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1490,7 +1490,7 @@ pub fn read_entire_wal_dumb(file: &Arc) -> Result, header: &WalHeader) -> Result< }; #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete))); - io.pwrite(0, buffer.clone(), c)?; + io.pwrite(0, buffer.clone(), c.into())?; Ok(()) } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index f9929dc8e..b731e650d 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -890,7 +890,7 @@ impl Wal for WalFile { }), })); let shared = self.get_shared(); - shared.file.sync(completion)?; + shared.file.sync(completion.into())?; self.sync_state.set(SyncState::Syncing); Ok(IOResult::IO) } diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index b53f56af4..310d227d7 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -6,7 +6,7 @@ use std::{ use rand::Rng as _; use rand_chacha::ChaCha8Rng; use tracing::{instrument, Level}; -use turso_core::{CompletionType, File, Result}; +use turso_core::{Completion, File, Result}; use crate::model::FAULT_ERROR_MSG; pub(crate) struct SimulatorFile { @@ -38,6 +38,13 @@ pub(crate) struct SimulatorFile { pub latency_probability: usize, pub sync_completion: RefCell>>, + pub queued_io: RefCell>, +} + +pub struct QueuedIo { + pub completion: Completion, + pub time: std::time::Instant, + pub op: Box Result>>, } unsafe impl Send for SimulatorFile {} @@ -78,11 +85,13 @@ impl SimulatorFile { } #[instrument(skip_all, level = Level::TRACE)] - fn generate_latency_duration(&self) -> Option { + fn generate_latency_duration(&self) -> Option { let mut rng = self.rng.borrow_mut(); // Chance to introduce some latency rng.gen_bool(self.latency_probability as f64 / 100.0) - .then(|| std::time::Duration::from_millis(rng.gen_range(20..50))) + .then(|| { + std::time::Instant::now() + std::time::Duration::from_millis(rng.gen_range(20..50)) + }) } } @@ -108,7 +117,7 @@ impl File for SimulatorFile { fn pread( &self, pos: usize, - mut c: turso_core::Completion, + c: Arc, ) -> Result> { self.nr_pread_calls.set(self.nr_pread_calls.get() + 1); if self.fault.get() { @@ -119,31 +128,19 @@ impl File for SimulatorFile { )); } if let Some(latency) = self.generate_latency_duration() { - let CompletionType::Read(read_completion) = &mut c.completion_type else { - unreachable!(); - }; - let before = self.rng.borrow_mut().gen_bool(0.5); - let dummy_complete = Box::new(|_, _| {}); - let prev_complete = std::mem::replace(&mut read_completion.complete, dummy_complete); - let new_complete = move |res, bytes_read| { - if before { - std::thread::sleep(latency); - } - (prev_complete)(res, bytes_read); - if !before { - std::thread::sleep(latency); - } - }; - read_completion.complete = Box::new(new_complete); - }; - self.inner.pread(pos, c) + let op = Box::new(|| self.inner.pread(pos, c.clone())); + + Ok(c) + } else { + self.inner.pread(pos, c) + } } fn pwrite( &self, pos: usize, buffer: Arc>, - mut c: turso_core::Completion, + c: Arc, ) -> Result> { self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1); if self.fault.get() { @@ -154,52 +151,27 @@ impl File for SimulatorFile { )); } if let Some(latency) = self.generate_latency_duration() { - let CompletionType::Write(write_completion) = &mut c.completion_type else { - unreachable!(); - }; - let before = self.rng.borrow_mut().gen_bool(0.5); - let dummy_complete = Box::new(|_| {}); - let prev_complete = std::mem::replace(&mut write_completion.complete, dummy_complete); - let new_complete = move |res| { - if before { - std::thread::sleep(latency); - } - (prev_complete)(res); - if !before { - std::thread::sleep(latency); - } - }; - write_completion.complete = Box::new(new_complete); - }; - self.inner.pwrite(pos, buffer, c) + let op = Box::new(|| self.inner.pwrite(pos, buffer, c.clone())); + + Ok(c) + } else { + self.inner.pwrite(pos, buffer, c) + } } - fn sync(&self, mut c: turso_core::Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { self.nr_sync_calls.set(self.nr_sync_calls.get() + 1); if self.fault.get() { // TODO: Enable this when https://github.com/tursodatabase/turso/issues/2091 is fixed. tracing::debug!("ignoring sync fault because it causes false positives with current simulator design"); self.fault.set(false); } - if let Some(latency) = self.generate_latency_duration() { - let CompletionType::Sync(sync_completion) = &mut c.completion_type else { - unreachable!(); - }; - let before = self.rng.borrow_mut().gen_bool(0.5); - let dummy_complete = Box::new(|_| {}); - let prev_complete = std::mem::replace(&mut sync_completion.complete, dummy_complete); - let new_complete = move |res| { - if before { - std::thread::sleep(latency); - } - (prev_complete)(res); - if !before { - std::thread::sleep(latency); - } - }; - sync_completion.complete = Box::new(new_complete); - }; - let c = self.inner.sync(c)?; + let c = if let Some(latency) = self.generate_latency_duration() { + let op = Box::new(|| self.inner.sync(c.clone())); + Ok(c) + } else { + self.inner.sync(c) + }?; *self.sync_completion.borrow_mut() = Some(c.clone()); Ok(c) } diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index 5f1cc7d7c..bf416455c 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -59,9 +59,10 @@ impl SimulatorIO { impl Clock for SimulatorIO { fn now(&self) -> Instant { + let now = chrono::Local::now(); Instant { - secs: 1704067200, // 2024-01-01 00:00:00 UTC - micros: 0, + secs: now.timestamp(), + micros: now.timestamp_subsec_micros(), } } } @@ -87,6 +88,7 @@ impl IO for SimulatorIO { rng: RefCell::new(ChaCha8Rng::seed_from_u64(self.seed)), latency_probability: self.latency_probability, sync_completion: RefCell::new(None), + queued_io: RefCell::new(Vec::new()), }); self.files.borrow_mut().push(file.clone()); Ok(file)