diff --git a/core/Cargo.toml b/core/Cargo.toml index 393c70cfe..898eb337d 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -30,6 +30,7 @@ testvfs = ["limbo_ext_tests/static"] static = ["limbo_ext/static"] fuzz = [] csv = ["limbo_csv/static"] +omit_autovacuum = [] [target.'cfg(target_os = "linux")'.dependencies] io-uring = { version = "0.7.5", optional = true } @@ -77,7 +78,7 @@ crossbeam-skiplist = "0.1.3" tracing = "0.1.41" ryu = "1.0.19" uncased = "0.9.10" -strum_macros = {workspace = true } +strum_macros = { workspace = true } bitflags = "2.9.0" [build-dependencies] diff --git a/core/pragma.rs b/core/pragma.rs index b7a457b76..2d820a77f 100644 --- a/core/pragma.rs +++ b/core/pragma.rs @@ -69,6 +69,10 @@ fn pragma_for(pragma: PragmaName) -> Pragma { &["user_version"], ), WalCheckpoint => Pragma::new(PragmaFlags::NeedSchema, &["busy", "log", "checkpointed"]), + AutoVacuum => Pragma::new( + PragmaFlags::NoColumns1 | PragmaFlags::Result0, + &["auto_vacuum"], + ), } } diff --git a/core/storage/btree.rs b/core/storage/btree.rs index c2cca064a..75a9a7e36 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -3,7 +3,7 @@ use tracing::{instrument, Level}; use crate::{ schema::Index, storage::{ - pager::Pager, + pager::{BtreePageAllocMode, Pager}, sqlite3_ondisk::{ read_u32, read_varint, BTreeCell, PageContent, PageType, TableInteriorCell, TableLeafCell, @@ -4029,7 +4029,9 @@ impl BTreeCursor { let root = root_btree.get(); let root_contents = root.get_contents(); // FIXME: handle page cache is full - let child_btree = self.pager.do_allocate_page(root_contents.page_type(), 0); + let child_btree = + self.pager + .do_allocate_page(root_contents.page_type(), 0, BtreePageAllocMode::Any); tracing::debug!( "balance_root(root={}, rightmost={}, page_type={:?})", @@ -5216,7 +5218,8 @@ impl BTreeCursor { } pub fn allocate_page(&self, page_type: PageType, offset: usize) -> BTreePage { - self.pager.do_allocate_page(page_type, offset) + self.pager + .do_allocate_page(page_type, offset, BtreePageAllocMode::Any) } } @@ -6871,8 +6874,14 @@ mod tests { tracing::info!("super seed: {}", seed); for _ in 0..attempts { let (pager, _) = empty_btree(); - let index_root_page = pager.btree_create(&CreateBTreeFlags::new_index()); - let index_root_page = index_root_page as usize; + let index_root_page_result = + pager.btree_create(&CreateBTreeFlags::new_index()).unwrap(); + let index_root_page = match index_root_page_result { + crate::types::CursorResult::Ok(id) => id as usize, + crate::types::CursorResult::IO => { + panic!("btree_create returned IO in test, unexpected") + } + }; let mut cursor = BTreeCursor::new_table(None, pager.clone(), index_root_page); let mut keys = SortedVec::new(); tracing::info!("seed: {}", seed); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 8fe71647c..9a5fd30b1 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -7,6 +7,7 @@ use crate::storage::sqlite3_ondisk::{ self, DatabaseHeader, PageContent, PageType, DATABASE_HEADER_PAGE_ID, }; use crate::storage::wal::{CheckpointResult, Wal, WalFsyncStatus}; +use crate::types::CursorResult; use crate::Completion; use crate::{Buffer, LimboError, Result}; use parking_lot::RwLock; @@ -21,6 +22,9 @@ use super::btree::BTreePage; use super::page_cache::{CacheError, CacheResizeResult, DumbLruPageCache, PageCacheKey}; use super::wal::{CheckpointMode, CheckpointStatus}; +#[cfg(not(feature = "omit_autovacuum"))] +use {crate::io::Buffer as IoBuffer, ptrmap::*}; + pub struct PageInner { pub flags: AtomicUsize, pub contents: Option, @@ -163,6 +167,16 @@ enum CheckpointState { CheckpointDone, } +/// The mode of allocating a btree page. +pub enum BtreePageAllocMode { + /// Allocate any btree page + Any, + /// Allocate a specific page number, typically used for root page allocation + Exact(u32), + /// Allocate a page number less than or equal to the parameter + Le(u32), +} + /// This will keep track of the state of current cache flush in order to not repeat work struct FlushInfo { state: FlushState, @@ -170,6 +184,14 @@ struct FlushInfo { in_flight_writes: Rc>, } +/// Track the state of the auto-vacuum mode. +#[derive(Clone, Copy, Debug)] +pub enum AutoVacuumMode { + None, + Full, + Incremental, +} + /// The pager interface implements the persistence layer by providing access /// to pages of the database file, including caching, concurrency control, and /// transaction management. @@ -191,6 +213,7 @@ pub struct Pager { checkpoint_state: RefCell, checkpoint_inflight: Rc>, syncing: Rc>, + auto_vacuum_mode: RefCell, } #[derive(Debug, Copy, Clone)] @@ -241,19 +264,242 @@ impl Pager { checkpoint_state: RefCell::new(CheckpointState::Checkpoint), checkpoint_inflight: Rc::new(RefCell::new(0)), buffer_pool, + auto_vacuum_mode: RefCell::new(AutoVacuumMode::None), }) } - // FIXME: handle no room in page cache - pub fn btree_create(&self, flags: &CreateBTreeFlags) -> u32 { + pub fn get_auto_vacuum_mode(&self) -> AutoVacuumMode { + *self.auto_vacuum_mode.borrow() + } + + pub fn set_auto_vacuum_mode(&self, mode: AutoVacuumMode) { + *self.auto_vacuum_mode.borrow_mut() = mode; + } + + /// Retrieves the pointer map entry for a given database page. + /// `target_page_num` (1-indexed) is the page whose entry is sought. + /// Returns `Ok(None)` if the page is not supposed to have a ptrmap entry (e.g. header, or a ptrmap page itself). + #[cfg(not(feature = "omit_autovacuum"))] + pub fn ptrmap_get(&self, target_page_num: u32) -> Result>> { + tracing::trace!("ptrmap_get(page_idx = {})", target_page_num); + let configured_page_size = self.db_header.lock().get_page_size() as usize; + + if target_page_num < FIRST_PTRMAP_PAGE_NO + || is_ptrmap_page(target_page_num, configured_page_size) + { + return Ok(CursorResult::Ok(None)); + } + + let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size); + let offset_in_ptrmap_page = + get_ptrmap_offset_in_page(target_page_num, ptrmap_pg_no, configured_page_size)?; + tracing::trace!( + "ptrmap_get(page_idx = {}) = ptrmap_pg_no = {}", + target_page_num, + ptrmap_pg_no + ); + + let ptrmap_page = self.read_page(ptrmap_pg_no as usize)?; + if ptrmap_page.is_locked() { + return Ok(CursorResult::IO); + } + if !ptrmap_page.is_loaded() { + return Ok(CursorResult::IO); + } + let ptrmap_page_inner = ptrmap_page.get(); + + let page_content: &PageContent = match ptrmap_page_inner.contents.as_ref() { + Some(content) => content, + None => { + return Err(LimboError::InternalError(format!( + "Ptrmap page {} content not loaded", + ptrmap_pg_no + ))) + } + }; + + let page_buffer_guard: std::cell::Ref = page_content.buffer.borrow(); + let full_buffer_slice: &[u8] = page_buffer_guard.as_slice(); + + // Ptrmap pages are not page 1, so their internal offset within their buffer should be 0. + // The actual page data starts at page_content.offset within the full_buffer_slice. + if ptrmap_pg_no != 1 && page_content.offset != 0 { + return Err(LimboError::Corrupt(format!( + "Ptrmap page {} has unexpected internal offset {}", + ptrmap_pg_no, page_content.offset + ))); + } + let ptrmap_page_data_slice: &[u8] = &full_buffer_slice[page_content.offset..]; + let actual_data_length = ptrmap_page_data_slice.len(); + + // Check if the calculated offset for the entry is within the bounds of the actual page data length. + if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > actual_data_length { + return Err(LimboError::InternalError(format!( + "Ptrmap offset {} + entry size {} out of bounds for page {} (actual data len {})", + offset_in_ptrmap_page, PTRMAP_ENTRY_SIZE, ptrmap_pg_no, actual_data_length + ))); + } + + let entry_slice = &ptrmap_page_data_slice + [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE]; + match PtrmapEntry::deserialize(entry_slice) { + Some(entry) => Ok(CursorResult::Ok(Some(entry))), + None => Err(LimboError::Corrupt(format!( + "Failed to deserialize ptrmap entry for page {} from ptrmap page {}", + target_page_num, ptrmap_pg_no + ))), + } + } + + /// Writes or updates the pointer map entry for a given database page. + /// `db_page_no_to_update` (1-indexed) is the page whose entry is to be set. + /// `entry_type` and `parent_page_no` define the new entry. + #[cfg(not(feature = "omit_autovacuum"))] + pub fn ptrmap_put( + &self, + db_page_no_to_update: u32, + entry_type: PtrmapType, + parent_page_no: u32, + ) -> Result> { + tracing::trace!( + "ptrmap_put(page_idx = {}, entry_type = {:?}, parent_page_no = {})", + db_page_no_to_update, + entry_type, + parent_page_no + ); + + let page_size = self.db_header.lock().get_page_size() as usize; + + if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO + || is_ptrmap_page(db_page_no_to_update, page_size) + { + return Err(LimboError::InternalError(format!( + "Cannot set ptrmap entry for page {}: it's a header/ptrmap page or invalid.", + db_page_no_to_update + ))); + } + + let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(db_page_no_to_update, page_size); + let offset_in_ptrmap_page = + get_ptrmap_offset_in_page(db_page_no_to_update, ptrmap_pg_no, page_size)?; + tracing::trace!( + "ptrmap_put(page_idx = {}, entry_type = {:?}, parent_page_no = {}) = ptrmap_pg_no = {}, offset_in_ptrmap_page = {}", + db_page_no_to_update, + entry_type, + parent_page_no, + ptrmap_pg_no, + offset_in_ptrmap_page + ); + + let ptrmap_page = self.read_page(ptrmap_pg_no as usize)?; + if ptrmap_page.is_locked() { + return Ok(CursorResult::IO); + } + if !ptrmap_page.is_loaded() { + return Ok(CursorResult::IO); + } + let ptrmap_page_inner = ptrmap_page.get(); + + let page_content = match ptrmap_page_inner.contents.as_ref() { + Some(content) => content, + None => { + return Err(LimboError::InternalError(format!( + "Ptrmap page {} content not loaded", + ptrmap_pg_no + ))) + } + }; + + let mut page_buffer_guard = page_content.buffer.borrow_mut(); + let full_buffer_slice = page_buffer_guard.as_mut_slice(); + + if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > full_buffer_slice.len() { + return Err(LimboError::InternalError(format!( + "Ptrmap offset {} + entry size {} out of bounds for page {} (actual data len {})", + offset_in_ptrmap_page, + PTRMAP_ENTRY_SIZE, + ptrmap_pg_no, + full_buffer_slice.len() + ))); + } + + let entry = PtrmapEntry { + entry_type, + parent_page_no, + }; + entry.serialize( + &mut full_buffer_slice + [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE], + )?; + + ptrmap_page.set_dirty(); + self.add_dirty(ptrmap_pg_no as usize); + Ok(CursorResult::Ok(())) + } + + /// This method is used to allocate a new root page for a btree, both for tables and indexes + /// FIXME: handle no room in page cache + pub fn btree_create(&self, flags: &CreateBTreeFlags) -> Result> { let page_type = match flags { _ if flags.is_table() => PageType::TableLeaf, _ if flags.is_index() => PageType::IndexLeaf, _ => unreachable!("Invalid flags state"), }; - let page = self.do_allocate_page(page_type, 0); - let id = page.get().get().id; - id as u32 + #[cfg(feature = "omit_autovacuum")] + { + let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any); + let page_id = page.get().get().id; + return Ok(CursorResult::Ok(page_id as u32)); + } + + // If autovacuum is enabled, we need to allocate a new page number that is greater than the largest root page number + #[cfg(not(feature = "omit_autovacuum"))] + { + let auto_vacuum_mode = self.auto_vacuum_mode.borrow(); + match *auto_vacuum_mode { + AutoVacuumMode::None => { + let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any); + let page_id = page.get().get().id; + return Ok(CursorResult::Ok(page_id as u32)); + } + AutoVacuumMode::Full => { + let mut root_page_num = self.db_header.lock().vacuum_mode_largest_root_page; + assert!(root_page_num > 0); // Largest root page number cannot be 0 because that is set to 1 when creating the database with autovacuum enabled + root_page_num += 1; + assert!(root_page_num >= FIRST_PTRMAP_PAGE_NO); // can never be less than 2 because we have already incremented + + while is_ptrmap_page( + root_page_num, + self.db_header.lock().get_page_size() as usize, + ) { + root_page_num += 1; + } + assert!(root_page_num >= 3); // the very first root page is page 3 + + // root_page_num here is the desired root page + let page = self.do_allocate_page( + page_type, + 0, + BtreePageAllocMode::Exact(root_page_num), + ); + let allocated_page_id = page.get().get().id as u32; + if allocated_page_id != root_page_num { + // TODO(Zaid): Handle swapping the allocated page with the desired root page + } + + // TODO(Zaid): Update the header metadata to reflect the new root page number + + // For now map allocated_page_id since we are not swapping it with root_page_num + match self.ptrmap_put(allocated_page_id, PtrmapType::RootPage, 0)? { + CursorResult::Ok(_) => Ok(CursorResult::Ok(allocated_page_id as u32)), + CursorResult::IO => Ok(CursorResult::IO), + } + } + AutoVacuumMode::Incremental => { + unimplemented!() + } + } + } } /// Allocate a new overflow page. @@ -274,7 +520,12 @@ impl Pager { /// Allocate a new page to the btree via the pager. /// This marks the page as dirty and writes the page header. // FIXME: handle no room in page cache - pub fn do_allocate_page(&self, page_type: PageType, offset: usize) -> BTreePage { + pub fn do_allocate_page( + &self, + page_type: PageType, + offset: usize, + _alloc_mode: BtreePageAllocMode, + ) -> BTreePage { let page = self.allocate_page().unwrap(); let page = Arc::new(BTreePageInner { page: RefCell::new(page), @@ -678,6 +929,34 @@ impl Pager { let header = &self.db_header; let mut header = header.lock(); header.database_size += 1; + + #[cfg(not(feature = "omit_autovacuum"))] + { + // If the following conditions are met, allocate a pointer map page, add to cache and increment the database size + // - autovacuum is enabled + // - the last page is a pointer map page + if matches!(*self.auto_vacuum_mode.borrow(), AutoVacuumMode::Full) + && is_ptrmap_page(header.database_size, header.get_page_size() as usize) + { + let page = allocate_page(header.database_size as usize, &self.buffer_pool, 0); + page.set_dirty(); + self.add_dirty(page.get().id); + + let page_key = PageCacheKey::new(page.get().id); + let mut cache = self.page_cache.write(); + match cache.insert(page_key, page.clone()) { + Ok(_) => (), + Err(CacheError::Full) => return Err(LimboError::CacheFull), + Err(_) => { + return Err(LimboError::InternalError( + "Unknown error inserting page to cache".into(), + )) + } + } + header.database_size += 1; + } + } + // update database size self.write_database_header(&mut header)?; @@ -770,6 +1049,181 @@ impl CreateBTreeFlags { } } +/* +** The pointer map is a lookup table that identifies the parent page for +** each child page in the database file. The parent page is the page that +** contains a pointer to the child. Every page in the database contains +** 0 or 1 parent pages. Each pointer map entry consists of a single byte 'type' +** and a 4 byte parent page number. +** +** The PTRMAP_XXX identifiers below are the valid types. +** +** The purpose of the pointer map is to facilitate moving pages from one +** position in the file to another as part of autovacuum. When a page +** is moved, the pointer in its parent must be updated to point to the +** new location. The pointer map is used to locate the parent page quickly. +** +** PTRMAP_ROOTPAGE: The database page is a root-page. The page-number is not +** used in this case. +** +** PTRMAP_FREEPAGE: The database page is an unused (free) page. The page-number +** is not used in this case. +** +** PTRMAP_OVERFLOW1: The database page is the first page in a list of +** overflow pages. The page number identifies the page that +** contains the cell with a pointer to this overflow page. +** +** PTRMAP_OVERFLOW2: The database page is the second or later page in a list of +** overflow pages. The page-number identifies the previous +** page in the overflow page list. +** +** PTRMAP_BTREE: The database page is a non-root btree page. The page number +** identifies the parent page in the btree. +*/ +#[cfg(not(feature = "omit_autovacuum"))] +mod ptrmap { + use crate::{storage::sqlite3_ondisk::MIN_PAGE_SIZE, LimboError, Result}; + + // Constants + pub const PTRMAP_ENTRY_SIZE: usize = 5; + /// Page 1 is the schema page which contains the database header. + /// Page 2 is the first pointer map page if the database has any pointer map pages. + pub const FIRST_PTRMAP_PAGE_NO: u32 = 2; + + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + #[repr(u8)] + pub enum PtrmapType { + RootPage = 1, + FreePage = 2, + Overflow1 = 3, + Overflow2 = 4, + BTreeNode = 5, + } + + impl PtrmapType { + pub fn from_u8(value: u8) -> Option { + match value { + 1 => Some(PtrmapType::RootPage), + 2 => Some(PtrmapType::FreePage), + 3 => Some(PtrmapType::Overflow1), + 4 => Some(PtrmapType::Overflow2), + 5 => Some(PtrmapType::BTreeNode), + _ => None, + } + } + } + + #[derive(Debug, Clone, Copy)] + pub struct PtrmapEntry { + pub entry_type: PtrmapType, + pub parent_page_no: u32, + } + + impl PtrmapEntry { + pub fn serialize(&self, buffer: &mut [u8]) -> Result<()> { + if buffer.len() < PTRMAP_ENTRY_SIZE { + return Err(LimboError::InternalError(format!( + "Buffer too small to serialize ptrmap entry. Expected at least {} bytes, got {}", + PTRMAP_ENTRY_SIZE, + buffer.len() + ))); + } + buffer[0] = self.entry_type as u8; + buffer[1..5].copy_from_slice(&self.parent_page_no.to_be_bytes()); + Ok(()) + } + + pub fn deserialize(buffer: &[u8]) -> Option { + if buffer.len() < PTRMAP_ENTRY_SIZE { + return None; + } + let entry_type_u8 = buffer[0]; + let parent_bytes_slice = buffer.get(1..5)?; + let parent_page_no = u32::from_be_bytes(parent_bytes_slice.try_into().ok()?); + PtrmapType::from_u8(entry_type_u8).map(|entry_type| PtrmapEntry { + entry_type, + parent_page_no, + }) + } + } + + /// Calculates how many database pages are mapped by a single pointer map page. + /// This is based on the total page size, as ptrmap pages are filled with entries. + pub fn entries_per_ptrmap_page(page_size: usize) -> usize { + assert!(page_size >= MIN_PAGE_SIZE as usize); + page_size / PTRMAP_ENTRY_SIZE + } + + /// Calculates the cycle length of pointer map pages + /// The cycle length is the number of database pages that are mapped by a single pointer map page. + pub fn ptrmap_page_cycle_length(page_size: usize) -> usize { + assert!(page_size >= MIN_PAGE_SIZE as usize); + (page_size / PTRMAP_ENTRY_SIZE) + 1 + } + + /// Determines if a given page number `db_page_no` (1-indexed) is a pointer map page in a database with autovacuum enabled + pub fn is_ptrmap_page(db_page_no: u32, page_size: usize) -> bool { + // The first page cannot be a ptrmap page because its for the schema + if db_page_no == 1 { + return false; + } + if db_page_no == FIRST_PTRMAP_PAGE_NO { + return true; + } + return get_ptrmap_page_no_for_db_page(db_page_no, page_size) == db_page_no; + } + + /// Calculates which pointer map page (1-indexed) contains the entry for `db_page_no_to_query` (1-indexed). + /// `db_page_no_to_query` is the page whose ptrmap entry we are interested in. + pub fn get_ptrmap_page_no_for_db_page(db_page_no_to_query: u32, page_size: usize) -> u32 { + let group_size = ptrmap_page_cycle_length(page_size) as u32; + if group_size == 0 { + panic!("Page size too small, a ptrmap page cannot map any db pages."); + } + + let effective_page_index = db_page_no_to_query - FIRST_PTRMAP_PAGE_NO; + let group_idx = effective_page_index / group_size; + + (group_idx * group_size) + FIRST_PTRMAP_PAGE_NO + } + + /// Calculates the byte offset of the entry for `db_page_no_to_query` (1-indexed) + /// within its pointer map page (`ptrmap_page_no`, 1-indexed). + pub fn get_ptrmap_offset_in_page( + db_page_no_to_query: u32, + ptrmap_page_no: u32, + page_size: usize, + ) -> Result { + // The data pages mapped by `ptrmap_page_no` are: + // `ptrmap_page_no + 1`, `ptrmap_page_no + 2`, ..., up to `ptrmap_page_no + n_data_pages_per_group`. + // `db_page_no_to_query` must be one of these. + // The 0-indexed position of `db_page_no_to_query` within this sequence of data pages is: + // `db_page_no_to_query - (ptrmap_page_no + 1)`. + + let n_data_pages_per_group = entries_per_ptrmap_page(page_size); + let first_data_page_mapped = ptrmap_page_no + 1; + let last_data_page_mapped = ptrmap_page_no + n_data_pages_per_group as u32; + + if db_page_no_to_query < first_data_page_mapped + || db_page_no_to_query > last_data_page_mapped + { + return Err(LimboError::InternalError(format!( + "Page {} is not mapped by the data page range [{}, {}] of ptrmap page {}", + db_page_no_to_query, first_data_page_mapped, last_data_page_mapped, ptrmap_page_no + ))); + } + if is_ptrmap_page(db_page_no_to_query, page_size) { + return Err(LimboError::InternalError(format!( + "Page {} is a pointer map page and should not have an entry calculated this way.", + db_page_no_to_query + ))); + } + + let entry_index_on_page = (db_page_no_to_query - first_data_page_mapped) as usize; + Ok(entry_index_on_page * PTRMAP_ENTRY_SIZE) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -800,3 +1254,164 @@ mod tests { assert_eq!(page.unwrap().get().id, 1); } } + +#[cfg(test)] +#[cfg(not(feature = "omit_autovacuum"))] +mod ptrmap_tests { + use std::cell::RefCell; + use std::rc::Rc; + use std::sync::Arc; + + use super::ptrmap::*; + use super::*; + use crate::fast_lock::SpinLock; + use crate::io::{MemoryIO, OpenFlags, IO}; + use crate::storage::buffer_pool::BufferPool; + use crate::storage::database::{DatabaseFile, DatabaseStorage}; + use crate::storage::page_cache::DumbLruPageCache; + use crate::storage::pager::Pager; + use crate::storage::sqlite3_ondisk::DatabaseHeader; + use crate::storage::sqlite3_ondisk::MIN_PAGE_SIZE; + use crate::storage::wal::{WalFile, WalFileShared}; + + // Helper to create a Pager for testing + fn test_pager_setup(page_size: u32, initial_db_pages: u32) -> Pager { + let io: Arc = Arc::new(MemoryIO::new()); + let db_file_raw = io.open_file("test.db", OpenFlags::Create, true).unwrap(); + let db_storage: Arc = Arc::new(DatabaseFile::new(db_file_raw)); + + // Initialize a minimal header in autovacuum mode + let mut header_data = DatabaseHeader::default(); + header_data.update_page_size(page_size); + let db_header_arc = Arc::new(SpinLock::new(header_data)); + db_header_arc.lock().vacuum_mode_largest_root_page = 1; + + // Construct interfaces for the pager + let buffer_pool = Rc::new(BufferPool::new(page_size as usize)); + let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new( + (initial_db_pages + 10) as usize, + ))); + + let wal = Rc::new(RefCell::new(WalFile::new( + io.clone(), + page_size, + WalFileShared::open_shared(&io, "test.db-wal", page_size).unwrap(), + buffer_pool.clone(), + ))); + + let pager = Pager::finish_open(db_header_arc, db_storage, wal, io, page_cache, buffer_pool) + .unwrap(); + pager.set_auto_vacuum_mode(AutoVacuumMode::Full); + + // Allocate all the pages as btree root pages + for _ in 0..initial_db_pages { + match pager.btree_create(&CreateBTreeFlags::new_table()) { + Ok(CursorResult::Ok(_root_page_id)) => (), + Ok(CursorResult::IO) => { + panic!("test_pager_setup: btree_create returned CursorResult::IO unexpectedly"); + } + Err(e) => { + panic!("test_pager_setup: btree_create failed: {:?}", e); + } + } + } + + return pager; + } + + #[test] + fn test_ptrmap_page_allocation() { + let page_size = 4096; + let initial_db_pages = 10; + let pager = test_pager_setup(page_size, initial_db_pages); + + // Page 5 should be mapped by ptrmap page 2. + let db_page_to_update: u32 = 5; + let expected_ptrmap_pg_no = + get_ptrmap_page_no_for_db_page(db_page_to_update, page_size as usize); + assert_eq!(expected_ptrmap_pg_no, FIRST_PTRMAP_PAGE_NO); + + // Ensure the pointer map page ref is created and loadable via the pager + let ptrmap_page_ref = pager.read_page(expected_ptrmap_pg_no as usize); + assert!(ptrmap_page_ref.is_ok()); + + // Ensure that the database header size is correctly reflected + assert_eq!(pager.db_header.lock().database_size, initial_db_pages + 2); // (1+1) -> (header + ptrmap) + + // Read the entry from the ptrmap page and verify it + let entry = pager.ptrmap_get(db_page_to_update).unwrap(); + assert!(matches!(entry, CursorResult::Ok(Some(_)))); + let CursorResult::Ok(Some(entry)) = entry else { + panic!("entry is not Some"); + }; + assert_eq!(entry.entry_type, PtrmapType::RootPage); + assert_eq!(entry.parent_page_no, 0); + } + + #[test] + fn test_is_ptrmap_page_logic() { + let page_size = MIN_PAGE_SIZE as usize; + let n_data_pages = entries_per_ptrmap_page(page_size); + assert_eq!(n_data_pages, 102); // 512/5 = 102 + + assert!(!is_ptrmap_page(1, page_size)); // Header + assert!(is_ptrmap_page(2, page_size)); // P0 + assert!(!is_ptrmap_page(3, page_size)); // D0_1 + assert!(!is_ptrmap_page(4, page_size)); // D0_2 + assert!(!is_ptrmap_page(5, page_size)); // D0_3 + assert!(is_ptrmap_page(105, page_size)); // P1 + assert!(!is_ptrmap_page(106, page_size)); // D1_1 + assert!(!is_ptrmap_page(107, page_size)); // D1_2 + assert!(!is_ptrmap_page(108, page_size)); // D1_3 + assert!(is_ptrmap_page(208, page_size)); // P2 + } + + #[test] + fn test_get_ptrmap_page_no() { + let page_size = MIN_PAGE_SIZE as usize; // Maps 103 data pages + + // Test pages mapped by P0 (page 2) + assert_eq!(get_ptrmap_page_no_for_db_page(3, page_size), 2); // D(3) -> P0(2) + assert_eq!(get_ptrmap_page_no_for_db_page(4, page_size), 2); // D(4) -> P0(2) + assert_eq!(get_ptrmap_page_no_for_db_page(5, page_size), 2); // D(5) -> P0(2) + assert_eq!(get_ptrmap_page_no_for_db_page(104, page_size), 2); // D(104) -> P0(2) + + assert_eq!(get_ptrmap_page_no_for_db_page(105, page_size), 105); // Page 105 is a pointer map page. + + // Test pages mapped by P1 (page 6) + assert_eq!(get_ptrmap_page_no_for_db_page(106, page_size), 105); // D(106) -> P1(105) + assert_eq!(get_ptrmap_page_no_for_db_page(107, page_size), 105); // D(107) -> P1(105) + assert_eq!(get_ptrmap_page_no_for_db_page(108, page_size), 105); // D(108) -> P1(105) + + assert_eq!(get_ptrmap_page_no_for_db_page(208, page_size), 208); // Page 208 is a pointer map page. + } + + #[test] + fn test_get_ptrmap_offset() { + let page_size = MIN_PAGE_SIZE as usize; // Maps 103 data pages + + assert_eq!(get_ptrmap_offset_in_page(3, 2, page_size).unwrap(), 0); + assert_eq!( + get_ptrmap_offset_in_page(4, 2, page_size).unwrap(), + 1 * PTRMAP_ENTRY_SIZE + ); + assert_eq!( + get_ptrmap_offset_in_page(5, 2, page_size).unwrap(), + 2 * PTRMAP_ENTRY_SIZE + ); + + // P1 (page 105) maps D(106)...D(207) + // D(106) is index 0 on P1. Offset 0. + // D(107) is index 1 on P1. Offset 5. + // D(108) is index 2 on P1. Offset 10. + assert_eq!(get_ptrmap_offset_in_page(106, 105, page_size).unwrap(), 0); + assert_eq!( + get_ptrmap_offset_in_page(107, 105, page_size).unwrap(), + 1 * PTRMAP_ENTRY_SIZE + ); + assert_eq!( + get_ptrmap_offset_in_page(108, 105, page_size).unwrap(), + 2 * PTRMAP_ENTRY_SIZE + ); + } +} diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 97b008ee6..bc39cdfdf 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -75,7 +75,7 @@ const DEFAULT_CACHE_SIZE: i32 = -2000; pub const MIN_PAGE_CACHE_SIZE: usize = 10; /// The minimum page size in bytes. -const MIN_PAGE_SIZE: u32 = 512; +pub const MIN_PAGE_SIZE: u32 = 512; /// The maximum page size in bytes. const MAX_PAGE_SIZE: u32 = 65536; @@ -142,7 +142,7 @@ pub struct DatabaseHeader { /// The page number of the largest root b-tree page when in auto-vacuum or /// incremental-vacuum modes, or zero otherwise. - vacuum_mode_largest_root_page: u32, + pub vacuum_mode_largest_root_page: u32, /// The database text encoding. 1=UTF-8, 2=UTF-16le, 3=UTF-16be. text_encoding: u32, @@ -151,7 +151,7 @@ pub struct DatabaseHeader { pub user_version: i32, /// True (non-zero) for incremental-vacuum mode. False (zero) otherwise. - incremental_vacuum_enabled: u32, + pub incremental_vacuum_enabled: u32, /// The "Application ID" set by PRAGMA application_id. application_id: u32, diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index e1b2c13c0..ba670c1a9 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -1,19 +1,20 @@ //! VDBE bytecode generation for pragma statements. //! More info: https://www.sqlite.org/pragma.html. -use limbo_sqlite3_parser::ast; use limbo_sqlite3_parser::ast::PragmaName; +use limbo_sqlite3_parser::ast::{self, Expr}; use std::rc::{Rc, Weak}; use std::sync::Arc; use crate::fast_lock::SpinLock; use crate::schema::Schema; +use crate::storage::pager::AutoVacuumMode; use crate::storage::sqlite3_ondisk::{DatabaseHeader, MIN_PAGE_CACHE_SIZE}; use crate::storage::wal::CheckpointMode; use crate::util::{normalize_ident, parse_signed_number}; use crate::vdbe::builder::{ProgramBuilder, ProgramBuilderOpts, QueryMode}; use crate::vdbe::insn::{Cookie, Insn}; -use crate::{bail_parse_error, Pager, Value}; +use crate::{bail_parse_error, LimboError, Pager, Value}; use std::str::FromStr; use strum::IntoEnumIterator; @@ -62,6 +63,7 @@ pub fn translate_pragma( schema, None, database_header.clone(), + pager, connection, &mut program, )?; @@ -73,6 +75,7 @@ pub fn translate_pragma( schema, Some(value), database_header.clone(), + pager, connection, &mut program, )?; @@ -97,6 +100,7 @@ pub fn translate_pragma( schema, Some(value), database_header.clone(), + pager, connection, &mut program, )?; @@ -139,6 +143,7 @@ fn update_pragma( schema, None, header, + pager, connection, program, )?; @@ -151,6 +156,7 @@ fn update_pragma( schema, None, header, + pager, connection, program, )?; @@ -162,6 +168,7 @@ fn update_pragma( schema, None, header, + pager, connection, program, )?; @@ -196,6 +203,62 @@ fn update_pragma( PragmaName::PageSize => { todo!("updating page_size is not yet implemented") } + PragmaName::AutoVacuum => { + let auto_vacuum_mode = match value { + Expr::Name(name) => { + let name = name.0.to_lowercase(); + match name.as_str() { + "none" => 0, + "full" => 1, + "incremental" => 2, + _ => { + return Err(LimboError::InvalidArgument( + "invalid auto vacuum mode".to_string(), + )); + } + } + } + _ => { + return Err(LimboError::InvalidArgument( + "invalid auto vacuum mode".to_string(), + )) + } + }; + match auto_vacuum_mode { + 0 => update_auto_vacuum_mode(AutoVacuumMode::None, 0, header, pager)?, + 1 => update_auto_vacuum_mode(AutoVacuumMode::Full, 1, header, pager)?, + 2 => update_auto_vacuum_mode(AutoVacuumMode::Incremental, 1, header, pager)?, + _ => { + return Err(LimboError::InvalidArgument( + "invalid auto vacuum mode".to_string(), + )) + } + } + let largest_root_page_number_reg = program.alloc_register(); + program.emit_insn(Insn::ReadCookie { + db: 0, + dest: largest_root_page_number_reg, + cookie: Cookie::LargestRootPageNumber, + }); + let set_cookie_label = program.allocate_label(); + program.emit_insn(Insn::If { + reg: largest_root_page_number_reg, + target_pc: set_cookie_label, + jump_if_null: false, + }); + program.emit_insn(Insn::Halt { + err_code: 0, + description: "Early halt because auto vacuum mode is not enabled".to_string(), + }); + program.resolve_label(set_cookie_label, program.offset()); + program.emit_insn(Insn::SetCookie { + db: 0, + cookie: Cookie::IncrementalVacuum, + value: auto_vacuum_mode - 1, + p5: 0, + }); + Ok(()) + } } } @@ -204,6 +267,7 @@ fn query_pragma( schema: &Schema, value: Option, database_header: Arc>, + pager: Rc, connection: Weak, program: &mut ProgramBuilder, ) -> crate::Result<()> { @@ -305,11 +369,40 @@ fn query_pragma( program.emit_int(database_header.lock().get_page_size().into(), register); program.emit_result_row(register, 1); } + PragmaName::AutoVacuum => { + let auto_vacuum_mode = pager.get_auto_vacuum_mode(); + let auto_vacuum_mode_i64: i64 = match auto_vacuum_mode { + AutoVacuumMode::None => 0, + AutoVacuumMode::Full => 1, + AutoVacuumMode::Incremental => 2, + }; + let register = program.alloc_register(); + program.emit_insn(Insn::Int64 { + _p1: 0, + out_reg: register, + _p3: 0, + value: auto_vacuum_mode_i64, + }); + program.emit_result_row(register, 1); + } } Ok(()) } +fn update_auto_vacuum_mode( + auto_vacuum_mode: AutoVacuumMode, + largest_root_page_number: u32, + header: Arc>, + pager: Rc, +) -> crate::Result<()> { + let mut header_guard = header.lock(); + header_guard.vacuum_mode_largest_root_page = largest_root_page_number; + pager.set_auto_vacuum_mode(auto_vacuum_mode); + pager.write_database_header(&header_guard)?; + Ok(()) +} + fn update_cache_size( value: i64, header: Arc>, diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 0db530c5e..28ec562eb 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -3845,6 +3845,27 @@ pub fn op_insert( Ok(InsnFunctionStepResult::Step) } +pub fn op_int_64( + program: &Program, + state: &mut ProgramState, + insn: &Insn, + pager: &Rc, + mv_store: Option<&Rc>, +) -> Result { + let Insn::Int64 { + _p1, + out_reg, + _p3, + value, + } = insn + else { + unreachable!("unexpected Insn {:?}", insn) + }; + state.registers[*out_reg] = Register::Value(Value::Integer(*value)); + state.pc += 1; + Ok(InsnFunctionStepResult::Step) +} + pub fn op_delete( program: &Program, state: &mut ProgramState, @@ -4321,7 +4342,7 @@ pub fn op_create_btree( todo!("temp databases not implemented yet"); } // FIXME: handle page cache is full - let root_page = pager.btree_create(flags); + let root_page = return_if_io!(pager.btree_create(flags)); state.registers[*root] = Register::Value(Value::Integer(root_page as i64)); state.pc += 1; Ok(InsnFunctionStepResult::Step) @@ -4506,6 +4527,9 @@ pub fn op_read_cookie( let cookie_value = match cookie { Cookie::UserVersion => pager.db_header.lock().user_version.into(), Cookie::SchemaVersion => pager.db_header.lock().schema_cookie.into(), + Cookie::LargestRootPageNumber => { + pager.db_header.lock().vacuum_mode_largest_root_page.into() + } cookie => todo!("{cookie:?} is not yet implement for ReadCookie"), }; state.registers[*dest] = Register::Value(Value::Integer(cookie_value)); @@ -4538,6 +4562,16 @@ pub fn op_set_cookie( header_guard.user_version = *value; pager.write_database_header(&*header_guard)?; } + Cookie::LargestRootPageNumber => { + let mut header_guard = pager.db_header.lock(); + header_guard.vacuum_mode_largest_root_page = *value as u32; + pager.write_database_header(&*header_guard)?; + } + Cookie::IncrementalVacuum => { + let mut header_guard = pager.db_header.lock(); + header_guard.incremental_vacuum_enabled = *value as u32; + pager.write_database_header(&*header_guard)?; + } cookie => todo!("{cookie:?} is not yet implement for SetCookie"), } state.pc += 1; @@ -4742,7 +4776,7 @@ pub fn op_open_ephemeral( }; // FIXME: handle page cache is full - let root_page = pager.btree_create(flag); + let root_page = return_if_io!(pager.btree_create(flag)); let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap(); let mv_cursor = match state.mv_tx_id { diff --git a/core/vdbe/explain.rs b/core/vdbe/explain.rs index 69bb886c1..9aa6588c6 100644 --- a/core/vdbe/explain.rs +++ b/core/vdbe/explain.rs @@ -1560,6 +1560,20 @@ pub fn insn_to_str( 0, "".to_string(), ), + Insn::Int64 { + _p1, + out_reg, + _p3, + value, + } => ( + "Int64", + 0, + *out_reg as i32, + 0, + Value::Integer(*value), + 0, + format!("r[{}]={}", *out_reg, *value), + ), }; format!( "{:<4} {:<17} {:<4} {:<4} {:<4} {:<13} {:<2} {}", diff --git a/core/vdbe/insn.rs b/core/vdbe/insn.rs index 976f6aba4..6eea43318 100644 --- a/core/vdbe/insn.rs +++ b/core/vdbe/insn.rs @@ -669,6 +669,13 @@ pub enum Insn { table_name: String, }, + Int64 { + _p1: usize, // unused + out_reg: usize, // the output register + _p3: usize, // unused + value: i64, // the value being written into the output register + }, + Delete { cursor_id: CursorID, }, @@ -993,6 +1000,7 @@ impl Insn { Insn::EndCoroutine { .. } => execute::op_end_coroutine, Insn::Yield { .. } => execute::op_yield, Insn::Insert { .. } => execute::op_insert, + Insn::Int64 { .. } => execute::op_int_64, Insn::IdxInsert { .. } => execute::op_idx_insert, Insn::Delete { .. } => execute::op_delete, Insn::NewRowid { .. } => execute::op_new_rowid, @@ -1047,4 +1055,6 @@ pub enum Cookie { DatabaseTextEncoding = 5, /// The "user version" as read and set by the user_version pragma. UserVersion = 6, + /// The auto-vacuum mode setting. + IncrementalVacuum = 7, } diff --git a/limbostress.log b/limbostress.log new file mode 100644 index 000000000..e69de29bb diff --git a/vendored/sqlite3-parser/src/parser/ast/mod.rs b/vendored/sqlite3-parser/src/parser/ast/mod.rs index 390bc9609..7512b75d5 100644 --- a/vendored/sqlite3-parser/src/parser/ast/mod.rs +++ b/vendored/sqlite3-parser/src/parser/ast/mod.rs @@ -1655,6 +1655,8 @@ pub type PragmaValue = Expr; // TODO #[derive(Clone, Debug, PartialEq, Eq, EnumIter, EnumString, strum::Display)] #[strum(serialize_all = "snake_case")] pub enum PragmaName { + /// set the autovacuum mode + AutoVacuum, /// `cache_size` pragma CacheSize, /// `journal_mode` pragma