diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 0777bc951..92e7ddd7d 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -976,6 +976,9 @@ impl BTreeCursor { /// We don't include the rowid in the comparison and that's why the last value from the record is not included. fn do_seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result>> { let cell_iter_dir = op.iteration_direction(); + if let SeekKey::TableRowId(rowid) = key { + return self.tablebtree_seek(rowid, op, cell_iter_dir); + } return_if_io!(self.move_to(key.clone(), op.clone())); { @@ -1280,6 +1283,159 @@ impl BTreeCursor { } } + /// Specialized version of do_seek() for table btrees that uses binary search instead + /// of iterating cells in order. + fn tablebtree_seek( + &mut self, + rowid: u64, + seek_op: SeekOp, + iter_dir: IterationDirection, + ) -> Result>> { + assert!(self.mv_cursor.is_none()); + self.move_to_root(); + return_if_io!(self.tablebtree_move_to_binsearch(rowid, seek_op, iter_dir)); + let page = self.stack.top(); + return_if_locked!(page); + let contents = page.get().contents.as_ref().unwrap(); + assert!( + contents.is_leaf(), + "tablebtree_seek_binsearch() called on non-leaf page" + ); + + let cell_count = contents.cell_count(); + let mut min: isize = 0; + let mut max: isize = cell_count as isize - 1; + + // If iter dir is forwards, we want the first cell that matches; + // If iter dir is backwards, we want the last cell that matches. + let mut nearest_matching_cell = None; + loop { + if min > max { + let Some(nearest_matching_cell) = nearest_matching_cell else { + return Ok(CursorResult::Ok(None)); + }; + self.stack.set_cell_index(nearest_matching_cell as i32); + let matching_cell = contents.cell_get( + nearest_matching_cell, + payload_overflow_threshold_max( + contents.page_type(), + self.usable_space() as u16, + ), + payload_overflow_threshold_min( + contents.page_type(), + self.usable_space() as u16, + ), + self.usable_space(), + )?; + let BTreeCell::TableLeafCell(TableLeafCell { + _rowid: cell_rowid, + _payload, + first_overflow_page, + payload_size, + .. + }) = matching_cell + else { + unreachable!("unexpected cell type: {:?}", matching_cell); + }; + + return_if_io!(self.read_record_w_possible_overflow( + _payload, + first_overflow_page, + payload_size + )); + self.stack.next_cell_in_direction(iter_dir); + + return Ok(CursorResult::Ok(Some(cell_rowid))); + } + + let cur_cell_idx = (min + max) / 2; + self.stack.set_cell_index(cur_cell_idx as i32); + let cur_cell = contents.cell_get( + cur_cell_idx as usize, + payload_overflow_threshold_max(contents.page_type(), self.usable_space() as u16), + payload_overflow_threshold_min(contents.page_type(), self.usable_space() as u16), + self.usable_space(), + )?; + + let BTreeCell::TableLeafCell(TableLeafCell { + _rowid: cell_rowid, + _payload, + first_overflow_page, + payload_size, + .. + }) = cur_cell + else { + unreachable!("unexpected cell type: {:?}", cur_cell); + }; + + let cmp = cell_rowid.cmp(&rowid); + + let found = match seek_op { + SeekOp::GT => cmp.is_gt(), + SeekOp::GE => cmp.is_ge(), + SeekOp::EQ => cmp.is_eq(), + SeekOp::LE => cmp.is_le(), + SeekOp::LT => cmp.is_lt(), + }; + + // rowids are unique, so we can return the rowid immediately + if found && SeekOp::EQ == seek_op { + return_if_io!(self.read_record_w_possible_overflow( + _payload, + first_overflow_page, + payload_size + )); + self.stack.next_cell_in_direction(iter_dir); + return Ok(CursorResult::Ok(Some(cell_rowid))); + } + + if found { + match iter_dir { + IterationDirection::Forwards => { + nearest_matching_cell = Some(cur_cell_idx as usize); + max = cur_cell_idx - 1; + } + IterationDirection::Backwards => { + nearest_matching_cell = Some(cur_cell_idx as usize); + min = cur_cell_idx + 1; + } + } + } else { + if cmp.is_gt() { + max = cur_cell_idx - 1; + } else if cmp.is_lt() { + min = cur_cell_idx + 1; + } else { + match iter_dir { + IterationDirection::Forwards => { + min = cur_cell_idx + 1; + } + IterationDirection::Backwards => { + max = cur_cell_idx - 1; + } + } + } + } + } + } + + fn read_record_w_possible_overflow( + &mut self, + payload: &'static [u8], + next_page: Option, + payload_size: u64, + ) -> Result> { + if let Some(next_page) = next_page { + self.process_overflow_read(payload, next_page, payload_size) + } else { + crate::storage::sqlite3_ondisk::read_record( + payload, + self.get_immutable_record_or_create().as_mut().unwrap(), + )?; + Ok(CursorResult::Ok(())) + } + } + pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result> { assert!(self.mv_cursor.is_none()); tracing::trace!("move_to(key={:?} cmp={:?})", key, cmp); @@ -1310,10 +1466,6 @@ impl BTreeCursor { let iter_dir = cmp.iteration_direction(); - if let SeekKey::TableRowId(rowid) = key { - return self.tablebtree_move_to_binsearch(rowid, cmp, iter_dir); - } - loop { let page = self.stack.top(); return_if_locked!(page);