Merge 'More State Machines in preparation for tracking IO Completions' from Pedro Muniz

More changes. I want to avoid big PRs, so doing these changes in small
increments. I think in like 2 PRs after this one, I will be able make
the change effectively.

Closes #2400
This commit is contained in:
Jussi Saurio
2025-08-05 00:00:09 +03:00
committed by GitHub
4 changed files with 261 additions and 139 deletions

View File

@@ -10,7 +10,10 @@ use crate::{
TableInteriorCell, TableLeafCell, CELL_PTR_SIZE_BYTES, INTERIOR_PAGE_HEADER_SIZE_BYTES,
LEAF_PAGE_HEADER_SIZE_BYTES, LEFT_CHILD_PTR_SIZE_BYTES,
},
state_machines::{EmptyTableState, MoveToRightState, SeekToLastState},
state_machines::{
AdvanceState, CountState, EmptyTableState, MoveToRightState, RewindState,
SeekToLastState,
},
},
translate::plan::IterationDirection,
turso_assert,
@@ -449,9 +452,10 @@ impl Debug for CursorState {
}
}
#[derive(Debug, Clone)]
enum OverflowState {
Start,
ProcessPage { next_page: u32 },
ProcessPage { next_page: PageRef },
Done,
}
@@ -536,7 +540,7 @@ pub struct BTreeCursor {
state: CursorState,
/// Information maintained while freeing overflow pages. Maintained separately from cursor state since
/// any method could require freeing overflow pages
overflow_state: Option<OverflowState>,
overflow_state: OverflowState,
/// Page stack used to traverse the btree.
/// Each cursor has a stack because each cursor traverses the btree independently.
stack: PageStack,
@@ -576,7 +580,14 @@ pub struct BTreeCursor {
/// State machine for [BTreeCursor::move_to_rightmost] and, optionally, the id of the rightmost page in the btree.
/// If we know the rightmost page id and are already on that page, we can skip a seek.
move_to_right_state: (MoveToRightState, Option<usize>),
/// State machine for [BTreeCursor::seek_to_last]
seek_to_last_state: SeekToLastState,
/// State machine for [BTreeCursor::rewind]
rewind_state: RewindState,
/// State machine for [BTreeCursor::next] and [BTreeCursor::prev]
advance_state: AdvanceState,
/// State machine for [BTreeCursor::count]
count_state: CountState,
}
/// We store the cell index and cell count for each page in the stack.
@@ -616,7 +627,7 @@ impl BTreeCursor {
null_flag: false,
going_upwards: false,
state: CursorState::None,
overflow_state: None,
overflow_state: OverflowState::Start,
stack: PageStack {
current_page: Cell::new(-1),
node_states: RefCell::new([BTreeNodeState::default(); BTCURSOR_MAX_DEPTH + 1]),
@@ -634,6 +645,9 @@ impl BTreeCursor {
is_empty_table_state: RefCell::new(EmptyTableState::Start),
move_to_right_state: (MoveToRightState::Start, None),
seek_to_last_state: SeekToLastState::Start,
rewind_state: RewindState::Start,
advance_state: AdvanceState::Start,
count_state: CountState::Start,
}
}
@@ -4241,22 +4255,27 @@ impl BTreeCursor {
#[instrument(skip_all, level = Level::DEBUG)]
pub fn rewind(&mut self) -> Result<IOResult<()>> {
if let Some(mv_cursor) = &self.mv_cursor {
{
let mut mv_cursor = mv_cursor.borrow_mut();
mv_cursor.rewind();
loop {
match self.rewind_state {
RewindState::Start => {
self.rewind_state = RewindState::NextRecord;
if let Some(mv_cursor) = &self.mv_cursor {
let mut mv_cursor = mv_cursor.borrow_mut();
mv_cursor.rewind();
} else {
let _c = self.move_to_root()?;
return Ok(IOResult::IO);
}
}
RewindState::NextRecord => {
let cursor_has_record = return_if_io!(self.get_next_record());
self.invalidate_record();
self.has_record.replace(cursor_has_record);
self.rewind_state = RewindState::Start;
return Ok(IOResult::Done(()));
}
}
let cursor_has_record = return_if_io!(self.get_next_record());
self.invalidate_record();
self.has_record.replace(cursor_has_record);
} else {
let _c = self.move_to_root()?;
let cursor_has_record = return_if_io!(self.get_next_record());
self.invalidate_record();
self.has_record.replace(cursor_has_record);
}
Ok(IOResult::Done(()))
}
#[instrument(skip_all, level = Level::DEBUG)]
@@ -4270,11 +4289,21 @@ impl BTreeCursor {
#[instrument(skip_all, level = Level::DEBUG)]
pub fn next(&mut self) -> Result<IOResult<bool>> {
return_if_io!(self.restore_context());
let cursor_has_record = return_if_io!(self.get_next_record());
self.has_record.replace(cursor_has_record);
self.invalidate_record();
Ok(IOResult::Done(cursor_has_record))
loop {
match self.advance_state {
AdvanceState::Start => {
return_if_io!(self.restore_context());
self.advance_state = AdvanceState::Advance;
}
AdvanceState::Advance => {
let cursor_has_record = return_if_io!(self.get_next_record());
self.has_record.replace(cursor_has_record);
self.invalidate_record();
self.advance_state = AdvanceState::Start;
return Ok(IOResult::Done(cursor_has_record));
}
}
}
}
fn invalidate_record(&mut self) {
@@ -4288,11 +4317,21 @@ impl BTreeCursor {
#[instrument(skip_all, level = Level::DEBUG)]
pub fn prev(&mut self) -> Result<IOResult<bool>> {
assert!(self.mv_cursor.is_none());
return_if_io!(self.restore_context());
let cursor_has_record = return_if_io!(self.get_prev_record());
self.has_record.replace(cursor_has_record);
self.invalidate_record();
Ok(IOResult::Done(cursor_has_record))
loop {
match self.advance_state {
AdvanceState::Start => {
return_if_io!(self.restore_context());
self.advance_state = AdvanceState::Advance;
}
AdvanceState::Advance => {
let cursor_has_record = return_if_io!(self.get_prev_record());
self.has_record.replace(cursor_has_record);
self.invalidate_record();
self.advance_state = AdvanceState::Start;
return Ok(IOResult::Done(cursor_has_record));
}
}
}
}
#[instrument(skip(self), level = Level::DEBUG)]
@@ -4970,9 +5009,7 @@ impl BTreeCursor {
#[instrument(skip_all, level = Level::DEBUG)]
fn clear_overflow_pages(&mut self, cell: &BTreeCell) -> Result<IOResult<()>> {
loop {
let state = self.overflow_state.take().unwrap_or(OverflowState::Start);
match state {
match self.overflow_state.clone() {
OverflowState::Start => {
let first_overflow_page = match cell {
BTreeCell::TableLeafCell(leaf_cell) => leaf_cell.first_overflow_page,
@@ -4983,42 +5020,63 @@ impl BTreeCursor {
BTreeCell::TableInteriorCell(_) => return Ok(IOResult::Done(())), // No overflow pages
};
if let Some(page) = first_overflow_page {
self.overflow_state = Some(OverflowState::ProcessPage { next_page: page });
continue;
if let Some(next_page) = first_overflow_page {
if next_page < 2
|| next_page
> self
.pager
.io
.block(|| {
self.pager.with_header(|header| header.database_size)
})?
.get()
{
self.overflow_state = OverflowState::Start;
return Err(LimboError::Corrupt("Invalid overflow page number".into()));
}
let (page, _c) = self.read_page(next_page as usize)?;
self.overflow_state = OverflowState::ProcessPage {
next_page: page.get(),
};
return Ok(IOResult::IO);
} else {
self.overflow_state = Some(OverflowState::Done);
self.overflow_state = OverflowState::Done;
}
}
OverflowState::ProcessPage { next_page } => {
if next_page < 2
|| next_page as usize
> self
.pager
.io
.block(|| self.pager.with_header(|header| header.database_size))?
.get() as usize
{
self.overflow_state = None;
return Err(LimboError::Corrupt("Invalid overflow page number".into()));
}
let (page, _c) = self.read_page(next_page as usize)?;
return_if_locked_maybe_load!(self.pager, page);
OverflowState::ProcessPage { next_page: page } => {
return_if_locked!(page);
let page = page.get();
let contents = page.get().contents.as_ref().unwrap();
let contents = page.get_contents();
let next = contents.read_u32(0);
let next_page_id = page.get().id;
return_if_io!(self.pager.free_page(Some(page), next_page as usize));
return_if_io!(self.pager.free_page(Some(page), next_page_id));
if next != 0 {
self.overflow_state = Some(OverflowState::ProcessPage { next_page: next });
if next < 2
|| next
> self
.pager
.io
.block(|| {
self.pager.with_header(|header| header.database_size)
})?
.get()
{
self.overflow_state = OverflowState::Start;
return Err(LimboError::Corrupt("Invalid overflow page number".into()));
}
let (page, _c) = self.read_page(next as usize)?;
self.overflow_state = OverflowState::ProcessPage {
next_page: page.get(),
};
return Ok(IOResult::IO);
} else {
self.overflow_state = Some(OverflowState::Done);
self.overflow_state = OverflowState::Done;
}
}
OverflowState::Done => {
self.overflow_state = None;
self.overflow_state = OverflowState::Start;
return Ok(IOResult::Done(()));
}
};
@@ -5047,6 +5105,7 @@ impl BTreeCursor {
self.state = CursorState::Destroy(DestroyInfo {
state: DestroyState::Start,
});
return Ok(IOResult::IO);
}
loop {
@@ -5078,8 +5137,8 @@ impl BTreeCursor {
}
DestroyState::ProcessPage => {
let page = self.stack.top();
self.stack.advance();
assert!(page.get().is_loaded()); // page should be loaded at this time
self.stack.advance();
let page = page.get();
let contents = page.get().contents.as_ref().unwrap();
let cell_idx = self.stack.current_cell_index();
@@ -5105,6 +5164,7 @@ impl BTreeCursor {
"unable to get a mut reference to destroy state in cursor",
);
destroy_info.state = DestroyState::LoadPage;
return Ok(IOResult::IO);
} else {
let destroy_info = self.state.mut_destroy_info().expect(
"unable to get a mut reference to destroy state in cursor",
@@ -5162,7 +5222,7 @@ impl BTreeCursor {
"unable to get a mut reference to destroy state in cursor",
);
destroy_info.state = DestroyState::LoadPage;
continue;
return Ok(IOResult::IO);
}
},
}
@@ -5179,7 +5239,7 @@ impl BTreeCursor {
"unable to get a mut reference to destroy state in cursor",
);
destroy_info.state = DestroyState::LoadPage;
continue;
return Ok(IOResult::IO);
}
// For any leaf cell, advance the index now that overflow pages have been cleared
BTreeCell::TableLeafCell(_) | BTreeCell::IndexLeafCell(_) => {
@@ -5352,10 +5412,6 @@ impl BTreeCursor {
/// Only supposed to be used in the context of a simple Count Select Statement
#[instrument(skip(self), level = Level::DEBUG)]
pub fn count(&mut self) -> Result<IOResult<usize>> {
if self.count == 0 {
let _c = self.move_to_root()?;
}
if let Some(_mv_cursor) = &self.mv_cursor {
todo!("Implement count for mvcc");
}
@@ -5364,78 +5420,93 @@ impl BTreeCursor {
let mut mem_page;
let mut contents;
loop {
mem_page_rc = self.stack.top();
return_if_locked_maybe_load!(self.pager, mem_page_rc);
mem_page = mem_page_rc.get();
contents = mem_page.get().contents.as_ref().unwrap();
/* If this is a leaf page or the tree is not an int-key tree, then
** this page contains countable entries. Increment the entry counter
** accordingly.
*/
if !matches!(contents.page_type(), PageType::TableInterior) {
self.count += contents.cell_count();
let state = self.count_state;
match state {
CountState::Start => {
let _c = self.move_to_root()?;
self.count_state = CountState::Loop;
return Ok(IOResult::IO);
}
CountState::Loop => {
mem_page_rc = self.stack.top();
mem_page = mem_page_rc.get();
return_if_locked_maybe_load!(self.pager, mem_page_rc);
turso_assert!(mem_page.is_loaded(), "page should be loaded");
contents = mem_page.get().contents.as_ref().unwrap();
self.stack.advance();
let cell_idx = self.stack.current_cell_index() as usize;
// Second condition is necessary in case we return if the page is locked in the loop below
if contents.is_leaf() || cell_idx > contents.cell_count() {
loop {
if !self.stack.has_parent() {
// All pages of the b-tree have been visited. Return successfully
let _c = self.move_to_root()?;
return Ok(IOResult::Done(self.count));
}
// Move to parent
self.stack.pop();
mem_page_rc = self.stack.top();
return_if_locked_maybe_load!(self.pager, mem_page_rc);
mem_page = mem_page_rc.get();
contents = mem_page.get().contents.as_ref().unwrap();
let cell_idx = self.stack.current_cell_index() as usize;
if cell_idx <= contents.cell_count() {
break;
}
/* If this is a leaf page or the tree is not an int-key tree, then
** this page contains countable entries. Increment the entry counter
** accordingly.
*/
if !matches!(contents.page_type(), PageType::TableInterior) {
self.count += contents.cell_count();
}
}
let cell_idx = self.stack.current_cell_index() as usize;
assert!(cell_idx <= contents.cell_count(),);
assert!(!contents.is_leaf());
if cell_idx == contents.cell_count() {
// Move to right child
// should be safe as contents is not a leaf page
let right_most_pointer = contents.rightmost_pointer().unwrap();
self.stack.advance();
let (mem_page, _c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
} else {
// Move to child left page
let cell = contents.cell_get(cell_idx, self.usable_space())?;
let cell_idx = self.stack.current_cell_index() as usize;
match cell {
BTreeCell::TableInteriorCell(TableInteriorCell {
left_child_page, ..
})
| BTreeCell::IndexInteriorCell(IndexInteriorCell {
left_child_page, ..
}) => {
self.stack.advance();
let (mem_page, _c) = self.read_page(left_child_page as usize)?;
self.stack.push(mem_page);
// Second condition is necessary in case we return if the page is locked in the loop below
if contents.is_leaf() || cell_idx > contents.cell_count() {
loop {
if !self.stack.has_parent() {
// All pages of the b-tree have been visited. Return successfully
let _c = self.move_to_root()?;
self.count_state = CountState::Finish;
return Ok(IOResult::IO);
}
// Move to parent
self.stack.pop();
mem_page_rc = self.stack.top();
mem_page = mem_page_rc.get();
return_if_locked_maybe_load!(self.pager, mem_page_rc);
turso_assert!(mem_page.is_loaded(), "page should be loaded");
contents = mem_page.get().contents.as_ref().unwrap();
let cell_idx = self.stack.current_cell_index() as usize;
if cell_idx <= contents.cell_count() {
break;
}
}
_ => unreachable!(),
}
let cell_idx = self.stack.current_cell_index() as usize;
assert!(cell_idx <= contents.cell_count(),);
assert!(!contents.is_leaf());
if cell_idx == contents.cell_count() {
// Move to right child
// should be safe as contents is not a leaf page
let right_most_pointer = contents.rightmost_pointer().unwrap();
self.stack.advance();
let (mem_page, _c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
return Ok(IOResult::IO);
} else {
// Move to child left page
let cell = contents.cell_get(cell_idx, self.usable_space())?;
match cell {
BTreeCell::TableInteriorCell(TableInteriorCell {
left_child_page, ..
})
| BTreeCell::IndexInteriorCell(IndexInteriorCell {
left_child_page, ..
}) => {
self.stack.advance();
let (mem_page, _c) = self.read_page(left_child_page as usize)?;
self.stack.push(mem_page);
return Ok(IOResult::IO);
}
_ => unreachable!(),
}
}
}
CountState::Finish => {
return Ok(IOResult::Done(self.count));
}
}
}
@@ -5568,6 +5639,7 @@ pub struct IntegrityCheckState {
pub current_page: usize,
page_stack: Vec<IntegrityCheckPageEntry>,
first_leaf_level: Option<usize>,
page: Option<PageRef>,
}
impl IntegrityCheckState {
@@ -5580,6 +5652,7 @@ impl IntegrityCheckState {
max_intkey: i64::MAX,
}],
first_leaf_level: None,
page: None,
}
}
}
@@ -5615,11 +5688,17 @@ pub fn integrity_check(
else {
return Ok(IOResult::Done(()));
};
let (page, _c) = btree_read_page(pager, page_idx)?;
return_if_locked_maybe_load!(pager, page);
let page = match state.page.take() {
Some(page) => page,
None => {
let (page, _c) = btree_read_page(pager, page_idx)?;
state.page = Some(page.get());
return Ok(IOResult::IO);
}
};
return_if_locked!(page);
state.page_stack.pop();
let page = page.get();
let contents = page.get_contents();
let usable_space = pager.usable_space() as u16;
let mut coverage_checker = CoverageChecker::new(page.get().id);

View File

@@ -17,3 +17,22 @@ pub enum SeekToLastState {
Start,
IsEmpty,
}
#[derive(Debug, Clone, Copy)]
pub enum RewindState {
Start,
NextRecord,
}
#[derive(Debug, Clone, Copy)]
pub enum AdvanceState {
Start,
Advance,
}
#[derive(Debug, Clone, Copy)]
pub enum CountState {
Start,
Loop,
Finish,
}

View File

@@ -6388,10 +6388,21 @@ pub fn op_noop(
Ok(InsnFunctionStepResult::Step)
}
#[derive(Default)]
pub enum OpOpenEphemeralState {
#[default]
Start,
StartingTxn { pager: Rc<Pager> },
CreateBtree { pager: Rc<Pager> },
StartingTxn {
pager: Rc<Pager>,
},
CreateBtree {
pager: Rc<Pager>,
},
// clippy complains this variant is too big when compared to the rest of the variants
// so it says we need to box it here
Rewind {
cursor: Box<BTreeCursor>,
},
}
pub fn op_open_ephemeral(
program: &Program,
@@ -6408,7 +6419,7 @@ pub fn op_open_ephemeral(
Insn::OpenAutoindex { cursor_id } => (*cursor_id, false),
_ => unreachable!("unexpected Insn {:?}", insn),
};
match &state.op_open_ephemeral_state {
match &mut state.op_open_ephemeral_state {
OpOpenEphemeralState::Start => {
tracing::trace!("Start");
let conn = program.connection.clone();
@@ -6491,7 +6502,7 @@ pub fn op_open_ephemeral(
_ => unreachable!("This should not have happened"),
};
let mut cursor = if let CursorType::BTreeIndex(index) = cursor_type {
let cursor = if let CursorType::BTreeIndex(index) = cursor_type {
BTreeCursor::new_index(
mv_cursor,
pager.clone(),
@@ -6502,24 +6513,37 @@ pub fn op_open_ephemeral(
} else {
BTreeCursor::new_table(mv_cursor, pager.clone(), root_page as usize, num_columns)
};
let res = cursor.rewind()?; // Will never return io
state.op_open_ephemeral_state = OpOpenEphemeralState::Rewind {
cursor: Box::new(cursor),
};
}
OpOpenEphemeralState::Rewind { cursor } => {
return_if_io!(cursor.rewind());
let mut cursors: std::cell::RefMut<'_, Vec<Option<Cursor>>> =
state.cursors.borrow_mut();
let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap();
let OpOpenEphemeralState::Rewind { cursor } =
std::mem::take(&mut state.op_open_ephemeral_state)
else {
unreachable!()
};
// Table content is erased if the cursor already exists
match cursor_type {
CursorType::BTreeTable(_) => {
cursors
.get_mut(cursor_id)
.unwrap()
.replace(Cursor::new_btree(cursor));
.replace(Cursor::new_btree(*cursor));
}
CursorType::BTreeIndex(_) => {
cursors
.get_mut(cursor_id)
.unwrap()
.replace(Cursor::new_btree(cursor));
.replace(Cursor::new_btree(*cursor));
}
CursorType::Pseudo(_) => {
panic!("OpenEphemeral on pseudo cursor");

View File

@@ -617,12 +617,12 @@ fn test_bind_parameters_update_rowid_alias_seek_rowid() -> anyhow::Result<()> {
row.get::<&Value>(2).unwrap(),
&Value::Integer(if i == 0 { 4 } else { 11 })
);
i += 1;
}
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
i += 1;
}
let mut ins = conn.prepare("update test set name = ? where id < ? AND age between ? and ?;")?;
ins.bind_at(1.try_into()?, Value::build_text("updated"));
@@ -648,12 +648,12 @@ fn test_bind_parameters_update_rowid_alias_seek_rowid() -> anyhow::Result<()> {
row.get::<&Value>(0).unwrap(),
&Value::build_text(if i == 0 { "updated" } else { "test" }),
);
i += 1;
}
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
i += 1;
}
assert_eq!(ins.parameters().count(), 4);
@@ -692,12 +692,12 @@ fn test_bind_parameters_delete_rowid_alias_seek_out_of_order() -> anyhow::Result
StepResult::Row => {
let row = sel.row().unwrap();
assert_eq!(row.get::<&Value>(0).unwrap(), &Value::build_text("correct"),);
i += 1;
}
StepResult::IO => sel.run_once()?,
StepResult::Done | StepResult::Interrupt => break,
StepResult::Busy => panic!("database busy"),
}
i += 1;
}
assert_eq!(i, 1);
assert_eq!(ins.parameters().count(), 4);