diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 871d49d78..ad936555b 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -223,6 +223,10 @@ struct DeleteInfo { enum WriteState { Start, BalanceStart, + BalanceFreePages { + curr_page: usize, + sibling_count_new: usize, + }, /// Choose which sibling pages to balance (max 3). /// Generally, the siblings involved will be the page that triggered the balancing and its left and right siblings. /// The exceptions are: @@ -2255,6 +2259,7 @@ impl BTreeCursor { } } WriteState::BalanceStart + | WriteState::BalanceFreePages { .. } | WriteState::BalanceNonRootPickSiblings | WriteState::BalanceNonRootDoBalancing => { return_if_io!(self.balance()); @@ -2333,7 +2338,9 @@ impl BTreeCursor { self.stack.pop(); return_if_io!(self.balance_non_root()); } - WriteState::BalanceNonRootPickSiblings | WriteState::BalanceNonRootDoBalancing => { + WriteState::BalanceNonRootPickSiblings + | WriteState::BalanceNonRootDoBalancing + | WriteState::BalanceFreePages { .. } => { return_if_io!(self.balance_non_root()); } WriteState::Finish => return Ok(IOResult::Done(())), @@ -2350,7 +2357,7 @@ impl BTreeCursor { "Cursor must be in balancing state" ); let state = self.state.write_info().expect("must be balancing").state; - tracing::debug!("balance_non_root(state={:?})", state); + tracing::debug!(?state); let (next_write_state, result) = match state { WriteState::Start => todo!(), WriteState::BalanceStart => todo!(), @@ -3322,13 +3329,38 @@ impl BTreeCursor { right_page_id, ); + ( + WriteState::BalanceFreePages { + curr_page: sibling_count_new, + sibling_count_new, + }, + Ok(CursorResult::Ok(())), + ) + } + WriteState::BalanceFreePages { + curr_page, + sibling_count_new, + } => { + let write_info = self.state.write_info().unwrap(); + let mut balance_info: std::cell::RefMut<'_, Option> = + write_info.balance_info.borrow_mut(); + let balance_info = balance_info.as_mut().unwrap(); // We have to free pages that are not used anymore - for i in sibling_count_new..balance_info.sibling_count { - let page = balance_info.pages_to_balance[i].as_ref().unwrap(); - self.pager - .free_page(Some(page.get().clone()), page.get().get().id)?; + if !((sibling_count_new..balance_info.sibling_count).contains(&curr_page)) { + (WriteState::BalanceStart, Ok(IOResult::Done(()))) + } else { + let page = balance_info.pages_to_balance[curr_page].as_ref().unwrap(); + return_if_io!(self + .pager + .free_page(Some(page.get().clone()), page.get().get().id)); + ( + WriteState::BalanceFreePages { + curr_page: curr_page + 1, + sibling_count_new, + }, + Ok(CursorResult::Ok(())), + ) } - (WriteState::BalanceStart, Ok(IOResult::Done(()))) } WriteState::Finish => todo!(), }; @@ -4679,7 +4711,7 @@ impl BTreeCursor { let contents = page.get().contents.as_ref().unwrap(); let next = contents.read_u32(0); - self.pager.free_page(Some(page), next_page as usize)?; + return_if_io!(self.pager.free_page(Some(page), next_page as usize)); if next != 0 { self.overflow_state = Some(OverflowState::ProcessPage { next_page: next }); @@ -4866,7 +4898,7 @@ impl BTreeCursor { let page = self.stack.top(); let page_id = page.get().get().id; - self.pager.free_page(Some(page.get()), page_id)?; + return_if_io!(self.pager.free_page(Some(page.get()), page_id)); if self.stack.has_parent() { self.stack.pop(); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index fa20b1f9d..7bc3ec894 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -280,6 +280,7 @@ pub struct Pager { /// to change it. page_size: Cell>, reserved_space: OnceCell, + free_page_state: RefCell, } #[derive(Debug, Copy, Clone)] @@ -303,6 +304,18 @@ enum AllocatePage1State { Done, } +#[derive(Debug, Clone)] +enum FreePageState { + Start, + AddToTrunk { + page: Arc, + trunk_page: Option>, + }, + NewTrunk { + page: Arc, + }, +} + impl Pager { pub fn new( db_file: Arc, @@ -342,6 +355,7 @@ impl Pager { state: CacheFlushState::Start, in_flight_writes: Rc::new(RefCell::new(0)), }), + free_page_state: RefCell::new(FreePageState::Start), }) } @@ -1073,7 +1087,7 @@ impl Pager { // Providing a page is optional, if provided it will be used to avoid reading the page from disk. // This is implemented in accordance with sqlite freepage2() function. #[instrument(skip_all, level = Level::INFO)] - pub fn free_page(&self, page: Option, page_id: usize) -> Result<()> { + pub fn free_page(&self, page: Option, page_id: usize) -> Result> { tracing::trace!("free_page(page_id={})", page_id); const TRUNK_PAGE_HEADER_SIZE: usize = 8; const LEAF_ENTRY_SIZE: usize = 4; @@ -1082,65 +1096,100 @@ impl Pager { const TRUNK_PAGE_NEXT_PAGE_OFFSET: usize = 0; // Offset to next trunk page pointer const TRUNK_PAGE_LEAF_COUNT_OFFSET: usize = 4; // Offset to leaf count - if page_id < 2 || page_id > header_accessor::get_database_size(self)? as usize { - return Err(LimboError::Corrupt(format!( - "Invalid page number {page_id} for free operation" - ))); - } + let mut state = self.free_page_state.borrow_mut(); + tracing::debug!(?state); + loop { + match &mut *state { + FreePageState::Start => { + if page_id < 2 || page_id > header_accessor::get_database_size(self)? as usize { + return Err(LimboError::Corrupt(format!( + "Invalid page number {page_id} for free operation" + ))); + } - let page = match page { - Some(page) => { - assert_eq!(page.get().id, page_id, "Page id mismatch"); - page - } - None => self.read_page(page_id)?, - }; + let page = match page.clone() { + Some(page) => { + assert_eq!(page.get().id, page_id, "Page id mismatch"); + page + } + None => self.read_page(page_id)?, + }; + header_accessor::set_freelist_pages( + self, + header_accessor::get_freelist_pages(self)? + 1, + )?; - header_accessor::set_freelist_pages(self, header_accessor::get_freelist_pages(self)? + 1)?; + let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?; - let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?; + if trunk_page_id != 0 { + *state = FreePageState::AddToTrunk { + page, + trunk_page: None, + }; + } else { + *state = FreePageState::NewTrunk { page }; + } + } + FreePageState::AddToTrunk { page, trunk_page } => { + let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?; + if trunk_page.is_none() { + // Add as leaf to current trunk + trunk_page.replace(self.read_page(trunk_page_id as usize)?); + } + let trunk_page = trunk_page.as_ref().unwrap(); + if trunk_page.is_locked() || !trunk_page.is_loaded() { + return Ok(CursorResult::IO); + } - if trunk_page_id != 0 { - // Add as leaf to current trunk - let trunk_page = self.read_page(trunk_page_id as usize)?; - let trunk_page_contents = trunk_page.get().contents.as_ref().unwrap(); - let number_of_leaf_pages = trunk_page_contents.read_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET); + let trunk_page_contents = trunk_page.get().contents.as_ref().unwrap(); + let number_of_leaf_pages = + trunk_page_contents.read_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET); - // Reserve 2 slots for the trunk page header which is 8 bytes or 2*LEAF_ENTRY_SIZE - let max_free_list_entries = (self.usable_space() / LEAF_ENTRY_SIZE) - RESERVED_SLOTS; + // Reserve 2 slots for the trunk page header which is 8 bytes or 2*LEAF_ENTRY_SIZE + let max_free_list_entries = + (self.usable_space() / LEAF_ENTRY_SIZE) - RESERVED_SLOTS; - if number_of_leaf_pages < max_free_list_entries as u32 { - trunk_page.set_dirty(); - self.add_dirty(trunk_page_id as usize); + if number_of_leaf_pages < max_free_list_entries as u32 { + trunk_page.set_dirty(); + self.add_dirty(trunk_page_id as usize); - trunk_page_contents - .write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, number_of_leaf_pages + 1); - trunk_page_contents.write_u32( - TRUNK_PAGE_HEADER_SIZE + (number_of_leaf_pages as usize * LEAF_ENTRY_SIZE), - page_id as u32, - ); - page.clear_uptodate(); - page.clear_loaded(); + trunk_page_contents + .write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, number_of_leaf_pages + 1); + trunk_page_contents.write_u32( + TRUNK_PAGE_HEADER_SIZE + + (number_of_leaf_pages as usize * LEAF_ENTRY_SIZE), + page_id as u32, + ); + page.clear_uptodate(); - return Ok(()); + break; + } + } + FreePageState::NewTrunk { page } => { + if page.is_locked() || !page.is_loaded() { + return Ok(CursorResult::IO); + } + // If we get here, need to make this page a new trunk + page.set_dirty(); + self.add_dirty(page_id); + + let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?; + + let contents = page.get().contents.as_mut().unwrap(); + // Point to previous trunk + contents.write_u32(TRUNK_PAGE_NEXT_PAGE_OFFSET, trunk_page_id); + // Zero leaf count + contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0); + // Update page 1 to point to new trunk + header_accessor::set_freelist_trunk_page(self, page_id as u32)?; + // Clear flags + page.clear_uptodate(); + break; + } } } - - // If we get here, need to make this page a new trunk - page.set_dirty(); - self.add_dirty(page_id); - - let contents = page.get().contents.as_mut().unwrap(); - // Point to previous trunk - contents.write_u32(TRUNK_PAGE_NEXT_PAGE_OFFSET, trunk_page_id); - // Zero leaf count - contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0); - // Update page 1 to point to new trunk - header_accessor::set_freelist_trunk_page(self, page_id as u32)?; - // Clear flags - page.clear_uptodate(); - page.clear_loaded(); - Ok(()) + *state = FreePageState::Start; + Ok(CursorResult::Ok(())) } #[instrument(skip_all, level = Level::INFO)] diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index e8e496173..12a96b742 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -752,12 +752,13 @@ pub fn begin_read_page( Ok(()) } +#[instrument(skip_all, level = Level::INFO)] pub fn finish_read_page( page_idx: usize, buffer_ref: Arc>, page: PageRef, ) -> Result<()> { - tracing::trace!("finish_read_btree_page(page_idx = {})", page_idx); + tracing::trace!(page_idx); let pos = if page_idx == DATABASE_HEADER_PAGE_ID { DATABASE_HEADER_SIZE } else { diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index 81bc411b2..5eff245b5 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -107,7 +107,6 @@ impl SimulatorFile { #[instrument(skip_all, level = Level::DEBUG)] pub fn run_queued_io(&self, now: std::time::Instant) -> Result<()> { let mut queued_io = self.queued_io.borrow_mut(); - tracing::debug!(?queued_io); // TODO: as we are not in version 1.87 we cannot use `extract_if` // so we have to do something different to achieve the same thing // This code was acquired from: https://doc.rust-lang.org/beta/std/vec/struct.Vec.html#method.extract_if