diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 210af9076..c264e0caa 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -27,25 +27,36 @@ use super::wal::CheckpointMode; #[cfg(not(feature = "omit_autovacuum"))] use ptrmap::*; +#[derive(Debug, Clone)] pub struct HeaderRef(PageRef); impl HeaderRef { pub fn from_pager(pager: &Pager) -> Result> { - if !pager.db_state.is_initialized() { - return Err(LimboError::Page1NotAlloc); + let state = pager.header_ref_state.borrow().clone(); + tracing::trace!(?state); + match state { + HeaderRefState::Start => { + if !pager.db_state.is_initialized() { + return Err(LimboError::Page1NotAlloc); + } + + let (page, _c) = pager.read_page(DatabaseHeader::PAGE_ID)?; + *pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page }; + Ok(IOResult::IO) + } + HeaderRefState::CreateHeader { page } => { + // TODO: will have to remove this when tracking IO completions + if page.is_locked() { + return Ok(IOResult::IO); + } + turso_assert!( + page.get().id == DatabaseHeader::PAGE_ID, + "incorrect header page id" + ); + *pager.header_ref_state.borrow_mut() = HeaderRefState::Start; + Ok(IOResult::Done(Self(page))) + } } - - let (page, _c) = pager.read_page(DatabaseHeader::PAGE_ID)?; - if page.is_locked() { - return Ok(IOResult::IO); - } - - turso_assert!( - page.get().id == DatabaseHeader::PAGE_ID, - "incorrect header page id" - ); - - Ok(IOResult::Done(Self(page))) } pub fn borrow(&self) -> &DatabaseHeader { @@ -55,27 +66,38 @@ impl HeaderRef { } } +#[derive(Debug, Clone)] pub struct HeaderRefMut(PageRef); impl HeaderRefMut { pub fn from_pager(pager: &Pager) -> Result> { - if !pager.db_state.is_initialized() { - return Err(LimboError::Page1NotAlloc); + let state = pager.header_ref_state.borrow().clone(); + tracing::trace!(?state); + match state { + HeaderRefState::Start => { + if !pager.db_state.is_initialized() { + return Err(LimboError::Page1NotAlloc); + } + + let (page, _c) = pager.read_page(DatabaseHeader::PAGE_ID)?; + *pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page }; + Ok(IOResult::IO) + } + HeaderRefState::CreateHeader { page } => { + // TODO: will have to remove this when tracking IO completions + if page.is_locked() { + return Ok(IOResult::IO); + } + turso_assert!( + page.get().id == DatabaseHeader::PAGE_ID, + "incorrect header page id" + ); + + pager.add_dirty(&page); + *pager.header_ref_state.borrow_mut() = HeaderRefState::Start; + Ok(IOResult::Done(Self(page))) + } } - - let (page, _c) = pager.read_page(DatabaseHeader::PAGE_ID)?; - if page.is_locked() { - return Ok(IOResult::IO); - } - - turso_assert!( - page.get().id == DatabaseHeader::PAGE_ID, - "incorrect header page id" - ); - - pager.add_dirty(&page); - - Ok(IOResult::Done(Self(page))) } pub fn borrow_mut(&self) -> &mut DatabaseHeader { @@ -359,15 +381,36 @@ impl AtomicDbState { #[cfg(not(feature = "omit_autovacuum"))] enum PtrMapGetState { Start, - ReadPage { - page_size: usize, - }, Deserialize { ptrmap_page: PageRef, offset_in_ptrmap_page: usize, }, } +#[derive(Debug, Clone)] +#[cfg(not(feature = "omit_autovacuum"))] +enum PtrMapPutState { + Start, + Deserialize { + ptrmap_page: PageRef, + offset_in_ptrmap_page: usize, + }, +} + +#[derive(Debug, Clone)] +enum HeaderRefState { + Start, + CreateHeader { page: PageRef }, +} + +#[cfg(not(feature = "omit_autovacuum"))] +#[derive(Debug, Clone, Copy)] +enum BtreeCreateVacuumFullState { + Start, + AllocatePage { root_page_num: u32 }, + PtrMapPut { allocated_page_id: u32 }, +} + /// The pager interface implements the persistence layer by providing access /// to pages of the database file, including caching, concurrency control, and /// transaction management. @@ -410,6 +453,12 @@ pub struct Pager { #[cfg(not(feature = "omit_autovacuum"))] /// State machine for [Pager::ptrmap_get] ptrmap_get_state: RefCell, + #[cfg(not(feature = "omit_autovacuum"))] + /// State machine for [Pager::ptrmap_put] + ptrmap_put_state: RefCell, + header_ref_state: RefCell, + #[cfg(not(feature = "omit_autovacuum"))] + btree_create_vacuum_full_state: Cell, } #[derive(Debug, Clone)] @@ -515,6 +564,11 @@ impl Pager { allocate_page_state: RefCell::new(AllocatePageState::Start), #[cfg(not(feature = "omit_autovacuum"))] ptrmap_get_state: RefCell::new(PtrMapGetState::Start), + #[cfg(not(feature = "omit_autovacuum"))] + ptrmap_put_state: RefCell::new(PtrMapPutState::Start), + header_ref_state: RefCell::new(HeaderRefState::Start), + #[cfg(not(feature = "omit_autovacuum"))] + btree_create_vacuum_full_state: Cell::new(BtreeCreateVacuumFullState::Start), }) } @@ -535,98 +589,86 @@ impl Pager { /// Returns `Ok(None)` if the page is not supposed to have a ptrmap entry (e.g. header, or a ptrmap page itself). #[cfg(not(feature = "omit_autovacuum"))] pub fn ptrmap_get(&self, target_page_num: u32) -> Result>> { - loop { - let ptrmap_get_state = self.ptrmap_get_state.borrow().clone(); - match ptrmap_get_state { - PtrMapGetState::Start => { - tracing::trace!("ptrmap_get(page_idx = {})", target_page_num); - let configured_page_size = - return_if_io!(self.with_header(|header| header.page_size)).get() as usize; + let ptrmap_get_state = self.ptrmap_get_state.borrow().clone(); + match ptrmap_get_state { + PtrMapGetState::Start => { + tracing::trace!("ptrmap_get(page_idx = {})", target_page_num); + let configured_page_size = + return_if_io!(self.with_header(|header| header.page_size)).get() as usize; - if target_page_num < FIRST_PTRMAP_PAGE_NO - || is_ptrmap_page(target_page_num, configured_page_size) - { - return Ok(IOResult::Done(None)); - } - - self.ptrmap_get_state.replace(PtrMapGetState::ReadPage { - page_size: configured_page_size, - }); + if target_page_num < FIRST_PTRMAP_PAGE_NO + || is_ptrmap_page(target_page_num, configured_page_size) + { + return Ok(IOResult::Done(None)); } - PtrMapGetState::ReadPage { - page_size: configured_page_size, - } => { - let ptrmap_pg_no = - get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size); - let offset_in_ptrmap_page = get_ptrmap_offset_in_page( - target_page_num, - ptrmap_pg_no, - configured_page_size, - )?; - tracing::trace!( - "ptrmap_get(page_idx = {}) = ptrmap_pg_no = {}", - target_page_num, - ptrmap_pg_no - ); - let (ptrmap_page, _c) = self.read_page(ptrmap_pg_no as usize)?; - self.ptrmap_get_state.replace(PtrMapGetState::Deserialize { - ptrmap_page, - offset_in_ptrmap_page, - }); - return Ok(IOResult::IO); - } - PtrMapGetState::Deserialize { + let ptrmap_pg_no = + get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size); + let offset_in_ptrmap_page = + get_ptrmap_offset_in_page(target_page_num, ptrmap_pg_no, configured_page_size)?; + tracing::trace!( + "ptrmap_get(page_idx = {}) = ptrmap_pg_no = {}", + target_page_num, + ptrmap_pg_no + ); + + let (ptrmap_page, _c) = self.read_page(ptrmap_pg_no as usize)?; + self.ptrmap_get_state.replace(PtrMapGetState::Deserialize { ptrmap_page, offset_in_ptrmap_page, - } => { - if ptrmap_page.is_locked() { - return Ok(IOResult::IO); - } - if !ptrmap_page.is_loaded() { - return Ok(IOResult::IO); - } - let ptrmap_page_inner = ptrmap_page.get(); - let ptrmap_pg_no = ptrmap_page_inner.id; + }); + Ok(IOResult::IO) + } + PtrMapGetState::Deserialize { + ptrmap_page, + offset_in_ptrmap_page, + } => { + if ptrmap_page.is_locked() { + return Ok(IOResult::IO); + } + if !ptrmap_page.is_loaded() { + return Ok(IOResult::IO); + } + let ptrmap_page_inner = ptrmap_page.get(); + let ptrmap_pg_no = ptrmap_page_inner.id; - let page_content: &PageContent = match ptrmap_page_inner.contents.as_ref() { - Some(content) => content, - None => { - return Err(LimboError::InternalError(format!( - "Ptrmap page {ptrmap_pg_no} content not loaded" - ))); - } - }; - - let full_buffer_slice: &[u8] = page_content.buffer.as_slice(); - - // Ptrmap pages are not page 1, so their internal offset within their buffer should be 0. - // The actual page data starts at page_content.offset within the full_buffer_slice. - if ptrmap_pg_no != 1 && page_content.offset != 0 { - return Err(LimboError::Corrupt(format!( - "Ptrmap page {} has unexpected internal offset {}", - ptrmap_pg_no, page_content.offset + let page_content: &PageContent = match ptrmap_page_inner.contents.as_ref() { + Some(content) => content, + None => { + return Err(LimboError::InternalError(format!( + "Ptrmap page {ptrmap_pg_no} content not loaded" ))); } - let ptrmap_page_data_slice: &[u8] = &full_buffer_slice[page_content.offset..]; - let actual_data_length = ptrmap_page_data_slice.len(); + }; - // Check if the calculated offset for the entry is within the bounds of the actual page data length. - if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > actual_data_length { - return Err(LimboError::InternalError(format!( + let full_buffer_slice: &[u8] = page_content.buffer.as_slice(); + + // Ptrmap pages are not page 1, so their internal offset within their buffer should be 0. + // The actual page data starts at page_content.offset within the full_buffer_slice. + if ptrmap_pg_no != 1 && page_content.offset != 0 { + return Err(LimboError::Corrupt(format!( + "Ptrmap page {} has unexpected internal offset {}", + ptrmap_pg_no, page_content.offset + ))); + } + let ptrmap_page_data_slice: &[u8] = &full_buffer_slice[page_content.offset..]; + let actual_data_length = ptrmap_page_data_slice.len(); + + // Check if the calculated offset for the entry is within the bounds of the actual page data length. + if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > actual_data_length { + return Err(LimboError::InternalError(format!( "Ptrmap offset {offset_in_ptrmap_page} + entry size {PTRMAP_ENTRY_SIZE} out of bounds for page {ptrmap_pg_no} (actual data len {actual_data_length})" ))); - } + } - let entry_slice = &ptrmap_page_data_slice - [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE]; - self.ptrmap_get_state.replace(PtrMapGetState::Start); - return match PtrmapEntry::deserialize(entry_slice) { - Some(entry) => Ok(IOResult::Done(Some(entry))), - None => Err(LimboError::Corrupt(format!( - "Failed to deserialize ptrmap entry for page {target_page_num} from ptrmap page {ptrmap_pg_no}" - ))), - }; + let entry_slice = &ptrmap_page_data_slice + [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE]; + self.ptrmap_get_state.replace(PtrMapGetState::Start); + match PtrmapEntry::deserialize(entry_slice) { + Some(entry) => Ok(IOResult::Done(Some(entry))), + None => Err(LimboError::Corrupt(format!( + "Failed to deserialize ptrmap entry for page {target_page_num} from ptrmap page {ptrmap_pg_no}" + ))), } } } @@ -648,74 +690,85 @@ impl Pager { entry_type, parent_page_no ); + let ptrmap_put_state = self.ptrmap_put_state.borrow().clone(); + match ptrmap_put_state { + PtrMapPutState::Start => { + let page_size = + return_if_io!(self.with_header(|header| header.page_size)).get() as usize; - let page_size = return_if_io!(self.with_header(|header| header.page_size)).get() as usize; + if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO + || is_ptrmap_page(db_page_no_to_update, page_size) + { + return Err(LimboError::InternalError(format!( + "Cannot set ptrmap entry for page {db_page_no_to_update}: it's a header/ptrmap page or invalid." + ))); + } - if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO - || is_ptrmap_page(db_page_no_to_update, page_size) - { - return Err(LimboError::InternalError(format!( - "Cannot set ptrmap entry for page {db_page_no_to_update}: it's a header/ptrmap page or invalid." - ))); - } + let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(db_page_no_to_update, page_size); + let offset_in_ptrmap_page = + get_ptrmap_offset_in_page(db_page_no_to_update, ptrmap_pg_no, page_size)?; + tracing::trace!( + "ptrmap_put(page_idx = {}, entry_type = {:?}, parent_page_no = {}) = ptrmap_pg_no = {}, offset_in_ptrmap_page = {}", + db_page_no_to_update, + entry_type, + parent_page_no, + ptrmap_pg_no, + offset_in_ptrmap_page + ); - let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(db_page_no_to_update, page_size); - let offset_in_ptrmap_page = - get_ptrmap_offset_in_page(db_page_no_to_update, ptrmap_pg_no, page_size)?; - tracing::trace!( - "ptrmap_put(page_idx = {}, entry_type = {:?}, parent_page_no = {}) = ptrmap_pg_no = {}, offset_in_ptrmap_page = {}", - db_page_no_to_update, - entry_type, - parent_page_no, - ptrmap_pg_no, - offset_in_ptrmap_page - ); - - let (ptrmap_page, _c) = self.read_page(ptrmap_pg_no as usize)?; - if ptrmap_page.is_locked() { - return Ok(IOResult::IO); - } - if !ptrmap_page.is_loaded() { - return Ok(IOResult::IO); - } - let ptrmap_page_inner = ptrmap_page.get(); - - let page_content = match ptrmap_page_inner.contents.as_ref() { - Some(content) => content, - None => { - return Err(LimboError::InternalError(format!( - "Ptrmap page {ptrmap_pg_no} content not loaded" - ))) + let (ptrmap_page, _c) = self.read_page(ptrmap_pg_no as usize)?; + self.ptrmap_put_state.replace(PtrMapPutState::Deserialize { + ptrmap_page, + offset_in_ptrmap_page, + }); + Ok(IOResult::IO) } - }; - - let full_buffer_slice = page_content.buffer.as_mut_slice(); - - if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > full_buffer_slice.len() { - return Err(LimboError::InternalError(format!( - "Ptrmap offset {} + entry size {} out of bounds for page {} (actual data len {})", + PtrMapPutState::Deserialize { + ptrmap_page, offset_in_ptrmap_page, - PTRMAP_ENTRY_SIZE, - ptrmap_pg_no, - full_buffer_slice.len() - ))); + } => { + let ptrmap_page_inner = ptrmap_page.get(); + let ptrmap_pg_no = ptrmap_page_inner.id; + + let page_content = match ptrmap_page_inner.contents.as_ref() { + Some(content) => content, + None => { + return Err(LimboError::InternalError(format!( + "Ptrmap page {ptrmap_pg_no} content not loaded" + ))) + } + }; + + let full_buffer_slice = page_content.buffer.as_mut_slice(); + + if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > full_buffer_slice.len() { + return Err(LimboError::InternalError(format!( + "Ptrmap offset {} + entry size {} out of bounds for page {} (actual data len {})", + offset_in_ptrmap_page, + PTRMAP_ENTRY_SIZE, + ptrmap_pg_no, + full_buffer_slice.len() + ))); + } + + let entry = PtrmapEntry { + entry_type, + parent_page_no, + }; + entry.serialize( + &mut full_buffer_slice + [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE], + )?; + + turso_assert!( + ptrmap_page.get().id == ptrmap_pg_no as usize, + "ptrmap page has unexpected number" + ); + self.add_dirty(&ptrmap_page); + self.ptrmap_put_state.replace(PtrMapPutState::Start); + Ok(IOResult::Done(())) + } } - - let entry = PtrmapEntry { - entry_type, - parent_page_no, - }; - entry.serialize( - &mut full_buffer_slice - [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE], - )?; - - turso_assert!( - ptrmap_page.get().id == ptrmap_pg_no as usize, - "ptrmap page has unexpected number" - ); - self.add_dirty(&ptrmap_page); - Ok(IOResult::Done(())) } /// This method is used to allocate a new root page for a btree, both for tables and indexes @@ -744,40 +797,61 @@ impl Pager { Ok(IOResult::Done(page.get().get().id as u32)) } AutoVacuumMode::Full => { - let (mut root_page_num, page_size) = - return_if_io!(self.with_header(|header| { - ( - header.vacuum_mode_largest_root_page.get(), - header.page_size.get(), - ) - })); + loop { + match self.btree_create_vacuum_full_state.get() { + BtreeCreateVacuumFullState::Start => { + let (mut root_page_num, page_size) = return_if_io!(self + .with_header(|header| { + ( + header.vacuum_mode_largest_root_page.get(), + header.page_size.get(), + ) + })); - assert!(root_page_num > 0); // Largest root page number cannot be 0 because that is set to 1 when creating the database with autovacuum enabled - root_page_num += 1; - assert!(root_page_num >= FIRST_PTRMAP_PAGE_NO); // can never be less than 2 because we have already incremented + assert!(root_page_num > 0); // Largest root page number cannot be 0 because that is set to 1 when creating the database with autovacuum enabled + root_page_num += 1; + assert!(root_page_num >= FIRST_PTRMAP_PAGE_NO); // can never be less than 2 because we have already incremented - while is_ptrmap_page(root_page_num, page_size as usize) { - root_page_num += 1; - } - assert!(root_page_num >= 3); // the very first root page is page 3 + while is_ptrmap_page(root_page_num, page_size as usize) { + root_page_num += 1; + } + assert!(root_page_num >= 3); // the very first root page is page 3 + self.btree_create_vacuum_full_state.set( + BtreeCreateVacuumFullState::AllocatePage { root_page_num }, + ); + } + BtreeCreateVacuumFullState::AllocatePage { root_page_num } => { + // root_page_num here is the desired root page + let page = return_if_io!(self.do_allocate_page( + page_type, + 0, + BtreePageAllocMode::Exact(root_page_num), + )); + let allocated_page_id = page.get().get().id as u32; + if allocated_page_id != root_page_num { + // TODO(Zaid): Handle swapping the allocated page with the desired root page + } - // root_page_num here is the desired root page - let page = return_if_io!(self.do_allocate_page( - page_type, - 0, - BtreePageAllocMode::Exact(root_page_num), - )); - let allocated_page_id = page.get().get().id as u32; - if allocated_page_id != root_page_num { - // TODO(Zaid): Handle swapping the allocated page with the desired root page - } - - // TODO(Zaid): Update the header metadata to reflect the new root page number - - // For now map allocated_page_id since we are not swapping it with root_page_num - match self.ptrmap_put(allocated_page_id, PtrmapType::RootPage, 0)? { - IOResult::Done(_) => Ok(IOResult::Done(allocated_page_id)), - IOResult::IO => Ok(IOResult::IO), + // TODO(Zaid): Update the header metadata to reflect the new root page number + self.btree_create_vacuum_full_state.set( + BtreeCreateVacuumFullState::PtrMapPut { allocated_page_id }, + ); + } + BtreeCreateVacuumFullState::PtrMapPut { allocated_page_id } => { + // For now map allocated_page_id since we are not swapping it with root_page_num + let res = match self.ptrmap_put( + allocated_page_id, + PtrmapType::RootPage, + 0, + )? { + IOResult::Done(_) => Ok(IOResult::Done(allocated_page_id)), + IOResult::IO => return Ok(IOResult::IO), + }; + self.btree_create_vacuum_full_state + .set(BtreeCreateVacuumFullState::Start); + return res; + } + } } } AutoVacuumMode::Incremental => { @@ -2349,6 +2423,14 @@ mod ptrmap_tests { ) .unwrap(); run_until_done(|| pager.allocate_page1(), &pager).unwrap(); + { + let page_cache = pager.page_cache.read(); + println!( + "Cache Len: {} Cap: {}", + page_cache.len(), + page_cache.capacity() + ); + } pager .io .block(|| { @@ -2360,10 +2442,20 @@ mod ptrmap_tests { // Allocate all the pages as btree root pages const EXPECTED_FIRST_ROOT_PAGE_ID: u32 = 3; // page1 = 1, first ptrmap page = 2, root page = 3 for i in 0..initial_db_pages { - match run_until_done( + let res = run_until_done( || pager.btree_create(&CreateBTreeFlags::new_table()), &pager, - ) { + ); + { + let page_cache = pager.page_cache.read(); + println!( + "i: {} Cache Len: {} Cap: {}", + i, + page_cache.len(), + page_cache.capacity() + ); + } + match res { Ok(root_page_id) => { assert_eq!(root_page_id, EXPECTED_FIRST_ROOT_PAGE_ID + i); }