bubble completions in btree

This commit is contained in:
pedrocarlo
2025-08-12 13:35:32 -03:00
committed by Jussi Saurio
parent 82b75330bc
commit f95625a06c

View File

@@ -17,8 +17,8 @@ use crate::{
translate::plan::IterationDirection,
turso_assert,
types::{
find_compare, get_tie_breaker_from_seek_op, IndexInfo, RecordCompare, RecordCursor,
SeekResult,
find_compare, get_tie_breaker_from_seek_op, IOCompletions, IndexInfo, RecordCompare,
RecordCursor, SeekResult,
},
util::IOExt,
Completion, MvCursor,
@@ -692,9 +692,9 @@ impl BTreeCursor {
let mv_cursor = mv_cursor.borrow();
return Ok(IOResult::Done(mv_cursor.is_empty()));
}
let (page, _c) = self.pager.read_page(self.root_page)?;
let (page, c) = self.pager.read_page(self.root_page)?;
*self.is_empty_table_state.borrow_mut() = EmptyTableState::ReadPage { page };
Ok(IOResult::IO)
Ok(IOResult::IO(IOCompletions::Single(c)))
}
EmptyTableState::ReadPage { page } => {
// TODO: Remove this line after we start awaiting for completions
@@ -729,9 +729,9 @@ impl BTreeCursor {
if let Some(rightmost_pointer) = rightmost_pointer {
let past_rightmost_pointer = cell_count as i32 + 1;
self.stack.set_cell_index(past_rightmost_pointer);
let (page, _c) = self.read_page(rightmost_pointer as usize)?;
let (page, c) = self.read_page(rightmost_pointer as usize)?;
self.stack.push_backwards(page);
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
}
if cell_idx >= cell_count as i32 {
@@ -793,9 +793,9 @@ impl BTreeCursor {
self.stack.retreat();
}
let (mem_page, _c) = self.read_page(left_child_page as usize)?;
let (mem_page, c) = self.read_page(left_child_page as usize)?;
self.stack.push_backwards(mem_page);
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
}
@@ -809,14 +809,14 @@ impl BTreeCursor {
payload_size: u64,
) -> Result<IOResult<()>> {
if self.read_overflow_state.borrow().is_none() {
let (page, _c) = self.read_page(start_next_page as usize)?;
let (page, c) = self.read_page(start_next_page as usize)?;
*self.read_overflow_state.borrow_mut() = Some(ReadPayloadOverflow {
payload: payload.to_vec(),
next_page: start_next_page,
remaining_to_read: payload_size as usize - payload.len(),
page,
});
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
let mut read_overflow_state = self.read_overflow_state.borrow_mut();
let ReadPayloadOverflow {
@@ -840,7 +840,7 @@ impl BTreeCursor {
*remaining_to_read -= to_read;
if *remaining_to_read != 0 && next != 0 {
let (new_page, _c) = self.pager.read_page(next as usize).map(|(page, c)| {
let (new_page, c) = self.pager.read_page(next as usize).map(|(page, c)| {
(
Arc::new(BTreePageInner {
page: RefCell::new(page),
@@ -850,7 +850,7 @@ impl BTreeCursor {
})?;
*page_btree = new_page;
*next_page = next;
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
turso_assert!(
*remaining_to_read == 0 && next == 0,
@@ -1013,7 +1013,7 @@ impl BTreeCursor {
let pages_to_skip = offset / overflow_size as u32;
let page_offset = offset % overflow_size as u32;
// Read page
let (page, _c) = self.read_page(first_overflow_page.unwrap() as usize)?;
let (page, c) = self.read_page(first_overflow_page.unwrap() as usize)?;
self.state =
CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages {
@@ -1025,7 +1025,7 @@ impl BTreeCursor {
is_write,
});
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
Ok(IOResult::Done(()))
}
@@ -1073,7 +1073,7 @@ impl BTreeCursor {
}
pages_left_to_skip -= 1;
let (page, _c) = self.read_page(next as usize)?;
let (page, c) = self.read_page(next as usize)?;
self.state = CursorState::ReadWritePayload(
PayloadOverflowWithOffset::SkipOverflowPages {
@@ -1086,7 +1086,7 @@ impl BTreeCursor {
},
);
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage {
mut remaining_to_read,
@@ -1140,7 +1140,7 @@ impl BTreeCursor {
// Load next page
current_offset = 0; // Reset offset for new page
let (page, _c) = self.read_page(next as usize)?;
let (page, c) = self.read_page(next as usize)?;
page_btree = page;
self.state =
@@ -1152,7 +1152,7 @@ impl BTreeCursor {
is_write,
});
// Return IO to allow other operations
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
_ => {
return Err(LimboError::InternalError(
@@ -1260,9 +1260,9 @@ impl BTreeCursor {
(Some(right_most_pointer), false) => {
// do rightmost
self.stack.advance();
let (mem_page, _c) = self.read_page(right_most_pointer as usize)?;
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
_ => {
if self.ancestor_pages_have_more_children() {
@@ -1298,9 +1298,9 @@ impl BTreeCursor {
}
let left_child_page = contents.cell_interior_read_left_child_page(cell_idx);
let (mem_page, _c) = self.read_page(left_child_page as usize)?;
let (mem_page, c) = self.read_page(left_child_page as usize)?;
self.stack.push(mem_page);
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
}
@@ -1352,9 +1352,9 @@ impl BTreeCursor {
}
}
let rightmost_page_id = *rightmost_page_id;
let _c = self.move_to_root()?;
let c = self.move_to_root()?;
self.move_to_right_state = (MoveToRightState::ProcessPage, rightmost_page_id);
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
MoveToRightState::ProcessPage => {
let mem_page = self.stack.top();
@@ -1375,9 +1375,9 @@ impl BTreeCursor {
match contents.rightmost_pointer() {
Some(right_most_pointer) => {
self.stack.set_cell_index(contents.cell_count() as i32 + 1);
let (mem_page, _c) = self.read_page(right_most_pointer as usize)?;
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
None => {
@@ -1442,22 +1442,22 @@ impl BTreeCursor {
let left_child_page =
contents.cell_interior_read_left_child_page(nearest_matching_cell);
self.stack.set_cell_index(nearest_matching_cell as i32);
let (mem_page, _c) = self.read_page(left_child_page as usize)?;
let (mem_page, c) = self.read_page(left_child_page as usize)?;
self.stack.push(mem_page);
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
};
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
self.stack.set_cell_index(cell_count as i32 + 1);
match contents.rightmost_pointer() {
Some(right_most_pointer) => {
let (mem_page, _c) = self.read_page(right_most_pointer as usize)?;
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
};
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
None => {
unreachable!("we shall not go back up! The only way is down the slope");
@@ -1581,12 +1581,12 @@ impl BTreeCursor {
self.stack.set_cell_index(contents.cell_count() as i32 + 1);
match contents.rightmost_pointer() {
Some(right_most_pointer) => {
let (mem_page, _c) = self.read_page(right_most_pointer as usize)?;
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
};
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
None => {
unreachable!("we shall not go back up! The only way is down the slope");
@@ -1619,12 +1619,12 @@ impl BTreeCursor {
page.get().id
);
let (mem_page, _c) = self.read_page(*left_child_page as usize)?;
let (mem_page, c) = self.read_page(*left_child_page as usize)?;
self.stack.push(mem_page);
self.seek_state = CursorSeekState::MovingBetweenPages {
eq_seen: Cell::new(eq_seen.get()),
};
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
let cur_cell_idx = (min + max) >> 1; // rustc generates extra insns for (min+max)/2 due to them being isize. we know min&max are >=0 here.
@@ -2124,8 +2124,8 @@ impl BTreeCursor {
MoveToState::Start => {
self.move_to_state = MoveToState::MoveToPage;
if matches!(self.seek_state, CursorSeekState::Start) {
let _c = self.move_to_root()?;
return Ok(IOResult::IO);
let c = self.move_to_root()?;
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
}
MoveToState::MoveToPage => {
@@ -2303,9 +2303,8 @@ impl BTreeCursor {
// We insert the state back if overwriting returns IO.
let mut state = state.take().expect("state should be present");
let cell_idx = *cell_idx;
if self
.overwrite_cell(page.clone(), cell_idx, record, &mut state)?
.is_io()
if let IOResult::IO(io) =
self.overwrite_cell(page.clone(), cell_idx, record, &mut state)?
{
let CursorState::Write(write_state) = &mut self.state else {
panic!("expected write state");
@@ -2315,7 +2314,7 @@ impl BTreeCursor {
cell_idx,
state: Some(state),
};
return Ok(IOResult::IO);
return Ok(IOResult::IO(io));
}
let overflows = !page.get().get_contents().overflow_cells.is_empty();
let underflows = !overflows && {
@@ -2660,7 +2659,7 @@ impl BTreeCursor {
*sub_state = BalanceSubState::NonRootDoBalancing;
if !completions.is_empty() {
// TODO: when tracking IO return all the completions here
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Many(completions)));
}
}
BalanceSubState::NonRootDoBalancing => {
@@ -3167,22 +3166,10 @@ impl BTreeCursor {
pages_to_balance_new[i].replace(page.clone());
} else {
// FIXME: handle page cache is full
let mut page =
pager.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?;
// FIXME: add new state machine state instead of this sync IO hack
while matches!(page, IOResult::IO) {
pager.io.run_once()?;
page = pager.do_allocate_page(
page_type,
0,
BtreePageAllocMode::Any,
)?;
}
let IOResult::Done(page) = page else {
return Err(LimboError::InternalError(
"Failed to allocate page".into(),
));
};
let page = pager.io.block(|| {
pager.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)
})?;
pages_to_balance_new[i].replace(page);
// Since this page didn't exist before, we can set it to cells length as it
// marks them as empty since it is a prefix sum of cells.
@@ -4148,9 +4135,9 @@ impl BTreeCursor {
assert!(self.mv_cursor.is_none()); // unsure about this -_-
match self.seek_end_state {
SeekEndState::Start => {
let _c = self.move_to_root()?;
let c = self.move_to_root()?;
self.seek_end_state = SeekEndState::ProcessPage;
Ok(IOResult::IO)
Ok(IOResult::IO(IOCompletions::Single(c)))
}
SeekEndState::ProcessPage => {
let mem_page = self.stack.top();
@@ -4168,9 +4155,9 @@ impl BTreeCursor {
match contents.rightmost_pointer() {
Some(right_most_pointer) => {
self.stack.set_cell_index(contents.cell_count() as i32 + 1); // invalid on interior
let (child, _c) = self.read_page(right_most_pointer as usize)?;
let (child, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(child);
Ok(IOResult::IO)
Ok(IOResult::IO(IOCompletions::Single(c)))
}
None => unreachable!("interior page must have rightmost pointer"),
}
@@ -4221,8 +4208,8 @@ impl BTreeCursor {
let mut mv_cursor = mv_cursor.borrow_mut();
mv_cursor.rewind();
} else {
let _c = self.move_to_root()?;
return Ok(IOResult::IO);
let c = self.move_to_root()?;
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
}
RewindState::NextRecord => {
@@ -4958,11 +4945,11 @@ impl BTreeCursor {
self.overflow_state = OverflowState::Start;
return Err(LimboError::Corrupt("Invalid overflow page number".into()));
}
let (page, _c) = self.read_page(next_page as usize)?;
let (page, c) = self.read_page(next_page as usize)?;
self.overflow_state = OverflowState::ProcessPage {
next_page: page.get(),
};
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
} else {
self.overflow_state = OverflowState::Done;
}
@@ -4990,11 +4977,11 @@ impl BTreeCursor {
self.overflow_state = OverflowState::Start;
return Err(LimboError::Corrupt("Invalid overflow page number".into()));
}
let (page, _c) = self.read_page(next as usize)?;
let (page, c) = self.read_page(next as usize)?;
self.overflow_state = OverflowState::ProcessPage {
next_page: page.get(),
};
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
} else {
self.overflow_state = OverflowState::Done;
}
@@ -5025,11 +5012,11 @@ impl BTreeCursor {
#[instrument(skip(self), level = Level::DEBUG)]
pub fn btree_destroy(&mut self) -> Result<IOResult<Option<usize>>> {
if let CursorState::None = &self.state {
let _c = self.move_to_root()?;
let c = self.move_to_root()?;
self.state = CursorState::Destroy(DestroyInfo {
state: DestroyState::Start,
});
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
loop {
@@ -5081,14 +5068,13 @@ impl BTreeCursor {
// Non-leaf page which has processed all children but not it's potential right child
(false, n) if n == contents.cell_count() as i32 => {
if let Some(rightmost) = contents.rightmost_pointer() {
let (rightmost_page, _c) =
self.read_page(rightmost as usize)?;
let (rightmost_page, c) = self.read_page(rightmost as usize)?;
self.stack.push(rightmost_page);
let destroy_info = self.state.mut_destroy_info().expect(
"unable to get a mut reference to destroy state in cursor",
);
destroy_info.state = DestroyState::LoadPage;
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
} else {
let destroy_info = self.state.mut_destroy_info().expect(
"unable to get a mut reference to destroy state in cursor",
@@ -5140,13 +5126,13 @@ impl BTreeCursor {
BTreeCell::IndexInteriorCell(cell) => cell.left_child_page,
_ => panic!("expected interior cell"),
};
let (child_page, _c) = self.read_page(child_page_id as usize)?;
let (child_page, c) = self.read_page(child_page_id as usize)?;
self.stack.push(child_page);
let destroy_info = self.state.mut_destroy_info().expect(
"unable to get a mut reference to destroy state in cursor",
);
destroy_info.state = DestroyState::LoadPage;
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
},
}
@@ -5156,7 +5142,7 @@ impl BTreeCursor {
match cell {
// For an index interior cell, clear the left child page now that overflow pages have been cleared
BTreeCell::IndexInteriorCell(index_int_cell) => {
let (child_page, _c) =
let (child_page, c) =
self.read_page(index_int_cell.left_child_page as usize)?;
self.stack.push(child_page);
let destroy_info = self
@@ -5164,7 +5150,7 @@ impl BTreeCursor {
.mut_destroy_info()
.expect("unable to get a mut reference to destroy state in cursor");
destroy_info.state = DestroyState::LoadPage;
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
// For any leaf cell, advance the index now that overflow pages have been cleared
BTreeCell::TableLeafCell(_) | BTreeCell::IndexLeafCell(_) => {
@@ -5345,9 +5331,9 @@ impl BTreeCursor {
let state = self.count_state;
match state {
CountState::Start => {
let _c = self.move_to_root()?;
let c = self.move_to_root()?;
self.count_state = CountState::Loop;
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
CountState::Loop => {
mem_page_rc = self.stack.top();
@@ -5372,9 +5358,9 @@ impl BTreeCursor {
loop {
if !self.stack.has_parent() {
// All pages of the b-tree have been visited. Return successfully
let _c = self.move_to_root()?;
let c = self.move_to_root()?;
self.count_state = CountState::Finish;
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
// Move to parent
@@ -5404,9 +5390,9 @@ impl BTreeCursor {
// should be safe as contents is not a leaf page
let right_most_pointer = contents.rightmost_pointer().unwrap();
self.stack.advance();
let (mem_page, _c) = self.read_page(right_most_pointer as usize)?;
let (mem_page, c) = self.read_page(right_most_pointer as usize)?;
self.stack.push(mem_page);
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
} else {
// Move to child left page
let cell = contents.cell_get(cell_idx, self.usable_space())?;
@@ -5419,9 +5405,9 @@ impl BTreeCursor {
left_child_page, ..
}) => {
self.stack.advance();
let (mem_page, _c) = self.read_page(left_child_page as usize)?;
let (mem_page, c) = self.read_page(left_child_page as usize)?;
self.stack.push(mem_page);
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
_ => unreachable!(),
}
@@ -5469,14 +5455,14 @@ impl BTreeCursor {
self.valid_state =
CursorValidState::RequireAdvance(IterationDirection::Forwards);
self.context = Some(ctx);
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(Completion::new_dummy())));
}
self.valid_state = CursorValidState::Valid;
Ok(IOResult::Done(()))
}
IOResult::IO => {
IOResult::IO(io) => {
self.context = Some(ctx);
Ok(IOResult::IO)
Ok(IOResult::IO(io))
}
}
}
@@ -5613,9 +5599,9 @@ pub fn integrity_check(
let page = match state.page.take() {
Some(page) => page,
None => {
let (page, _c) = btree_read_page(pager, page_idx)?;
let (page, c) = btree_read_page(pager, page_idx)?;
state.page = Some(page.get());
return Ok(IOResult::IO);
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
};
return_if_locked!(page);