edit state machine in Btree for freeing pages + Pager state machine for free_page

This commit is contained in:
pedrocarlo
2025-07-09 16:46:42 -03:00
parent 5771d1a00e
commit 7b8eec90bd
4 changed files with 142 additions and 61 deletions

View File

@@ -223,6 +223,10 @@ struct DeleteInfo {
enum WriteState {
Start,
BalanceStart,
BalanceFreePages {
curr_page: usize,
sibling_count_new: usize,
},
/// Choose which sibling pages to balance (max 3).
/// Generally, the siblings involved will be the page that triggered the balancing and its left and right siblings.
/// The exceptions are:
@@ -2255,6 +2259,7 @@ impl BTreeCursor {
}
}
WriteState::BalanceStart
| WriteState::BalanceFreePages { .. }
| WriteState::BalanceNonRootPickSiblings
| WriteState::BalanceNonRootDoBalancing => {
return_if_io!(self.balance());
@@ -2333,7 +2338,9 @@ impl BTreeCursor {
self.stack.pop();
return_if_io!(self.balance_non_root());
}
WriteState::BalanceNonRootPickSiblings | WriteState::BalanceNonRootDoBalancing => {
WriteState::BalanceNonRootPickSiblings
| WriteState::BalanceNonRootDoBalancing
| WriteState::BalanceFreePages { .. } => {
return_if_io!(self.balance_non_root());
}
WriteState::Finish => return Ok(IOResult::Done(())),
@@ -2350,7 +2357,7 @@ impl BTreeCursor {
"Cursor must be in balancing state"
);
let state = self.state.write_info().expect("must be balancing").state;
tracing::debug!("balance_non_root(state={:?})", state);
tracing::debug!(?state);
let (next_write_state, result) = match state {
WriteState::Start => todo!(),
WriteState::BalanceStart => todo!(),
@@ -3322,13 +3329,38 @@ impl BTreeCursor {
right_page_id,
);
(
WriteState::BalanceFreePages {
curr_page: sibling_count_new,
sibling_count_new,
},
Ok(CursorResult::Ok(())),
)
}
WriteState::BalanceFreePages {
curr_page,
sibling_count_new,
} => {
let write_info = self.state.write_info().unwrap();
let mut balance_info: std::cell::RefMut<'_, Option<BalanceInfo>> =
write_info.balance_info.borrow_mut();
let balance_info = balance_info.as_mut().unwrap();
// We have to free pages that are not used anymore
for i in sibling_count_new..balance_info.sibling_count {
let page = balance_info.pages_to_balance[i].as_ref().unwrap();
self.pager
.free_page(Some(page.get().clone()), page.get().get().id)?;
if !((sibling_count_new..balance_info.sibling_count).contains(&curr_page)) {
(WriteState::BalanceStart, Ok(IOResult::Done(())))
} else {
let page = balance_info.pages_to_balance[curr_page].as_ref().unwrap();
return_if_io!(self
.pager
.free_page(Some(page.get().clone()), page.get().get().id));
(
WriteState::BalanceFreePages {
curr_page: curr_page + 1,
sibling_count_new,
},
Ok(CursorResult::Ok(())),
)
}
(WriteState::BalanceStart, Ok(IOResult::Done(())))
}
WriteState::Finish => todo!(),
};
@@ -4679,7 +4711,7 @@ impl BTreeCursor {
let contents = page.get().contents.as_ref().unwrap();
let next = contents.read_u32(0);
self.pager.free_page(Some(page), next_page as usize)?;
return_if_io!(self.pager.free_page(Some(page), next_page as usize));
if next != 0 {
self.overflow_state = Some(OverflowState::ProcessPage { next_page: next });
@@ -4866,7 +4898,7 @@ impl BTreeCursor {
let page = self.stack.top();
let page_id = page.get().get().id;
self.pager.free_page(Some(page.get()), page_id)?;
return_if_io!(self.pager.free_page(Some(page.get()), page_id));
if self.stack.has_parent() {
self.stack.pop();

View File

@@ -280,6 +280,7 @@ pub struct Pager {
/// to change it.
page_size: Cell<Option<u32>>,
reserved_space: OnceCell<u8>,
free_page_state: RefCell<FreePageState>,
}
#[derive(Debug, Copy, Clone)]
@@ -303,6 +304,18 @@ enum AllocatePage1State {
Done,
}
#[derive(Debug, Clone)]
enum FreePageState {
Start,
AddToTrunk {
page: Arc<Page>,
trunk_page: Option<Arc<Page>>,
},
NewTrunk {
page: Arc<Page>,
},
}
impl Pager {
pub fn new(
db_file: Arc<dyn DatabaseStorage>,
@@ -342,6 +355,7 @@ impl Pager {
state: CacheFlushState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
}),
free_page_state: RefCell::new(FreePageState::Start),
})
}
@@ -1073,7 +1087,7 @@ impl Pager {
// Providing a page is optional, if provided it will be used to avoid reading the page from disk.
// This is implemented in accordance with sqlite freepage2() function.
#[instrument(skip_all, level = Level::INFO)]
pub fn free_page(&self, page: Option<PageRef>, page_id: usize) -> Result<()> {
pub fn free_page(&self, page: Option<PageRef>, page_id: usize) -> Result<CursorResult<()>> {
tracing::trace!("free_page(page_id={})", page_id);
const TRUNK_PAGE_HEADER_SIZE: usize = 8;
const LEAF_ENTRY_SIZE: usize = 4;
@@ -1082,65 +1096,100 @@ impl Pager {
const TRUNK_PAGE_NEXT_PAGE_OFFSET: usize = 0; // Offset to next trunk page pointer
const TRUNK_PAGE_LEAF_COUNT_OFFSET: usize = 4; // Offset to leaf count
if page_id < 2 || page_id > header_accessor::get_database_size(self)? as usize {
return Err(LimboError::Corrupt(format!(
"Invalid page number {page_id} for free operation"
)));
}
let mut state = self.free_page_state.borrow_mut();
tracing::debug!(?state);
loop {
match &mut *state {
FreePageState::Start => {
if page_id < 2 || page_id > header_accessor::get_database_size(self)? as usize {
return Err(LimboError::Corrupt(format!(
"Invalid page number {page_id} for free operation"
)));
}
let page = match page {
Some(page) => {
assert_eq!(page.get().id, page_id, "Page id mismatch");
page
}
None => self.read_page(page_id)?,
};
let page = match page.clone() {
Some(page) => {
assert_eq!(page.get().id, page_id, "Page id mismatch");
page
}
None => self.read_page(page_id)?,
};
header_accessor::set_freelist_pages(
self,
header_accessor::get_freelist_pages(self)? + 1,
)?;
header_accessor::set_freelist_pages(self, header_accessor::get_freelist_pages(self)? + 1)?;
let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?;
let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?;
if trunk_page_id != 0 {
*state = FreePageState::AddToTrunk {
page,
trunk_page: None,
};
} else {
*state = FreePageState::NewTrunk { page };
}
}
FreePageState::AddToTrunk { page, trunk_page } => {
let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?;
if trunk_page.is_none() {
// Add as leaf to current trunk
trunk_page.replace(self.read_page(trunk_page_id as usize)?);
}
let trunk_page = trunk_page.as_ref().unwrap();
if trunk_page.is_locked() || !trunk_page.is_loaded() {
return Ok(CursorResult::IO);
}
if trunk_page_id != 0 {
// Add as leaf to current trunk
let trunk_page = self.read_page(trunk_page_id as usize)?;
let trunk_page_contents = trunk_page.get().contents.as_ref().unwrap();
let number_of_leaf_pages = trunk_page_contents.read_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET);
let trunk_page_contents = trunk_page.get().contents.as_ref().unwrap();
let number_of_leaf_pages =
trunk_page_contents.read_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET);
// Reserve 2 slots for the trunk page header which is 8 bytes or 2*LEAF_ENTRY_SIZE
let max_free_list_entries = (self.usable_space() / LEAF_ENTRY_SIZE) - RESERVED_SLOTS;
// Reserve 2 slots for the trunk page header which is 8 bytes or 2*LEAF_ENTRY_SIZE
let max_free_list_entries =
(self.usable_space() / LEAF_ENTRY_SIZE) - RESERVED_SLOTS;
if number_of_leaf_pages < max_free_list_entries as u32 {
trunk_page.set_dirty();
self.add_dirty(trunk_page_id as usize);
if number_of_leaf_pages < max_free_list_entries as u32 {
trunk_page.set_dirty();
self.add_dirty(trunk_page_id as usize);
trunk_page_contents
.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, number_of_leaf_pages + 1);
trunk_page_contents.write_u32(
TRUNK_PAGE_HEADER_SIZE + (number_of_leaf_pages as usize * LEAF_ENTRY_SIZE),
page_id as u32,
);
page.clear_uptodate();
page.clear_loaded();
trunk_page_contents
.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, number_of_leaf_pages + 1);
trunk_page_contents.write_u32(
TRUNK_PAGE_HEADER_SIZE
+ (number_of_leaf_pages as usize * LEAF_ENTRY_SIZE),
page_id as u32,
);
page.clear_uptodate();
return Ok(());
break;
}
}
FreePageState::NewTrunk { page } => {
if page.is_locked() || !page.is_loaded() {
return Ok(CursorResult::IO);
}
// If we get here, need to make this page a new trunk
page.set_dirty();
self.add_dirty(page_id);
let trunk_page_id = header_accessor::get_freelist_trunk_page(self)?;
let contents = page.get().contents.as_mut().unwrap();
// Point to previous trunk
contents.write_u32(TRUNK_PAGE_NEXT_PAGE_OFFSET, trunk_page_id);
// Zero leaf count
contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0);
// Update page 1 to point to new trunk
header_accessor::set_freelist_trunk_page(self, page_id as u32)?;
// Clear flags
page.clear_uptodate();
break;
}
}
}
// If we get here, need to make this page a new trunk
page.set_dirty();
self.add_dirty(page_id);
let contents = page.get().contents.as_mut().unwrap();
// Point to previous trunk
contents.write_u32(TRUNK_PAGE_NEXT_PAGE_OFFSET, trunk_page_id);
// Zero leaf count
contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0);
// Update page 1 to point to new trunk
header_accessor::set_freelist_trunk_page(self, page_id as u32)?;
// Clear flags
page.clear_uptodate();
page.clear_loaded();
Ok(())
*state = FreePageState::Start;
Ok(CursorResult::Ok(()))
}
#[instrument(skip_all, level = Level::INFO)]

View File

@@ -752,12 +752,13 @@ pub fn begin_read_page(
Ok(())
}
#[instrument(skip_all, level = Level::INFO)]
pub fn finish_read_page(
page_idx: usize,
buffer_ref: Arc<RefCell<Buffer>>,
page: PageRef,
) -> Result<()> {
tracing::trace!("finish_read_btree_page(page_idx = {})", page_idx);
tracing::trace!(page_idx);
let pos = if page_idx == DATABASE_HEADER_PAGE_ID {
DATABASE_HEADER_SIZE
} else {

View File

@@ -107,7 +107,6 @@ impl SimulatorFile {
#[instrument(skip_all, level = Level::DEBUG)]
pub fn run_queued_io(&self, now: std::time::Instant) -> Result<()> {
let mut queued_io = self.queued_io.borrow_mut();
tracing::debug!(?queued_io);
// TODO: as we are not in version 1.87 we cannot use `extract_if`
// so we have to do something different to achieve the same thing
// This code was acquired from: https://doc.rust-lang.org/beta/std/vec/struct.Vec.html#method.extract_if