diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 87f2f5dc0..37955e90e 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -10,7 +10,10 @@ use crate::{ TableInteriorCell, TableLeafCell, CELL_PTR_SIZE_BYTES, INTERIOR_PAGE_HEADER_SIZE_BYTES, LEAF_PAGE_HEADER_SIZE_BYTES, LEFT_CHILD_PTR_SIZE_BYTES, }, - state_machines::{EmptyTableState, MoveToRightState, SeekToLastState}, + state_machines::{ + AdvanceState, CountState, EmptyTableState, MoveToRightState, RewindState, + SeekToLastState, + }, }, translate::plan::IterationDirection, turso_assert, @@ -449,9 +452,10 @@ impl Debug for CursorState { } } +#[derive(Debug, Clone)] enum OverflowState { Start, - ProcessPage { next_page: u32 }, + ProcessPage { next_page: PageRef }, Done, } @@ -536,7 +540,7 @@ pub struct BTreeCursor { state: CursorState, /// Information maintained while freeing overflow pages. Maintained separately from cursor state since /// any method could require freeing overflow pages - overflow_state: Option, + overflow_state: OverflowState, /// Page stack used to traverse the btree. /// Each cursor has a stack because each cursor traverses the btree independently. stack: PageStack, @@ -576,7 +580,14 @@ pub struct BTreeCursor { /// State machine for [BTreeCursor::move_to_rightmost] and, optionally, the id of the rightmost page in the btree. /// If we know the rightmost page id and are already on that page, we can skip a seek. move_to_right_state: (MoveToRightState, Option), + /// State machine for [BTreeCursor::seek_to_last] seek_to_last_state: SeekToLastState, + /// State machine for [BTreeCursor::rewind] + rewind_state: RewindState, + /// State machine for [BTreeCursor::next] and [BTreeCursor::prev] + advance_state: AdvanceState, + /// State machine for [BTreeCursor::count] + count_state: CountState, } /// We store the cell index and cell count for each page in the stack. @@ -616,7 +627,7 @@ impl BTreeCursor { null_flag: false, going_upwards: false, state: CursorState::None, - overflow_state: None, + overflow_state: OverflowState::Start, stack: PageStack { current_page: Cell::new(-1), node_states: RefCell::new([BTreeNodeState::default(); BTCURSOR_MAX_DEPTH + 1]), @@ -634,6 +645,9 @@ impl BTreeCursor { is_empty_table_state: RefCell::new(EmptyTableState::Start), move_to_right_state: (MoveToRightState::Start, None), seek_to_last_state: SeekToLastState::Start, + rewind_state: RewindState::Start, + advance_state: AdvanceState::Start, + count_state: CountState::Start, } } @@ -4241,22 +4255,27 @@ impl BTreeCursor { #[instrument(skip_all, level = Level::DEBUG)] pub fn rewind(&mut self) -> Result> { - if let Some(mv_cursor) = &self.mv_cursor { - { - let mut mv_cursor = mv_cursor.borrow_mut(); - mv_cursor.rewind(); + loop { + match self.rewind_state { + RewindState::Start => { + self.rewind_state = RewindState::NextRecord; + if let Some(mv_cursor) = &self.mv_cursor { + let mut mv_cursor = mv_cursor.borrow_mut(); + mv_cursor.rewind(); + } else { + let _c = self.move_to_root()?; + return Ok(IOResult::IO); + } + } + RewindState::NextRecord => { + let cursor_has_record = return_if_io!(self.get_next_record()); + self.invalidate_record(); + self.has_record.replace(cursor_has_record); + self.rewind_state = RewindState::Start; + return Ok(IOResult::Done(())); + } } - let cursor_has_record = return_if_io!(self.get_next_record()); - self.invalidate_record(); - self.has_record.replace(cursor_has_record); - } else { - let _c = self.move_to_root()?; - - let cursor_has_record = return_if_io!(self.get_next_record()); - self.invalidate_record(); - self.has_record.replace(cursor_has_record); } - Ok(IOResult::Done(())) } #[instrument(skip_all, level = Level::DEBUG)] @@ -4270,11 +4289,21 @@ impl BTreeCursor { #[instrument(skip_all, level = Level::DEBUG)] pub fn next(&mut self) -> Result> { - return_if_io!(self.restore_context()); - let cursor_has_record = return_if_io!(self.get_next_record()); - self.has_record.replace(cursor_has_record); - self.invalidate_record(); - Ok(IOResult::Done(cursor_has_record)) + loop { + match self.advance_state { + AdvanceState::Start => { + return_if_io!(self.restore_context()); + self.advance_state = AdvanceState::Advance; + } + AdvanceState::Advance => { + let cursor_has_record = return_if_io!(self.get_next_record()); + self.has_record.replace(cursor_has_record); + self.invalidate_record(); + self.advance_state = AdvanceState::Start; + return Ok(IOResult::Done(cursor_has_record)); + } + } + } } fn invalidate_record(&mut self) { @@ -4288,11 +4317,21 @@ impl BTreeCursor { #[instrument(skip_all, level = Level::DEBUG)] pub fn prev(&mut self) -> Result> { assert!(self.mv_cursor.is_none()); - return_if_io!(self.restore_context()); - let cursor_has_record = return_if_io!(self.get_prev_record()); - self.has_record.replace(cursor_has_record); - self.invalidate_record(); - Ok(IOResult::Done(cursor_has_record)) + loop { + match self.advance_state { + AdvanceState::Start => { + return_if_io!(self.restore_context()); + self.advance_state = AdvanceState::Advance; + } + AdvanceState::Advance => { + let cursor_has_record = return_if_io!(self.get_prev_record()); + self.has_record.replace(cursor_has_record); + self.invalidate_record(); + self.advance_state = AdvanceState::Start; + return Ok(IOResult::Done(cursor_has_record)); + } + } + } } #[instrument(skip(self), level = Level::DEBUG)] @@ -4970,9 +5009,7 @@ impl BTreeCursor { #[instrument(skip_all, level = Level::DEBUG)] fn clear_overflow_pages(&mut self, cell: &BTreeCell) -> Result> { loop { - let state = self.overflow_state.take().unwrap_or(OverflowState::Start); - - match state { + match self.overflow_state.clone() { OverflowState::Start => { let first_overflow_page = match cell { BTreeCell::TableLeafCell(leaf_cell) => leaf_cell.first_overflow_page, @@ -4983,42 +5020,63 @@ impl BTreeCursor { BTreeCell::TableInteriorCell(_) => return Ok(IOResult::Done(())), // No overflow pages }; - if let Some(page) = first_overflow_page { - self.overflow_state = Some(OverflowState::ProcessPage { next_page: page }); - continue; + if let Some(next_page) = first_overflow_page { + if next_page < 2 + || next_page + > self + .pager + .io + .block(|| { + self.pager.with_header(|header| header.database_size) + })? + .get() + { + self.overflow_state = OverflowState::Start; + return Err(LimboError::Corrupt("Invalid overflow page number".into())); + } + let (page, _c) = self.read_page(next_page as usize)?; + self.overflow_state = OverflowState::ProcessPage { + next_page: page.get(), + }; + return Ok(IOResult::IO); } else { - self.overflow_state = Some(OverflowState::Done); + self.overflow_state = OverflowState::Done; } } - OverflowState::ProcessPage { next_page } => { - if next_page < 2 - || next_page as usize - > self - .pager - .io - .block(|| self.pager.with_header(|header| header.database_size))? - .get() as usize - { - self.overflow_state = None; - return Err(LimboError::Corrupt("Invalid overflow page number".into())); - } - let (page, _c) = self.read_page(next_page as usize)?; - return_if_locked_maybe_load!(self.pager, page); + OverflowState::ProcessPage { next_page: page } => { + return_if_locked!(page); - let page = page.get(); - let contents = page.get().contents.as_ref().unwrap(); + let contents = page.get_contents(); let next = contents.read_u32(0); + let next_page_id = page.get().id; - return_if_io!(self.pager.free_page(Some(page), next_page as usize)); + return_if_io!(self.pager.free_page(Some(page), next_page_id)); if next != 0 { - self.overflow_state = Some(OverflowState::ProcessPage { next_page: next }); + if next < 2 + || next + > self + .pager + .io + .block(|| { + self.pager.with_header(|header| header.database_size) + })? + .get() + { + self.overflow_state = OverflowState::Start; + return Err(LimboError::Corrupt("Invalid overflow page number".into())); + } + let (page, _c) = self.read_page(next as usize)?; + self.overflow_state = OverflowState::ProcessPage { + next_page: page.get(), + }; + return Ok(IOResult::IO); } else { - self.overflow_state = Some(OverflowState::Done); + self.overflow_state = OverflowState::Done; } } OverflowState::Done => { - self.overflow_state = None; + self.overflow_state = OverflowState::Start; return Ok(IOResult::Done(())); } }; @@ -5047,6 +5105,7 @@ impl BTreeCursor { self.state = CursorState::Destroy(DestroyInfo { state: DestroyState::Start, }); + return Ok(IOResult::IO); } loop { @@ -5078,8 +5137,8 @@ impl BTreeCursor { } DestroyState::ProcessPage => { let page = self.stack.top(); - self.stack.advance(); assert!(page.get().is_loaded()); // page should be loaded at this time + self.stack.advance(); let page = page.get(); let contents = page.get().contents.as_ref().unwrap(); let cell_idx = self.stack.current_cell_index(); @@ -5105,6 +5164,7 @@ impl BTreeCursor { "unable to get a mut reference to destroy state in cursor", ); destroy_info.state = DestroyState::LoadPage; + return Ok(IOResult::IO); } else { let destroy_info = self.state.mut_destroy_info().expect( "unable to get a mut reference to destroy state in cursor", @@ -5162,7 +5222,7 @@ impl BTreeCursor { "unable to get a mut reference to destroy state in cursor", ); destroy_info.state = DestroyState::LoadPage; - continue; + return Ok(IOResult::IO); } }, } @@ -5179,7 +5239,7 @@ impl BTreeCursor { "unable to get a mut reference to destroy state in cursor", ); destroy_info.state = DestroyState::LoadPage; - continue; + return Ok(IOResult::IO); } // For any leaf cell, advance the index now that overflow pages have been cleared BTreeCell::TableLeafCell(_) | BTreeCell::IndexLeafCell(_) => { @@ -5352,10 +5412,6 @@ impl BTreeCursor { /// Only supposed to be used in the context of a simple Count Select Statement #[instrument(skip(self), level = Level::DEBUG)] pub fn count(&mut self) -> Result> { - if self.count == 0 { - let _c = self.move_to_root()?; - } - if let Some(_mv_cursor) = &self.mv_cursor { todo!("Implement count for mvcc"); } @@ -5364,78 +5420,93 @@ impl BTreeCursor { let mut mem_page; let mut contents; - loop { - mem_page_rc = self.stack.top(); - return_if_locked_maybe_load!(self.pager, mem_page_rc); - mem_page = mem_page_rc.get(); - contents = mem_page.get().contents.as_ref().unwrap(); - - /* If this is a leaf page or the tree is not an int-key tree, then - ** this page contains countable entries. Increment the entry counter - ** accordingly. - */ - if !matches!(contents.page_type(), PageType::TableInterior) { - self.count += contents.cell_count(); + let state = self.count_state; + match state { + CountState::Start => { + let _c = self.move_to_root()?; + self.count_state = CountState::Loop; + return Ok(IOResult::IO); } + CountState::Loop => { + mem_page_rc = self.stack.top(); + mem_page = mem_page_rc.get(); + return_if_locked_maybe_load!(self.pager, mem_page_rc); + turso_assert!(mem_page.is_loaded(), "page should be loaded"); + contents = mem_page.get().contents.as_ref().unwrap(); - self.stack.advance(); - let cell_idx = self.stack.current_cell_index() as usize; - - // Second condition is necessary in case we return if the page is locked in the loop below - if contents.is_leaf() || cell_idx > contents.cell_count() { - loop { - if !self.stack.has_parent() { - // All pages of the b-tree have been visited. Return successfully - let _c = self.move_to_root()?; - - return Ok(IOResult::Done(self.count)); - } - - // Move to parent - self.stack.pop(); - - mem_page_rc = self.stack.top(); - return_if_locked_maybe_load!(self.pager, mem_page_rc); - mem_page = mem_page_rc.get(); - contents = mem_page.get().contents.as_ref().unwrap(); - - let cell_idx = self.stack.current_cell_index() as usize; - - if cell_idx <= contents.cell_count() { - break; - } + /* If this is a leaf page or the tree is not an int-key tree, then + ** this page contains countable entries. Increment the entry counter + ** accordingly. + */ + if !matches!(contents.page_type(), PageType::TableInterior) { + self.count += contents.cell_count(); } - } - let cell_idx = self.stack.current_cell_index() as usize; - - assert!(cell_idx <= contents.cell_count(),); - assert!(!contents.is_leaf()); - - if cell_idx == contents.cell_count() { - // Move to right child - // should be safe as contents is not a leaf page - let right_most_pointer = contents.rightmost_pointer().unwrap(); self.stack.advance(); - let (mem_page, _c) = self.read_page(right_most_pointer as usize)?; - self.stack.push(mem_page); - } else { - // Move to child left page - let cell = contents.cell_get(cell_idx, self.usable_space())?; + let cell_idx = self.stack.current_cell_index() as usize; - match cell { - BTreeCell::TableInteriorCell(TableInteriorCell { - left_child_page, .. - }) - | BTreeCell::IndexInteriorCell(IndexInteriorCell { - left_child_page, .. - }) => { - self.stack.advance(); - let (mem_page, _c) = self.read_page(left_child_page as usize)?; - self.stack.push(mem_page); + // Second condition is necessary in case we return if the page is locked in the loop below + if contents.is_leaf() || cell_idx > contents.cell_count() { + loop { + if !self.stack.has_parent() { + // All pages of the b-tree have been visited. Return successfully + let _c = self.move_to_root()?; + self.count_state = CountState::Finish; + return Ok(IOResult::IO); + } + + // Move to parent + self.stack.pop(); + + mem_page_rc = self.stack.top(); + mem_page = mem_page_rc.get(); + return_if_locked_maybe_load!(self.pager, mem_page_rc); + turso_assert!(mem_page.is_loaded(), "page should be loaded"); + contents = mem_page.get().contents.as_ref().unwrap(); + + let cell_idx = self.stack.current_cell_index() as usize; + + if cell_idx <= contents.cell_count() { + break; + } } - _ => unreachable!(), } + + let cell_idx = self.stack.current_cell_index() as usize; + + assert!(cell_idx <= contents.cell_count(),); + assert!(!contents.is_leaf()); + + if cell_idx == contents.cell_count() { + // Move to right child + // should be safe as contents is not a leaf page + let right_most_pointer = contents.rightmost_pointer().unwrap(); + self.stack.advance(); + let (mem_page, _c) = self.read_page(right_most_pointer as usize)?; + self.stack.push(mem_page); + return Ok(IOResult::IO); + } else { + // Move to child left page + let cell = contents.cell_get(cell_idx, self.usable_space())?; + + match cell { + BTreeCell::TableInteriorCell(TableInteriorCell { + left_child_page, .. + }) + | BTreeCell::IndexInteriorCell(IndexInteriorCell { + left_child_page, .. + }) => { + self.stack.advance(); + let (mem_page, _c) = self.read_page(left_child_page as usize)?; + self.stack.push(mem_page); + return Ok(IOResult::IO); + } + _ => unreachable!(), + } + } + } + CountState::Finish => { + return Ok(IOResult::Done(self.count)); } } } @@ -5568,6 +5639,7 @@ pub struct IntegrityCheckState { pub current_page: usize, page_stack: Vec, first_leaf_level: Option, + page: Option, } impl IntegrityCheckState { @@ -5580,6 +5652,7 @@ impl IntegrityCheckState { max_intkey: i64::MAX, }], first_leaf_level: None, + page: None, } } } @@ -5615,11 +5688,17 @@ pub fn integrity_check( else { return Ok(IOResult::Done(())); }; - let (page, _c) = btree_read_page(pager, page_idx)?; - return_if_locked_maybe_load!(pager, page); + let page = match state.page.take() { + Some(page) => page, + None => { + let (page, _c) = btree_read_page(pager, page_idx)?; + state.page = Some(page.get()); + return Ok(IOResult::IO); + } + }; + return_if_locked!(page); state.page_stack.pop(); - let page = page.get(); let contents = page.get_contents(); let usable_space = pager.usable_space() as u16; let mut coverage_checker = CoverageChecker::new(page.get().id); diff --git a/core/storage/state_machines.rs b/core/storage/state_machines.rs index ffce33498..d2e7190ef 100644 --- a/core/storage/state_machines.rs +++ b/core/storage/state_machines.rs @@ -17,3 +17,22 @@ pub enum SeekToLastState { Start, IsEmpty, } + +#[derive(Debug, Clone, Copy)] +pub enum RewindState { + Start, + NextRecord, +} + +#[derive(Debug, Clone, Copy)] +pub enum AdvanceState { + Start, + Advance, +} + +#[derive(Debug, Clone, Copy)] +pub enum CountState { + Start, + Loop, + Finish, +} diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 56bb30e23..d6cb09e99 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -6388,10 +6388,21 @@ pub fn op_noop( Ok(InsnFunctionStepResult::Step) } +#[derive(Default)] pub enum OpOpenEphemeralState { + #[default] Start, - StartingTxn { pager: Rc }, - CreateBtree { pager: Rc }, + StartingTxn { + pager: Rc, + }, + CreateBtree { + pager: Rc, + }, + // clippy complains this variant is too big when compared to the rest of the variants + // so it says we need to box it here + Rewind { + cursor: Box, + }, } pub fn op_open_ephemeral( program: &Program, @@ -6408,7 +6419,7 @@ pub fn op_open_ephemeral( Insn::OpenAutoindex { cursor_id } => (*cursor_id, false), _ => unreachable!("unexpected Insn {:?}", insn), }; - match &state.op_open_ephemeral_state { + match &mut state.op_open_ephemeral_state { OpOpenEphemeralState::Start => { tracing::trace!("Start"); let conn = program.connection.clone(); @@ -6491,7 +6502,7 @@ pub fn op_open_ephemeral( _ => unreachable!("This should not have happened"), }; - let mut cursor = if let CursorType::BTreeIndex(index) = cursor_type { + let cursor = if let CursorType::BTreeIndex(index) = cursor_type { BTreeCursor::new_index( mv_cursor, pager.clone(), @@ -6502,24 +6513,37 @@ pub fn op_open_ephemeral( } else { BTreeCursor::new_table(mv_cursor, pager.clone(), root_page as usize, num_columns) }; - let res = cursor.rewind()?; // Will never return io + state.op_open_ephemeral_state = OpOpenEphemeralState::Rewind { + cursor: Box::new(cursor), + }; + } + OpOpenEphemeralState::Rewind { cursor } => { + return_if_io!(cursor.rewind()); let mut cursors: std::cell::RefMut<'_, Vec>> = state.cursors.borrow_mut(); + let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap(); + + let OpOpenEphemeralState::Rewind { cursor } = + std::mem::take(&mut state.op_open_ephemeral_state) + else { + unreachable!() + }; + // Table content is erased if the cursor already exists match cursor_type { CursorType::BTreeTable(_) => { cursors .get_mut(cursor_id) .unwrap() - .replace(Cursor::new_btree(cursor)); + .replace(Cursor::new_btree(*cursor)); } CursorType::BTreeIndex(_) => { cursors .get_mut(cursor_id) .unwrap() - .replace(Cursor::new_btree(cursor)); + .replace(Cursor::new_btree(*cursor)); } CursorType::Pseudo(_) => { panic!("OpenEphemeral on pseudo cursor"); diff --git a/tests/integration/query_processing/test_read_path.rs b/tests/integration/query_processing/test_read_path.rs index 193396362..03d8b9ceb 100644 --- a/tests/integration/query_processing/test_read_path.rs +++ b/tests/integration/query_processing/test_read_path.rs @@ -617,12 +617,12 @@ fn test_bind_parameters_update_rowid_alias_seek_rowid() -> anyhow::Result<()> { row.get::<&Value>(2).unwrap(), &Value::Integer(if i == 0 { 4 } else { 11 }) ); + i += 1; } StepResult::IO => sel.run_once()?, StepResult::Done | StepResult::Interrupt => break, StepResult::Busy => panic!("database busy"), } - i += 1; } let mut ins = conn.prepare("update test set name = ? where id < ? AND age between ? and ?;")?; ins.bind_at(1.try_into()?, Value::build_text("updated")); @@ -648,12 +648,12 @@ fn test_bind_parameters_update_rowid_alias_seek_rowid() -> anyhow::Result<()> { row.get::<&Value>(0).unwrap(), &Value::build_text(if i == 0 { "updated" } else { "test" }), ); + i += 1; } StepResult::IO => sel.run_once()?, StepResult::Done | StepResult::Interrupt => break, StepResult::Busy => panic!("database busy"), } - i += 1; } assert_eq!(ins.parameters().count(), 4); @@ -692,12 +692,12 @@ fn test_bind_parameters_delete_rowid_alias_seek_out_of_order() -> anyhow::Result StepResult::Row => { let row = sel.row().unwrap(); assert_eq!(row.get::<&Value>(0).unwrap(), &Value::build_text("correct"),); + i += 1; } StepResult::IO => sel.run_once()?, StepResult::Done | StepResult::Interrupt => break, StepResult::Busy => panic!("database busy"), } - i += 1; } assert_eq!(i, 1); assert_eq!(ins.parameters().count(), 4);