From f95625a06cebef1e280694be8f4e7b0ed6cc28a4 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Tue, 12 Aug 2025 13:35:32 -0300 Subject: [PATCH] bubble completions in btree --- core/storage/btree.rs | 162 +++++++++++++++++++----------------------- 1 file changed, 74 insertions(+), 88 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 90e29fedf..5c97e7dcb 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -17,8 +17,8 @@ use crate::{ translate::plan::IterationDirection, turso_assert, types::{ - find_compare, get_tie_breaker_from_seek_op, IndexInfo, RecordCompare, RecordCursor, - SeekResult, + find_compare, get_tie_breaker_from_seek_op, IOCompletions, IndexInfo, RecordCompare, + RecordCursor, SeekResult, }, util::IOExt, Completion, MvCursor, @@ -692,9 +692,9 @@ impl BTreeCursor { let mv_cursor = mv_cursor.borrow(); return Ok(IOResult::Done(mv_cursor.is_empty())); } - let (page, _c) = self.pager.read_page(self.root_page)?; + let (page, c) = self.pager.read_page(self.root_page)?; *self.is_empty_table_state.borrow_mut() = EmptyTableState::ReadPage { page }; - Ok(IOResult::IO) + Ok(IOResult::IO(IOCompletions::Single(c))) } EmptyTableState::ReadPage { page } => { // TODO: Remove this line after we start awaiting for completions @@ -729,9 +729,9 @@ impl BTreeCursor { if let Some(rightmost_pointer) = rightmost_pointer { let past_rightmost_pointer = cell_count as i32 + 1; self.stack.set_cell_index(past_rightmost_pointer); - let (page, _c) = self.read_page(rightmost_pointer as usize)?; + let (page, c) = self.read_page(rightmost_pointer as usize)?; self.stack.push_backwards(page); - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } } if cell_idx >= cell_count as i32 { @@ -793,9 +793,9 @@ impl BTreeCursor { self.stack.retreat(); } - let (mem_page, _c) = self.read_page(left_child_page as usize)?; + let (mem_page, c) = self.read_page(left_child_page as usize)?; self.stack.push_backwards(mem_page); - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } } @@ -809,14 +809,14 @@ impl BTreeCursor { payload_size: u64, ) -> Result> { if self.read_overflow_state.borrow().is_none() { - let (page, _c) = self.read_page(start_next_page as usize)?; + let (page, c) = self.read_page(start_next_page as usize)?; *self.read_overflow_state.borrow_mut() = Some(ReadPayloadOverflow { payload: payload.to_vec(), next_page: start_next_page, remaining_to_read: payload_size as usize - payload.len(), page, }); - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } let mut read_overflow_state = self.read_overflow_state.borrow_mut(); let ReadPayloadOverflow { @@ -840,7 +840,7 @@ impl BTreeCursor { *remaining_to_read -= to_read; if *remaining_to_read != 0 && next != 0 { - let (new_page, _c) = self.pager.read_page(next as usize).map(|(page, c)| { + let (new_page, c) = self.pager.read_page(next as usize).map(|(page, c)| { ( Arc::new(BTreePageInner { page: RefCell::new(page), @@ -850,7 +850,7 @@ impl BTreeCursor { })?; *page_btree = new_page; *next_page = next; - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } turso_assert!( *remaining_to_read == 0 && next == 0, @@ -1013,7 +1013,7 @@ impl BTreeCursor { let pages_to_skip = offset / overflow_size as u32; let page_offset = offset % overflow_size as u32; // Read page - let (page, _c) = self.read_page(first_overflow_page.unwrap() as usize)?; + let (page, c) = self.read_page(first_overflow_page.unwrap() as usize)?; self.state = CursorState::ReadWritePayload(PayloadOverflowWithOffset::SkipOverflowPages { @@ -1025,7 +1025,7 @@ impl BTreeCursor { is_write, }); - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } Ok(IOResult::Done(())) } @@ -1073,7 +1073,7 @@ impl BTreeCursor { } pages_left_to_skip -= 1; - let (page, _c) = self.read_page(next as usize)?; + let (page, c) = self.read_page(next as usize)?; self.state = CursorState::ReadWritePayload( PayloadOverflowWithOffset::SkipOverflowPages { @@ -1086,7 +1086,7 @@ impl BTreeCursor { }, ); - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { mut remaining_to_read, @@ -1140,7 +1140,7 @@ impl BTreeCursor { // Load next page current_offset = 0; // Reset offset for new page - let (page, _c) = self.read_page(next as usize)?; + let (page, c) = self.read_page(next as usize)?; page_btree = page; self.state = @@ -1152,7 +1152,7 @@ impl BTreeCursor { is_write, }); // Return IO to allow other operations - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } _ => { return Err(LimboError::InternalError( @@ -1260,9 +1260,9 @@ impl BTreeCursor { (Some(right_most_pointer), false) => { // do rightmost self.stack.advance(); - let (mem_page, _c) = self.read_page(right_most_pointer as usize)?; + let (mem_page, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } _ => { if self.ancestor_pages_have_more_children() { @@ -1298,9 +1298,9 @@ impl BTreeCursor { } let left_child_page = contents.cell_interior_read_left_child_page(cell_idx); - let (mem_page, _c) = self.read_page(left_child_page as usize)?; + let (mem_page, c) = self.read_page(left_child_page as usize)?; self.stack.push(mem_page); - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } } @@ -1352,9 +1352,9 @@ impl BTreeCursor { } } let rightmost_page_id = *rightmost_page_id; - let _c = self.move_to_root()?; + let c = self.move_to_root()?; self.move_to_right_state = (MoveToRightState::ProcessPage, rightmost_page_id); - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } MoveToRightState::ProcessPage => { let mem_page = self.stack.top(); @@ -1375,9 +1375,9 @@ impl BTreeCursor { match contents.rightmost_pointer() { Some(right_most_pointer) => { self.stack.set_cell_index(contents.cell_count() as i32 + 1); - let (mem_page, _c) = self.read_page(right_most_pointer as usize)?; + let (mem_page, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } None => { @@ -1442,22 +1442,22 @@ impl BTreeCursor { let left_child_page = contents.cell_interior_read_left_child_page(nearest_matching_cell); self.stack.set_cell_index(nearest_matching_cell as i32); - let (mem_page, _c) = self.read_page(left_child_page as usize)?; + let (mem_page, c) = self.read_page(left_child_page as usize)?; self.stack.push(mem_page); self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), }; - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } self.stack.set_cell_index(cell_count as i32 + 1); match contents.rightmost_pointer() { Some(right_most_pointer) => { - let (mem_page, _c) = self.read_page(right_most_pointer as usize)?; + let (mem_page, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), }; - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } None => { unreachable!("we shall not go back up! The only way is down the slope"); @@ -1581,12 +1581,12 @@ impl BTreeCursor { self.stack.set_cell_index(contents.cell_count() as i32 + 1); match contents.rightmost_pointer() { Some(right_most_pointer) => { - let (mem_page, _c) = self.read_page(right_most_pointer as usize)?; + let (mem_page, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), }; - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } None => { unreachable!("we shall not go back up! The only way is down the slope"); @@ -1619,12 +1619,12 @@ impl BTreeCursor { page.get().id ); - let (mem_page, _c) = self.read_page(*left_child_page as usize)?; + let (mem_page, c) = self.read_page(*left_child_page as usize)?; self.stack.push(mem_page); self.seek_state = CursorSeekState::MovingBetweenPages { eq_seen: Cell::new(eq_seen.get()), }; - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } let cur_cell_idx = (min + max) >> 1; // rustc generates extra insns for (min+max)/2 due to them being isize. we know min&max are >=0 here. @@ -2124,8 +2124,8 @@ impl BTreeCursor { MoveToState::Start => { self.move_to_state = MoveToState::MoveToPage; if matches!(self.seek_state, CursorSeekState::Start) { - let _c = self.move_to_root()?; - return Ok(IOResult::IO); + let c = self.move_to_root()?; + return Ok(IOResult::IO(IOCompletions::Single(c))); } } MoveToState::MoveToPage => { @@ -2303,9 +2303,8 @@ impl BTreeCursor { // We insert the state back if overwriting returns IO. let mut state = state.take().expect("state should be present"); let cell_idx = *cell_idx; - if self - .overwrite_cell(page.clone(), cell_idx, record, &mut state)? - .is_io() + if let IOResult::IO(io) = + self.overwrite_cell(page.clone(), cell_idx, record, &mut state)? { let CursorState::Write(write_state) = &mut self.state else { panic!("expected write state"); @@ -2315,7 +2314,7 @@ impl BTreeCursor { cell_idx, state: Some(state), }; - return Ok(IOResult::IO); + return Ok(IOResult::IO(io)); } let overflows = !page.get().get_contents().overflow_cells.is_empty(); let underflows = !overflows && { @@ -2660,7 +2659,7 @@ impl BTreeCursor { *sub_state = BalanceSubState::NonRootDoBalancing; if !completions.is_empty() { // TODO: when tracking IO return all the completions here - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Many(completions))); } } BalanceSubState::NonRootDoBalancing => { @@ -3167,22 +3166,10 @@ impl BTreeCursor { pages_to_balance_new[i].replace(page.clone()); } else { // FIXME: handle page cache is full - let mut page = - pager.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?; // FIXME: add new state machine state instead of this sync IO hack - while matches!(page, IOResult::IO) { - pager.io.run_once()?; - page = pager.do_allocate_page( - page_type, - 0, - BtreePageAllocMode::Any, - )?; - } - let IOResult::Done(page) = page else { - return Err(LimboError::InternalError( - "Failed to allocate page".into(), - )); - }; + let page = pager.io.block(|| { + pager.do_allocate_page(page_type, 0, BtreePageAllocMode::Any) + })?; pages_to_balance_new[i].replace(page); // Since this page didn't exist before, we can set it to cells length as it // marks them as empty since it is a prefix sum of cells. @@ -4148,9 +4135,9 @@ impl BTreeCursor { assert!(self.mv_cursor.is_none()); // unsure about this -_- match self.seek_end_state { SeekEndState::Start => { - let _c = self.move_to_root()?; + let c = self.move_to_root()?; self.seek_end_state = SeekEndState::ProcessPage; - Ok(IOResult::IO) + Ok(IOResult::IO(IOCompletions::Single(c))) } SeekEndState::ProcessPage => { let mem_page = self.stack.top(); @@ -4168,9 +4155,9 @@ impl BTreeCursor { match contents.rightmost_pointer() { Some(right_most_pointer) => { self.stack.set_cell_index(contents.cell_count() as i32 + 1); // invalid on interior - let (child, _c) = self.read_page(right_most_pointer as usize)?; + let (child, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(child); - Ok(IOResult::IO) + Ok(IOResult::IO(IOCompletions::Single(c))) } None => unreachable!("interior page must have rightmost pointer"), } @@ -4221,8 +4208,8 @@ impl BTreeCursor { let mut mv_cursor = mv_cursor.borrow_mut(); mv_cursor.rewind(); } else { - let _c = self.move_to_root()?; - return Ok(IOResult::IO); + let c = self.move_to_root()?; + return Ok(IOResult::IO(IOCompletions::Single(c))); } } RewindState::NextRecord => { @@ -4958,11 +4945,11 @@ impl BTreeCursor { self.overflow_state = OverflowState::Start; return Err(LimboError::Corrupt("Invalid overflow page number".into())); } - let (page, _c) = self.read_page(next_page as usize)?; + let (page, c) = self.read_page(next_page as usize)?; self.overflow_state = OverflowState::ProcessPage { next_page: page.get(), }; - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } else { self.overflow_state = OverflowState::Done; } @@ -4990,11 +4977,11 @@ impl BTreeCursor { self.overflow_state = OverflowState::Start; return Err(LimboError::Corrupt("Invalid overflow page number".into())); } - let (page, _c) = self.read_page(next as usize)?; + let (page, c) = self.read_page(next as usize)?; self.overflow_state = OverflowState::ProcessPage { next_page: page.get(), }; - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } else { self.overflow_state = OverflowState::Done; } @@ -5025,11 +5012,11 @@ impl BTreeCursor { #[instrument(skip(self), level = Level::DEBUG)] pub fn btree_destroy(&mut self) -> Result>> { if let CursorState::None = &self.state { - let _c = self.move_to_root()?; + let c = self.move_to_root()?; self.state = CursorState::Destroy(DestroyInfo { state: DestroyState::Start, }); - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } loop { @@ -5081,14 +5068,13 @@ impl BTreeCursor { // Non-leaf page which has processed all children but not it's potential right child (false, n) if n == contents.cell_count() as i32 => { if let Some(rightmost) = contents.rightmost_pointer() { - let (rightmost_page, _c) = - self.read_page(rightmost as usize)?; + let (rightmost_page, c) = self.read_page(rightmost as usize)?; self.stack.push(rightmost_page); let destroy_info = self.state.mut_destroy_info().expect( "unable to get a mut reference to destroy state in cursor", ); destroy_info.state = DestroyState::LoadPage; - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } else { let destroy_info = self.state.mut_destroy_info().expect( "unable to get a mut reference to destroy state in cursor", @@ -5140,13 +5126,13 @@ impl BTreeCursor { BTreeCell::IndexInteriorCell(cell) => cell.left_child_page, _ => panic!("expected interior cell"), }; - let (child_page, _c) = self.read_page(child_page_id as usize)?; + let (child_page, c) = self.read_page(child_page_id as usize)?; self.stack.push(child_page); let destroy_info = self.state.mut_destroy_info().expect( "unable to get a mut reference to destroy state in cursor", ); destroy_info.state = DestroyState::LoadPage; - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } }, } @@ -5156,7 +5142,7 @@ impl BTreeCursor { match cell { // For an index interior cell, clear the left child page now that overflow pages have been cleared BTreeCell::IndexInteriorCell(index_int_cell) => { - let (child_page, _c) = + let (child_page, c) = self.read_page(index_int_cell.left_child_page as usize)?; self.stack.push(child_page); let destroy_info = self @@ -5164,7 +5150,7 @@ impl BTreeCursor { .mut_destroy_info() .expect("unable to get a mut reference to destroy state in cursor"); destroy_info.state = DestroyState::LoadPage; - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } // For any leaf cell, advance the index now that overflow pages have been cleared BTreeCell::TableLeafCell(_) | BTreeCell::IndexLeafCell(_) => { @@ -5345,9 +5331,9 @@ impl BTreeCursor { let state = self.count_state; match state { CountState::Start => { - let _c = self.move_to_root()?; + let c = self.move_to_root()?; self.count_state = CountState::Loop; - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } CountState::Loop => { mem_page_rc = self.stack.top(); @@ -5372,9 +5358,9 @@ impl BTreeCursor { loop { if !self.stack.has_parent() { // All pages of the b-tree have been visited. Return successfully - let _c = self.move_to_root()?; + let c = self.move_to_root()?; self.count_state = CountState::Finish; - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } // Move to parent @@ -5404,9 +5390,9 @@ impl BTreeCursor { // should be safe as contents is not a leaf page let right_most_pointer = contents.rightmost_pointer().unwrap(); self.stack.advance(); - let (mem_page, _c) = self.read_page(right_most_pointer as usize)?; + let (mem_page, c) = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } else { // Move to child left page let cell = contents.cell_get(cell_idx, self.usable_space())?; @@ -5419,9 +5405,9 @@ impl BTreeCursor { left_child_page, .. }) => { self.stack.advance(); - let (mem_page, _c) = self.read_page(left_child_page as usize)?; + let (mem_page, c) = self.read_page(left_child_page as usize)?; self.stack.push(mem_page); - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } _ => unreachable!(), } @@ -5469,14 +5455,14 @@ impl BTreeCursor { self.valid_state = CursorValidState::RequireAdvance(IterationDirection::Forwards); self.context = Some(ctx); - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(Completion::new_dummy()))); } self.valid_state = CursorValidState::Valid; Ok(IOResult::Done(())) } - IOResult::IO => { + IOResult::IO(io) => { self.context = Some(ctx); - Ok(IOResult::IO) + Ok(IOResult::IO(io)) } } } @@ -5613,9 +5599,9 @@ pub fn integrity_check( let page = match state.page.take() { Some(page) => page, None => { - let (page, _c) = btree_read_page(pager, page_idx)?; + let (page, c) = btree_read_page(pager, page_idx)?; state.page = Some(page.get()); - return Ok(IOResult::IO); + return Ok(IOResult::IO(IOCompletions::Single(c))); } }; return_if_locked!(page);