From 6b0ed0846552530563cc9a4ce6bfa7be789b1860 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Thu, 21 Aug 2025 12:29:37 -0300 Subject: [PATCH] `read_page` should return No Completion when have a page cache hit --- core/storage/btree.rs | 898 +++++++++++++++++++++++------------------- core/storage/pager.rs | 356 +++++++++-------- 2 files changed, 688 insertions(+), 566 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 0a3d67cd8..75ced6e49 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -680,21 +680,25 @@ impl BTreeCursor { /// This is done by checking if the root page has no cells. #[instrument(skip_all, level = Level::DEBUG)] fn is_empty_table(&self) -> Result> { - let state = self.is_empty_table_state.borrow().clone(); - match state { - EmptyTableState::Start => { - if let Some(mv_cursor) = &self.mv_cursor { - let mv_cursor = mv_cursor.borrow(); - return Ok(IOResult::Done(mv_cursor.is_empty())); + loop { + let state = self.is_empty_table_state.borrow().clone(); + match state { + EmptyTableState::Start => { + if let Some(mv_cursor) = &self.mv_cursor { + let mv_cursor = mv_cursor.borrow(); + return Ok(IOResult::Done(mv_cursor.is_empty())); + } + let (page, c) = self.pager.read_page(self.root_page)?; + *self.is_empty_table_state.borrow_mut() = EmptyTableState::ReadPage { page }; + if let Some(c) = c { + io_yield_one!(c); + } + } + EmptyTableState::ReadPage { page } => { + turso_assert!(page.is_loaded(), "page should be loaded"); + let cell_count = page.get().contents.as_ref().unwrap().cell_count(); + break Ok(IOResult::Done(cell_count == 0)); } - let (page, c) = self.pager.read_page(self.root_page)?; - *self.is_empty_table_state.borrow_mut() = EmptyTableState::ReadPage { page }; - io_yield_one!(c); - } - EmptyTableState::ReadPage { page } => { - turso_assert!(page.is_loaded(), "page should be loaded"); - let cell_count = page.get().contents.as_ref().unwrap().cell_count(); - Ok(IOResult::Done(cell_count == 0)) } } } @@ -723,7 +727,10 @@ impl BTreeCursor { self.stack.set_cell_index(past_rightmost_pointer); let (page, c) = self.read_page(rightmost_pointer as usize)?; self.stack.push_backwards(page); - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } + continue; } } if cell_idx >= cell_count as i32 { @@ -787,7 +794,9 @@ impl BTreeCursor { let (mem_page, c) = self.read_page(left_child_page as usize)?; self.stack.push_backwards(mem_page); - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } } } @@ -800,67 +809,75 @@ impl BTreeCursor { start_next_page: u32, payload_size: u64, ) -> Result> { - if self.read_overflow_state.borrow().is_none() { - let (page, c) = self.read_page(start_next_page as usize)?; - *self.read_overflow_state.borrow_mut() = Some(ReadPayloadOverflow { - payload: payload.to_vec(), - next_page: start_next_page, - remaining_to_read: payload_size as usize - payload.len(), - page, - }); - io_yield_one!(c); + loop { + if self.read_overflow_state.borrow().is_none() { + let (page, c) = self.read_page(start_next_page as usize)?; + *self.read_overflow_state.borrow_mut() = Some(ReadPayloadOverflow { + payload: payload.to_vec(), + next_page: start_next_page, + remaining_to_read: payload_size as usize - payload.len(), + page, + }); + if let Some(c) = c { + io_yield_one!(c); + } + continue; + } + let mut read_overflow_state = self.read_overflow_state.borrow_mut(); + let ReadPayloadOverflow { + payload, + next_page, + remaining_to_read, + page: page_btree, + } = read_overflow_state.as_mut().unwrap(); + + let page = page_btree.get(); + turso_assert!(page.is_loaded(), "page should be loaded"); + tracing::debug!(next_page, remaining_to_read, "reading overflow page"); + let contents = page.get_contents(); + // The first four bytes of each overflow page are a big-endian integer which is the page number of the next page in the chain, or zero for the final page in the chain. + let next = contents.read_u32_no_offset(0); + let buf = contents.as_ptr(); + let usable_space = self.pager.usable_space(); + let to_read = (*remaining_to_read).min(usable_space - 4); + payload.extend_from_slice(&buf[4..4 + to_read]); + *remaining_to_read -= to_read; + + if *remaining_to_read != 0 && next != 0 { + let (new_page, c) = self.pager.read_page(next as usize).map(|(page, c)| { + ( + Arc::new(BTreePageInner { + page: RefCell::new(page), + }), + c, + ) + })?; + *page_btree = new_page; + *next_page = next; + if let Some(c) = c { + io_yield_one!(c); + } + continue; + } + turso_assert!( + *remaining_to_read == 0 && next == 0, + "we can't have more pages to read while also have read everything" + ); + let mut payload_swap = Vec::new(); + std::mem::swap(payload, &mut payload_swap); + + let mut reuse_immutable = self.get_immutable_record_or_create(); + reuse_immutable.as_mut().unwrap().invalidate(); + + reuse_immutable + .as_mut() + .unwrap() + .start_serialization(&payload_swap); + self.record_cursor.borrow_mut().invalidate(); + + let _ = read_overflow_state.take(); + break Ok(IOResult::Done(())); } - let mut read_overflow_state = self.read_overflow_state.borrow_mut(); - let ReadPayloadOverflow { - payload, - next_page, - remaining_to_read, - page: page_btree, - } = read_overflow_state.as_mut().unwrap(); - - let page = page_btree.get(); - turso_assert!(page.is_loaded(), "page should be loaded"); - tracing::debug!(next_page, remaining_to_read, "reading overflow page"); - let contents = page.get_contents(); - // The first four bytes of each overflow page are a big-endian integer which is the page number of the next page in the chain, or zero for the final page in the chain. - let next = contents.read_u32_no_offset(0); - let buf = contents.as_ptr(); - let usable_space = self.pager.usable_space(); - let to_read = (*remaining_to_read).min(usable_space - 4); - payload.extend_from_slice(&buf[4..4 + to_read]); - *remaining_to_read -= to_read; - - if *remaining_to_read != 0 && next != 0 { - let (new_page, c) = self.pager.read_page(next as usize).map(|(page, c)| { - ( - Arc::new(BTreePageInner { - page: RefCell::new(page), - }), - c, - ) - })?; - *page_btree = new_page; - *next_page = next; - io_yield_one!(c); - } - turso_assert!( - *remaining_to_read == 0 && next == 0, - "we can't have more pages to read while also have read everything" - ); - let mut payload_swap = Vec::new(); - std::mem::swap(payload, &mut payload_swap); - - let mut reuse_immutable = self.get_immutable_record_or_create(); - reuse_immutable.as_mut().unwrap().invalidate(); - - reuse_immutable - .as_mut() - .unwrap() - .start_serialization(&payload_swap); - self.record_cursor.borrow_mut().invalidate(); - - let _ = read_overflow_state.take(); - Ok(IOResult::Done(())) } /// Calculates how much of a cell's payload should be stored locally vs in overflow pages @@ -923,101 +940,107 @@ impl BTreeCursor { mut amount: u32, is_write: bool, ) -> Result> { - if let CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages { - .. - }) - | CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { .. }) = - &self.state - { - return self.continue_payload_overflow_with_offset(buffer, self.usable_space()); - } - - let page_btree = self.stack.top(); - - let page = page_btree.get(); - let contents = page.get().contents.as_ref().unwrap(); - let cell_idx = self.stack.current_cell_index() as usize - 1; - - if cell_idx >= contents.cell_count() { - return Err(LimboError::Corrupt("Invalid cell index".into())); - } - - let usable_size = self.usable_space(); - let cell = contents.cell_get(cell_idx, usable_size).unwrap(); - - let (payload, payload_size, first_overflow_page) = match cell { - BTreeCell::TableLeafCell(cell) => { - (cell.payload, cell.payload_size, cell.first_overflow_page) + loop { + if let CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages { + .. + }) + | CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { + .. + }) = &self.state + { + return self.continue_payload_overflow_with_offset(buffer, self.usable_space()); } - BTreeCell::IndexLeafCell(cell) => { - (cell.payload, cell.payload_size, cell.first_overflow_page) - } - BTreeCell::IndexInteriorCell(cell) => { - (cell.payload, cell.payload_size, cell.first_overflow_page) - } - BTreeCell::TableInteriorCell(_) => { - return Err(LimboError::Corrupt( - "Cannot access payload of table interior cell".into(), - )); - } - }; - turso_assert!( - offset + amount <= payload_size as u32, - "offset + amount <= payload_size" - ); - let (local_size, _) = - self.parse_cell_info(payload_size as usize, contents.page_type(), usable_size)?; - let mut bytes_processed: u32 = 0; - if offset < local_size as u32 { - let mut local_amount: u32 = amount; - if local_amount + offset > local_size as u32 { - local_amount = local_size as u32 - offset; + let page_btree = self.stack.top(); + + let page = page_btree.get(); + let contents = page.get().contents.as_ref().unwrap(); + let cell_idx = self.stack.current_cell_index() as usize - 1; + + if cell_idx >= contents.cell_count() { + return Err(LimboError::Corrupt("Invalid cell index".into())); } - if is_write { - self.write_payload_to_page( - offset, - local_amount, - payload, - buffer, - page_btree.clone(), - ); + + let usable_size = self.usable_space(); + let cell = contents.cell_get(cell_idx, usable_size).unwrap(); + + let (payload, payload_size, first_overflow_page) = match cell { + BTreeCell::TableLeafCell(cell) => { + (cell.payload, cell.payload_size, cell.first_overflow_page) + } + BTreeCell::IndexLeafCell(cell) => { + (cell.payload, cell.payload_size, cell.first_overflow_page) + } + BTreeCell::IndexInteriorCell(cell) => { + (cell.payload, cell.payload_size, cell.first_overflow_page) + } + BTreeCell::TableInteriorCell(_) => { + return Err(LimboError::Corrupt( + "Cannot access payload of table interior cell".into(), + )); + } + }; + turso_assert!( + offset + amount <= payload_size as u32, + "offset + amount <= payload_size" + ); + + let (local_size, _) = + self.parse_cell_info(payload_size as usize, contents.page_type(), usable_size)?; + let mut bytes_processed: u32 = 0; + if offset < local_size as u32 { + let mut local_amount: u32 = amount; + if local_amount + offset > local_size as u32 { + local_amount = local_size as u32 - offset; + } + if is_write { + self.write_payload_to_page( + offset, + local_amount, + payload, + buffer, + page_btree.clone(), + ); + } else { + self.read_payload_from_page(offset, local_amount, payload, buffer); + } + offset = 0; + amount -= local_amount; + bytes_processed += local_amount; } else { - self.read_payload_from_page(offset, local_amount, payload, buffer); - } - offset = 0; - amount -= local_amount; - bytes_processed += local_amount; - } else { - offset -= local_size as u32; - } - - if amount > 0 { - if first_overflow_page.is_none() { - return Err(LimboError::Corrupt( - "Expected overflow page but none found".into(), - )); + offset -= local_size as u32; } - let overflow_size = usable_size - 4; - let pages_to_skip = offset / overflow_size as u32; - let page_offset = offset % overflow_size as u32; - // Read page - let (page, c) = self.read_page(first_overflow_page.unwrap() as usize)?; + if amount > 0 { + if first_overflow_page.is_none() { + return Err(LimboError::Corrupt( + "Expected overflow page but none found".into(), + )); + } - self.state = - CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages { - next_page: page, - pages_left_to_skip: pages_to_skip, - page_offset, - amount, - buffer_offset: bytes_processed as usize, - is_write, - }); + let overflow_size = usable_size - 4; + let pages_to_skip = offset / overflow_size as u32; + let page_offset = offset % overflow_size as u32; + // Read page + let (page, c) = self.read_page(first_overflow_page.unwrap() as usize)?; - io_yield_one!(c); + self.state = + CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages { + next_page: page, + pages_left_to_skip: pages_to_skip, + page_offset, + amount, + buffer_offset: bytes_processed as usize, + is_write, + }); + + if let Some(c) = c { + io_yield_one!(c); + } + continue; + } + break Ok(IOResult::Done(())); } - Ok(IOResult::Done(())) } #[instrument(skip_all, level = Level::DEBUG)] @@ -1075,7 +1098,9 @@ impl BTreeCursor { }, ); - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } } CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { mut remaining_to_read, @@ -1140,7 +1165,9 @@ impl BTreeCursor { is_write, }); // Return IO to allow other operations - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } } _ => { return Err(LimboError::InternalError( @@ -1249,7 +1276,10 @@ impl BTreeCursor { self.stack.advance(); let (mem_page, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } + continue; } _ => { if self.ancestor_pages_have_more_children() { @@ -1287,7 +1317,9 @@ impl BTreeCursor { let left_child_page = contents.cell_interior_read_left_child_page(cell_idx); let (mem_page, c) = self.read_page(left_child_page as usize)?; self.stack.push(mem_page); - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } } } @@ -1310,7 +1342,7 @@ impl BTreeCursor { /// Move the cursor to the root page of the btree. #[instrument(skip_all, level = Level::DEBUG)] - fn move_to_root(&mut self) -> Result { + fn move_to_root(&mut self) -> Result> { self.seek_state = CursorSeekState::Start; self.going_upwards = false; tracing::trace!(root_page = self.root_page); @@ -1323,49 +1355,55 @@ impl BTreeCursor { /// Move the cursor to the rightmost record in the btree. #[instrument(skip(self), level = Level::DEBUG)] fn move_to_rightmost(&mut self) -> Result> { - let (move_to_right_state, rightmost_page_id) = &self.move_to_right_state; - match *move_to_right_state { - MoveToRightState::Start => { - if let Some(rightmost_page_id) = rightmost_page_id { - // If we know the rightmost page and are already on it, we can skip a seek. - let current_page = self.stack.top(); - let current_page = current_page.get(); - if current_page.get().id == *rightmost_page_id { - let contents = current_page.get_contents(); - let cell_count = contents.cell_count(); - self.stack.set_cell_index(cell_count as i32 - 1); - return Ok(IOResult::Done(cell_count > 0)); + loop { + let (move_to_right_state, rightmost_page_id) = &self.move_to_right_state; + match *move_to_right_state { + MoveToRightState::Start => { + if let Some(rightmost_page_id) = rightmost_page_id { + // If we know the rightmost page and are already on it, we can skip a seek. + let current_page = self.stack.top(); + let current_page = current_page.get(); + if current_page.get().id == *rightmost_page_id { + let contents = current_page.get_contents(); + let cell_count = contents.cell_count(); + self.stack.set_cell_index(cell_count as i32 - 1); + return Ok(IOResult::Done(cell_count > 0)); + } } - } - let rightmost_page_id = *rightmost_page_id; - let c = self.move_to_root()?; - self.move_to_right_state = (MoveToRightState::ProcessPage, rightmost_page_id); - io_yield_one!(c); - } - MoveToRightState::ProcessPage => { - let mem_page = self.stack.top(); - let page = mem_page.get(); - let page_idx = page.get().id; - let contents = page.get().contents.as_ref().unwrap(); - if contents.is_leaf() { - self.move_to_right_state = (MoveToRightState::Start, Some(page_idx)); - if contents.cell_count() > 0 { - self.stack.set_cell_index(contents.cell_count() as i32 - 1); - return Ok(IOResult::Done(true)); - } - return Ok(IOResult::Done(false)); - } - - match contents.rightmost_pointer() { - Some(right_most_pointer) => { - self.stack.set_cell_index(contents.cell_count() as i32 + 1); - let (mem_page, c) = self.read_page(right_most_pointer as usize)?; - self.stack.push(mem_page); + let rightmost_page_id = *rightmost_page_id; + let c = self.move_to_root()?; + self.move_to_right_state = (MoveToRightState::ProcessPage, rightmost_page_id); + if let Some(c) = c { io_yield_one!(c); } + } + MoveToRightState::ProcessPage => { + let mem_page = self.stack.top(); + let page = mem_page.get(); + let page_idx = page.get().id; + let contents = page.get().contents.as_ref().unwrap(); + if contents.is_leaf() { + self.move_to_right_state = (MoveToRightState::Start, Some(page_idx)); + if contents.cell_count() > 0 { + self.stack.set_cell_index(contents.cell_count() as i32 - 1); + return Ok(IOResult::Done(true)); + } + return Ok(IOResult::Done(false)); + } - None => { - unreachable!("interior page should have a rightmost pointer"); + match contents.rightmost_pointer() { + Some(right_most_pointer) => { + self.stack.set_cell_index(contents.cell_count() as i32 + 1); + let (mem_page, c) = self.read_page(right_most_pointer as usize)?; + self.stack.push(mem_page); + if let Some(c) = c { + io_yield_one!(c); + } + } + + None => { + unreachable!("interior page should have a rightmost pointer"); + } } } } @@ -1375,49 +1413,49 @@ impl BTreeCursor { /// Specialized version of move_to() for table btrees. #[instrument(skip(self), level = Level::DEBUG)] fn tablebtree_move_to(&mut self, rowid: i64, seek_op: SeekOp) -> Result> { - let page = self.stack.top(); - let page = page.get(); - let contents = page.get().contents.as_ref().unwrap(); - if contents.is_leaf() { - self.seek_state = CursorSeekState::FoundLeaf { - eq_seen: Cell::new(false), - }; - return Ok(IOResult::Done(())); - } + loop { + let page = self.stack.top(); + let page = page.get(); + let contents = page.get().contents.as_ref().unwrap(); + if contents.is_leaf() { + self.seek_state = CursorSeekState::FoundLeaf { + eq_seen: Cell::new(false), + }; + return Ok(IOResult::Done(())); + } - let cell_count = contents.cell_count(); - if matches!( - self.seek_state, - CursorSeekState::Start | CursorSeekState::MovingBetweenPages { .. } - ) { - let eq_seen = match &self.seek_state { - CursorSeekState::MovingBetweenPages { eq_seen } => eq_seen.get(), - _ => false, - }; - let min_cell_idx = Cell::new(0); - let max_cell_idx = Cell::new(cell_count as isize - 1); - let nearest_matching_cell = Cell::new(None); + let cell_count = contents.cell_count(); + if matches!( + self.seek_state, + CursorSeekState::Start | CursorSeekState::MovingBetweenPages { .. } + ) { + let eq_seen = match &self.seek_state { + CursorSeekState::MovingBetweenPages { eq_seen } => eq_seen.get(), + _ => false, + }; + let min_cell_idx = Cell::new(0); + let max_cell_idx = Cell::new(cell_count as isize - 1); + let nearest_matching_cell = Cell::new(None); - self.seek_state = CursorSeekState::InteriorPageBinarySearch { + self.seek_state = CursorSeekState::InteriorPageBinarySearch { + min_cell_idx, + max_cell_idx, + nearest_matching_cell, + eq_seen: Cell::new(eq_seen), + }; + } + + let CursorSeekState::InteriorPageBinarySearch { min_cell_idx, max_cell_idx, nearest_matching_cell, - eq_seen: Cell::new(eq_seen), + eq_seen, + .. + } = &self.seek_state + else { + unreachable!("we must be in an interior binary search state"); }; - } - let CursorSeekState::InteriorPageBinarySearch { - min_cell_idx, - max_cell_idx, - nearest_matching_cell, - eq_seen, - .. - } = &self.seek_state - else { - unreachable!("we must be in an interior binary search state"); - }; - - loop { let min = min_cell_idx.get(); let max = max_cell_idx.get(); if min > max { @@ -1430,7 +1468,10 @@ impl BTreeCursor { self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), }; - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } + continue; } self.stack.set_cell_index(cell_count as i32 + 1); match contents.rightmost_pointer() { @@ -1440,7 +1481,10 @@ impl BTreeCursor { self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), }; - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } + continue; } None => { unreachable!("we shall not go back up! The only way is down the slope"); @@ -1507,55 +1551,55 @@ impl BTreeCursor { tracing::debug!("Using record comparison strategy: {:?}", record_comparer); let tie_breaker = get_tie_breaker_from_seek_op(cmp); - let page = self.stack.top(); - let page = page.get(); - let contents = page.get().contents.as_ref().unwrap(); - if contents.is_leaf() { - let eq_seen = match &self.seek_state { - CursorSeekState::MovingBetweenPages { eq_seen } => eq_seen.get(), - _ => false, - }; - self.seek_state = CursorSeekState::FoundLeaf { - eq_seen: Cell::new(eq_seen), - }; - return Ok(IOResult::Done(())); - } + loop { + let page = self.stack.top(); + let page = page.get(); + let contents = page.get().contents.as_ref().unwrap(); + if contents.is_leaf() { + let eq_seen = match &self.seek_state { + CursorSeekState::MovingBetweenPages { eq_seen } => eq_seen.get(), + _ => false, + }; + self.seek_state = CursorSeekState::FoundLeaf { + eq_seen: Cell::new(eq_seen), + }; + return Ok(IOResult::Done(())); + } - if matches!( - self.seek_state, - CursorSeekState::Start | CursorSeekState::MovingBetweenPages { .. } - ) { - let eq_seen = match &self.seek_state { - CursorSeekState::MovingBetweenPages { eq_seen } => eq_seen.get(), - _ => false, - }; - let cell_count = contents.cell_count(); - let min_cell_idx = Cell::new(0); - let max_cell_idx = Cell::new(cell_count as isize - 1); - let nearest_matching_cell = Cell::new(None); + if matches!( + self.seek_state, + CursorSeekState::Start | CursorSeekState::MovingBetweenPages { .. } + ) { + let eq_seen = match &self.seek_state { + CursorSeekState::MovingBetweenPages { eq_seen } => eq_seen.get(), + _ => false, + }; + let cell_count = contents.cell_count(); + let min_cell_idx = Cell::new(0); + let max_cell_idx = Cell::new(cell_count as isize - 1); + let nearest_matching_cell = Cell::new(None); - self.seek_state = CursorSeekState::InteriorPageBinarySearch { + self.seek_state = CursorSeekState::InteriorPageBinarySearch { + min_cell_idx, + max_cell_idx, + nearest_matching_cell, + eq_seen: Cell::new(eq_seen), + }; + } + + let CursorSeekState::InteriorPageBinarySearch { min_cell_idx, max_cell_idx, nearest_matching_cell, - eq_seen: Cell::new(eq_seen), + eq_seen, + } = &self.seek_state + else { + unreachable!( + "we must be in an interior binary search state, got {:?}", + self.seek_state + ); }; - } - let CursorSeekState::InteriorPageBinarySearch { - min_cell_idx, - max_cell_idx, - nearest_matching_cell, - eq_seen, - } = &self.seek_state - else { - unreachable!( - "we must be in an interior binary search state, got {:?}", - self.seek_state - ); - }; - - loop { let min = min_cell_idx.get(); let max = max_cell_idx.get(); if min > max { @@ -1568,7 +1612,10 @@ impl BTreeCursor { self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), }; - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } + continue; } None => { unreachable!("we shall not go back up! The only way is down the slope"); @@ -1606,7 +1653,10 @@ impl BTreeCursor { self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), }; - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } + continue; } let cur_cell_idx = (min + max) >> 1; // rustc generates extra insns for (min+max)/2 due to them being isize. we know min&max are >=0 here. @@ -2103,7 +2153,9 @@ impl BTreeCursor { self.move_to_state = MoveToState::MoveToPage; if matches!(self.seek_state, CursorSeekState::Start) { let c = self.move_to_root()?; - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } } } MoveToState::MoveToPage => { @@ -2557,7 +2609,9 @@ impl BTreeCursor { let sibling_page = page.get(); self.pager.add_dirty(&sibling_page); } - completions.push(c); + if let Some(c) = c { + completions.push(c); + } pages_to_balance[i].replace(page); if i == 0 { break; @@ -2623,7 +2677,6 @@ impl BTreeCursor { }); *sub_state = BalanceSubState::NonRootDoBalancing; if !completions.is_empty() { - // TODO: when tracking IO return all the completions here io_yield_many!(completions); } } @@ -4097,31 +4150,37 @@ impl BTreeCursor { pub fn seek_end(&mut self) -> Result> { assert!(self.mv_cursor.is_none()); // unsure about this -_- - match self.seek_end_state { - SeekEndState::Start => { - let c = self.move_to_root()?; - self.seek_end_state = SeekEndState::ProcessPage; - io_yield_one!(c); - } - SeekEndState::ProcessPage => { - let mem_page = self.stack.top(); - let page = mem_page.get(); - let contents = page.get().contents.as_ref().unwrap(); - if contents.is_leaf() { - // set cursor just past the last cell to append - self.stack.set_cell_index(contents.cell_count() as i32); - self.seek_end_state = SeekEndState::Start; - return Ok(IOResult::Done(())); - } - - match contents.rightmost_pointer() { - Some(right_most_pointer) => { - self.stack.set_cell_index(contents.cell_count() as i32 + 1); // invalid on interior - let (child, c) = self.read_page(right_most_pointer as usize)?; - self.stack.push(child); + loop { + match self.seek_end_state { + SeekEndState::Start => { + let c = self.move_to_root()?; + self.seek_end_state = SeekEndState::ProcessPage; + if let Some(c) = c { io_yield_one!(c); } - None => unreachable!("interior page must have rightmost pointer"), + } + SeekEndState::ProcessPage => { + let mem_page = self.stack.top(); + let page = mem_page.get(); + let contents = page.get().contents.as_ref().unwrap(); + if contents.is_leaf() { + // set cursor just past the last cell to append + self.stack.set_cell_index(contents.cell_count() as i32); + self.seek_end_state = SeekEndState::Start; + return Ok(IOResult::Done(())); + } + + match contents.rightmost_pointer() { + Some(right_most_pointer) => { + self.stack.set_cell_index(contents.cell_count() as i32 + 1); // invalid on interior + let (child, c) = self.read_page(right_most_pointer as usize)?; + self.stack.push(child); + if let Some(c) = c { + io_yield_one!(c); + } + } + None => unreachable!("interior page must have rightmost pointer"), + } } } } @@ -4174,7 +4233,9 @@ impl BTreeCursor { mv_cursor.rewind(); } else { let c = self.move_to_root()?; - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } } } RewindState::NextRecord => { @@ -4831,7 +4892,9 @@ impl BTreeCursor { self.overflow_state = OverflowState::ProcessPage { next_page: page.get(), }; - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } } else { self.overflow_state = OverflowState::Done; } @@ -4863,7 +4926,9 @@ impl BTreeCursor { self.overflow_state = OverflowState::ProcessPage { next_page: page.get(), }; - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } } else { self.overflow_state = OverflowState::Done; } @@ -4898,7 +4963,9 @@ impl BTreeCursor { self.state = CursorState::Destroy(DestroyInfo { state: DestroyState::Start, }); - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } } loop { @@ -4955,7 +5022,9 @@ impl BTreeCursor { "unable to get a mut reference to destroy state in cursor", ); destroy_info.state = DestroyState::LoadPage; - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } } else { let destroy_info = self.state.mut_destroy_info().expect( "unable to get a mut reference to destroy state in cursor", @@ -5013,7 +5082,9 @@ impl BTreeCursor { "unable to get a mut reference to destroy state in cursor", ); destroy_info.state = DestroyState::LoadPage; - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } } }, } @@ -5031,7 +5102,9 @@ impl BTreeCursor { .mut_destroy_info() .expect("unable to get a mut reference to destroy state in cursor"); destroy_info.state = DestroyState::LoadPage; - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } } // For any leaf cell, advance the index now that overflow pages have been cleared BTreeCell::TableLeafCell(_) | BTreeCell::IndexLeafCell(_) => { @@ -5211,91 +5284,104 @@ impl BTreeCursor { let mut mem_page; let mut contents; - let state = self.count_state; - match state { - CountState::Start => { - let c = self.move_to_root()?; - self.count_state = CountState::Loop; - io_yield_one!(c); - } - CountState::Loop => { - mem_page_rc = self.stack.top(); - mem_page = mem_page_rc.get(); - turso_assert!(mem_page.is_loaded(), "page should be loaded"); - contents = mem_page.get().contents.as_ref().unwrap(); - - /* If this is a leaf page or the tree is not an int-key tree, then - ** this page contains countable entries. Increment the entry counter - ** accordingly. - */ - if !matches!(contents.page_type(), PageType::TableInterior) { - self.count += contents.cell_count(); - } - - self.stack.advance(); - let cell_idx = self.stack.current_cell_index() as usize; - - // Second condition is necessary in case we return if the page is locked in the loop below - if contents.is_leaf() || cell_idx > contents.cell_count() { - loop { - if !self.stack.has_parent() { - // All pages of the b-tree have been visited. Return successfully - let c = self.move_to_root()?; - self.count_state = CountState::Finish; - io_yield_one!(c); - } - - // Move to parent - self.stack.pop(); - - mem_page_rc = self.stack.top(); - mem_page = mem_page_rc.get(); - turso_assert!(mem_page.is_loaded(), "page should be loaded"); - contents = mem_page.get().contents.as_ref().unwrap(); - - let cell_idx = self.stack.current_cell_index() as usize; - - if cell_idx <= contents.cell_count() { - break; - } + 'outer: loop { + let state = self.count_state; + match state { + CountState::Start => { + let c = self.move_to_root()?; + self.count_state = CountState::Loop; + if let Some(c) = c { + io_yield_one!(c); } } + CountState::Loop => { + mem_page_rc = self.stack.top(); + mem_page = mem_page_rc.get(); + turso_assert!(mem_page.is_loaded(), "page should be loaded"); + contents = mem_page.get().contents.as_ref().unwrap(); - let cell_idx = self.stack.current_cell_index() as usize; + /* If this is a leaf page or the tree is not an int-key tree, then + ** this page contains countable entries. Increment the entry counter + ** accordingly. + */ + if !matches!(contents.page_type(), PageType::TableInterior) { + self.count += contents.cell_count(); + } - assert!(cell_idx <= contents.cell_count(),); - assert!(!contents.is_leaf()); - - if cell_idx == contents.cell_count() { - // Move to right child - // should be safe as contents is not a leaf page - let right_most_pointer = contents.rightmost_pointer().unwrap(); self.stack.advance(); - let (mem_page, c) = self.read_page(right_most_pointer as usize)?; - self.stack.push(mem_page); - io_yield_one!(c); - } else { - // Move to child left page - let cell = contents.cell_get(cell_idx, self.usable_space())?; + let cell_idx = self.stack.current_cell_index() as usize; - match cell { - BTreeCell::TableInteriorCell(TableInteriorCell { - left_child_page, .. - }) - | BTreeCell::IndexInteriorCell(IndexInteriorCell { - left_child_page, .. - }) => { - self.stack.advance(); - let (mem_page, c) = self.read_page(left_child_page as usize)?; - self.stack.push(mem_page); + // Second condition is necessary in case we return if the page is locked in the loop below + if contents.is_leaf() || cell_idx > contents.cell_count() { + loop { + if !self.stack.has_parent() { + // All pages of the b-tree have been visited. Return successfully + let c = self.move_to_root()?; + self.count_state = CountState::Finish; + if let Some(c) = c { + io_yield_one!(c); + } + continue 'outer; + } + + // Move to parent + self.stack.pop(); + + mem_page_rc = self.stack.top(); + mem_page = mem_page_rc.get(); + turso_assert!(mem_page.is_loaded(), "page should be loaded"); + contents = mem_page.get().contents.as_ref().unwrap(); + + let cell_idx = self.stack.current_cell_index() as usize; + + if cell_idx <= contents.cell_count() { + break; + } + } + } + + let cell_idx = self.stack.current_cell_index() as usize; + + assert!(cell_idx <= contents.cell_count(),); + assert!(!contents.is_leaf()); + + if cell_idx == contents.cell_count() { + // Move to right child + // should be safe as contents is not a leaf page + let right_most_pointer = contents.rightmost_pointer().unwrap(); + self.stack.advance(); + let (mem_page, c) = self.read_page(right_most_pointer as usize)?; + self.stack.push(mem_page); + if let Some(c) = c { io_yield_one!(c); } - _ => unreachable!(), + } else { + // Move to child left page + let cell = contents.cell_get(cell_idx, self.usable_space())?; + + match cell { + BTreeCell::TableInteriorCell(TableInteriorCell { + left_child_page, + .. + }) + | BTreeCell::IndexInteriorCell(IndexInteriorCell { + left_child_page, + .. + }) => { + self.stack.advance(); + let (mem_page, c) = self.read_page(left_child_page as usize)?; + self.stack.push(mem_page); + if let Some(c) = c { + io_yield_one!(c); + } + } + _ => unreachable!(), + } } } - } - CountState::Finish => { - return Ok(IOResult::Done(self.count)); + CountState::Finish => { + return Ok(IOResult::Done(self.count)); + } } } } @@ -5348,7 +5434,7 @@ impl BTreeCursor { } } - pub fn read_page(&self, page_idx: usize) -> Result<(BTreePage, Completion)> { + pub fn read_page(&self, page_idx: usize) -> Result<(BTreePage, Option)> { btree_read_page(&self.pager, page_idx) } @@ -5482,7 +5568,10 @@ pub fn integrity_check( None => { let (page, c) = btree_read_page(pager, page_idx)?; state.page = Some(page.get()); - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } + page.get() } }; turso_assert!(page.is_loaded(), "page should be loaded"); @@ -5627,7 +5716,10 @@ pub fn integrity_check( Ok(IOResult::Done(())) } -pub fn btree_read_page(pager: &Rc, page_idx: usize) -> Result<(BTreePage, Completion)> { +pub fn btree_read_page( + pager: &Rc, + page_idx: usize, +) -> Result<(BTreePage, Option)> { pager.read_page(page_idx).map(|(page, c)| { ( Arc::new(BTreePageInner { @@ -7913,7 +8005,9 @@ mod tests { ) .unwrap(); let c = cursor.move_to_root().unwrap(); - pager.io.wait_for_completion(c).unwrap(); + if let Some(c) = c { + pager.io.wait_for_completion(c).unwrap(); + } pager .io .block(|| pager.end_tx(false, &conn, false)) @@ -8120,7 +8214,9 @@ mod tests { } let c = cursor.move_to_root().unwrap(); - pager.io.wait_for_completion(c).unwrap(); + if let Some(c) = c { + pager.io.wait_for_completion(c).unwrap(); + } pager .io .block(|| pager.end_tx(false, &conn, false)) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index c1247449c..f942d2aaa 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -41,26 +41,30 @@ pub struct HeaderRef(PageRef); impl HeaderRef { pub fn from_pager(pager: &Pager) -> Result> { - let state = pager.header_ref_state.borrow().clone(); - tracing::trace!(?state); - match state { - HeaderRefState::Start => { - if !pager.db_state.is_initialized() { - return Err(LimboError::Page1NotAlloc); - } + loop { + let state = pager.header_ref_state.borrow().clone(); + tracing::trace!(?state); + match state { + HeaderRefState::Start => { + if !pager.db_state.is_initialized() { + return Err(LimboError::Page1NotAlloc); + } - let (page, c) = pager.read_page(DatabaseHeader::PAGE_ID)?; - *pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page }; - io_yield_one!(c); - } - HeaderRefState::CreateHeader { page } => { - turso_assert!(page.is_loaded(), "page should be loaded"); - turso_assert!( - page.get().id == DatabaseHeader::PAGE_ID, - "incorrect header page id" - ); - *pager.header_ref_state.borrow_mut() = HeaderRefState::Start; - Ok(IOResult::Done(Self(page))) + let (page, c) = pager.read_page(DatabaseHeader::PAGE_ID)?; + *pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page }; + if let Some(c) = c { + io_yield_one!(c); + } + } + HeaderRefState::CreateHeader { page } => { + turso_assert!(page.is_loaded(), "page should be loaded"); + turso_assert!( + page.get().id == DatabaseHeader::PAGE_ID, + "incorrect header page id" + ); + *pager.header_ref_state.borrow_mut() = HeaderRefState::Start; + break Ok(IOResult::Done(Self(page))); + } } } } @@ -77,28 +81,32 @@ pub struct HeaderRefMut(PageRef); impl HeaderRefMut { pub fn from_pager(pager: &Pager) -> Result> { - let state = pager.header_ref_state.borrow().clone(); - tracing::trace!(?state); - match state { - HeaderRefState::Start => { - if !pager.db_state.is_initialized() { - return Err(LimboError::Page1NotAlloc); + loop { + let state = pager.header_ref_state.borrow().clone(); + tracing::trace!(?state); + match state { + HeaderRefState::Start => { + if !pager.db_state.is_initialized() { + return Err(LimboError::Page1NotAlloc); + } + + let (page, c) = pager.read_page(DatabaseHeader::PAGE_ID)?; + *pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page }; + if let Some(c) = c { + io_yield_one!(c); + } } + HeaderRefState::CreateHeader { page } => { + turso_assert!(page.is_loaded(), "page should be loaded"); + turso_assert!( + page.get().id == DatabaseHeader::PAGE_ID, + "incorrect header page id" + ); - let (page, c) = pager.read_page(DatabaseHeader::PAGE_ID)?; - *pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page }; - io_yield_one!(c); - } - HeaderRefState::CreateHeader { page } => { - turso_assert!(page.is_loaded(), "page should be loaded"); - turso_assert!( - page.get().id == DatabaseHeader::PAGE_ID, - "incorrect header page id" - ); - - pager.add_dirty(&page); - *pager.header_ref_state.borrow_mut() = HeaderRefState::Start; - Ok(IOResult::Done(Self(page))) + pager.add_dirty(&page); + *pager.header_ref_state.borrow_mut() = HeaderRefState::Start; + break Ok(IOResult::Done(Self(page))); + } } } } @@ -624,81 +632,88 @@ impl Pager { /// Returns `Ok(None)` if the page is not supposed to have a ptrmap entry (e.g. header, or a ptrmap page itself). #[cfg(not(feature = "omit_autovacuum"))] pub fn ptrmap_get(&self, target_page_num: u32) -> Result>> { - let ptrmap_get_state = self.ptrmap_get_state.borrow().clone(); - match ptrmap_get_state { - PtrMapGetState::Start => { - tracing::trace!("ptrmap_get(page_idx = {})", target_page_num); - let configured_page_size = - return_if_io!(self.with_header(|header| header.page_size)).get() as usize; + loop { + let ptrmap_get_state = self.ptrmap_get_state.borrow().clone(); + match ptrmap_get_state { + PtrMapGetState::Start => { + tracing::trace!("ptrmap_get(page_idx = {})", target_page_num); + let configured_page_size = + return_if_io!(self.with_header(|header| header.page_size)).get() as usize; - if target_page_num < FIRST_PTRMAP_PAGE_NO - || is_ptrmap_page(target_page_num, configured_page_size) - { - return Ok(IOResult::Done(None)); + if target_page_num < FIRST_PTRMAP_PAGE_NO + || is_ptrmap_page(target_page_num, configured_page_size) + { + return Ok(IOResult::Done(None)); + } + + let ptrmap_pg_no = + get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size); + let offset_in_ptrmap_page = get_ptrmap_offset_in_page( + target_page_num, + ptrmap_pg_no, + configured_page_size, + )?; + tracing::trace!( + "ptrmap_get(page_idx = {}) = ptrmap_pg_no = {}", + target_page_num, + ptrmap_pg_no + ); + + let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?; + self.ptrmap_get_state.replace(PtrMapGetState::Deserialize { + ptrmap_page, + offset_in_ptrmap_page, + }); + if let Some(c) = c { + io_yield_one!(c); + } } - - let ptrmap_pg_no = - get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size); - let offset_in_ptrmap_page = - get_ptrmap_offset_in_page(target_page_num, ptrmap_pg_no, configured_page_size)?; - tracing::trace!( - "ptrmap_get(page_idx = {}) = ptrmap_pg_no = {}", - target_page_num, - ptrmap_pg_no - ); - - let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?; - self.ptrmap_get_state.replace(PtrMapGetState::Deserialize { + PtrMapGetState::Deserialize { ptrmap_page, offset_in_ptrmap_page, - }); - io_yield_one!(c); - } - PtrMapGetState::Deserialize { - ptrmap_page, - offset_in_ptrmap_page, - } => { - turso_assert!(ptrmap_page.is_loaded(), "ptrmap_page should be loaded"); - let ptrmap_page_inner = ptrmap_page.get(); - let ptrmap_pg_no = ptrmap_page_inner.id; + } => { + turso_assert!(ptrmap_page.is_loaded(), "ptrmap_page should be loaded"); + let ptrmap_page_inner = ptrmap_page.get(); + let ptrmap_pg_no = ptrmap_page_inner.id; - let page_content: &PageContent = match ptrmap_page_inner.contents.as_ref() { - Some(content) => content, - None => { - return Err(LimboError::InternalError(format!( - "Ptrmap page {ptrmap_pg_no} content not loaded" + let page_content: &PageContent = match ptrmap_page_inner.contents.as_ref() { + Some(content) => content, + None => { + return Err(LimboError::InternalError(format!( + "Ptrmap page {ptrmap_pg_no} content not loaded" + ))); + } + }; + + let full_buffer_slice: &[u8] = page_content.buffer.as_slice(); + + // Ptrmap pages are not page 1, so their internal offset within their buffer should be 0. + // The actual page data starts at page_content.offset within the full_buffer_slice. + if ptrmap_pg_no != 1 && page_content.offset != 0 { + return Err(LimboError::Corrupt(format!( + "Ptrmap page {} has unexpected internal offset {}", + ptrmap_pg_no, page_content.offset ))); } - }; + let ptrmap_page_data_slice: &[u8] = &full_buffer_slice[page_content.offset..]; + let actual_data_length = ptrmap_page_data_slice.len(); - let full_buffer_slice: &[u8] = page_content.buffer.as_slice(); - - // Ptrmap pages are not page 1, so their internal offset within their buffer should be 0. - // The actual page data starts at page_content.offset within the full_buffer_slice. - if ptrmap_pg_no != 1 && page_content.offset != 0 { - return Err(LimboError::Corrupt(format!( - "Ptrmap page {} has unexpected internal offset {}", - ptrmap_pg_no, page_content.offset - ))); - } - let ptrmap_page_data_slice: &[u8] = &full_buffer_slice[page_content.offset..]; - let actual_data_length = ptrmap_page_data_slice.len(); - - // Check if the calculated offset for the entry is within the bounds of the actual page data length. - if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > actual_data_length { - return Err(LimboError::InternalError(format!( + // Check if the calculated offset for the entry is within the bounds of the actual page data length. + if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > actual_data_length { + return Err(LimboError::InternalError(format!( "Ptrmap offset {offset_in_ptrmap_page} + entry size {PTRMAP_ENTRY_SIZE} out of bounds for page {ptrmap_pg_no} (actual data len {actual_data_length})" ))); - } + } - let entry_slice = &ptrmap_page_data_slice - [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE]; - self.ptrmap_get_state.replace(PtrMapGetState::Start); - match PtrmapEntry::deserialize(entry_slice) { - Some(entry) => Ok(IOResult::Done(Some(entry))), - None => Err(LimboError::Corrupt(format!( - "Failed to deserialize ptrmap entry for page {target_page_num} from ptrmap page {ptrmap_pg_no}" - ))), + let entry_slice = &ptrmap_page_data_slice + [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE]; + self.ptrmap_get_state.replace(PtrMapGetState::Start); + break match PtrmapEntry::deserialize(entry_slice) { + Some(entry) => Ok(IOResult::Done(Some(entry))), + None => Err(LimboError::Corrupt(format!( + "Failed to deserialize ptrmap entry for page {target_page_num} from ptrmap page {ptrmap_pg_no}" + ))), + }; } } } @@ -720,24 +735,26 @@ impl Pager { entry_type, parent_page_no ); - let ptrmap_put_state = self.ptrmap_put_state.borrow().clone(); - match ptrmap_put_state { - PtrMapPutState::Start => { - let page_size = - return_if_io!(self.with_header(|header| header.page_size)).get() as usize; + loop { + let ptrmap_put_state = self.ptrmap_put_state.borrow().clone(); + match ptrmap_put_state { + PtrMapPutState::Start => { + let page_size = + return_if_io!(self.with_header(|header| header.page_size)).get() as usize; - if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO - || is_ptrmap_page(db_page_no_to_update, page_size) - { - return Err(LimboError::InternalError(format!( + if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO + || is_ptrmap_page(db_page_no_to_update, page_size) + { + return Err(LimboError::InternalError(format!( "Cannot set ptrmap entry for page {db_page_no_to_update}: it's a header/ptrmap page or invalid." ))); - } + } - let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(db_page_no_to_update, page_size); - let offset_in_ptrmap_page = - get_ptrmap_offset_in_page(db_page_no_to_update, ptrmap_pg_no, page_size)?; - tracing::trace!( + let ptrmap_pg_no = + get_ptrmap_page_no_for_db_page(db_page_no_to_update, page_size); + let offset_in_ptrmap_page = + get_ptrmap_offset_in_page(db_page_no_to_update, ptrmap_pg_no, page_size)?; + tracing::trace!( "ptrmap_put(page_idx = {}, entry_type = {:?}, parent_page_no = {}) = ptrmap_pg_no = {}, offset_in_ptrmap_page = {}", db_page_no_to_update, entry_type, @@ -746,58 +763,61 @@ impl Pager { offset_in_ptrmap_page ); - let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?; - self.ptrmap_put_state.replace(PtrMapPutState::Deserialize { + let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?; + self.ptrmap_put_state.replace(PtrMapPutState::Deserialize { + ptrmap_page, + offset_in_ptrmap_page, + }); + if let Some(c) = c { + io_yield_one!(c); + } + } + PtrMapPutState::Deserialize { ptrmap_page, offset_in_ptrmap_page, - }); - io_yield_one!(c); - } - PtrMapPutState::Deserialize { - ptrmap_page, - offset_in_ptrmap_page, - } => { - turso_assert!(ptrmap_page.is_loaded(), "page should be loaded"); - let ptrmap_page_inner = ptrmap_page.get(); - let ptrmap_pg_no = ptrmap_page_inner.id; + } => { + turso_assert!(ptrmap_page.is_loaded(), "page should be loaded"); + let ptrmap_page_inner = ptrmap_page.get(); + let ptrmap_pg_no = ptrmap_page_inner.id; - let page_content = match ptrmap_page_inner.contents.as_ref() { - Some(content) => content, - None => { + let page_content = match ptrmap_page_inner.contents.as_ref() { + Some(content) => content, + None => { + return Err(LimboError::InternalError(format!( + "Ptrmap page {ptrmap_pg_no} content not loaded" + ))) + } + }; + + let full_buffer_slice = page_content.buffer.as_mut_slice(); + + if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > full_buffer_slice.len() { return Err(LimboError::InternalError(format!( - "Ptrmap page {ptrmap_pg_no} content not loaded" - ))) - } - }; - - let full_buffer_slice = page_content.buffer.as_mut_slice(); - - if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > full_buffer_slice.len() { - return Err(LimboError::InternalError(format!( "Ptrmap offset {} + entry size {} out of bounds for page {} (actual data len {})", offset_in_ptrmap_page, PTRMAP_ENTRY_SIZE, ptrmap_pg_no, full_buffer_slice.len() ))); + } + + let entry = PtrmapEntry { + entry_type, + parent_page_no, + }; + entry.serialize( + &mut full_buffer_slice + [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE], + )?; + + turso_assert!( + ptrmap_page.get().id == ptrmap_pg_no, + "ptrmap page has unexpected number" + ); + self.add_dirty(&ptrmap_page); + self.ptrmap_put_state.replace(PtrMapPutState::Start); + break Ok(IOResult::Done(())); } - - let entry = PtrmapEntry { - entry_type, - parent_page_no, - }; - entry.serialize( - &mut full_buffer_slice - [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE], - )?; - - turso_assert!( - ptrmap_page.get().id == ptrmap_pg_no, - "ptrmap page has unexpected number" - ); - self.add_dirty(&ptrmap_page); - self.ptrmap_put_state.replace(PtrMapPutState::Start); - Ok(IOResult::Done(())) } } } @@ -1097,18 +1117,17 @@ impl Pager { /// Reads a page from the database. #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn read_page(&self, page_idx: usize) -> Result<(PageRef, Completion)> { + pub fn read_page(&self, page_idx: usize) -> Result<(PageRef, Option)> { tracing::trace!("read_page(page_idx = {})", page_idx); let mut page_cache = self.page_cache.write(); let page_key = PageCacheKey::new(page_idx); if let Some(page) = page_cache.get(&page_key) { tracing::trace!("read_page(page_idx = {}) = cached", page_idx); - // Dummy completion being passed, as we do not need to read from database or wal - return Ok((page.clone(), Completion::new_dummy())); + return Ok((page.clone(), None)); } let (page, c) = self.read_page_no_cache(page_idx, None, false)?; self.cache_insert(page_idx, page.clone(), &mut page_cache)?; - Ok((page, c)) + Ok((page, Some(c))) } fn begin_read_disk_page( @@ -1624,7 +1643,9 @@ impl Pager { // Add as leaf to current trunk let (page, c) = self.read_page(trunk_page_id as usize)?; trunk_page.replace(page); - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } } let trunk_page = trunk_page.as_ref().unwrap(); turso_assert!(trunk_page.is_loaded(), "trunk_page should be loaded"); @@ -1821,7 +1842,9 @@ impl Pager { trunk_page, current_db_size: new_db_size, }; - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } } AllocatePageState::SearchAvailableFreeListLeaf { trunk_page, @@ -1857,7 +1880,10 @@ impl Pager { leaf_page, number_of_freelist_leaves, }; - io_yield_one!(c); + if let Some(c) = c { + io_yield_one!(c); + } + continue; } // No freelist leaves on this trunk page.