remove state machine for count

This commit is contained in:
pedrocarlo
2025-05-03 23:11:35 -03:00
parent 655ceeca45
commit 342bf51c88

View File

@@ -200,20 +200,6 @@ enum ReadPayloadOverflow {
},
}
struct CountInfo {
state: CountState,
/// Maintain count of the number of records in the btree. Used for the `Count` opcode
count: usize,
}
/// State Machine of Count Operation
#[derive(Debug, Clone, Copy)]
enum CountState {
LoadPage,
MoveParent,
MoveChild,
}
#[derive(Clone, Debug)]
pub enum BTreeKey<'a> {
TableRowId((u64, Option<&'a ImmutableRecord>)),
@@ -303,7 +289,6 @@ enum CursorState {
Write(WriteInfo),
Destroy(DestroyInfo),
Delete(DeleteInfo),
Count(CountInfo),
}
impl CursorState {
@@ -346,20 +331,6 @@ impl CursorState {
_ => None,
}
}
fn count_info(&self) -> Option<&CountInfo> {
match self {
CursorState::Count(x) => Some(x),
_ => None,
}
}
fn mut_count_info(&mut self) -> Option<&mut CountInfo> {
match self {
CursorState::Count(x) => Some(x),
_ => None,
}
}
}
enum OverflowState {
@@ -394,6 +365,8 @@ pub struct BTreeCursor {
reusable_immutable_record: RefCell<Option<ImmutableRecord>>,
empty_record: Cell<bool>,
pub index_key_sort_order: IndexKeySortOrder,
/// Maintain count of the number of records in the btree. Used for the `Count` opcode
count: usize,
}
impl BTreeCursor {
@@ -419,6 +392,7 @@ impl BTreeCursor {
reusable_immutable_record: RefCell::new(None),
empty_record: Cell::new(true),
index_key_sort_order: IndexKeySortOrder::default(),
count: 0,
}
}
@@ -4262,138 +4236,104 @@ impl BTreeCursor {
///
/// Only supposed to be used in the context of a simple Count Select Statement
pub fn count(&mut self) -> Result<CursorResult<usize>> {
if self.count == 0 {
self.move_to_root();
}
if let Some(_mv_cursor) = &self.mv_cursor {
todo!("Implement count for mvcc");
}
if let CursorState::None = &self.state {
self.move_to_root();
self.state = CursorState::Count(CountInfo {
state: CountState::LoadPage,
count: 0,
});
}
let mut mem_page_rc;
let mut mem_page;
let mut contents;
loop {
let count_state = self
.state
.count_info()
.expect("count info should have been set")
.state;
mem_page_rc = self.stack.top();
return_if_locked_maybe_load!(self.pager, mem_page_rc);
mem_page = mem_page_rc.get();
contents = mem_page.contents.as_ref().unwrap();
match count_state {
CountState::LoadPage => {
let mem_page_rc = self.stack.top();
return_if_locked_maybe_load!(self.pager, mem_page_rc);
let mem_page = mem_page_rc.get();
let contents = mem_page.contents.as_ref().unwrap();
let cell_idx = self.stack.current_cell_index() as usize;
/* If this is a leaf page or the tree is not an int-key tree, then
** this page contains countable entries. Increment the entry counter
** accordingly.
*/
// TODO: (pedrocarlo) does not take into account if "tree is not an int-key tree" condition
if contents.is_leaf() {
self.count += contents.cell_count();
}
let count_info = self
.state
.mut_count_info()
.expect("count info should have been set");
/* If this is a leaf page or the tree is not an int-key tree, then
** this page contains countable entries. Increment the entry counter
** accordingly.
*/
// TODO: (pedrocarlo) does not take into account if "tree is not an int-key tree" condition
if contents.is_leaf() {
count_info.count += contents.cell_count();
}
if contents.is_leaf() || cell_idx > contents.cell_count() {
count_info.state = CountState::MoveParent;
} else {
count_info.state = CountState::MoveChild;
}
}
CountState::MoveParent => {
let mem_page_rc = self.stack.top();
assert!(mem_page_rc.is_loaded());
let cell_idx = self.stack.current_cell_index() as usize;
// Second condition is necessary in case we return if the page is locked in the loop below
if contents.is_leaf() || cell_idx > contents.cell_count() {
loop {
if !self.stack.has_parent() {
// All pages of the b-tree have been visited. Return successfully
self.move_to_root();
// Borrow count_info here like this due to borrow checking
let count_info = self
.state
.mut_count_info()
.expect("count info should have been set");
return Ok(CursorResult::Ok(count_info.count));
}
let count_info = self
.state
.mut_count_info()
.expect("count info should have been set");
return Ok(CursorResult::Ok(self.count));
}
// Move to parent
self.going_upwards = true;
self.stack.pop();
count_info.state = CountState::LoadPage;
}
CountState::MoveChild => {
let mem_page_rc = self.stack.top();
assert!(mem_page_rc.is_loaded());
let contents = mem_page_rc.get().contents.as_ref().unwrap();
mem_page_rc = self.stack.top();
return_if_locked_maybe_load!(self.pager, mem_page_rc);
mem_page = mem_page_rc.get();
contents = mem_page.contents.as_ref().unwrap();
let cell_idx = self.stack.current_cell_index() as usize;
assert!(!contents.is_leaf());
assert!(
cell_idx <= contents.cell_count(),
"cell_idx={} cell_count={}",
cell_idx,
contents.cell_count()
);
if !(cell_idx > contents.cell_count()) {
break;
}
}
}
if cell_idx == contents.cell_count() {
// Move to right child
// should be safe as contents is not a leaf page
let right_most_pointer = contents.rightmost_pointer().unwrap();
let cell_idx = self.stack.current_cell_index() as usize;
assert!(cell_idx <= contents.cell_count(),);
assert!(!contents.is_leaf());
if cell_idx == contents.cell_count() {
// Move to right child
// should be safe as contents is not a leaf page
let right_most_pointer = contents.rightmost_pointer().unwrap();
self.stack.advance();
let mem_page = self.pager.read_page(right_most_pointer as usize)?;
self.going_upwards = false;
self.stack.push(mem_page);
} else {
// Move to child left page
let cell = contents.cell_get(
cell_idx,
payload_overflow_threshold_max(
contents.page_type(),
self.usable_space() as u16,
),
payload_overflow_threshold_min(
contents.page_type(),
self.usable_space() as u16,
),
self.usable_space(),
)?;
match cell {
BTreeCell::TableInteriorCell(TableInteriorCell {
_left_child_page: left_child_page,
..
})
| BTreeCell::IndexInteriorCell(IndexInteriorCell {
left_child_page, ..
}) => {
self.stack.advance();
let mem_page = self.pager.read_page(right_most_pointer as usize)?;
let mem_page = self.pager.read_page(left_child_page as usize)?;
self.going_upwards = false;
self.stack.push(mem_page);
} else {
// Move to child left page
let cell = contents.cell_get(
cell_idx,
payload_overflow_threshold_max(
contents.page_type(),
self.usable_space() as u16,
),
payload_overflow_threshold_min(
contents.page_type(),
self.usable_space() as u16,
),
self.usable_space(),
)?;
match cell {
BTreeCell::TableInteriorCell(TableInteriorCell {
_left_child_page: left_child_page,
..
})
| BTreeCell::IndexInteriorCell(IndexInteriorCell {
left_child_page,
..
}) => {
self.stack.advance();
let mem_page = self.pager.read_page(left_child_page as usize)?;
self.going_upwards = false;
self.stack.push(mem_page);
}
_ => unreachable!(),
}
}
let count_info = self
.state
.mut_count_info()
.expect("count info should have been set");
count_info.state = CountState::LoadPage;
_ => unreachable!(),
}
}
}