mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-27 21:14:21 +01:00
Merge 'IO More State Machine' from Pedro Muniz
I swear we just need one more state machine. Just more state machine until we achieve IO tracking. (PS: this is a meme) Closes #2462
This commit is contained in:
@@ -27,25 +27,36 @@ use super::wal::CheckpointMode;
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
use ptrmap::*;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HeaderRef(PageRef);
|
||||
|
||||
impl HeaderRef {
|
||||
pub fn from_pager(pager: &Pager) -> Result<IOResult<Self>> {
|
||||
if !pager.db_state.is_initialized() {
|
||||
return Err(LimboError::Page1NotAlloc);
|
||||
let state = pager.header_ref_state.borrow().clone();
|
||||
tracing::trace!(?state);
|
||||
match state {
|
||||
HeaderRefState::Start => {
|
||||
if !pager.db_state.is_initialized() {
|
||||
return Err(LimboError::Page1NotAlloc);
|
||||
}
|
||||
|
||||
let (page, _c) = pager.read_page(DatabaseHeader::PAGE_ID)?;
|
||||
*pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page };
|
||||
Ok(IOResult::IO)
|
||||
}
|
||||
HeaderRefState::CreateHeader { page } => {
|
||||
// TODO: will have to remove this when tracking IO completions
|
||||
if page.is_locked() {
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
turso_assert!(
|
||||
page.get().id == DatabaseHeader::PAGE_ID,
|
||||
"incorrect header page id"
|
||||
);
|
||||
*pager.header_ref_state.borrow_mut() = HeaderRefState::Start;
|
||||
Ok(IOResult::Done(Self(page)))
|
||||
}
|
||||
}
|
||||
|
||||
let (page, _c) = pager.read_page(DatabaseHeader::PAGE_ID)?;
|
||||
if page.is_locked() {
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
|
||||
turso_assert!(
|
||||
page.get().id == DatabaseHeader::PAGE_ID,
|
||||
"incorrect header page id"
|
||||
);
|
||||
|
||||
Ok(IOResult::Done(Self(page)))
|
||||
}
|
||||
|
||||
pub fn borrow(&self) -> &DatabaseHeader {
|
||||
@@ -55,27 +66,38 @@ impl HeaderRef {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HeaderRefMut(PageRef);
|
||||
|
||||
impl HeaderRefMut {
|
||||
pub fn from_pager(pager: &Pager) -> Result<IOResult<Self>> {
|
||||
if !pager.db_state.is_initialized() {
|
||||
return Err(LimboError::Page1NotAlloc);
|
||||
let state = pager.header_ref_state.borrow().clone();
|
||||
tracing::trace!(?state);
|
||||
match state {
|
||||
HeaderRefState::Start => {
|
||||
if !pager.db_state.is_initialized() {
|
||||
return Err(LimboError::Page1NotAlloc);
|
||||
}
|
||||
|
||||
let (page, _c) = pager.read_page(DatabaseHeader::PAGE_ID)?;
|
||||
*pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page };
|
||||
Ok(IOResult::IO)
|
||||
}
|
||||
HeaderRefState::CreateHeader { page } => {
|
||||
// TODO: will have to remove this when tracking IO completions
|
||||
if page.is_locked() {
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
turso_assert!(
|
||||
page.get().id == DatabaseHeader::PAGE_ID,
|
||||
"incorrect header page id"
|
||||
);
|
||||
|
||||
pager.add_dirty(&page);
|
||||
*pager.header_ref_state.borrow_mut() = HeaderRefState::Start;
|
||||
Ok(IOResult::Done(Self(page)))
|
||||
}
|
||||
}
|
||||
|
||||
let (page, _c) = pager.read_page(DatabaseHeader::PAGE_ID)?;
|
||||
if page.is_locked() {
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
|
||||
turso_assert!(
|
||||
page.get().id == DatabaseHeader::PAGE_ID,
|
||||
"incorrect header page id"
|
||||
);
|
||||
|
||||
pager.add_dirty(&page);
|
||||
|
||||
Ok(IOResult::Done(Self(page)))
|
||||
}
|
||||
|
||||
pub fn borrow_mut(&self) -> &mut DatabaseHeader {
|
||||
@@ -359,15 +381,36 @@ impl AtomicDbState {
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
enum PtrMapGetState {
|
||||
Start,
|
||||
ReadPage {
|
||||
page_size: usize,
|
||||
},
|
||||
Deserialize {
|
||||
ptrmap_page: PageRef,
|
||||
offset_in_ptrmap_page: usize,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
enum PtrMapPutState {
|
||||
Start,
|
||||
Deserialize {
|
||||
ptrmap_page: PageRef,
|
||||
offset_in_ptrmap_page: usize,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum HeaderRefState {
|
||||
Start,
|
||||
CreateHeader { page: PageRef },
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum BtreeCreateVacuumFullState {
|
||||
Start,
|
||||
AllocatePage { root_page_num: u32 },
|
||||
PtrMapPut { allocated_page_id: u32 },
|
||||
}
|
||||
|
||||
/// The pager interface implements the persistence layer by providing access
|
||||
/// to pages of the database file, including caching, concurrency control, and
|
||||
/// transaction management.
|
||||
@@ -410,6 +453,12 @@ pub struct Pager {
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
/// State machine for [Pager::ptrmap_get]
|
||||
ptrmap_get_state: RefCell<PtrMapGetState>,
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
/// State machine for [Pager::ptrmap_put]
|
||||
ptrmap_put_state: RefCell<PtrMapPutState>,
|
||||
header_ref_state: RefCell<HeaderRefState>,
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
btree_create_vacuum_full_state: Cell<BtreeCreateVacuumFullState>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -515,6 +564,11 @@ impl Pager {
|
||||
allocate_page_state: RefCell::new(AllocatePageState::Start),
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
ptrmap_get_state: RefCell::new(PtrMapGetState::Start),
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
ptrmap_put_state: RefCell::new(PtrMapPutState::Start),
|
||||
header_ref_state: RefCell::new(HeaderRefState::Start),
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
btree_create_vacuum_full_state: Cell::new(BtreeCreateVacuumFullState::Start),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -535,98 +589,86 @@ impl Pager {
|
||||
/// Returns `Ok(None)` if the page is not supposed to have a ptrmap entry (e.g. header, or a ptrmap page itself).
|
||||
#[cfg(not(feature = "omit_autovacuum"))]
|
||||
pub fn ptrmap_get(&self, target_page_num: u32) -> Result<IOResult<Option<PtrmapEntry>>> {
|
||||
loop {
|
||||
let ptrmap_get_state = self.ptrmap_get_state.borrow().clone();
|
||||
match ptrmap_get_state {
|
||||
PtrMapGetState::Start => {
|
||||
tracing::trace!("ptrmap_get(page_idx = {})", target_page_num);
|
||||
let configured_page_size =
|
||||
return_if_io!(self.with_header(|header| header.page_size)).get() as usize;
|
||||
let ptrmap_get_state = self.ptrmap_get_state.borrow().clone();
|
||||
match ptrmap_get_state {
|
||||
PtrMapGetState::Start => {
|
||||
tracing::trace!("ptrmap_get(page_idx = {})", target_page_num);
|
||||
let configured_page_size =
|
||||
return_if_io!(self.with_header(|header| header.page_size)).get() as usize;
|
||||
|
||||
if target_page_num < FIRST_PTRMAP_PAGE_NO
|
||||
|| is_ptrmap_page(target_page_num, configured_page_size)
|
||||
{
|
||||
return Ok(IOResult::Done(None));
|
||||
}
|
||||
|
||||
self.ptrmap_get_state.replace(PtrMapGetState::ReadPage {
|
||||
page_size: configured_page_size,
|
||||
});
|
||||
if target_page_num < FIRST_PTRMAP_PAGE_NO
|
||||
|| is_ptrmap_page(target_page_num, configured_page_size)
|
||||
{
|
||||
return Ok(IOResult::Done(None));
|
||||
}
|
||||
PtrMapGetState::ReadPage {
|
||||
page_size: configured_page_size,
|
||||
} => {
|
||||
let ptrmap_pg_no =
|
||||
get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size);
|
||||
let offset_in_ptrmap_page = get_ptrmap_offset_in_page(
|
||||
target_page_num,
|
||||
ptrmap_pg_no,
|
||||
configured_page_size,
|
||||
)?;
|
||||
tracing::trace!(
|
||||
"ptrmap_get(page_idx = {}) = ptrmap_pg_no = {}",
|
||||
target_page_num,
|
||||
ptrmap_pg_no
|
||||
);
|
||||
|
||||
let (ptrmap_page, _c) = self.read_page(ptrmap_pg_no as usize)?;
|
||||
self.ptrmap_get_state.replace(PtrMapGetState::Deserialize {
|
||||
ptrmap_page,
|
||||
offset_in_ptrmap_page,
|
||||
});
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
PtrMapGetState::Deserialize {
|
||||
let ptrmap_pg_no =
|
||||
get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size);
|
||||
let offset_in_ptrmap_page =
|
||||
get_ptrmap_offset_in_page(target_page_num, ptrmap_pg_no, configured_page_size)?;
|
||||
tracing::trace!(
|
||||
"ptrmap_get(page_idx = {}) = ptrmap_pg_no = {}",
|
||||
target_page_num,
|
||||
ptrmap_pg_no
|
||||
);
|
||||
|
||||
let (ptrmap_page, _c) = self.read_page(ptrmap_pg_no as usize)?;
|
||||
self.ptrmap_get_state.replace(PtrMapGetState::Deserialize {
|
||||
ptrmap_page,
|
||||
offset_in_ptrmap_page,
|
||||
} => {
|
||||
if ptrmap_page.is_locked() {
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
if !ptrmap_page.is_loaded() {
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
let ptrmap_page_inner = ptrmap_page.get();
|
||||
let ptrmap_pg_no = ptrmap_page_inner.id;
|
||||
});
|
||||
Ok(IOResult::IO)
|
||||
}
|
||||
PtrMapGetState::Deserialize {
|
||||
ptrmap_page,
|
||||
offset_in_ptrmap_page,
|
||||
} => {
|
||||
if ptrmap_page.is_locked() {
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
if !ptrmap_page.is_loaded() {
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
let ptrmap_page_inner = ptrmap_page.get();
|
||||
let ptrmap_pg_no = ptrmap_page_inner.id;
|
||||
|
||||
let page_content: &PageContent = match ptrmap_page_inner.contents.as_ref() {
|
||||
Some(content) => content,
|
||||
None => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Ptrmap page {ptrmap_pg_no} content not loaded"
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let full_buffer_slice: &[u8] = page_content.buffer.as_slice();
|
||||
|
||||
// Ptrmap pages are not page 1, so their internal offset within their buffer should be 0.
|
||||
// The actual page data starts at page_content.offset within the full_buffer_slice.
|
||||
if ptrmap_pg_no != 1 && page_content.offset != 0 {
|
||||
return Err(LimboError::Corrupt(format!(
|
||||
"Ptrmap page {} has unexpected internal offset {}",
|
||||
ptrmap_pg_no, page_content.offset
|
||||
let page_content: &PageContent = match ptrmap_page_inner.contents.as_ref() {
|
||||
Some(content) => content,
|
||||
None => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Ptrmap page {ptrmap_pg_no} content not loaded"
|
||||
)));
|
||||
}
|
||||
let ptrmap_page_data_slice: &[u8] = &full_buffer_slice[page_content.offset..];
|
||||
let actual_data_length = ptrmap_page_data_slice.len();
|
||||
};
|
||||
|
||||
// Check if the calculated offset for the entry is within the bounds of the actual page data length.
|
||||
if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > actual_data_length {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
let full_buffer_slice: &[u8] = page_content.buffer.as_slice();
|
||||
|
||||
// Ptrmap pages are not page 1, so their internal offset within their buffer should be 0.
|
||||
// The actual page data starts at page_content.offset within the full_buffer_slice.
|
||||
if ptrmap_pg_no != 1 && page_content.offset != 0 {
|
||||
return Err(LimboError::Corrupt(format!(
|
||||
"Ptrmap page {} has unexpected internal offset {}",
|
||||
ptrmap_pg_no, page_content.offset
|
||||
)));
|
||||
}
|
||||
let ptrmap_page_data_slice: &[u8] = &full_buffer_slice[page_content.offset..];
|
||||
let actual_data_length = ptrmap_page_data_slice.len();
|
||||
|
||||
// Check if the calculated offset for the entry is within the bounds of the actual page data length.
|
||||
if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > actual_data_length {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Ptrmap offset {offset_in_ptrmap_page} + entry size {PTRMAP_ENTRY_SIZE} out of bounds for page {ptrmap_pg_no} (actual data len {actual_data_length})"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
let entry_slice = &ptrmap_page_data_slice
|
||||
[offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE];
|
||||
self.ptrmap_get_state.replace(PtrMapGetState::Start);
|
||||
return match PtrmapEntry::deserialize(entry_slice) {
|
||||
Some(entry) => Ok(IOResult::Done(Some(entry))),
|
||||
None => Err(LimboError::Corrupt(format!(
|
||||
"Failed to deserialize ptrmap entry for page {target_page_num} from ptrmap page {ptrmap_pg_no}"
|
||||
))),
|
||||
};
|
||||
let entry_slice = &ptrmap_page_data_slice
|
||||
[offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE];
|
||||
self.ptrmap_get_state.replace(PtrMapGetState::Start);
|
||||
match PtrmapEntry::deserialize(entry_slice) {
|
||||
Some(entry) => Ok(IOResult::Done(Some(entry))),
|
||||
None => Err(LimboError::Corrupt(format!(
|
||||
"Failed to deserialize ptrmap entry for page {target_page_num} from ptrmap page {ptrmap_pg_no}"
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -648,74 +690,85 @@ impl Pager {
|
||||
entry_type,
|
||||
parent_page_no
|
||||
);
|
||||
let ptrmap_put_state = self.ptrmap_put_state.borrow().clone();
|
||||
match ptrmap_put_state {
|
||||
PtrMapPutState::Start => {
|
||||
let page_size =
|
||||
return_if_io!(self.with_header(|header| header.page_size)).get() as usize;
|
||||
|
||||
let page_size = return_if_io!(self.with_header(|header| header.page_size)).get() as usize;
|
||||
if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO
|
||||
|| is_ptrmap_page(db_page_no_to_update, page_size)
|
||||
{
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Cannot set ptrmap entry for page {db_page_no_to_update}: it's a header/ptrmap page or invalid."
|
||||
)));
|
||||
}
|
||||
|
||||
if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO
|
||||
|| is_ptrmap_page(db_page_no_to_update, page_size)
|
||||
{
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Cannot set ptrmap entry for page {db_page_no_to_update}: it's a header/ptrmap page or invalid."
|
||||
)));
|
||||
}
|
||||
let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(db_page_no_to_update, page_size);
|
||||
let offset_in_ptrmap_page =
|
||||
get_ptrmap_offset_in_page(db_page_no_to_update, ptrmap_pg_no, page_size)?;
|
||||
tracing::trace!(
|
||||
"ptrmap_put(page_idx = {}, entry_type = {:?}, parent_page_no = {}) = ptrmap_pg_no = {}, offset_in_ptrmap_page = {}",
|
||||
db_page_no_to_update,
|
||||
entry_type,
|
||||
parent_page_no,
|
||||
ptrmap_pg_no,
|
||||
offset_in_ptrmap_page
|
||||
);
|
||||
|
||||
let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(db_page_no_to_update, page_size);
|
||||
let offset_in_ptrmap_page =
|
||||
get_ptrmap_offset_in_page(db_page_no_to_update, ptrmap_pg_no, page_size)?;
|
||||
tracing::trace!(
|
||||
"ptrmap_put(page_idx = {}, entry_type = {:?}, parent_page_no = {}) = ptrmap_pg_no = {}, offset_in_ptrmap_page = {}",
|
||||
db_page_no_to_update,
|
||||
entry_type,
|
||||
parent_page_no,
|
||||
ptrmap_pg_no,
|
||||
offset_in_ptrmap_page
|
||||
);
|
||||
|
||||
let (ptrmap_page, _c) = self.read_page(ptrmap_pg_no as usize)?;
|
||||
if ptrmap_page.is_locked() {
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
if !ptrmap_page.is_loaded() {
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
let ptrmap_page_inner = ptrmap_page.get();
|
||||
|
||||
let page_content = match ptrmap_page_inner.contents.as_ref() {
|
||||
Some(content) => content,
|
||||
None => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Ptrmap page {ptrmap_pg_no} content not loaded"
|
||||
)))
|
||||
let (ptrmap_page, _c) = self.read_page(ptrmap_pg_no as usize)?;
|
||||
self.ptrmap_put_state.replace(PtrMapPutState::Deserialize {
|
||||
ptrmap_page,
|
||||
offset_in_ptrmap_page,
|
||||
});
|
||||
Ok(IOResult::IO)
|
||||
}
|
||||
};
|
||||
|
||||
let full_buffer_slice = page_content.buffer.as_mut_slice();
|
||||
|
||||
if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > full_buffer_slice.len() {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Ptrmap offset {} + entry size {} out of bounds for page {} (actual data len {})",
|
||||
PtrMapPutState::Deserialize {
|
||||
ptrmap_page,
|
||||
offset_in_ptrmap_page,
|
||||
PTRMAP_ENTRY_SIZE,
|
||||
ptrmap_pg_no,
|
||||
full_buffer_slice.len()
|
||||
)));
|
||||
} => {
|
||||
let ptrmap_page_inner = ptrmap_page.get();
|
||||
let ptrmap_pg_no = ptrmap_page_inner.id;
|
||||
|
||||
let page_content = match ptrmap_page_inner.contents.as_ref() {
|
||||
Some(content) => content,
|
||||
None => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Ptrmap page {ptrmap_pg_no} content not loaded"
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
let full_buffer_slice = page_content.buffer.as_mut_slice();
|
||||
|
||||
if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > full_buffer_slice.len() {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"Ptrmap offset {} + entry size {} out of bounds for page {} (actual data len {})",
|
||||
offset_in_ptrmap_page,
|
||||
PTRMAP_ENTRY_SIZE,
|
||||
ptrmap_pg_no,
|
||||
full_buffer_slice.len()
|
||||
)));
|
||||
}
|
||||
|
||||
let entry = PtrmapEntry {
|
||||
entry_type,
|
||||
parent_page_no,
|
||||
};
|
||||
entry.serialize(
|
||||
&mut full_buffer_slice
|
||||
[offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE],
|
||||
)?;
|
||||
|
||||
turso_assert!(
|
||||
ptrmap_page.get().id == ptrmap_pg_no as usize,
|
||||
"ptrmap page has unexpected number"
|
||||
);
|
||||
self.add_dirty(&ptrmap_page);
|
||||
self.ptrmap_put_state.replace(PtrMapPutState::Start);
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
}
|
||||
|
||||
let entry = PtrmapEntry {
|
||||
entry_type,
|
||||
parent_page_no,
|
||||
};
|
||||
entry.serialize(
|
||||
&mut full_buffer_slice
|
||||
[offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE],
|
||||
)?;
|
||||
|
||||
turso_assert!(
|
||||
ptrmap_page.get().id == ptrmap_pg_no as usize,
|
||||
"ptrmap page has unexpected number"
|
||||
);
|
||||
self.add_dirty(&ptrmap_page);
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
|
||||
/// This method is used to allocate a new root page for a btree, both for tables and indexes
|
||||
@@ -744,40 +797,61 @@ impl Pager {
|
||||
Ok(IOResult::Done(page.get().get().id as u32))
|
||||
}
|
||||
AutoVacuumMode::Full => {
|
||||
let (mut root_page_num, page_size) =
|
||||
return_if_io!(self.with_header(|header| {
|
||||
(
|
||||
header.vacuum_mode_largest_root_page.get(),
|
||||
header.page_size.get(),
|
||||
)
|
||||
}));
|
||||
loop {
|
||||
match self.btree_create_vacuum_full_state.get() {
|
||||
BtreeCreateVacuumFullState::Start => {
|
||||
let (mut root_page_num, page_size) = return_if_io!(self
|
||||
.with_header(|header| {
|
||||
(
|
||||
header.vacuum_mode_largest_root_page.get(),
|
||||
header.page_size.get(),
|
||||
)
|
||||
}));
|
||||
|
||||
assert!(root_page_num > 0); // Largest root page number cannot be 0 because that is set to 1 when creating the database with autovacuum enabled
|
||||
root_page_num += 1;
|
||||
assert!(root_page_num >= FIRST_PTRMAP_PAGE_NO); // can never be less than 2 because we have already incremented
|
||||
assert!(root_page_num > 0); // Largest root page number cannot be 0 because that is set to 1 when creating the database with autovacuum enabled
|
||||
root_page_num += 1;
|
||||
assert!(root_page_num >= FIRST_PTRMAP_PAGE_NO); // can never be less than 2 because we have already incremented
|
||||
|
||||
while is_ptrmap_page(root_page_num, page_size as usize) {
|
||||
root_page_num += 1;
|
||||
}
|
||||
assert!(root_page_num >= 3); // the very first root page is page 3
|
||||
while is_ptrmap_page(root_page_num, page_size as usize) {
|
||||
root_page_num += 1;
|
||||
}
|
||||
assert!(root_page_num >= 3); // the very first root page is page 3
|
||||
self.btree_create_vacuum_full_state.set(
|
||||
BtreeCreateVacuumFullState::AllocatePage { root_page_num },
|
||||
);
|
||||
}
|
||||
BtreeCreateVacuumFullState::AllocatePage { root_page_num } => {
|
||||
// root_page_num here is the desired root page
|
||||
let page = return_if_io!(self.do_allocate_page(
|
||||
page_type,
|
||||
0,
|
||||
BtreePageAllocMode::Exact(root_page_num),
|
||||
));
|
||||
let allocated_page_id = page.get().get().id as u32;
|
||||
if allocated_page_id != root_page_num {
|
||||
// TODO(Zaid): Handle swapping the allocated page with the desired root page
|
||||
}
|
||||
|
||||
// root_page_num here is the desired root page
|
||||
let page = return_if_io!(self.do_allocate_page(
|
||||
page_type,
|
||||
0,
|
||||
BtreePageAllocMode::Exact(root_page_num),
|
||||
));
|
||||
let allocated_page_id = page.get().get().id as u32;
|
||||
if allocated_page_id != root_page_num {
|
||||
// TODO(Zaid): Handle swapping the allocated page with the desired root page
|
||||
}
|
||||
|
||||
// TODO(Zaid): Update the header metadata to reflect the new root page number
|
||||
|
||||
// For now map allocated_page_id since we are not swapping it with root_page_num
|
||||
match self.ptrmap_put(allocated_page_id, PtrmapType::RootPage, 0)? {
|
||||
IOResult::Done(_) => Ok(IOResult::Done(allocated_page_id)),
|
||||
IOResult::IO => Ok(IOResult::IO),
|
||||
// TODO(Zaid): Update the header metadata to reflect the new root page number
|
||||
self.btree_create_vacuum_full_state.set(
|
||||
BtreeCreateVacuumFullState::PtrMapPut { allocated_page_id },
|
||||
);
|
||||
}
|
||||
BtreeCreateVacuumFullState::PtrMapPut { allocated_page_id } => {
|
||||
// For now map allocated_page_id since we are not swapping it with root_page_num
|
||||
let res = match self.ptrmap_put(
|
||||
allocated_page_id,
|
||||
PtrmapType::RootPage,
|
||||
0,
|
||||
)? {
|
||||
IOResult::Done(_) => Ok(IOResult::Done(allocated_page_id)),
|
||||
IOResult::IO => return Ok(IOResult::IO),
|
||||
};
|
||||
self.btree_create_vacuum_full_state
|
||||
.set(BtreeCreateVacuumFullState::Start);
|
||||
return res;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
AutoVacuumMode::Incremental => {
|
||||
@@ -2349,6 +2423,14 @@ mod ptrmap_tests {
|
||||
)
|
||||
.unwrap();
|
||||
run_until_done(|| pager.allocate_page1(), &pager).unwrap();
|
||||
{
|
||||
let page_cache = pager.page_cache.read();
|
||||
println!(
|
||||
"Cache Len: {} Cap: {}",
|
||||
page_cache.len(),
|
||||
page_cache.capacity()
|
||||
);
|
||||
}
|
||||
pager
|
||||
.io
|
||||
.block(|| {
|
||||
@@ -2360,10 +2442,20 @@ mod ptrmap_tests {
|
||||
// Allocate all the pages as btree root pages
|
||||
const EXPECTED_FIRST_ROOT_PAGE_ID: u32 = 3; // page1 = 1, first ptrmap page = 2, root page = 3
|
||||
for i in 0..initial_db_pages {
|
||||
match run_until_done(
|
||||
let res = run_until_done(
|
||||
|| pager.btree_create(&CreateBTreeFlags::new_table()),
|
||||
&pager,
|
||||
) {
|
||||
);
|
||||
{
|
||||
let page_cache = pager.page_cache.read();
|
||||
println!(
|
||||
"i: {} Cache Len: {} Cap: {}",
|
||||
i,
|
||||
page_cache.len(),
|
||||
page_cache.capacity()
|
||||
);
|
||||
}
|
||||
match res {
|
||||
Ok(root_page_id) => {
|
||||
assert_eq!(root_page_id, EXPECTED_FIRST_ROOT_PAGE_ID + i);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user