From 4ca5595f07399b2de10dc592e60f148e95c69016 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Thu, 3 Jul 2025 14:21:51 -0300 Subject: [PATCH 01/11] enable io latency + limit number of tests in simulator pull request CI check --- simulator/runner/cli.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simulator/runner/cli.rs b/simulator/runner/cli.rs index 4c8bee186..b0776171f 100644 --- a/simulator/runner/cli.rs +++ b/simulator/runner/cli.rs @@ -110,7 +110,7 @@ pub struct SimulatorCLI { #[clap( long = "latency_prob", help = "added IO latency probability", - default_value_t = 0 + default_value_t = 1 )] pub latency_probability: usize, #[clap(long, help = "Enable experimental MVCC feature")] From dc5f73887e8c17445071bb44591e7c0abccab9e1 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Tue, 8 Jul 2025 21:07:36 -0300 Subject: [PATCH 02/11] refactor to require `Arc` in file traits so that we can delay IO calls correctly --- bindings/javascript/src/lib.rs | 6 +-- bindings/wasm/lib.rs | 22 ++++---- core/io/generic.rs | 12 ++--- core/io/io_uring.rs | 9 ++-- core/io/memory.rs | 10 ++-- core/io/mod.rs | 6 +-- core/io/unix.rs | 9 ++-- core/io/vfs.rs | 12 ++--- core/io/windows.rs | 12 ++--- core/storage/database.rs | 12 ++--- core/storage/sqlite3_ondisk.rs | 8 +-- core/storage/wal.rs | 2 +- simulator/runner/file.rs | 94 ++++++++++++---------------------- simulator/runner/io.rs | 6 ++- 14 files changed, 95 insertions(+), 125 deletions(-) diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index cfa88b5f7..64e8ab7b4 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -672,7 +672,7 @@ impl turso_core::DatabaseStorage for DatabaseFile { return Err(turso_core::LimboError::NotADB); } let pos = (page_idx - 1) * size; - self.file.pread(pos, c)?; + self.file.pread(pos, c.into())?; Ok(()) } @@ -684,12 +684,12 @@ impl turso_core::DatabaseStorage for DatabaseFile { ) -> turso_core::Result<()> { let size = buffer.borrow().len(); let pos = (page_idx - 1) * size; - self.file.pwrite(pos, buffer, c)?; + self.file.pwrite(pos, buffer, c.into())?; Ok(()) } fn sync(&self, c: turso_core::Completion) -> turso_core::Result<()> { - let _ = self.file.sync(c)?; + let _ = self.file.sync(c.into())?; Ok(()) } diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 3d0caf705..7962f0366 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -213,7 +213,11 @@ impl turso_core::File for File { Ok(()) } - fn pread(&self, pos: usize, c: turso_core::Completion) -> Result> { + fn pread( + &self, + pos: usize, + c: Arc, + ) -> Result> { let r = match c.completion_type { turso_core::CompletionType::Read(ref r) => r, _ => unreachable!(), @@ -225,14 +229,14 @@ impl turso_core::File for File { }; r.complete(nr); #[allow(clippy::arc_with_non_send_sync)] - Ok(Arc::new(c)) + Ok(c) } fn pwrite( &self, pos: usize, buffer: Arc>, - c: turso_core::Completion, + c: Arc, ) -> Result> { let w = match c.completion_type { turso_core::CompletionType::Write(ref w) => w, @@ -243,14 +247,14 @@ impl turso_core::File for File { self.vfs.pwrite(self.fd, buf, pos); w.complete(buf.len() as i32); #[allow(clippy::arc_with_non_send_sync)] - Ok(Arc::new(c)) + Ok(c) } - fn sync(&self, c: turso_core::Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { self.vfs.sync(self.fd); c.complete(0); #[allow(clippy::arc_with_non_send_sync)] - Ok(Arc::new(c)) + Ok(c) } fn size(&self) -> Result { @@ -351,7 +355,7 @@ impl turso_core::DatabaseStorage for DatabaseFile { return Err(turso_core::LimboError::NotADB); } let pos = (page_idx - 1) * size; - self.file.pread(pos, c)?; + self.file.pread(pos, c.into())?; Ok(()) } @@ -363,12 +367,12 @@ impl turso_core::DatabaseStorage for DatabaseFile { ) -> Result<()> { let size = buffer.borrow().len(); let pos = (page_idx - 1) * size; - self.file.pwrite(pos, buffer, c)?; + self.file.pwrite(pos, buffer, c.into())?; Ok(()) } fn sync(&self, c: turso_core::Completion) -> Result<()> { - let _ = self.file.sync(c)?; + let _ = self.file.sync(c.into())?; Ok(()) } diff --git a/core/io/generic.rs b/core/io/generic.rs index 6e1faee73..ebbd67d17 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -86,7 +86,7 @@ impl File for GenericFile { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result> { + fn pread(&self, pos: usize, c: Arc) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; { @@ -99,14 +99,14 @@ impl File for GenericFile { file.read_exact(buf)?; } c.complete(0); - Ok(Arc::new(c)) + Ok(c) } fn pwrite( &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; @@ -114,14 +114,14 @@ impl File for GenericFile { let buf = buf.as_slice(); file.write_all(buf)?; c.complete(buf.len() as i32); - Ok(Arc::new(c)) + Ok(c) } - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { let mut file = self.file.borrow_mut(); file.sync_all().map_err(|err| LimboError::IOError(err))?; c.complete(0); - Ok(Arc::new(c)) + Ok(c) } fn size(&self) -> Result { diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index e204e044c..b873d4c79 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -302,7 +302,7 @@ impl File for UringFile { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result> { + fn pread(&self, pos: usize, c: Arc) -> Result> { let r = c.as_read(); trace!("pread(pos = {}, length = {})", pos, r.buf().len()); let mut io = self.io.borrow_mut(); @@ -317,7 +317,6 @@ impl File for UringFile { .user_data(io.ring.get_key()) }) }; - let c = Arc::new(c); io.ring.submit_entry(&read_e, c.clone()); Ok(c) } @@ -326,7 +325,7 @@ impl File for UringFile { &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { let mut io = self.io.borrow_mut(); let write = { @@ -339,7 +338,6 @@ impl File for UringFile { .user_data(io.ring.get_key()) }) }; - let c = Arc::new(c); let c_uring = c.clone(); io.ring.submit_entry( &write, @@ -354,7 +352,7 @@ impl File for UringFile { Ok(c) } - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { let mut io = self.io.borrow_mut(); trace!("sync()"); let sync = with_fd!(self, |fd| { @@ -362,7 +360,6 @@ impl File for UringFile { .build() .user_data(io.ring.get_key()) }); - let c = Arc::new(c); io.ring.submit_entry(&sync, c.clone()); Ok(c) } diff --git a/core/io/memory.rs b/core/io/memory.rs index b2f8bef5b..597696605 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -83,8 +83,7 @@ impl File for MemoryFile { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result> { - let c = Arc::new(c); + fn pread(&self, pos: usize, c: Arc) -> Result> { let r = c.as_read(); let buf_len = r.buf().len(); if buf_len == 0 { @@ -129,9 +128,8 @@ impl File for MemoryFile { &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { - let c = Arc::new(c); let buf = buffer.borrow(); let buf_len = buf.len(); if buf_len == 0 { @@ -167,10 +165,10 @@ impl File for MemoryFile { Ok(c) } - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { // no-op c.complete(0); - Ok(Arc::new(c)) + Ok(c) } fn size(&self) -> Result { diff --git a/core/io/mod.rs b/core/io/mod.rs index 5ba5d2a27..d462b4071 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -14,14 +14,14 @@ use std::{ pub trait File: Send + Sync { fn lock_file(&self, exclusive: bool) -> Result<()>; fn unlock_file(&self) -> Result<()>; - fn pread(&self, pos: usize, c: Completion) -> Result>; + fn pread(&self, pos: usize, c: Arc) -> Result>; fn pwrite( &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result>; - fn sync(&self, c: Completion) -> Result>; + fn sync(&self, c: Arc) -> Result>; fn size(&self) -> Result; } diff --git a/core/io/unix.rs b/core/io/unix.rs index 17cdb029c..26a63e6c6 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -335,14 +335,13 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::INFO)] - fn pread(&self, pos: usize, c: Completion) -> Result> { + fn pread(&self, pos: usize, c: Arc) -> Result> { let file = self.file.borrow(); let result = { let r = c.as_read(); let mut buf = r.buf_mut(); rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64) }; - let c = Arc::new(c); match result { Ok(n) => { trace!("pread n: {}", n); @@ -373,14 +372,13 @@ impl File for UnixFile<'_> { &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { let file = self.file.borrow(); let result = { let buf = buffer.borrow(); rustix::io::pwrite(file.as_fd(), buf.as_slice(), pos as u64) }; - let c = Arc::new(c); match result { Ok(n) => { trace!("pwrite n: {}", n); @@ -405,10 +403,9 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::INFO)] - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { let file = self.file.borrow(); let result = fs::fsync(file.as_fd()); - let c = Arc::new(c); match result { Ok(()) => { trace!("fsync"); diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 78b76ad6a..4df856eed 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -98,7 +98,7 @@ impl File for VfsFileImpl { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result> { + fn pread(&self, pos: usize, c: Arc) -> Result> { let r = match c.completion_type { CompletionType::Read(ref r) => r, _ => unreachable!(), @@ -113,7 +113,7 @@ impl File for VfsFileImpl { Err(LimboError::ExtensionError("pread failed".to_string())) } else { c.complete(result); - Ok(Arc::new(c)) + Ok(c) } } @@ -121,7 +121,7 @@ impl File for VfsFileImpl { &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { let buf = buffer.borrow(); let count = buf.as_slice().len(); @@ -142,18 +142,18 @@ impl File for VfsFileImpl { Err(LimboError::ExtensionError("pwrite failed".to_string())) } else { c.complete(result); - Ok(Arc::new(c)) + Ok(c) } } - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { let vfs = unsafe { &*self.vfs }; let result = unsafe { (vfs.sync)(self.file) }; if result < 0 { Err(LimboError::ExtensionError("sync failed".to_string())) } else { c.complete(0); - Ok(Arc::new(c)) + Ok(c) } } diff --git a/core/io/windows.rs b/core/io/windows.rs index cf2a60b38..3f7ea3a53 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -81,7 +81,7 @@ impl File for WindowsFile { unimplemented!() } - fn pread(&self, pos: usize, c: Completion) -> Result> { + fn pread(&self, pos: usize, c: Arc) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; let nr = { @@ -92,14 +92,14 @@ impl File for WindowsFile { buf.len() as i32 }; c.complete(nr); - Ok(Arc::new(c)) + Ok(c) } fn pwrite( &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; @@ -107,14 +107,14 @@ impl File for WindowsFile { let buf = buf.as_slice(); file.write_all(buf)?; c.complete(buffer.borrow().len() as i32); - Ok(Arc::new(c)) + Ok(c) } - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { let file = self.file.borrow_mut(); file.sync_all().map_err(LimboError::IOError)?; c.complete(0); - Ok(Arc::new(c)) + Ok(c) } fn size(&self) -> Result { diff --git a/core/storage/database.rs b/core/storage/database.rs index c2ad2c57a..a55c819e3 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -42,7 +42,7 @@ impl DatabaseStorage for DatabaseFile { return Err(LimboError::NotADB); } let pos = (page_idx - 1) * size; - self.file.pread(pos, c)?; + self.file.pread(pos, c.into())?; Ok(()) } @@ -59,13 +59,13 @@ impl DatabaseStorage for DatabaseFile { assert!(buffer_size <= 65536); assert_eq!(buffer_size & (buffer_size - 1), 0); let pos = (page_idx - 1) * buffer_size; - self.file.pwrite(pos, buffer, c)?; + self.file.pwrite(pos, buffer, c.into())?; Ok(()) } #[instrument(skip_all, level = Level::INFO)] fn sync(&self, c: Completion) -> Result<()> { - let _ = self.file.sync(c)?; + let _ = self.file.sync(c.into())?; Ok(()) } @@ -102,7 +102,7 @@ impl DatabaseStorage for FileMemoryStorage { return Err(LimboError::NotADB); } let pos = (page_idx - 1) * size; - self.file.pread(pos, c)?; + self.file.pread(pos, c.into())?; Ok(()) } @@ -118,13 +118,13 @@ impl DatabaseStorage for FileMemoryStorage { assert!(buffer_size <= 65536); assert_eq!(buffer_size & (buffer_size - 1), 0); let pos = (page_idx - 1) * buffer_size; - self.file.pwrite(pos, buffer, c)?; + self.file.pwrite(pos, buffer, c.into())?; Ok(()) } #[instrument(skip_all, level = Level::INFO)] fn sync(&self, c: Completion) -> Result<()> { - let _ = self.file.sync(c)?; + let _ = self.file.sync(c.into())?; Ok(()) } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index abbdc637c..e8e496173 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1490,7 +1490,7 @@ pub fn read_entire_wal_dumb(file: &Arc) -> Result, header: &WalHeader) -> Result< }; #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete))); - io.pwrite(0, buffer.clone(), c)?; + io.pwrite(0, buffer.clone(), c.into())?; Ok(()) } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index f9929dc8e..b731e650d 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -890,7 +890,7 @@ impl Wal for WalFile { }), })); let shared = self.get_shared(); - shared.file.sync(completion)?; + shared.file.sync(completion.into())?; self.sync_state.set(SyncState::Syncing); Ok(IOResult::IO) } diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index b53f56af4..310d227d7 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -6,7 +6,7 @@ use std::{ use rand::Rng as _; use rand_chacha::ChaCha8Rng; use tracing::{instrument, Level}; -use turso_core::{CompletionType, File, Result}; +use turso_core::{Completion, File, Result}; use crate::model::FAULT_ERROR_MSG; pub(crate) struct SimulatorFile { @@ -38,6 +38,13 @@ pub(crate) struct SimulatorFile { pub latency_probability: usize, pub sync_completion: RefCell>>, + pub queued_io: RefCell>, +} + +pub struct QueuedIo { + pub completion: Completion, + pub time: std::time::Instant, + pub op: Box Result>>, } unsafe impl Send for SimulatorFile {} @@ -78,11 +85,13 @@ impl SimulatorFile { } #[instrument(skip_all, level = Level::TRACE)] - fn generate_latency_duration(&self) -> Option { + fn generate_latency_duration(&self) -> Option { let mut rng = self.rng.borrow_mut(); // Chance to introduce some latency rng.gen_bool(self.latency_probability as f64 / 100.0) - .then(|| std::time::Duration::from_millis(rng.gen_range(20..50))) + .then(|| { + std::time::Instant::now() + std::time::Duration::from_millis(rng.gen_range(20..50)) + }) } } @@ -108,7 +117,7 @@ impl File for SimulatorFile { fn pread( &self, pos: usize, - mut c: turso_core::Completion, + c: Arc, ) -> Result> { self.nr_pread_calls.set(self.nr_pread_calls.get() + 1); if self.fault.get() { @@ -119,31 +128,19 @@ impl File for SimulatorFile { )); } if let Some(latency) = self.generate_latency_duration() { - let CompletionType::Read(read_completion) = &mut c.completion_type else { - unreachable!(); - }; - let before = self.rng.borrow_mut().gen_bool(0.5); - let dummy_complete = Box::new(|_, _| {}); - let prev_complete = std::mem::replace(&mut read_completion.complete, dummy_complete); - let new_complete = move |res, bytes_read| { - if before { - std::thread::sleep(latency); - } - (prev_complete)(res, bytes_read); - if !before { - std::thread::sleep(latency); - } - }; - read_completion.complete = Box::new(new_complete); - }; - self.inner.pread(pos, c) + let op = Box::new(|| self.inner.pread(pos, c.clone())); + + Ok(c) + } else { + self.inner.pread(pos, c) + } } fn pwrite( &self, pos: usize, buffer: Arc>, - mut c: turso_core::Completion, + c: Arc, ) -> Result> { self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1); if self.fault.get() { @@ -154,52 +151,27 @@ impl File for SimulatorFile { )); } if let Some(latency) = self.generate_latency_duration() { - let CompletionType::Write(write_completion) = &mut c.completion_type else { - unreachable!(); - }; - let before = self.rng.borrow_mut().gen_bool(0.5); - let dummy_complete = Box::new(|_| {}); - let prev_complete = std::mem::replace(&mut write_completion.complete, dummy_complete); - let new_complete = move |res| { - if before { - std::thread::sleep(latency); - } - (prev_complete)(res); - if !before { - std::thread::sleep(latency); - } - }; - write_completion.complete = Box::new(new_complete); - }; - self.inner.pwrite(pos, buffer, c) + let op = Box::new(|| self.inner.pwrite(pos, buffer, c.clone())); + + Ok(c) + } else { + self.inner.pwrite(pos, buffer, c) + } } - fn sync(&self, mut c: turso_core::Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { self.nr_sync_calls.set(self.nr_sync_calls.get() + 1); if self.fault.get() { // TODO: Enable this when https://github.com/tursodatabase/turso/issues/2091 is fixed. tracing::debug!("ignoring sync fault because it causes false positives with current simulator design"); self.fault.set(false); } - if let Some(latency) = self.generate_latency_duration() { - let CompletionType::Sync(sync_completion) = &mut c.completion_type else { - unreachable!(); - }; - let before = self.rng.borrow_mut().gen_bool(0.5); - let dummy_complete = Box::new(|_| {}); - let prev_complete = std::mem::replace(&mut sync_completion.complete, dummy_complete); - let new_complete = move |res| { - if before { - std::thread::sleep(latency); - } - (prev_complete)(res); - if !before { - std::thread::sleep(latency); - } - }; - sync_completion.complete = Box::new(new_complete); - }; - let c = self.inner.sync(c)?; + let c = if let Some(latency) = self.generate_latency_duration() { + let op = Box::new(|| self.inner.sync(c.clone())); + Ok(c) + } else { + self.inner.sync(c) + }?; *self.sync_completion.borrow_mut() = Some(c.clone()); Ok(c) } diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index 5f1cc7d7c..bf416455c 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -59,9 +59,10 @@ impl SimulatorIO { impl Clock for SimulatorIO { fn now(&self) -> Instant { + let now = chrono::Local::now(); Instant { - secs: 1704067200, // 2024-01-01 00:00:00 UTC - micros: 0, + secs: now.timestamp(), + micros: now.timestamp_subsec_micros(), } } } @@ -87,6 +88,7 @@ impl IO for SimulatorIO { rng: RefCell::new(ChaCha8Rng::seed_from_u64(self.seed)), latency_probability: self.latency_probability, sync_completion: RefCell::new(None), + queued_io: RefCell::new(Vec::new()), }); self.files.borrow_mut().push(file.clone()); Ok(file) From b292e08d2b34fb5397ae24c5ae513b3d09cd6186 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 9 Jul 2025 14:37:32 -0300 Subject: [PATCH 03/11] inject latency with queuing system --- simulator/runner/file.rs | 65 +++++++++++++++++++++++++++++++++------- simulator/runner/io.rs | 4 +++ 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index 310d227d7..edcb44c31 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -1,12 +1,13 @@ use std::{ cell::{Cell, RefCell}, + fmt::Debug, sync::Arc, }; use rand::Rng as _; use rand_chacha::ChaCha8Rng; use tracing::{instrument, Level}; -use turso_core::{Completion, File, Result}; +use turso_core::{File, Result}; use crate::model::FAULT_ERROR_MSG; pub(crate) struct SimulatorFile { @@ -38,13 +39,22 @@ pub(crate) struct SimulatorFile { pub latency_probability: usize, pub sync_completion: RefCell>>, - pub queued_io: RefCell>, + pub queued_io: RefCell>, } -pub struct QueuedIo { - pub completion: Completion, +type IoOperation = Box Result>>; + +pub struct DelayedIo { pub time: std::time::Instant, - pub op: Box Result>>, + pub op: IoOperation, +} + +impl Debug for DelayedIo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DelayedIo") + .field("time", &self.time) + .finish() + } } unsafe impl Send for SimulatorFile {} @@ -90,9 +100,32 @@ impl SimulatorFile { // Chance to introduce some latency rng.gen_bool(self.latency_probability as f64 / 100.0) .then(|| { - std::time::Instant::now() + std::time::Duration::from_millis(rng.gen_range(20..50)) + std::time::Instant::now() + std::time::Duration::from_millis(rng.gen_range(50..200)) }) } + + #[instrument(skip_all, level = Level::DEBUG)] + pub fn run_queued_io(&self, now: std::time::Instant) -> Result<()> { + let mut queued_io = self.queued_io.borrow_mut(); + tracing::debug!(?queued_io); + // TODO: as we are not in version 1.87 we cannot use `extract_if` + // so we have to do something different to achieve the same thing + // This code was acquired from: https://doc.rust-lang.org/beta/std/vec/struct.Vec.html#method.extract_if + let range = 0..queued_io.len(); + let mut i = range.start; + let end_items = queued_io.len() - range.end; + + while i < queued_io.len() - end_items { + if queued_io[i].time <= now { + let io = queued_io.remove(i); + // your code here + (io.op)(self)?; + } else { + i += 1; + } + } + Ok(()) + } } impl File for SimulatorFile { @@ -128,8 +161,11 @@ impl File for SimulatorFile { )); } if let Some(latency) = self.generate_latency_duration() { - let op = Box::new(|| self.inner.pread(pos, c.clone())); - + let cloned_c = c.clone(); + let op = Box::new(move |file: &SimulatorFile| file.inner.pread(pos, cloned_c)); + self.queued_io + .borrow_mut() + .push(DelayedIo { time: latency, op }); Ok(c) } else { self.inner.pread(pos, c) @@ -151,8 +187,11 @@ impl File for SimulatorFile { )); } if let Some(latency) = self.generate_latency_duration() { - let op = Box::new(|| self.inner.pwrite(pos, buffer, c.clone())); - + let cloned_c = c.clone(); + let op = Box::new(move |file: &SimulatorFile| file.inner.pwrite(pos, buffer, cloned_c)); + self.queued_io + .borrow_mut() + .push(DelayedIo { time: latency, op }); Ok(c) } else { self.inner.pwrite(pos, buffer, c) @@ -167,7 +206,11 @@ impl File for SimulatorFile { self.fault.set(false); } let c = if let Some(latency) = self.generate_latency_duration() { - let op = Box::new(|| self.inner.sync(c.clone())); + let cloned_c = c.clone(); + let op = Box::new(|file: &SimulatorFile| file.inner.sync(cloned_c)); + self.queued_io + .borrow_mut() + .push(DelayedIo { time: latency, op }); Ok(c) } else { self.inner.sync(c) diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index bf416455c..cc4b04f4e 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -109,6 +109,10 @@ impl IO for SimulatorIO { FAULT_ERROR_MSG.into(), )); } + let now = std::time::Instant::now(); + for file in self.files.borrow().iter() { + file.run_queued_io(now)?; + } self.inner.run_once()?; Ok(()) } From 5771d1a00e09833857352d3dcdf17ef8b26b66d7 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 9 Jul 2025 16:26:13 -0300 Subject: [PATCH 04/11] disable wal sync timeout on checkpoint --- core/storage/pager.rs | 17 ++++++++++------- simulator/runner/file.rs | 2 +- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 09cd09b90..fa20b1f9d 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1016,18 +1016,21 @@ impl Pager { } pub fn checkpoint_shutdown(&self, wal_checkpoint_disabled: bool) -> Result<()> { - let mut attempts = 0; + let mut _attempts = 0; { let mut wal = self.wal.borrow_mut(); // fsync the wal syncronously before beginning checkpoint while let Ok(IOResult::IO) = wal.sync() { - if attempts >= 10 { - return Err(LimboError::InternalError( - "Failed to fsync WAL before final checkpoint, fd likely closed".into(), - )); - } + // TODO: for now forget about timeouts as they fail regularly in SIM + // need to think of a better way to do this + + // if attempts >= 1000 { + // return Err(LimboError::InternalError( + // "Failed to fsync WAL before final checkpoint, fd likely closed".into(), + // )); + // } self.io.run_once()?; - attempts += 1; + _attempts += 1; } } self.wal_checkpoint(wal_checkpoint_disabled)?; diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index edcb44c31..81bc411b2 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -100,7 +100,7 @@ impl SimulatorFile { // Chance to introduce some latency rng.gen_bool(self.latency_probability as f64 / 100.0) .then(|| { - std::time::Instant::now() + std::time::Duration::from_millis(rng.gen_range(50..200)) + std::time::Instant::now() + std::time::Duration::from_millis(rng.gen_range(20..50)) }) } From 7b8eec90bd2c450af04d92dfef971eb6408e9529 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 9 Jul 2025 16:46:42 -0300 Subject: [PATCH 05/11] edit state machine in Btree for freeing pages + Pager state machine for free_page --- core/storage/btree.rs | 50 +++++++++-- core/storage/pager.rs | 149 ++++++++++++++++++++++----------- core/storage/sqlite3_ondisk.rs | 3 +- simulator/runner/file.rs | 1 - 4 files changed, 142 insertions(+), 61 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 871d49d78..ad936555b 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -223,6 +223,10 @@ struct DeleteInfo { enum WriteState { Start, BalanceStart, + BalanceFreePages { + curr_page: usize, + sibling_count_new: usize, + }, /// Choose which sibling pages to balance (max 3). /// Generally, the siblings involved will be the page that triggered the balancing and its left and right siblings. /// The exceptions are: @@ -2255,6 +2259,7 @@ impl BTreeCursor { } } WriteState::BalanceStart + | WriteState::BalanceFreePages { .. } | WriteState::BalanceNonRootPickSiblings | WriteState::BalanceNonRootDoBalancing => { return_if_io!(self.balance()); @@ -2333,7 +2338,9 @@ impl BTreeCursor { self.stack.pop(); return_if_io!(self.balance_non_root()); } - WriteState::BalanceNonRootPickSiblings | WriteState::BalanceNonRootDoBalancing => { + WriteState::BalanceNonRootPickSiblings + | WriteState::BalanceNonRootDoBalancing + | WriteState::BalanceFreePages { .. } => { return_if_io!(self.balance_non_root()); } WriteState::Finish => return Ok(IOResult::Done(())), @@ -2350,7 +2357,7 @@ impl BTreeCursor { "Cursor must be in balancing state" ); let state = self.state.write_info().expect("must be balancing").state; - tracing::debug!("balance_non_root(state={:?})", state); + tracing::debug!(?state); let (next_write_state, result) = match state { WriteState::Start => todo!(), WriteState::BalanceStart => todo!(), @@ -3322,13 +3329,38 @@ impl BTreeCursor { right_page_id, ); + ( + WriteState::BalanceFreePages { + curr_page: sibling_count_new, + sibling_count_new, + }, + Ok(CursorResult::Ok(())), + ) + } + WriteState::BalanceFreePages { + curr_page, + sibling_count_new, + } => { + let write_info = self.state.write_info().unwrap(); + let mut balance_info: std::cell::RefMut<'_, Option> = + write_info.balance_info.borrow_mut(); + let balance_info = balance_info.as_mut().unwrap(); // We have to free pages that are not used anymore - for i in sibling_count_new..balance_info.sibling_count { - let page = balance_info.pages_to_balance[i].as_ref().unwrap(); - self.pager - .free_page(Some(page.get().clone()), page.get().get().id)?; + if !((sibling_count_new..balance_info.sibling_count).contains(&curr_page)) { + (WriteState::BalanceStart, Ok(IOResult::Done(()))) + } else { + let page = balance_info.pages_to_balance[curr_page].as_ref().unwrap(); + return_if_io!(self + .pager + .free_page(Some(page.get().clone()), page.get().get().id)); + ( + WriteState::BalanceFreePages { + curr_page: curr_page + 1, + sibling_count_new, + }, + Ok(CursorResult::Ok(())), + ) } - (WriteState::BalanceStart, Ok(IOResult::Done(()))) } WriteState::Finish => todo!(), }; @@ -4679,7 +4711,7 @@ impl BTreeCursor { let contents = page.get().contents.as_ref().unwrap(); let next = contents.read_u32(0); - self.pager.free_page(Some(page), next_page as usize)?; + return_if_io!(self.pager.free_page(Some(page), next_page as usize)); if next != 0 { self.overflow_state = Some(OverflowState::ProcessPage { next_page: next }); @@ -4866,7 +4898,7 @@ impl BTreeCursor { let page = self.stack.top(); let page_id = page.get().get().id; - self.pager.free_page(Some(page.get()), page_id)?; + return_if_io!(self.pager.free_page(Some(page.get()), page_id)); if self.stack.has_parent() { self.stack.pop(); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index fa20b1f9d..7bc3ec894 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -280,6 +280,7 @@ pub struct Pager { /// to change it. page_size: Cell>, reserved_space: OnceCell, + free_page_state: RefCell, } #[derive(Debug, Copy, Clone)] @@ -303,6 +304,18 @@ enum AllocatePage1State { Done, } +#[derive(Debug, Clone)] +enum FreePageState { + Start, + AddToTrunk { + page: Arc, + trunk_page: Option>, + }, + NewTrunk { + page: Arc, + }, +} + impl Pager { pub fn new( db_file: Arc, @@ -342,6 +355,7 @@ impl Pager { state: CacheFlushState::Start, in_flight_writes: Rc::new(RefCell::new(0)), }), + free_page_state: RefCell::new(FreePageState::Start), }) } @@ -1073,7 +1087,7 @@ impl Pager { // Providing a page is optional, if provided it will be used to avoid reading the page from disk. // This is implemented in accordance with sqlite freepage2() function. #[instrument(skip_all, level = Level::INFO)] - pub fn free_page(&self, page: Option, page_id: usize) -> Result<()> { + pub fn free_page(&self, page: Option, page_id: usize) -> Result> { tracing::trace!("free_page(page_id={})", page_id); const TRUNK_PAGE_HEADER_SIZE: usize = 8; const LEAF_ENTRY_SIZE: usize = 4; @@ -1082,65 +1096,100 @@ impl Pager { const TRUNK_PAGE_NEXT_PAGE_OFFSET: usize = 0; // Offset to next trunk page pointer const TRUNK_PAGE_LEAF_COUNT_OFFSET: usize = 4; // Offset to leaf count - if page_id < 2 || page_id > header_accessor::get_database_size(self)? as usize { - return Err(LimboError::Corrupt(format!( - "Invalid page number {page_id} for free operation" - ))); - } + let mut state = self.free_page_state.borrow_mut(); + tracing::debug!(?state); + loop { + match &mut *state { + FreePageState::Start => { + if page_id < 2 || page_id > header_accessor::get_database_size(self)? as usize { + return Err(LimboError::Corrupt(format!( + "Invalid page number {page_id} for free operation" + ))); + } - let page = match page { - Some(page) => { - assert_eq!(page.get().id, page_id, "Page id mismatch"); - page - } - None => self.read_page(page_id)?, - }; + let page = match page.clone() { + Some(page) => { + assert_eq!(page.get().id, page_id, "Page id mismatch"); + page + } + None => self.read_page(page_id)?, + }; + header_accessor::set_freelist_pages( + self, + header_accessor::get_freelist_pages(self)? + 1, + )?; - header_accessor::set_freelist_pages(self, header_accessor::get_freelist_pages(self)? + 1)?; + let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?; - let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?; + if trunk_page_id != 0 { + *state = FreePageState::AddToTrunk { + page, + trunk_page: None, + }; + } else { + *state = FreePageState::NewTrunk { page }; + } + } + FreePageState::AddToTrunk { page, trunk_page } => { + let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?; + if trunk_page.is_none() { + // Add as leaf to current trunk + trunk_page.replace(self.read_page(trunk_page_id as usize)?); + } + let trunk_page = trunk_page.as_ref().unwrap(); + if trunk_page.is_locked() || !trunk_page.is_loaded() { + return Ok(CursorResult::IO); + } - if trunk_page_id != 0 { - // Add as leaf to current trunk - let trunk_page = self.read_page(trunk_page_id as usize)?; - let trunk_page_contents = trunk_page.get().contents.as_ref().unwrap(); - let number_of_leaf_pages = trunk_page_contents.read_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET); + let trunk_page_contents = trunk_page.get().contents.as_ref().unwrap(); + let number_of_leaf_pages = + trunk_page_contents.read_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET); - // Reserve 2 slots for the trunk page header which is 8 bytes or 2*LEAF_ENTRY_SIZE - let max_free_list_entries = (self.usable_space() / LEAF_ENTRY_SIZE) - RESERVED_SLOTS; + // Reserve 2 slots for the trunk page header which is 8 bytes or 2*LEAF_ENTRY_SIZE + let max_free_list_entries = + (self.usable_space() / LEAF_ENTRY_SIZE) - RESERVED_SLOTS; - if number_of_leaf_pages < max_free_list_entries as u32 { - trunk_page.set_dirty(); - self.add_dirty(trunk_page_id as usize); + if number_of_leaf_pages < max_free_list_entries as u32 { + trunk_page.set_dirty(); + self.add_dirty(trunk_page_id as usize); - trunk_page_contents - .write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, number_of_leaf_pages + 1); - trunk_page_contents.write_u32( - TRUNK_PAGE_HEADER_SIZE + (number_of_leaf_pages as usize * LEAF_ENTRY_SIZE), - page_id as u32, - ); - page.clear_uptodate(); - page.clear_loaded(); + trunk_page_contents + .write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, number_of_leaf_pages + 1); + trunk_page_contents.write_u32( + TRUNK_PAGE_HEADER_SIZE + + (number_of_leaf_pages as usize * LEAF_ENTRY_SIZE), + page_id as u32, + ); + page.clear_uptodate(); - return Ok(()); + break; + } + } + FreePageState::NewTrunk { page } => { + if page.is_locked() || !page.is_loaded() { + return Ok(CursorResult::IO); + } + // If we get here, need to make this page a new trunk + page.set_dirty(); + self.add_dirty(page_id); + + let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?; + + let contents = page.get().contents.as_mut().unwrap(); + // Point to previous trunk + contents.write_u32(TRUNK_PAGE_NEXT_PAGE_OFFSET, trunk_page_id); + // Zero leaf count + contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0); + // Update page 1 to point to new trunk + header_accessor::set_freelist_trunk_page(self, page_id as u32)?; + // Clear flags + page.clear_uptodate(); + break; + } } } - - // If we get here, need to make this page a new trunk - page.set_dirty(); - self.add_dirty(page_id); - - let contents = page.get().contents.as_mut().unwrap(); - // Point to previous trunk - contents.write_u32(TRUNK_PAGE_NEXT_PAGE_OFFSET, trunk_page_id); - // Zero leaf count - contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0); - // Update page 1 to point to new trunk - header_accessor::set_freelist_trunk_page(self, page_id as u32)?; - // Clear flags - page.clear_uptodate(); - page.clear_loaded(); - Ok(()) + *state = FreePageState::Start; + Ok(CursorResult::Ok(())) } #[instrument(skip_all, level = Level::INFO)] diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index e8e496173..12a96b742 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -752,12 +752,13 @@ pub fn begin_read_page( Ok(()) } +#[instrument(skip_all, level = Level::INFO)] pub fn finish_read_page( page_idx: usize, buffer_ref: Arc>, page: PageRef, ) -> Result<()> { - tracing::trace!("finish_read_btree_page(page_idx = {})", page_idx); + tracing::trace!(page_idx); let pos = if page_idx == DATABASE_HEADER_PAGE_ID { DATABASE_HEADER_SIZE } else { diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index 81bc411b2..5eff245b5 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -107,7 +107,6 @@ impl SimulatorFile { #[instrument(skip_all, level = Level::DEBUG)] pub fn run_queued_io(&self, now: std::time::Instant) -> Result<()> { let mut queued_io = self.queued_io.borrow_mut(); - tracing::debug!(?queued_io); // TODO: as we are not in version 1.87 we cannot use `extract_if` // so we have to do something different to achieve the same thing // This code was acquired from: https://doc.rust-lang.org/beta/std/vec/struct.Vec.html#method.extract_if From d72a7f5d8e729f9125ae9d5cf517a4b0a575a5ad Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 9 Jul 2025 23:09:08 -0300 Subject: [PATCH 06/11] decrease IO latency time as Turso becomes super slow for some reason --- simulator/runner/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index 5eff245b5..c7d615c0c 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -100,7 +100,7 @@ impl SimulatorFile { // Chance to introduce some latency rng.gen_bool(self.latency_probability as f64 / 100.0) .then(|| { - std::time::Instant::now() + std::time::Duration::from_millis(rng.gen_range(20..50)) + std::time::Instant::now() + std::time::Duration::from_millis(rng.gen_range(5..15)) }) } From 4a13286d62dd2559ea766dcd586df273410f5211 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Thu, 10 Jul 2025 11:32:01 -0300 Subject: [PATCH 07/11] modify clock to use simulated time instead --- core/io/clock.rs | 11 +++++++++++ simulator/runner/clock.rs | 34 ++++++++++++++++++++++++++++++++++ simulator/runner/file.rs | 14 +++++++++----- simulator/runner/io.rs | 18 +++++++++++------- simulator/runner/mod.rs | 1 + 5 files changed, 66 insertions(+), 12 deletions(-) create mode 100644 simulator/runner/clock.rs diff --git a/core/io/clock.rs b/core/io/clock.rs index 3a38ad955..8c12aa4e5 100644 --- a/core/io/clock.rs +++ b/core/io/clock.rs @@ -4,6 +4,17 @@ pub struct Instant { pub micros: u32, } +impl From> for Instant { + fn from(value: chrono::DateTime) -> Self { + Instant { + secs: value.timestamp(), + micros: value.timestamp_subsec_micros(), + } + } +} + + + pub trait Clock { fn now(&self) -> Instant; } diff --git a/simulator/runner/clock.rs b/simulator/runner/clock.rs new file mode 100644 index 000000000..56ef72f1f --- /dev/null +++ b/simulator/runner/clock.rs @@ -0,0 +1,34 @@ +use std::cell::RefCell; + +use chrono::{DateTime, Utc}; +use rand::Rng; +use rand_chacha::ChaCha8Rng; + +#[derive(Debug)] +pub struct SimulatorClock { + curr_time: RefCell>, + rng: RefCell, +} + +impl SimulatorClock { + const MIN_TICK: u64 = 1; + const MAX_TICK: u64 = 50; + + pub fn new(rng: ChaCha8Rng) -> Self { + Self { + curr_time: RefCell::new(Utc::now()), + rng: RefCell::new(rng), + } + } + + pub fn now(&self) -> DateTime { + let mut time = self.curr_time.borrow_mut(); + let nanos = self + .rng + .borrow_mut() + .gen_range(Self::MIN_TICK..Self::MAX_TICK); + let nanos = std::time::Duration::from_micros(nanos); + *time = *time + nanos; + *time + } +} diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index c7d615c0c..6bebbc5e1 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -4,12 +4,13 @@ use std::{ sync::Arc, }; +use chrono::{DateTime, Utc}; use rand::Rng as _; use rand_chacha::ChaCha8Rng; use tracing::{instrument, Level}; use turso_core::{File, Result}; -use crate::model::FAULT_ERROR_MSG; +use crate::{model::FAULT_ERROR_MSG, runner::clock::SimulatorClock}; pub(crate) struct SimulatorFile { pub(crate) inner: Arc, pub(crate) fault: Cell, @@ -40,12 +41,13 @@ pub(crate) struct SimulatorFile { pub sync_completion: RefCell>>, pub queued_io: RefCell>, + pub clock: Arc, } type IoOperation = Box Result>>; pub struct DelayedIo { - pub time: std::time::Instant, + pub time: turso_core::Instant, pub op: IoOperation, } @@ -95,17 +97,19 @@ impl SimulatorFile { } #[instrument(skip_all, level = Level::TRACE)] - fn generate_latency_duration(&self) -> Option { + fn generate_latency_duration(&self) -> Option { let mut rng = self.rng.borrow_mut(); // Chance to introduce some latency rng.gen_bool(self.latency_probability as f64 / 100.0) .then(|| { - std::time::Instant::now() + std::time::Duration::from_millis(rng.gen_range(5..15)) + let now: DateTime = self.clock.now().into(); + let sum = now + std::time::Duration::from_millis(rng.gen_range(5..20)); + sum.into() }) } #[instrument(skip_all, level = Level::DEBUG)] - pub fn run_queued_io(&self, now: std::time::Instant) -> Result<()> { + pub fn run_queued_io(&self, now: turso_core::Instant) -> Result<()> { let mut queued_io = self.queued_io.borrow_mut(); // TODO: as we are not in version 1.87 we cannot use `extract_if` // so we have to do something different to achieve the same thing diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index cc4b04f4e..b56000965 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -7,7 +7,10 @@ use rand::{RngCore, SeedableRng}; use rand_chacha::ChaCha8Rng; use turso_core::{Clock, Instant, MemoryIO, OpenFlags, PlatformIO, Result, IO}; -use crate::{model::FAULT_ERROR_MSG, runner::file::SimulatorFile}; +use crate::{ + model::FAULT_ERROR_MSG, + runner::{clock::SimulatorClock, file::SimulatorFile}, +}; pub(crate) struct SimulatorIO { pub(crate) inner: Box, @@ -18,6 +21,7 @@ pub(crate) struct SimulatorIO { pub(crate) page_size: usize, seed: u64, latency_probability: usize, + clock: Arc, } unsafe impl Send for SimulatorIO {} @@ -30,6 +34,8 @@ impl SimulatorIO { let files = RefCell::new(Vec::new()); let rng = RefCell::new(ChaCha8Rng::seed_from_u64(seed)); let nr_run_once_faults = Cell::new(0); + let clock = SimulatorClock::new(ChaCha8Rng::seed_from_u64(seed)); + Ok(Self { inner, fault, @@ -39,6 +45,7 @@ impl SimulatorIO { page_size, seed, latency_probability, + clock: Arc::new(clock), }) } @@ -59,11 +66,7 @@ impl SimulatorIO { impl Clock for SimulatorIO { fn now(&self) -> Instant { - let now = chrono::Local::now(); - Instant { - secs: now.timestamp(), - micros: now.timestamp_subsec_micros(), - } + self.clock.now().into() } } @@ -89,6 +92,7 @@ impl IO for SimulatorIO { latency_probability: self.latency_probability, sync_completion: RefCell::new(None), queued_io: RefCell::new(Vec::new()), + clock: self.clock.clone(), }); self.files.borrow_mut().push(file.clone()); Ok(file) @@ -109,7 +113,7 @@ impl IO for SimulatorIO { FAULT_ERROR_MSG.into(), )); } - let now = std::time::Instant::now(); + let now = self.now(); for file in self.files.borrow().iter() { file.run_queued_io(now)?; } diff --git a/simulator/runner/mod.rs b/simulator/runner/mod.rs index ccb0563ab..b56335da5 100644 --- a/simulator/runner/mod.rs +++ b/simulator/runner/mod.rs @@ -1,5 +1,6 @@ pub mod bugbase; pub mod cli; +pub mod clock; pub mod differential; pub mod doublecheck; pub mod env; From 46a7d20c125f235010daa806dcce29bc5efe47bb Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Thu, 10 Jul 2025 12:48:09 -0300 Subject: [PATCH 08/11] clippy --- core/io/clock.rs | 2 -- simulator/runner/clock.rs | 4 ++-- simulator/runner/file.rs | 3 +-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/core/io/clock.rs b/core/io/clock.rs index 8c12aa4e5..aae1a7633 100644 --- a/core/io/clock.rs +++ b/core/io/clock.rs @@ -13,8 +13,6 @@ impl From> for Instant { } } - - pub trait Clock { fn now(&self) -> Instant; } diff --git a/simulator/runner/clock.rs b/simulator/runner/clock.rs index 56ef72f1f..2f89e93e6 100644 --- a/simulator/runner/clock.rs +++ b/simulator/runner/clock.rs @@ -12,7 +12,7 @@ pub struct SimulatorClock { impl SimulatorClock { const MIN_TICK: u64 = 1; - const MAX_TICK: u64 = 50; + const MAX_TICK: u64 = 30; pub fn new(rng: ChaCha8Rng) -> Self { Self { @@ -28,7 +28,7 @@ impl SimulatorClock { .borrow_mut() .gen_range(Self::MIN_TICK..Self::MAX_TICK); let nanos = std::time::Duration::from_micros(nanos); - *time = *time + nanos; + *time += nanos; *time } } diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index 6bebbc5e1..6df14c89d 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -4,7 +4,6 @@ use std::{ sync::Arc, }; -use chrono::{DateTime, Utc}; use rand::Rng as _; use rand_chacha::ChaCha8Rng; use tracing::{instrument, Level}; @@ -102,7 +101,7 @@ impl SimulatorFile { // Chance to introduce some latency rng.gen_bool(self.latency_probability as f64 / 100.0) .then(|| { - let now: DateTime = self.clock.now().into(); + let now = self.clock.now(); let sum = now + std::time::Duration::from_millis(rng.gen_range(5..20)); sum.into() }) From 0ab2f2b951a0a19a822c2cd75a89f19bcbb6ffef Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Thu, 10 Jul 2025 15:08:27 -0300 Subject: [PATCH 09/11] cli options for max and min tick + adjust github action to run with faster clock so no timeouts happen --- .github/workflows/rust.yml | 2 +- simulator/runner/cli.rs | 14 +++++++++++++- simulator/runner/clock.rs | 11 ++++++----- simulator/runner/env.rs | 12 ++++++++++-- simulator/runner/io.rs | 10 ++++++++-- 5 files changed, 38 insertions(+), 11 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f20bbe06a..527b3d5b8 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -75,7 +75,7 @@ jobs: with: prefix-key: "v1-rust" # can be updated if we need to reset caches due to non-trivial change in the dependencies (for example, custom env var were set for single workspace project) - name: Install the project - run: ./scripts/run-sim --maximum-tests 1000 loop -n 10 -s + run: ./scripts/run-sim --maximum-tests 1000 --min-tick 10 --max-tick 50 loop -n 10 -s test-limbo: runs-on: blacksmith-4vcpu-ubuntu-2404 diff --git a/simulator/runner/cli.rs b/simulator/runner/cli.rs index b0776171f..f35555aa1 100644 --- a/simulator/runner/cli.rs +++ b/simulator/runner/cli.rs @@ -108,11 +108,23 @@ pub struct SimulatorCLI { #[clap(long, help = "disable Reopen-Database fault", default_value_t = false)] pub disable_reopen_database: bool, #[clap( - long = "latency_prob", + long = "latency-prob", help = "added IO latency probability", default_value_t = 1 )] pub latency_probability: usize, + #[clap( + long, + help = "Minimum tick time in microseconds for simulated time", + default_value_t = 1 + )] + pub min_tick: u64, + #[clap( + long, + help = "Maximum tick time in microseconds for simulated time", + default_value_t = 30 + )] + pub max_tick: u64, #[clap(long, help = "Enable experimental MVCC feature")] pub experimental_mvcc: bool, #[clap(long, help = "Enable experimental indexing feature")] diff --git a/simulator/runner/clock.rs b/simulator/runner/clock.rs index 2f89e93e6..ef687c5c1 100644 --- a/simulator/runner/clock.rs +++ b/simulator/runner/clock.rs @@ -8,16 +8,17 @@ use rand_chacha::ChaCha8Rng; pub struct SimulatorClock { curr_time: RefCell>, rng: RefCell, + min_tick: u64, + max_tick: u64, } impl SimulatorClock { - const MIN_TICK: u64 = 1; - const MAX_TICK: u64 = 30; - - pub fn new(rng: ChaCha8Rng) -> Self { + pub fn new(rng: ChaCha8Rng, min_tick: u64, max_tick: u64) -> Self { Self { curr_time: RefCell::new(Utc::now()), rng: RefCell::new(rng), + min_tick, + max_tick, } } @@ -26,7 +27,7 @@ impl SimulatorClock { let nanos = self .rng .borrow_mut() - .gen_range(Self::MIN_TICK..Self::MAX_TICK); + .gen_range(self.min_tick..self.max_tick); let nanos = std::time::Duration::from_micros(nanos); *time += nanos; *time diff --git a/simulator/runner/env.rs b/simulator/runner/env.rs index 675b2be08..f2de5aba1 100644 --- a/simulator/runner/env.rs +++ b/simulator/runner/env.rs @@ -247,8 +247,16 @@ impl SimulatorEnv { experimental_indexes: cli_opts.experimental_indexes, }; - let io = - Arc::new(SimulatorIO::new(seed, opts.page_size, cli_opts.latency_probability).unwrap()); + let io = Arc::new( + SimulatorIO::new( + seed, + opts.page_size, + cli_opts.latency_probability, + cli_opts.min_tick, + cli_opts.max_tick, + ) + .unwrap(), + ); // Remove existing database file if it exists let db_path = paths.db(&simulation_type, &SimulationPhase::Test); diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index b56000965..7c888cc7f 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -28,13 +28,19 @@ unsafe impl Send for SimulatorIO {} unsafe impl Sync for SimulatorIO {} impl SimulatorIO { - pub(crate) fn new(seed: u64, page_size: usize, latency_probability: usize) -> Result { + pub(crate) fn new( + seed: u64, + page_size: usize, + latency_probability: usize, + min_tick: u64, + max_tick: u64, + ) -> Result { let inner = Box::new(PlatformIO::new()?); let fault = Cell::new(false); let files = RefCell::new(Vec::new()); let rng = RefCell::new(ChaCha8Rng::seed_from_u64(seed)); let nr_run_once_faults = Cell::new(0); - let clock = SimulatorClock::new(ChaCha8Rng::seed_from_u64(seed)); + let clock = SimulatorClock::new(ChaCha8Rng::seed_from_u64(seed), min_tick, max_tick); Ok(Self { inner, From 6088aa34c2b482904588d299e6bdb4797ed92acc Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Thu, 10 Jul 2025 15:55:08 -0300 Subject: [PATCH 10/11] only add the sync completion after we call sync in the closure --- simulator/runner/file.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index 6df14c89d..d1682c353 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -209,15 +209,20 @@ impl File for SimulatorFile { } let c = if let Some(latency) = self.generate_latency_duration() { let cloned_c = c.clone(); - let op = Box::new(|file: &SimulatorFile| file.inner.sync(cloned_c)); + let op = Box::new(|file: &SimulatorFile| -> Result<_> { + let c = file.inner.sync(cloned_c)?; + *file.sync_completion.borrow_mut() = Some(c.clone()); + Ok(c) + }); self.queued_io .borrow_mut() .push(DelayedIo { time: latency, op }); - Ok(c) + c } else { - self.inner.sync(c) - }?; - *self.sync_completion.borrow_mut() = Some(c.clone()); + let c = self.inner.sync(c)?; + *self.sync_completion.borrow_mut() = Some(c.clone()); + c + }; Ok(c) } From b80218324d4380d21c4b03e6cd728d689e3d9fd6 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 16 Jul 2025 11:08:23 -0300 Subject: [PATCH 11/11] fix merge conflicts --- core/storage/btree.rs | 4 ++-- core/storage/pager.rs | 8 ++++---- simulator/runner/env.rs | 6 ++++++ tests/integration/query_processing/test_btree.rs | 2 +- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index ad936555b..7180368bb 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -3334,7 +3334,7 @@ impl BTreeCursor { curr_page: sibling_count_new, sibling_count_new, }, - Ok(CursorResult::Ok(())), + Ok(IOResult::Done(())), ) } WriteState::BalanceFreePages { @@ -3358,7 +3358,7 @@ impl BTreeCursor { curr_page: curr_page + 1, sibling_count_new, }, - Ok(CursorResult::Ok(())), + Ok(IOResult::Done(())), ) } } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 7bc3ec894..0ad3d1839 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1087,7 +1087,7 @@ impl Pager { // Providing a page is optional, if provided it will be used to avoid reading the page from disk. // This is implemented in accordance with sqlite freepage2() function. #[instrument(skip_all, level = Level::INFO)] - pub fn free_page(&self, page: Option, page_id: usize) -> Result> { + pub fn free_page(&self, page: Option, page_id: usize) -> Result> { tracing::trace!("free_page(page_id={})", page_id); const TRUNK_PAGE_HEADER_SIZE: usize = 8; const LEAF_ENTRY_SIZE: usize = 4; @@ -1138,7 +1138,7 @@ impl Pager { } let trunk_page = trunk_page.as_ref().unwrap(); if trunk_page.is_locked() || !trunk_page.is_loaded() { - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } let trunk_page_contents = trunk_page.get().contents.as_ref().unwrap(); @@ -1167,7 +1167,7 @@ impl Pager { } FreePageState::NewTrunk { page } => { if page.is_locked() || !page.is_loaded() { - return Ok(CursorResult::IO); + return Ok(IOResult::IO); } // If we get here, need to make this page a new trunk page.set_dirty(); @@ -1189,7 +1189,7 @@ impl Pager { } } *state = FreePageState::Start; - Ok(CursorResult::Ok(())) + Ok(IOResult::Done(())) } #[instrument(skip_all, level = Level::INFO)] diff --git a/simulator/runner/env.rs b/simulator/runner/env.rs index f2de5aba1..fa5859af3 100644 --- a/simulator/runner/env.rs +++ b/simulator/runner/env.rs @@ -99,6 +99,8 @@ impl SimulatorEnv { self.opts.seed, self.opts.page_size, self.opts.latency_probability, + self.opts.min_tick, + self.opts.max_tick, ) .unwrap(), ); @@ -245,6 +247,8 @@ impl SimulatorEnv { latency_probability: cli_opts.latency_probability, experimental_mvcc: cli_opts.experimental_mvcc, experimental_indexes: cli_opts.experimental_indexes, + min_tick: cli_opts.min_tick, + max_tick: cli_opts.max_tick, }; let io = Arc::new( @@ -415,6 +419,8 @@ pub(crate) struct SimulatorOpts { pub(crate) latency_probability: usize, pub(crate) experimental_mvcc: bool, pub(crate) experimental_indexes: bool, + pub min_tick: u64, + pub max_tick: u64, } #[derive(Debug, Clone)] diff --git a/tests/integration/query_processing/test_btree.rs b/tests/integration/query_processing/test_btree.rs index 705557687..70f02e2b1 100644 --- a/tests/integration/query_processing/test_btree.rs +++ b/tests/integration/query_processing/test_btree.rs @@ -437,7 +437,7 @@ fn write_at(io: &impl IO, file: Arc, offset: usize, data: &[u8]) { let drop_fn = Rc::new(move |_| {}); #[allow(clippy::arc_with_non_send_sync)] let buffer = Arc::new(RefCell::new(Buffer::new(Pin::new(data.to_vec()), drop_fn))); - let result = file.pwrite(offset, buffer, completion).unwrap(); + let result = file.pwrite(offset, buffer, completion.into()).unwrap(); while !result.is_completed() { io.run_once().unwrap(); }