core/storage: Wrap Pager vacuum state in RwLock

This commit is contained in:
Pekka Enberg
2025-09-22 09:12:54 +03:00
parent f053b76518
commit 6f258b37d9

View File

@@ -527,17 +527,17 @@ pub struct Pager {
max_page_count: AtomicU32,
header_ref_state: RwLock<HeaderRefState>,
#[cfg(not(feature = "omit_autovacuum"))]
vacuum_state: VacuumState,
vacuum_state: RwLock<VacuumState>,
pub(crate) io_ctx: RefCell<IOContext>,
}
#[cfg(not(feature = "omit_autovacuum"))]
pub struct VacuumState {
/// State machine for [Pager::ptrmap_get]
ptrmap_get_state: RefCell<PtrMapGetState>,
ptrmap_get_state: PtrMapGetState,
/// State machine for [Pager::ptrmap_put]
ptrmap_put_state: RefCell<PtrMapPutState>,
btree_create_vacuum_full_state: Cell<BtreeCreateVacuumFullState>,
ptrmap_put_state: PtrMapPutState,
btree_create_vacuum_full_state: BtreeCreateVacuumFullState,
}
#[derive(Debug, Clone)]
@@ -633,11 +633,11 @@ impl Pager {
max_page_count: AtomicU32::new(DEFAULT_MAX_PAGE_COUNT),
header_ref_state: RwLock::new(HeaderRefState::Start),
#[cfg(not(feature = "omit_autovacuum"))]
vacuum_state: VacuumState {
ptrmap_get_state: RefCell::new(PtrMapGetState::Start),
ptrmap_put_state: RefCell::new(PtrMapPutState::Start),
btree_create_vacuum_full_state: Cell::new(BtreeCreateVacuumFullState::Start),
},
vacuum_state: RwLock::new(VacuumState {
ptrmap_get_state: PtrMapGetState::Start,
ptrmap_put_state: PtrMapPutState::Start,
btree_create_vacuum_full_state: BtreeCreateVacuumFullState::Start,
}),
io_ctx: RefCell::new(IOContext::default()),
})
}
@@ -678,7 +678,10 @@ impl Pager {
#[cfg(not(feature = "omit_autovacuum"))]
pub fn ptrmap_get(&self, target_page_num: u32) -> Result<IOResult<Option<PtrmapEntry>>> {
loop {
let ptrmap_get_state = self.vacuum_state.ptrmap_get_state.borrow().clone();
let ptrmap_get_state = {
let vacuum_state = self.vacuum_state.read();
vacuum_state.ptrmap_get_state.clone()
};
match ptrmap_get_state {
PtrMapGetState::Start => {
tracing::trace!("ptrmap_get(page_idx = {})", target_page_num);
@@ -705,10 +708,10 @@ impl Pager {
);
let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?;
self.vacuum_state.ptrmap_get_state.replace(PtrMapGetState::Deserialize {
self.vacuum_state.write().ptrmap_get_state = PtrMapGetState::Deserialize {
ptrmap_page,
offset_in_ptrmap_page,
});
};
if let Some(c) = c {
io_yield_one!(c);
}
@@ -752,7 +755,7 @@ impl Pager {
let entry_slice = &ptrmap_page_data_slice
[offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE];
self.vacuum_state.ptrmap_get_state.replace(PtrMapGetState::Start);
self.vacuum_state.write().ptrmap_get_state = PtrMapGetState::Start;
break match PtrmapEntry::deserialize(entry_slice) {
Some(entry) => Ok(IOResult::Done(Some(entry))),
None => Err(LimboError::Corrupt(format!(
@@ -781,7 +784,10 @@ impl Pager {
parent_page_no
);
loop {
let ptrmap_put_state = self.vacuum_state.ptrmap_put_state.borrow().clone();
let ptrmap_put_state = {
let vacuum_state = self.vacuum_state.read();
vacuum_state.ptrmap_put_state.clone()
};
match ptrmap_put_state {
PtrMapPutState::Start => {
let page_size =
@@ -809,10 +815,10 @@ impl Pager {
);
let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?;
self.vacuum_state.ptrmap_put_state.replace(PtrMapPutState::Deserialize {
self.vacuum_state.write().ptrmap_put_state = PtrMapPutState::Deserialize {
ptrmap_page,
offset_in_ptrmap_page,
});
};
if let Some(c) = c {
io_yield_one!(c);
}
@@ -860,7 +866,7 @@ impl Pager {
"ptrmap page has unexpected number"
);
self.add_dirty(&ptrmap_page);
self.vacuum_state.ptrmap_put_state.replace(PtrMapPutState::Start);
self.vacuum_state.write().ptrmap_put_state = PtrMapPutState::Start;
break Ok(IOResult::Done(()));
}
}
@@ -895,7 +901,11 @@ impl Pager {
}
AutoVacuumMode::Full => {
loop {
match self.vacuum_state.btree_create_vacuum_full_state.get() {
let btree_create_vacuum_full_state = {
let vacuum_state = self.vacuum_state.read();
vacuum_state.btree_create_vacuum_full_state
};
match btree_create_vacuum_full_state {
BtreeCreateVacuumFullState::Start => {
let (mut root_page_num, page_size) = return_if_io!(self
.with_header(|header| {
@@ -913,9 +923,8 @@ impl Pager {
root_page_num += 1;
}
assert!(root_page_num >= 3); // the very first root page is page 3
self.vacuum_state.btree_create_vacuum_full_state.set(
BtreeCreateVacuumFullState::AllocatePage { root_page_num },
);
self.vacuum_state.write().btree_create_vacuum_full_state =
BtreeCreateVacuumFullState::AllocatePage { root_page_num };
}
BtreeCreateVacuumFullState::AllocatePage { root_page_num } => {
// root_page_num here is the desired root page
@@ -930,9 +939,8 @@ impl Pager {
}
// TODO(Zaid): Update the header metadata to reflect the new root page number
self.vacuum_state.btree_create_vacuum_full_state.set(
BtreeCreateVacuumFullState::PtrMapPut { allocated_page_id },
);
self.vacuum_state.write().btree_create_vacuum_full_state =
BtreeCreateVacuumFullState::PtrMapPut { allocated_page_id };
}
BtreeCreateVacuumFullState::PtrMapPut { allocated_page_id } => {
// For now map allocated_page_id since we are not swapping it with root_page_num
@@ -941,8 +949,8 @@ impl Pager {
PtrmapType::RootPage,
0,
));
self.vacuum_state.btree_create_vacuum_full_state
.set(BtreeCreateVacuumFullState::Start);
self.vacuum_state.write().btree_create_vacuum_full_state =
BtreeCreateVacuumFullState::Start;
return Ok(IOResult::Done(allocated_page_id));
}
}
@@ -2336,10 +2344,10 @@ impl Pager {
*self.free_page_state.write() = FreePageState::Start;
#[cfg(not(feature = "omit_autovacuum"))]
{
self.vacuum_state.ptrmap_get_state.replace(PtrMapGetState::Start);
self.vacuum_state.ptrmap_put_state.replace(PtrMapPutState::Start);
self.vacuum_state.btree_create_vacuum_full_state
.replace(BtreeCreateVacuumFullState::Start);
let mut vacuum_state = self.vacuum_state.write();
vacuum_state.ptrmap_get_state = PtrMapGetState::Start;
vacuum_state.ptrmap_put_state = PtrMapPutState::Start;
vacuum_state.btree_create_vacuum_full_state = BtreeCreateVacuumFullState::Start;
}
*self.header_ref_state.write() = HeaderRefState::Start;