diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f20bbe06a..527b3d5b8 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -75,7 +75,7 @@ jobs: with: prefix-key: "v1-rust" # can be updated if we need to reset caches due to non-trivial change in the dependencies (for example, custom env var were set for single workspace project) - name: Install the project - run: ./scripts/run-sim --maximum-tests 1000 loop -n 10 -s + run: ./scripts/run-sim --maximum-tests 1000 --min-tick 10 --max-tick 50 loop -n 10 -s test-limbo: runs-on: blacksmith-4vcpu-ubuntu-2404 diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index cfa88b5f7..64e8ab7b4 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -672,7 +672,7 @@ impl turso_core::DatabaseStorage for DatabaseFile { return Err(turso_core::LimboError::NotADB); } let pos = (page_idx - 1) * size; - self.file.pread(pos, c)?; + self.file.pread(pos, c.into())?; Ok(()) } @@ -684,12 +684,12 @@ impl turso_core::DatabaseStorage for DatabaseFile { ) -> turso_core::Result<()> { let size = buffer.borrow().len(); let pos = (page_idx - 1) * size; - self.file.pwrite(pos, buffer, c)?; + self.file.pwrite(pos, buffer, c.into())?; Ok(()) } fn sync(&self, c: turso_core::Completion) -> turso_core::Result<()> { - let _ = self.file.sync(c)?; + let _ = self.file.sync(c.into())?; Ok(()) } diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 3d0caf705..7962f0366 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -213,7 +213,11 @@ impl turso_core::File for File { Ok(()) } - fn pread(&self, pos: usize, c: turso_core::Completion) -> Result> { + fn pread( + &self, + pos: usize, + c: Arc, + ) -> Result> { let r = match c.completion_type { turso_core::CompletionType::Read(ref r) => r, _ => unreachable!(), @@ -225,14 +229,14 @@ impl turso_core::File for File { }; r.complete(nr); #[allow(clippy::arc_with_non_send_sync)] - Ok(Arc::new(c)) + Ok(c) } fn pwrite( &self, pos: usize, buffer: Arc>, - c: turso_core::Completion, + c: Arc, ) -> Result> { let w = match c.completion_type { turso_core::CompletionType::Write(ref w) => w, @@ -243,14 +247,14 @@ impl turso_core::File for File { self.vfs.pwrite(self.fd, buf, pos); w.complete(buf.len() as i32); #[allow(clippy::arc_with_non_send_sync)] - Ok(Arc::new(c)) + Ok(c) } - fn sync(&self, c: turso_core::Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { self.vfs.sync(self.fd); c.complete(0); #[allow(clippy::arc_with_non_send_sync)] - Ok(Arc::new(c)) + Ok(c) } fn size(&self) -> Result { @@ -351,7 +355,7 @@ impl turso_core::DatabaseStorage for DatabaseFile { return Err(turso_core::LimboError::NotADB); } let pos = (page_idx - 1) * size; - self.file.pread(pos, c)?; + self.file.pread(pos, c.into())?; Ok(()) } @@ -363,12 +367,12 @@ impl turso_core::DatabaseStorage for DatabaseFile { ) -> Result<()> { let size = buffer.borrow().len(); let pos = (page_idx - 1) * size; - self.file.pwrite(pos, buffer, c)?; + self.file.pwrite(pos, buffer, c.into())?; Ok(()) } fn sync(&self, c: turso_core::Completion) -> Result<()> { - let _ = self.file.sync(c)?; + let _ = self.file.sync(c.into())?; Ok(()) } diff --git a/core/io/clock.rs b/core/io/clock.rs index 3a38ad955..aae1a7633 100644 --- a/core/io/clock.rs +++ b/core/io/clock.rs @@ -4,6 +4,15 @@ pub struct Instant { pub micros: u32, } +impl From> for Instant { + fn from(value: chrono::DateTime) -> Self { + Instant { + secs: value.timestamp(), + micros: value.timestamp_subsec_micros(), + } + } +} + pub trait Clock { fn now(&self) -> Instant; } diff --git a/core/io/generic.rs b/core/io/generic.rs index 6e1faee73..ebbd67d17 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -86,7 +86,7 @@ impl File for GenericFile { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result> { + fn pread(&self, pos: usize, c: Arc) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; { @@ -99,14 +99,14 @@ impl File for GenericFile { file.read_exact(buf)?; } c.complete(0); - Ok(Arc::new(c)) + Ok(c) } fn pwrite( &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; @@ -114,14 +114,14 @@ impl File for GenericFile { let buf = buf.as_slice(); file.write_all(buf)?; c.complete(buf.len() as i32); - Ok(Arc::new(c)) + Ok(c) } - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { let mut file = self.file.borrow_mut(); file.sync_all().map_err(|err| LimboError::IOError(err))?; c.complete(0); - Ok(Arc::new(c)) + Ok(c) } fn size(&self) -> Result { diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index e204e044c..b873d4c79 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -302,7 +302,7 @@ impl File for UringFile { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result> { + fn pread(&self, pos: usize, c: Arc) -> Result> { let r = c.as_read(); trace!("pread(pos = {}, length = {})", pos, r.buf().len()); let mut io = self.io.borrow_mut(); @@ -317,7 +317,6 @@ impl File for UringFile { .user_data(io.ring.get_key()) }) }; - let c = Arc::new(c); io.ring.submit_entry(&read_e, c.clone()); Ok(c) } @@ -326,7 +325,7 @@ impl File for UringFile { &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { let mut io = self.io.borrow_mut(); let write = { @@ -339,7 +338,6 @@ impl File for UringFile { .user_data(io.ring.get_key()) }) }; - let c = Arc::new(c); let c_uring = c.clone(); io.ring.submit_entry( &write, @@ -354,7 +352,7 @@ impl File for UringFile { Ok(c) } - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { let mut io = self.io.borrow_mut(); trace!("sync()"); let sync = with_fd!(self, |fd| { @@ -362,7 +360,6 @@ impl File for UringFile { .build() .user_data(io.ring.get_key()) }); - let c = Arc::new(c); io.ring.submit_entry(&sync, c.clone()); Ok(c) } diff --git a/core/io/memory.rs b/core/io/memory.rs index b2f8bef5b..597696605 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -83,8 +83,7 @@ impl File for MemoryFile { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result> { - let c = Arc::new(c); + fn pread(&self, pos: usize, c: Arc) -> Result> { let r = c.as_read(); let buf_len = r.buf().len(); if buf_len == 0 { @@ -129,9 +128,8 @@ impl File for MemoryFile { &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { - let c = Arc::new(c); let buf = buffer.borrow(); let buf_len = buf.len(); if buf_len == 0 { @@ -167,10 +165,10 @@ impl File for MemoryFile { Ok(c) } - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { // no-op c.complete(0); - Ok(Arc::new(c)) + Ok(c) } fn size(&self) -> Result { diff --git a/core/io/mod.rs b/core/io/mod.rs index 5ba5d2a27..d462b4071 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -14,14 +14,14 @@ use std::{ pub trait File: Send + Sync { fn lock_file(&self, exclusive: bool) -> Result<()>; fn unlock_file(&self) -> Result<()>; - fn pread(&self, pos: usize, c: Completion) -> Result>; + fn pread(&self, pos: usize, c: Arc) -> Result>; fn pwrite( &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result>; - fn sync(&self, c: Completion) -> Result>; + fn sync(&self, c: Arc) -> Result>; fn size(&self) -> Result; } diff --git a/core/io/unix.rs b/core/io/unix.rs index 17cdb029c..26a63e6c6 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -335,14 +335,13 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::INFO)] - fn pread(&self, pos: usize, c: Completion) -> Result> { + fn pread(&self, pos: usize, c: Arc) -> Result> { let file = self.file.borrow(); let result = { let r = c.as_read(); let mut buf = r.buf_mut(); rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64) }; - let c = Arc::new(c); match result { Ok(n) => { trace!("pread n: {}", n); @@ -373,14 +372,13 @@ impl File for UnixFile<'_> { &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { let file = self.file.borrow(); let result = { let buf = buffer.borrow(); rustix::io::pwrite(file.as_fd(), buf.as_slice(), pos as u64) }; - let c = Arc::new(c); match result { Ok(n) => { trace!("pwrite n: {}", n); @@ -405,10 +403,9 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::INFO)] - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { let file = self.file.borrow(); let result = fs::fsync(file.as_fd()); - let c = Arc::new(c); match result { Ok(()) => { trace!("fsync"); diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 78b76ad6a..4df856eed 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -98,7 +98,7 @@ impl File for VfsFileImpl { Ok(()) } - fn pread(&self, pos: usize, c: Completion) -> Result> { + fn pread(&self, pos: usize, c: Arc) -> Result> { let r = match c.completion_type { CompletionType::Read(ref r) => r, _ => unreachable!(), @@ -113,7 +113,7 @@ impl File for VfsFileImpl { Err(LimboError::ExtensionError("pread failed".to_string())) } else { c.complete(result); - Ok(Arc::new(c)) + Ok(c) } } @@ -121,7 +121,7 @@ impl File for VfsFileImpl { &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { let buf = buffer.borrow(); let count = buf.as_slice().len(); @@ -142,18 +142,18 @@ impl File for VfsFileImpl { Err(LimboError::ExtensionError("pwrite failed".to_string())) } else { c.complete(result); - Ok(Arc::new(c)) + Ok(c) } } - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { let vfs = unsafe { &*self.vfs }; let result = unsafe { (vfs.sync)(self.file) }; if result < 0 { Err(LimboError::ExtensionError("sync failed".to_string())) } else { c.complete(0); - Ok(Arc::new(c)) + Ok(c) } } diff --git a/core/io/windows.rs b/core/io/windows.rs index cf2a60b38..3f7ea3a53 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -81,7 +81,7 @@ impl File for WindowsFile { unimplemented!() } - fn pread(&self, pos: usize, c: Completion) -> Result> { + fn pread(&self, pos: usize, c: Arc) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; let nr = { @@ -92,14 +92,14 @@ impl File for WindowsFile { buf.len() as i32 }; c.complete(nr); - Ok(Arc::new(c)) + Ok(c) } fn pwrite( &self, pos: usize, buffer: Arc>, - c: Completion, + c: Arc, ) -> Result> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; @@ -107,14 +107,14 @@ impl File for WindowsFile { let buf = buf.as_slice(); file.write_all(buf)?; c.complete(buffer.borrow().len() as i32); - Ok(Arc::new(c)) + Ok(c) } - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { let file = self.file.borrow_mut(); file.sync_all().map_err(LimboError::IOError)?; c.complete(0); - Ok(Arc::new(c)) + Ok(c) } fn size(&self) -> Result { diff --git a/core/storage/btree.rs b/core/storage/btree.rs index bcda6159d..af05eca35 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -223,6 +223,10 @@ struct DeleteInfo { enum WriteState { Start, BalanceStart, + BalanceFreePages { + curr_page: usize, + sibling_count_new: usize, + }, /// Choose which sibling pages to balance (max 3). /// Generally, the siblings involved will be the page that triggered the balancing and its left and right siblings. /// The exceptions are: @@ -2255,6 +2259,7 @@ impl BTreeCursor { } } WriteState::BalanceStart + | WriteState::BalanceFreePages { .. } | WriteState::BalanceNonRootPickSiblings | WriteState::BalanceNonRootDoBalancing => { return_if_io!(self.balance()); @@ -2333,7 +2338,9 @@ impl BTreeCursor { self.stack.pop(); return_if_io!(self.balance_non_root()); } - WriteState::BalanceNonRootPickSiblings | WriteState::BalanceNonRootDoBalancing => { + WriteState::BalanceNonRootPickSiblings + | WriteState::BalanceNonRootDoBalancing + | WriteState::BalanceFreePages { .. } => { return_if_io!(self.balance_non_root()); } WriteState::Finish => return Ok(IOResult::Done(())), @@ -2350,7 +2357,7 @@ impl BTreeCursor { "Cursor must be in balancing state" ); let state = self.state.write_info().expect("must be balancing").state; - tracing::debug!("balance_non_root(state={:?})", state); + tracing::debug!(?state); let (next_write_state, result) = match state { WriteState::Start => todo!(), WriteState::BalanceStart => todo!(), @@ -3326,13 +3333,38 @@ impl BTreeCursor { right_page_id, ); + ( + WriteState::BalanceFreePages { + curr_page: sibling_count_new, + sibling_count_new, + }, + Ok(IOResult::Done(())), + ) + } + WriteState::BalanceFreePages { + curr_page, + sibling_count_new, + } => { + let write_info = self.state.write_info().unwrap(); + let mut balance_info: std::cell::RefMut<'_, Option> = + write_info.balance_info.borrow_mut(); + let balance_info = balance_info.as_mut().unwrap(); // We have to free pages that are not used anymore - for i in sibling_count_new..balance_info.sibling_count { - let page = balance_info.pages_to_balance[i].as_ref().unwrap(); - self.pager - .free_page(Some(page.get().clone()), page.get().get().id)?; + if !((sibling_count_new..balance_info.sibling_count).contains(&curr_page)) { + (WriteState::BalanceStart, Ok(IOResult::Done(()))) + } else { + let page = balance_info.pages_to_balance[curr_page].as_ref().unwrap(); + return_if_io!(self + .pager + .free_page(Some(page.get().clone()), page.get().get().id)); + ( + WriteState::BalanceFreePages { + curr_page: curr_page + 1, + sibling_count_new, + }, + Ok(IOResult::Done(())), + ) } - (WriteState::BalanceStart, Ok(IOResult::Done(()))) } WriteState::Finish => todo!(), }; @@ -4686,7 +4718,7 @@ impl BTreeCursor { let contents = page.get().contents.as_ref().unwrap(); let next = contents.read_u32(0); - self.pager.free_page(Some(page), next_page as usize)?; + return_if_io!(self.pager.free_page(Some(page), next_page as usize)); if next != 0 { self.overflow_state = Some(OverflowState::ProcessPage { next_page: next }); @@ -4873,7 +4905,7 @@ impl BTreeCursor { let page = self.stack.top(); let page_id = page.get().get().id; - self.pager.free_page(Some(page.get()), page_id)?; + return_if_io!(self.pager.free_page(Some(page.get()), page_id)); if self.stack.has_parent() { self.stack.pop(); diff --git a/core/storage/database.rs b/core/storage/database.rs index c2ad2c57a..a55c819e3 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -42,7 +42,7 @@ impl DatabaseStorage for DatabaseFile { return Err(LimboError::NotADB); } let pos = (page_idx - 1) * size; - self.file.pread(pos, c)?; + self.file.pread(pos, c.into())?; Ok(()) } @@ -59,13 +59,13 @@ impl DatabaseStorage for DatabaseFile { assert!(buffer_size <= 65536); assert_eq!(buffer_size & (buffer_size - 1), 0); let pos = (page_idx - 1) * buffer_size; - self.file.pwrite(pos, buffer, c)?; + self.file.pwrite(pos, buffer, c.into())?; Ok(()) } #[instrument(skip_all, level = Level::INFO)] fn sync(&self, c: Completion) -> Result<()> { - let _ = self.file.sync(c)?; + let _ = self.file.sync(c.into())?; Ok(()) } @@ -102,7 +102,7 @@ impl DatabaseStorage for FileMemoryStorage { return Err(LimboError::NotADB); } let pos = (page_idx - 1) * size; - self.file.pread(pos, c)?; + self.file.pread(pos, c.into())?; Ok(()) } @@ -118,13 +118,13 @@ impl DatabaseStorage for FileMemoryStorage { assert!(buffer_size <= 65536); assert_eq!(buffer_size & (buffer_size - 1), 0); let pos = (page_idx - 1) * buffer_size; - self.file.pwrite(pos, buffer, c)?; + self.file.pwrite(pos, buffer, c.into())?; Ok(()) } #[instrument(skip_all, level = Level::INFO)] fn sync(&self, c: Completion) -> Result<()> { - let _ = self.file.sync(c)?; + let _ = self.file.sync(c.into())?; Ok(()) } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 09cd09b90..0ad3d1839 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -280,6 +280,7 @@ pub struct Pager { /// to change it. page_size: Cell>, reserved_space: OnceCell, + free_page_state: RefCell, } #[derive(Debug, Copy, Clone)] @@ -303,6 +304,18 @@ enum AllocatePage1State { Done, } +#[derive(Debug, Clone)] +enum FreePageState { + Start, + AddToTrunk { + page: Arc, + trunk_page: Option>, + }, + NewTrunk { + page: Arc, + }, +} + impl Pager { pub fn new( db_file: Arc, @@ -342,6 +355,7 @@ impl Pager { state: CacheFlushState::Start, in_flight_writes: Rc::new(RefCell::new(0)), }), + free_page_state: RefCell::new(FreePageState::Start), }) } @@ -1016,18 +1030,21 @@ impl Pager { } pub fn checkpoint_shutdown(&self, wal_checkpoint_disabled: bool) -> Result<()> { - let mut attempts = 0; + let mut _attempts = 0; { let mut wal = self.wal.borrow_mut(); // fsync the wal syncronously before beginning checkpoint while let Ok(IOResult::IO) = wal.sync() { - if attempts >= 10 { - return Err(LimboError::InternalError( - "Failed to fsync WAL before final checkpoint, fd likely closed".into(), - )); - } + // TODO: for now forget about timeouts as they fail regularly in SIM + // need to think of a better way to do this + + // if attempts >= 1000 { + // return Err(LimboError::InternalError( + // "Failed to fsync WAL before final checkpoint, fd likely closed".into(), + // )); + // } self.io.run_once()?; - attempts += 1; + _attempts += 1; } } self.wal_checkpoint(wal_checkpoint_disabled)?; @@ -1070,7 +1087,7 @@ impl Pager { // Providing a page is optional, if provided it will be used to avoid reading the page from disk. // This is implemented in accordance with sqlite freepage2() function. #[instrument(skip_all, level = Level::INFO)] - pub fn free_page(&self, page: Option, page_id: usize) -> Result<()> { + pub fn free_page(&self, page: Option, page_id: usize) -> Result> { tracing::trace!("free_page(page_id={})", page_id); const TRUNK_PAGE_HEADER_SIZE: usize = 8; const LEAF_ENTRY_SIZE: usize = 4; @@ -1079,65 +1096,100 @@ impl Pager { const TRUNK_PAGE_NEXT_PAGE_OFFSET: usize = 0; // Offset to next trunk page pointer const TRUNK_PAGE_LEAF_COUNT_OFFSET: usize = 4; // Offset to leaf count - if page_id < 2 || page_id > header_accessor::get_database_size(self)? as usize { - return Err(LimboError::Corrupt(format!( - "Invalid page number {page_id} for free operation" - ))); - } + let mut state = self.free_page_state.borrow_mut(); + tracing::debug!(?state); + loop { + match &mut *state { + FreePageState::Start => { + if page_id < 2 || page_id > header_accessor::get_database_size(self)? as usize { + return Err(LimboError::Corrupt(format!( + "Invalid page number {page_id} for free operation" + ))); + } - let page = match page { - Some(page) => { - assert_eq!(page.get().id, page_id, "Page id mismatch"); - page - } - None => self.read_page(page_id)?, - }; + let page = match page.clone() { + Some(page) => { + assert_eq!(page.get().id, page_id, "Page id mismatch"); + page + } + None => self.read_page(page_id)?, + }; + header_accessor::set_freelist_pages( + self, + header_accessor::get_freelist_pages(self)? + 1, + )?; - header_accessor::set_freelist_pages(self, header_accessor::get_freelist_pages(self)? + 1)?; + let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?; - let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?; + if trunk_page_id != 0 { + *state = FreePageState::AddToTrunk { + page, + trunk_page: None, + }; + } else { + *state = FreePageState::NewTrunk { page }; + } + } + FreePageState::AddToTrunk { page, trunk_page } => { + let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?; + if trunk_page.is_none() { + // Add as leaf to current trunk + trunk_page.replace(self.read_page(trunk_page_id as usize)?); + } + let trunk_page = trunk_page.as_ref().unwrap(); + if trunk_page.is_locked() || !trunk_page.is_loaded() { + return Ok(IOResult::IO); + } - if trunk_page_id != 0 { - // Add as leaf to current trunk - let trunk_page = self.read_page(trunk_page_id as usize)?; - let trunk_page_contents = trunk_page.get().contents.as_ref().unwrap(); - let number_of_leaf_pages = trunk_page_contents.read_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET); + let trunk_page_contents = trunk_page.get().contents.as_ref().unwrap(); + let number_of_leaf_pages = + trunk_page_contents.read_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET); - // Reserve 2 slots for the trunk page header which is 8 bytes or 2*LEAF_ENTRY_SIZE - let max_free_list_entries = (self.usable_space() / LEAF_ENTRY_SIZE) - RESERVED_SLOTS; + // Reserve 2 slots for the trunk page header which is 8 bytes or 2*LEAF_ENTRY_SIZE + let max_free_list_entries = + (self.usable_space() / LEAF_ENTRY_SIZE) - RESERVED_SLOTS; - if number_of_leaf_pages < max_free_list_entries as u32 { - trunk_page.set_dirty(); - self.add_dirty(trunk_page_id as usize); + if number_of_leaf_pages < max_free_list_entries as u32 { + trunk_page.set_dirty(); + self.add_dirty(trunk_page_id as usize); - trunk_page_contents - .write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, number_of_leaf_pages + 1); - trunk_page_contents.write_u32( - TRUNK_PAGE_HEADER_SIZE + (number_of_leaf_pages as usize * LEAF_ENTRY_SIZE), - page_id as u32, - ); - page.clear_uptodate(); - page.clear_loaded(); + trunk_page_contents + .write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, number_of_leaf_pages + 1); + trunk_page_contents.write_u32( + TRUNK_PAGE_HEADER_SIZE + + (number_of_leaf_pages as usize * LEAF_ENTRY_SIZE), + page_id as u32, + ); + page.clear_uptodate(); - return Ok(()); + break; + } + } + FreePageState::NewTrunk { page } => { + if page.is_locked() || !page.is_loaded() { + return Ok(IOResult::IO); + } + // If we get here, need to make this page a new trunk + page.set_dirty(); + self.add_dirty(page_id); + + let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?; + + let contents = page.get().contents.as_mut().unwrap(); + // Point to previous trunk + contents.write_u32(TRUNK_PAGE_NEXT_PAGE_OFFSET, trunk_page_id); + // Zero leaf count + contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0); + // Update page 1 to point to new trunk + header_accessor::set_freelist_trunk_page(self, page_id as u32)?; + // Clear flags + page.clear_uptodate(); + break; + } } } - - // If we get here, need to make this page a new trunk - page.set_dirty(); - self.add_dirty(page_id); - - let contents = page.get().contents.as_mut().unwrap(); - // Point to previous trunk - contents.write_u32(TRUNK_PAGE_NEXT_PAGE_OFFSET, trunk_page_id); - // Zero leaf count - contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0); - // Update page 1 to point to new trunk - header_accessor::set_freelist_trunk_page(self, page_id as u32)?; - // Clear flags - page.clear_uptodate(); - page.clear_loaded(); - Ok(()) + *state = FreePageState::Start; + Ok(IOResult::Done(())) } #[instrument(skip_all, level = Level::INFO)] diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index abbdc637c..12a96b742 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -752,12 +752,13 @@ pub fn begin_read_page( Ok(()) } +#[instrument(skip_all, level = Level::INFO)] pub fn finish_read_page( page_idx: usize, buffer_ref: Arc>, page: PageRef, ) -> Result<()> { - tracing::trace!("finish_read_btree_page(page_idx = {})", page_idx); + tracing::trace!(page_idx); let pos = if page_idx == DATABASE_HEADER_PAGE_ID { DATABASE_HEADER_SIZE } else { @@ -1490,7 +1491,7 @@ pub fn read_entire_wal_dumb(file: &Arc) -> Result, header: &WalHeader) -> Result< }; #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete))); - io.pwrite(0, buffer.clone(), c)?; + io.pwrite(0, buffer.clone(), c.into())?; Ok(()) } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index f9929dc8e..b731e650d 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -890,7 +890,7 @@ impl Wal for WalFile { }), })); let shared = self.get_shared(); - shared.file.sync(completion)?; + shared.file.sync(completion.into())?; self.sync_state.set(SyncState::Syncing); Ok(IOResult::IO) } diff --git a/simulator/runner/cli.rs b/simulator/runner/cli.rs index 4c8bee186..f35555aa1 100644 --- a/simulator/runner/cli.rs +++ b/simulator/runner/cli.rs @@ -108,11 +108,23 @@ pub struct SimulatorCLI { #[clap(long, help = "disable Reopen-Database fault", default_value_t = false)] pub disable_reopen_database: bool, #[clap( - long = "latency_prob", + long = "latency-prob", help = "added IO latency probability", - default_value_t = 0 + default_value_t = 1 )] pub latency_probability: usize, + #[clap( + long, + help = "Minimum tick time in microseconds for simulated time", + default_value_t = 1 + )] + pub min_tick: u64, + #[clap( + long, + help = "Maximum tick time in microseconds for simulated time", + default_value_t = 30 + )] + pub max_tick: u64, #[clap(long, help = "Enable experimental MVCC feature")] pub experimental_mvcc: bool, #[clap(long, help = "Enable experimental indexing feature")] diff --git a/simulator/runner/clock.rs b/simulator/runner/clock.rs new file mode 100644 index 000000000..ef687c5c1 --- /dev/null +++ b/simulator/runner/clock.rs @@ -0,0 +1,35 @@ +use std::cell::RefCell; + +use chrono::{DateTime, Utc}; +use rand::Rng; +use rand_chacha::ChaCha8Rng; + +#[derive(Debug)] +pub struct SimulatorClock { + curr_time: RefCell>, + rng: RefCell, + min_tick: u64, + max_tick: u64, +} + +impl SimulatorClock { + pub fn new(rng: ChaCha8Rng, min_tick: u64, max_tick: u64) -> Self { + Self { + curr_time: RefCell::new(Utc::now()), + rng: RefCell::new(rng), + min_tick, + max_tick, + } + } + + pub fn now(&self) -> DateTime { + let mut time = self.curr_time.borrow_mut(); + let nanos = self + .rng + .borrow_mut() + .gen_range(self.min_tick..self.max_tick); + let nanos = std::time::Duration::from_micros(nanos); + *time += nanos; + *time + } +} diff --git a/simulator/runner/env.rs b/simulator/runner/env.rs index 675b2be08..fa5859af3 100644 --- a/simulator/runner/env.rs +++ b/simulator/runner/env.rs @@ -99,6 +99,8 @@ impl SimulatorEnv { self.opts.seed, self.opts.page_size, self.opts.latency_probability, + self.opts.min_tick, + self.opts.max_tick, ) .unwrap(), ); @@ -245,10 +247,20 @@ impl SimulatorEnv { latency_probability: cli_opts.latency_probability, experimental_mvcc: cli_opts.experimental_mvcc, experimental_indexes: cli_opts.experimental_indexes, + min_tick: cli_opts.min_tick, + max_tick: cli_opts.max_tick, }; - let io = - Arc::new(SimulatorIO::new(seed, opts.page_size, cli_opts.latency_probability).unwrap()); + let io = Arc::new( + SimulatorIO::new( + seed, + opts.page_size, + cli_opts.latency_probability, + cli_opts.min_tick, + cli_opts.max_tick, + ) + .unwrap(), + ); // Remove existing database file if it exists let db_path = paths.db(&simulation_type, &SimulationPhase::Test); @@ -407,6 +419,8 @@ pub(crate) struct SimulatorOpts { pub(crate) latency_probability: usize, pub(crate) experimental_mvcc: bool, pub(crate) experimental_indexes: bool, + pub min_tick: u64, + pub max_tick: u64, } #[derive(Debug, Clone)] diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index b53f56af4..d1682c353 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -1,14 +1,15 @@ use std::{ cell::{Cell, RefCell}, + fmt::Debug, sync::Arc, }; use rand::Rng as _; use rand_chacha::ChaCha8Rng; use tracing::{instrument, Level}; -use turso_core::{CompletionType, File, Result}; +use turso_core::{File, Result}; -use crate::model::FAULT_ERROR_MSG; +use crate::{model::FAULT_ERROR_MSG, runner::clock::SimulatorClock}; pub(crate) struct SimulatorFile { pub(crate) inner: Arc, pub(crate) fault: Cell, @@ -38,6 +39,23 @@ pub(crate) struct SimulatorFile { pub latency_probability: usize, pub sync_completion: RefCell>>, + pub queued_io: RefCell>, + pub clock: Arc, +} + +type IoOperation = Box Result>>; + +pub struct DelayedIo { + pub time: turso_core::Instant, + pub op: IoOperation, +} + +impl Debug for DelayedIo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DelayedIo") + .field("time", &self.time) + .finish() + } } unsafe impl Send for SimulatorFile {} @@ -78,11 +96,37 @@ impl SimulatorFile { } #[instrument(skip_all, level = Level::TRACE)] - fn generate_latency_duration(&self) -> Option { + fn generate_latency_duration(&self) -> Option { let mut rng = self.rng.borrow_mut(); // Chance to introduce some latency rng.gen_bool(self.latency_probability as f64 / 100.0) - .then(|| std::time::Duration::from_millis(rng.gen_range(20..50))) + .then(|| { + let now = self.clock.now(); + let sum = now + std::time::Duration::from_millis(rng.gen_range(5..20)); + sum.into() + }) + } + + #[instrument(skip_all, level = Level::DEBUG)] + pub fn run_queued_io(&self, now: turso_core::Instant) -> Result<()> { + let mut queued_io = self.queued_io.borrow_mut(); + // TODO: as we are not in version 1.87 we cannot use `extract_if` + // so we have to do something different to achieve the same thing + // This code was acquired from: https://doc.rust-lang.org/beta/std/vec/struct.Vec.html#method.extract_if + let range = 0..queued_io.len(); + let mut i = range.start; + let end_items = queued_io.len() - range.end; + + while i < queued_io.len() - end_items { + if queued_io[i].time <= now { + let io = queued_io.remove(i); + // your code here + (io.op)(self)?; + } else { + i += 1; + } + } + Ok(()) } } @@ -108,7 +152,7 @@ impl File for SimulatorFile { fn pread( &self, pos: usize, - mut c: turso_core::Completion, + c: Arc, ) -> Result> { self.nr_pread_calls.set(self.nr_pread_calls.get() + 1); if self.fault.get() { @@ -119,31 +163,22 @@ impl File for SimulatorFile { )); } if let Some(latency) = self.generate_latency_duration() { - let CompletionType::Read(read_completion) = &mut c.completion_type else { - unreachable!(); - }; - let before = self.rng.borrow_mut().gen_bool(0.5); - let dummy_complete = Box::new(|_, _| {}); - let prev_complete = std::mem::replace(&mut read_completion.complete, dummy_complete); - let new_complete = move |res, bytes_read| { - if before { - std::thread::sleep(latency); - } - (prev_complete)(res, bytes_read); - if !before { - std::thread::sleep(latency); - } - }; - read_completion.complete = Box::new(new_complete); - }; - self.inner.pread(pos, c) + let cloned_c = c.clone(); + let op = Box::new(move |file: &SimulatorFile| file.inner.pread(pos, cloned_c)); + self.queued_io + .borrow_mut() + .push(DelayedIo { time: latency, op }); + Ok(c) + } else { + self.inner.pread(pos, c) + } } fn pwrite( &self, pos: usize, buffer: Arc>, - mut c: turso_core::Completion, + c: Arc, ) -> Result> { self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1); if self.fault.get() { @@ -154,53 +189,40 @@ impl File for SimulatorFile { )); } if let Some(latency) = self.generate_latency_duration() { - let CompletionType::Write(write_completion) = &mut c.completion_type else { - unreachable!(); - }; - let before = self.rng.borrow_mut().gen_bool(0.5); - let dummy_complete = Box::new(|_| {}); - let prev_complete = std::mem::replace(&mut write_completion.complete, dummy_complete); - let new_complete = move |res| { - if before { - std::thread::sleep(latency); - } - (prev_complete)(res); - if !before { - std::thread::sleep(latency); - } - }; - write_completion.complete = Box::new(new_complete); - }; - self.inner.pwrite(pos, buffer, c) + let cloned_c = c.clone(); + let op = Box::new(move |file: &SimulatorFile| file.inner.pwrite(pos, buffer, cloned_c)); + self.queued_io + .borrow_mut() + .push(DelayedIo { time: latency, op }); + Ok(c) + } else { + self.inner.pwrite(pos, buffer, c) + } } - fn sync(&self, mut c: turso_core::Completion) -> Result> { + fn sync(&self, c: Arc) -> Result> { self.nr_sync_calls.set(self.nr_sync_calls.get() + 1); if self.fault.get() { // TODO: Enable this when https://github.com/tursodatabase/turso/issues/2091 is fixed. tracing::debug!("ignoring sync fault because it causes false positives with current simulator design"); self.fault.set(false); } - if let Some(latency) = self.generate_latency_duration() { - let CompletionType::Sync(sync_completion) = &mut c.completion_type else { - unreachable!(); - }; - let before = self.rng.borrow_mut().gen_bool(0.5); - let dummy_complete = Box::new(|_| {}); - let prev_complete = std::mem::replace(&mut sync_completion.complete, dummy_complete); - let new_complete = move |res| { - if before { - std::thread::sleep(latency); - } - (prev_complete)(res); - if !before { - std::thread::sleep(latency); - } - }; - sync_completion.complete = Box::new(new_complete); + let c = if let Some(latency) = self.generate_latency_duration() { + let cloned_c = c.clone(); + let op = Box::new(|file: &SimulatorFile| -> Result<_> { + let c = file.inner.sync(cloned_c)?; + *file.sync_completion.borrow_mut() = Some(c.clone()); + Ok(c) + }); + self.queued_io + .borrow_mut() + .push(DelayedIo { time: latency, op }); + c + } else { + let c = self.inner.sync(c)?; + *self.sync_completion.borrow_mut() = Some(c.clone()); + c }; - let c = self.inner.sync(c)?; - *self.sync_completion.borrow_mut() = Some(c.clone()); Ok(c) } diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index 5f1cc7d7c..7c888cc7f 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -7,7 +7,10 @@ use rand::{RngCore, SeedableRng}; use rand_chacha::ChaCha8Rng; use turso_core::{Clock, Instant, MemoryIO, OpenFlags, PlatformIO, Result, IO}; -use crate::{model::FAULT_ERROR_MSG, runner::file::SimulatorFile}; +use crate::{ + model::FAULT_ERROR_MSG, + runner::{clock::SimulatorClock, file::SimulatorFile}, +}; pub(crate) struct SimulatorIO { pub(crate) inner: Box, @@ -18,18 +21,27 @@ pub(crate) struct SimulatorIO { pub(crate) page_size: usize, seed: u64, latency_probability: usize, + clock: Arc, } unsafe impl Send for SimulatorIO {} unsafe impl Sync for SimulatorIO {} impl SimulatorIO { - pub(crate) fn new(seed: u64, page_size: usize, latency_probability: usize) -> Result { + pub(crate) fn new( + seed: u64, + page_size: usize, + latency_probability: usize, + min_tick: u64, + max_tick: u64, + ) -> Result { let inner = Box::new(PlatformIO::new()?); let fault = Cell::new(false); let files = RefCell::new(Vec::new()); let rng = RefCell::new(ChaCha8Rng::seed_from_u64(seed)); let nr_run_once_faults = Cell::new(0); + let clock = SimulatorClock::new(ChaCha8Rng::seed_from_u64(seed), min_tick, max_tick); + Ok(Self { inner, fault, @@ -39,6 +51,7 @@ impl SimulatorIO { page_size, seed, latency_probability, + clock: Arc::new(clock), }) } @@ -59,10 +72,7 @@ impl SimulatorIO { impl Clock for SimulatorIO { fn now(&self) -> Instant { - Instant { - secs: 1704067200, // 2024-01-01 00:00:00 UTC - micros: 0, - } + self.clock.now().into() } } @@ -87,6 +97,8 @@ impl IO for SimulatorIO { rng: RefCell::new(ChaCha8Rng::seed_from_u64(self.seed)), latency_probability: self.latency_probability, sync_completion: RefCell::new(None), + queued_io: RefCell::new(Vec::new()), + clock: self.clock.clone(), }); self.files.borrow_mut().push(file.clone()); Ok(file) @@ -107,6 +119,10 @@ impl IO for SimulatorIO { FAULT_ERROR_MSG.into(), )); } + let now = self.now(); + for file in self.files.borrow().iter() { + file.run_queued_io(now)?; + } self.inner.run_once()?; Ok(()) } diff --git a/simulator/runner/mod.rs b/simulator/runner/mod.rs index ccb0563ab..b56335da5 100644 --- a/simulator/runner/mod.rs +++ b/simulator/runner/mod.rs @@ -1,5 +1,6 @@ pub mod bugbase; pub mod cli; +pub mod clock; pub mod differential; pub mod doublecheck; pub mod env; diff --git a/tests/integration/query_processing/test_btree.rs b/tests/integration/query_processing/test_btree.rs index 705557687..70f02e2b1 100644 --- a/tests/integration/query_processing/test_btree.rs +++ b/tests/integration/query_processing/test_btree.rs @@ -437,7 +437,7 @@ fn write_at(io: &impl IO, file: Arc, offset: usize, data: &[u8]) { let drop_fn = Rc::new(move |_| {}); #[allow(clippy::arc_with_non_send_sync)] let buffer = Arc::new(RefCell::new(Buffer::new(Pin::new(data.to_vec()), drop_fn))); - let result = file.pwrite(offset, buffer, completion).unwrap(); + let result = file.pwrite(offset, buffer, completion.into()).unwrap(); while !result.is_completed() { io.run_once().unwrap(); }