diff --git a/core/error.rs b/core/error.rs index 695b52ee7..a56dfbb10 100644 --- a/core/error.rs +++ b/core/error.rs @@ -8,6 +8,8 @@ pub enum LimboError { NotADB, #[error("Internal error: {0}")] InternalError(String), + #[error("Page cache is full")] + CacheFull, #[error("Parse error: {0}")] ParseError(String), #[error(transparent)] diff --git a/core/lib.rs b/core/lib.rs index 1db7001a6..15102bdea 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -57,7 +57,7 @@ use std::{ rc::Rc, sync::{Arc, OnceLock}, }; -use storage::btree::btree_init_page; +use storage::btree::{btree_init_page, BTreePageInner}; #[cfg(feature = "fs")] use storage::database::DatabaseFile; pub use storage::{ @@ -80,8 +80,6 @@ use vdbe::{builder::QueryMode, VTabOpaqueCursor}; pub type Result = std::result::Result; pub static DATABASE_VERSION: OnceLock = OnceLock::new(); -const DEFAULT_PAGE_CACHE_SIZE_IN_PAGES: usize = 2000; - #[derive(Clone, Copy, PartialEq, Eq)] enum TransactionState { Write, @@ -170,9 +168,7 @@ impl Database { None }; - let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::new( - DEFAULT_PAGE_CACHE_SIZE_IN_PAGES, - ))); + let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::default())); let schema = Arc::new(RwLock::new(Schema::new())); let db = Database { mv_store, @@ -276,6 +272,9 @@ pub fn maybe_init_database_file(file: &Arc, io: &Arc) -> Resul &Rc::new(BufferPool::new(db_header.get_page_size() as usize)), DATABASE_HEADER_SIZE, ); + let page1 = Arc::new(BTreePageInner { + page: RefCell::new(page1), + }); { // Create the sqlite_schema table, for this we just need to create the btree page // for the first page of the database which is basically like any other btree page @@ -288,6 +287,7 @@ pub fn maybe_init_database_file(file: &Arc, io: &Arc) -> Resul (db_header.get_page_size() - db_header.reserved_space as u32) as u16, ); + let page1 = page1.get(); let contents = page1.get().contents.as_mut().unwrap(); contents.write_database_header(&db_header); // write the first page to disk synchronously diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 6172b6a08..017ce21ee 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -25,6 +25,7 @@ use std::{ cmp::Ordering, pin::Pin, rc::Rc, + sync::Arc, }; use super::{ @@ -117,17 +118,27 @@ macro_rules! debug_validate_cells { } /// Check if the page is unlocked, if not return IO. If the page is not locked but not loaded, then try to load it. macro_rules! return_if_locked_maybe_load { - ($pager:expr, $expr:expr) => {{ - if $expr.is_locked() { + ($pager:expr, $btree_page:expr) => {{ + if $btree_page.get().is_locked() { return Ok(CursorResult::IO); } - if !$expr.is_loaded() { - $pager.load_page($expr.clone())?; + if !$btree_page.get().is_loaded() { + let page = $pager.read_page($btree_page.get().get().id)?; + $btree_page.page.replace(page); return Ok(CursorResult::IO); } }}; } +/// Wrapper around a page reference used in order to update the reference in case page was unloaded +/// and we need to update the reference. +pub struct BTreePageInner { + pub page: RefCell, +} + +pub type BTreePage = Arc; +unsafe impl Send for BTreePageInner {} +unsafe impl Sync for BTreePageInner {} /// State machine of destroy operations /// Keep track of traversal so that it can be resumed when IO is encountered #[derive(Debug, Clone)] @@ -194,7 +205,7 @@ enum ReadPayloadOverflow { payload: Vec, next_page: u32, remaining_to_read: usize, - page: PageRef, + page: BTreePage, }, } @@ -210,7 +221,7 @@ enum PayloadOverflowWithOffset { ProcessPage { next_page: u32, remaining_to_read: u32, - page: PageRef, + page: BTreePage, current_offset: usize, buffer_offset: usize, is_write: bool, @@ -271,7 +282,7 @@ impl BTreeKey<'_> { #[derive(Clone)] struct BalanceInfo { /// Old pages being balanced. We can have maximum 3 pages being balanced at the same time. - pages_to_balance: [Option; 3], + pages_to_balance: [Option; 3], /// Bookkeeping of the rightmost pointer so the offset::BTREE_RIGHTMOST_PTR can be updated. rightmost_pointer: *mut u8, /// Divider cells of old pages. We can have maximum 2 divider cells because of 3 pages. @@ -542,11 +553,8 @@ impl BTreeCursor { } let cell_idx = cell_idx as usize; - return_if_locked!(page); - if !page.is_loaded() { - self.pager.load_page(page.clone())?; - return Ok(CursorResult::IO); - } + return_if_locked_maybe_load!(self.pager, page); + let page = page.get(); let contents = page.get().contents.as_ref().unwrap(); let cell_count = contents.cell_count(); @@ -557,7 +565,7 @@ impl BTreeCursor { let rightmost_pointer = contents.rightmost_pointer(); if let Some(rightmost_pointer) = rightmost_pointer { self.stack - .push_backwards(self.pager.read_page(rightmost_pointer as usize)?); + .push_backwards(self.read_page(rightmost_pointer as usize)?); continue; } } @@ -580,7 +588,7 @@ impl BTreeCursor { _left_child_page, _rowid, }) => { - let mem_page = self.pager.read_page(_left_child_page as usize)?; + let mem_page = self.read_page(_left_child_page as usize)?; self.stack.push_backwards(mem_page); continue; } @@ -618,7 +626,7 @@ impl BTreeCursor { // left child has: key 663, key 664, key 665 // we need to move to the previous parent (with e.g. key 662) when iterating backwards. self.stack.retreat(); - let mem_page = self.pager.read_page(left_child_page as usize)?; + let mem_page = self.read_page(left_child_page as usize)?; self.stack.push(mem_page); // use cell_index = i32::MAX to tell next loop to go to the end of the current page self.stack.set_cell_index(i32::MAX); @@ -743,7 +751,7 @@ impl BTreeCursor { let res = match &mut self.state { CursorState::None => { tracing::debug!("start reading overflow page payload_size={}", payload_size); - let page = self.pager.read_page(start_next_page as usize)?; + let page = self.read_page(start_next_page as usize)?; self.state = CursorState::Read(ReadPayloadOverflow::ProcessPage { payload: payload.to_vec(), next_page: start_next_page, @@ -756,12 +764,13 @@ impl BTreeCursor { payload, next_page, remaining_to_read, - page, + page: page_btree, }) => { - if page.is_locked() { + if page_btree.get().is_locked() { return Ok(CursorResult::IO); } tracing::debug!("reading overflow page {} {}", next_page, remaining_to_read); + let page = page_btree.get(); let contents = page.get_contents(); // The first four bytes of each overflow page are a big-endian integer which is the page number of the next page in the chain, or zero for the final page in the chain. let next = contents.read_u32_no_offset(0); @@ -779,8 +788,12 @@ impl BTreeCursor { std::mem::swap(payload, &mut payload_swap); CursorResult::Ok(payload_swap) } else { - let new_page = self.pager.read_page(next as usize)?; - *page = new_page; + let new_page = self.pager.read_page(next as usize).map(|page| { + Arc::new(BTreePageInner { + page: RefCell::new(page), + }) + })?; + *page_btree = new_page; *next_page = next; CursorResult::IO } @@ -871,9 +884,10 @@ impl BTreeCursor { return self.continue_payload_overflow_with_offset(buffer, self.usable_space()); } - let page = self.stack.top(); - return_if_locked_maybe_load!(self.pager, page); + let page_btree = self.stack.top(); + return_if_locked_maybe_load!(self.pager, page_btree); + let page = page_btree.get(); let contents = page.get().contents.as_ref().unwrap(); let cell_idx = self.stack.current_cell_index() as usize - 1; @@ -918,7 +932,13 @@ impl BTreeCursor { local_amount = local_size as u32 - offset; } if is_write { - self.write_payload_to_page(offset, local_amount, payload, buffer, page.clone()); + self.write_payload_to_page( + offset, + local_amount, + payload, + buffer, + page_btree.clone(), + ); } else { self.read_payload_from_page(offset, local_amount, payload, buffer); } @@ -973,7 +993,7 @@ impl BTreeCursor { is_write, }) => { if *pages_left_to_skip == 0 { - let page = self.pager.read_page(*next_page as usize)?; + let page = self.read_page(*next_page as usize)?; return_if_locked_maybe_load!(self.pager, page); self.state = CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { @@ -988,8 +1008,9 @@ impl BTreeCursor { continue; } - let page = self.pager.read_page(*next_page as usize)?; + let page = self.read_page(*next_page as usize)?; return_if_locked_maybe_load!(self.pager, page); + let page = page.get(); let contents = page.get_contents(); let next = contents.read_u32_no_offset(0); @@ -1018,17 +1039,17 @@ impl BTreeCursor { CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { next_page, remaining_to_read, - page, + page: page_btree, current_offset, buffer_offset, is_write, }) => { - if page.is_locked() { + if page_btree.get().is_locked() { self.state = CursorState::ReadWritePayload(PayloadOverflowWithOffset::ProcessPage { next_page: *next_page, remaining_to_read: *remaining_to_read, - page: page.clone(), + page: page_btree.clone(), current_offset: *current_offset, buffer_offset: *buffer_offset, is_write: *is_write, @@ -1037,6 +1058,7 @@ impl BTreeCursor { return Ok(CursorResult::IO); } + let page = page_btree.get(); let contents = page.get_contents(); let overflow_size = usable_space - 4; @@ -1054,7 +1076,7 @@ impl BTreeCursor { bytes_to_process, page_payload, buffer, - page.clone(), + page_btree.clone(), ); } else { self.read_payload_from_page( @@ -1081,7 +1103,7 @@ impl BTreeCursor { // Load next page *next_page = next; *current_offset = 0; // Reset offset for new page - *page = self.pager.read_page(next as usize)?; + *page_btree = self.read_page(next as usize)?; // Return IO to allow other operations return Ok(CursorResult::IO); @@ -1116,10 +1138,10 @@ impl BTreeCursor { num_bytes: u32, payload: &[u8], buffer: &mut Vec, - page: PageRef, + page: BTreePage, ) { - page.set_dirty(); - self.pager.add_dirty(page.get().id); + page.get().set_dirty(); + self.pager.add_dirty(page.get().get().id); // SAFETY: This is safe as long as the page is not evicted from the cache. let payload_mut = unsafe { std::slice::from_raw_parts_mut(payload.as_ptr() as *mut u8, payload.len()) }; @@ -1155,15 +1177,15 @@ impl BTreeCursor { let mem_page_rc = self.stack.top(); let cell_idx = self.stack.current_cell_index() as usize; - tracing::trace!("current id={} cell={}", mem_page_rc.get().id, cell_idx); - return_if_locked!(mem_page_rc); - if !mem_page_rc.is_loaded() { - self.pager.load_page(mem_page_rc.clone())?; - return Ok(CursorResult::IO); - } + tracing::trace!( + "current id={} cell={}", + mem_page_rc.get().get().id, + cell_idx + ); + return_if_locked_maybe_load!(self.pager, mem_page_rc); let mem_page = mem_page_rc.get(); - let contents = mem_page.contents.as_ref().unwrap(); + let contents = mem_page.get().contents.as_ref().unwrap(); let cell_count = contents.cell_count(); if cell_count == 0 || cell_idx == cell_count { @@ -1172,7 +1194,7 @@ impl BTreeCursor { match contents.rightmost_pointer() { Some(right_most_pointer) => { self.stack.advance(); - let mem_page = self.pager.read_page(right_most_pointer as usize)?; + let mem_page = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); continue; } @@ -1216,7 +1238,7 @@ impl BTreeCursor { }) => { assert!(predicate.is_none()); self.stack.advance(); - let mem_page = self.pager.read_page(*_left_child_page as usize)?; + let mem_page = self.read_page(*_left_child_page as usize)?; self.stack.push(mem_page); continue; } @@ -1251,7 +1273,7 @@ impl BTreeCursor { payload_size, }) => { if !self.going_upwards { - let mem_page = self.pager.read_page(*left_child_page as usize)?; + let mem_page = self.read_page(*left_child_page as usize)?; self.stack.push(mem_page); continue; } @@ -1387,7 +1409,7 @@ impl BTreeCursor { /// Move the cursor to the root page of the btree. fn move_to_root(&mut self) { tracing::trace!("move_to_root({})", self.root_page); - let mem_page = self.pager.read_page(self.root_page).unwrap(); + let mem_page = self.read_page(self.root_page).unwrap(); self.stack.clear(); self.stack.push(mem_page); } @@ -1398,9 +1420,10 @@ impl BTreeCursor { loop { let mem_page = self.stack.top(); - let page_idx = mem_page.get().id; - let page = self.pager.read_page(page_idx)?; - return_if_locked!(page); + let page_idx = mem_page.get().get().id; + let page = self.read_page(page_idx)?; + return_if_locked!(page.get()); + let page = page.get(); let contents = page.get().contents.as_ref().unwrap(); if contents.is_leaf() { if contents.cell_count() > 0 { @@ -1412,7 +1435,7 @@ impl BTreeCursor { match contents.rightmost_pointer() { Some(right_most_pointer) => { self.stack.set_cell_index(contents.cell_count() as i32 + 1); - let mem_page = self.pager.read_page(right_most_pointer as usize)?; + let mem_page = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); continue; } @@ -1429,7 +1452,8 @@ impl BTreeCursor { let iter_dir = seek_op.iteration_direction(); 'outer: loop { let page = self.stack.top(); - return_if_locked!(page); + return_if_locked!(page.get()); + let page = page.get(); let contents = page.get().contents.as_ref().unwrap(); if contents.is_leaf() { return Ok(CursorResult::Ok(())); @@ -1457,14 +1481,14 @@ impl BTreeCursor { -1 + (iter_dir == IterationDirection::Forwards) as i32 * 2; self.stack .set_cell_index(leftmost_matching_cell as i32 + index_change); - let mem_page = self.pager.read_page(left_child_page as usize)?; + let mem_page = self.read_page(left_child_page as usize)?; self.stack.push(mem_page); continue 'outer; } self.stack.set_cell_index(cell_count as i32 + 1); match contents.rightmost_pointer() { Some(right_most_pointer) => { - let mem_page = self.pager.read_page(right_most_pointer as usize)?; + let mem_page = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); continue 'outer; } @@ -1524,7 +1548,8 @@ impl BTreeCursor { let iter_dir = cmp.iteration_direction(); 'outer: loop { let page = self.stack.top(); - return_if_locked!(page); + return_if_locked!(page.get()); + let page = page.get(); let contents = page.get().contents.as_ref().unwrap(); if contents.is_leaf() { return Ok(CursorResult::Ok(())); @@ -1540,7 +1565,7 @@ impl BTreeCursor { self.stack.set_cell_index(contents.cell_count() as i32 + 1); match contents.rightmost_pointer() { Some(right_most_pointer) => { - let mem_page = self.pager.read_page(right_most_pointer as usize)?; + let mem_page = self.read_page(right_most_pointer as usize)?; self.stack.push(mem_page); continue 'outer; } @@ -1580,7 +1605,7 @@ impl BTreeCursor { unreachable!("unexpected cell type: {:?}", matching_cell); }; - let mem_page = self.pager.read_page(*left_child_page as usize)?; + let mem_page = self.read_page(*left_child_page as usize)?; self.stack.push(mem_page); continue 'outer; } @@ -1686,7 +1711,8 @@ impl BTreeCursor { self.move_to_root(); return_if_io!(self.tablebtree_move_to(rowid, seek_op)); let page = self.stack.top(); - return_if_locked!(page); + return_if_locked!(page.get()); + let page = page.get(); let contents = page.get().contents.as_ref().unwrap(); assert!( contents.is_leaf(), @@ -1837,8 +1863,9 @@ impl BTreeCursor { return_if_io!(self.indexbtree_move_to(key, seek_op)); let page = self.stack.top(); - return_if_locked!(page); + return_if_locked!(page.get()); + let page = page.get(); let contents = page.get().contents.as_ref().unwrap(); let cell_count = contents.cell_count(); @@ -2070,7 +2097,8 @@ impl BTreeCursor { // get page and find cell let (cell_idx, page_type) = { - return_if_locked!(page); + return_if_locked!(page.get()); + let page = page.get(); page.set_dirty(); self.pager.add_dirty(page.get().id); @@ -2088,8 +2116,8 @@ impl BTreeCursor { // if the cell index is less than the total cells, check: if its an existing // rowid, we are going to update / overwrite the cell - if cell_idx < page.get_contents().cell_count() { - match page.get_contents().cell_get( + if cell_idx < page.get().get_contents().cell_count() { + match page.get().get_contents().cell_get( cell_idx, payload_overflow_threshold_max(page_type, self.usable_space() as u16), payload_overflow_threshold_min(page_type, self.usable_space() as u16), @@ -2147,6 +2175,7 @@ impl BTreeCursor { // insert let overflow = { + let page = page.get(); let contents = page.get().contents.as_mut().unwrap(); tracing::debug!( "insert_into_page(overflow, cell_count={})", @@ -2220,6 +2249,7 @@ impl BTreeCursor { // is less than 2/3rds of the total usable space on the page // // https://github.com/sqlite/sqlite/blob/0aa95099f5003dc99f599ab77ac0004950b281ef/src/btree.c#L9064-L9071 + let current_page = current_page.get(); let page = current_page.get().contents.as_mut().unwrap(); let usable_space = self.usable_space(); let free_space = compute_free_space(page, usable_space as u16); @@ -2265,6 +2295,7 @@ impl BTreeCursor { WriteState::BalanceNonRoot => { let parent_page = self.stack.top(); return_if_locked_maybe_load!(self.pager, parent_page); + let parent_page = parent_page.get(); // If `move_to` moved to rightmost page, cell index will be out of bounds. Meaning cell_count+1. // In any other case, `move_to` will stay in the correct index. if self.stack.current_cell_index() as usize @@ -2287,7 +2318,7 @@ impl BTreeCursor { PageType::IndexInterior | PageType::TableInterior )); // Part 1: Find the sibling pages to balance - let mut pages_to_balance: [Option; 3] = [const { None }; 3]; + let mut pages_to_balance: [Option; 3] = [const { None }; 3]; let number_of_cells_in_parent = parent_contents.cell_count() + parent_contents.overflow_cells.len(); @@ -2354,11 +2385,20 @@ impl BTreeCursor { let mut pgno: u32 = unsafe { right_pointer.cast::().read().swap_bytes() }; let current_sibling = sibling_pointer; for i in (0..=current_sibling).rev() { - let page = self.pager.read_page(pgno as usize)?; + let page = self.read_page(pgno as usize)?; + { + // mark as dirty + let sibling_page = page.get(); + sibling_page.set_dirty(); + self.pager.add_dirty(sibling_page.get().id); + } #[cfg(debug_assertions)] { - return_if_locked!(page); - debug_validate_cells!(&page.get_contents(), self.usable_space() as u16); + return_if_locked!(page.get()); + debug_validate_cells!( + &page.get().get_contents(), + self.usable_space() as u16 + ); } pages_to_balance[i].replace(page); assert_eq!( @@ -2399,10 +2439,13 @@ impl BTreeCursor { let page_type_of_siblings = pages_to_balance[0] .as_ref() .unwrap() + .get() .get_contents() .page_type(); for page in pages_to_balance.iter().take(sibling_count) { - let contents = page.as_ref().unwrap().get_contents(); + return_if_locked_maybe_load!(self.pager, page.as_ref().unwrap()); + let page = page.as_ref().unwrap().get(); + let contents = page.get_contents(); debug_validate_cells!(&contents, self.usable_space() as u16); assert_eq!(contents.page_type(), page_type_of_siblings); } @@ -2427,16 +2470,18 @@ impl BTreeCursor { let write_info = self.state.write_info().unwrap(); let mut balance_info = write_info.balance_info.borrow_mut(); let balance_info = balance_info.as_mut().unwrap(); - let all_loaded = balance_info + for page in balance_info .pages_to_balance .iter() .take(balance_info.sibling_count) - .all(|page| !page.as_ref().unwrap().is_locked()); - if !all_loaded { - return Ok(CursorResult::IO); + { + let page = page.as_ref().unwrap(); + return_if_locked_maybe_load!(self.pager, page); } // Now do real balancing - let parent_page = self.stack.top(); + let parent_page_btree = self.stack.top(); + let parent_page = parent_page_btree.get(); + let parent_contents = parent_page.get_contents(); let parent_is_root = !self.stack.has_parent(); @@ -2448,12 +2493,12 @@ impl BTreeCursor { /* 1. Get divider cells and max_cells */ let mut max_cells = 0; // we only need maximum 5 pages to balance 3 pages - let mut pages_to_balance_new: [Option; 5] = [const { None }; 5]; + let mut pages_to_balance_new: [Option; 5] = [const { None }; 5]; for i in (0..balance_info.sibling_count).rev() { let sibling_page = balance_info.pages_to_balance[i].as_ref().unwrap(); + let sibling_page = sibling_page.get(); + assert!(sibling_page.is_loaded()); let sibling_contents = sibling_page.get_contents(); - sibling_page.set_dirty(); - self.pager.add_dirty(sibling_page.get().id); max_cells += sibling_contents.cell_count(); max_cells += sibling_contents.overflow_cells.len(); @@ -2511,6 +2556,7 @@ impl BTreeCursor { let page_type = balance_info.pages_to_balance[0] .as_ref() .unwrap() + .get() .get_contents() .page_type(); tracing::debug!("balance_non_root(page_type={:?})", page_type); @@ -2522,7 +2568,8 @@ impl BTreeCursor { .take(balance_info.sibling_count) .enumerate() { - let old_page_contents = old_page.as_ref().unwrap().get_contents(); + let old_page = old_page.as_ref().unwrap().get(); + let old_page_contents = old_page.get_contents(); debug_validate_cells!(&old_page_contents, self.usable_space() as u16); for cell_idx in 0..old_page_contents.cell_count() { let (cell_start, cell_len) = old_page_contents.cell_get_raw_region( @@ -2612,6 +2659,7 @@ impl BTreeCursor { for i in 0..balance_info.sibling_count { cell_array.number_of_cells_per_page[i] = count_cells_in_old_pages[i]; let page = &balance_info.pages_to_balance[i].as_ref().unwrap(); + let page = page.get(); let page_contents = page.get_contents(); let free_space = compute_free_space(page_contents, self.usable_space() as u16); @@ -2782,10 +2830,11 @@ impl BTreeCursor { for i in 0..sibling_count_new { if i < balance_info.sibling_count { let page = balance_info.pages_to_balance[i].as_ref().unwrap(); - page.set_dirty(); + page.get().set_dirty(); pages_to_balance_new[i].replace(page.clone()); } else { - let page = self.pager.do_allocate_page(page_type, 0); + // FIXME: handle page cache is full + let page = self.allocate_page(page_type, 0); pages_to_balance_new[i].replace(page); // Since this page didn't exist before, we can set it to cells length as it // marks them as empty since it is a prefix sum of cells. @@ -2801,7 +2850,7 @@ impl BTreeCursor { .take(sibling_count_new) .enumerate() { - page_numbers[i] = page.as_ref().unwrap().get().id; + page_numbers[i] = page.as_ref().unwrap().get().get().id; } page_numbers.sort(); for (page, new_id) in pages_to_balance_new @@ -2811,9 +2860,10 @@ impl BTreeCursor { .zip(page_numbers.iter().rev().take(sibling_count_new)) { let page = page.as_ref().unwrap(); - if *new_id != page.get().id { - page.get().id = *new_id; - self.pager.put_loaded_page(*new_id, page.clone()); + if *new_id != page.get().get().id { + page.get().get().id = *new_id; + self.pager + .update_dirty_loaded_page_in_cache(*new_id, page.get())?; } } @@ -2826,7 +2876,7 @@ impl BTreeCursor { for page in pages_to_balance_new.iter().take(sibling_count_new) { tracing::debug!( "balance_non_root(new_sibling page_id={})", - page.as_ref().unwrap().get().id + page.as_ref().unwrap().get().get().id ); } } @@ -2844,6 +2894,7 @@ impl BTreeCursor { .as_ref() .unwrap() .get() + .get() .id as u32; let rightmost_pointer = balance_info.rightmost_pointer; let rightmost_pointer = @@ -2865,11 +2916,12 @@ impl BTreeCursor { let last_page = balance_info.pages_to_balance[balance_info.sibling_count - 1] .as_ref() .unwrap(); - let right_pointer = last_page.get_contents().rightmost_pointer().unwrap(); + let right_pointer = last_page.get().get_contents().rightmost_pointer().unwrap(); let new_last_page = pages_to_balance_new[sibling_count_new - 1] .as_ref() .unwrap(); new_last_page + .get() .get_contents() .write_u32(offset::BTREE_RIGHTMOST_PTR, right_pointer); } @@ -2890,10 +2942,12 @@ impl BTreeCursor { // Interior // Make this page's rightmost pointer point to pointer of divider cell before modification let previous_pointer_divider = read_u32(÷r_cell, 0); - page.get_contents() + page.get() + .get_contents() .write_u32(offset::BTREE_RIGHTMOST_PTR, previous_pointer_divider); // divider cell now points to this page - new_divider_cell.extend_from_slice(&(page.get().id as u32).to_be_bytes()); + new_divider_cell + .extend_from_slice(&(page.get().get().id as u32).to_be_bytes()); // now copy the rest of the divider cell: // Table Interior page: // * varint rowid @@ -2910,11 +2964,13 @@ impl BTreeCursor { divider_cell = &mut cell_array.cells[divider_cell_idx - 1]; let (_, n_bytes_payload) = read_varint(divider_cell)?; let (rowid, _) = read_varint(÷r_cell[n_bytes_payload..])?; - new_divider_cell.extend_from_slice(&(page.get().id as u32).to_be_bytes()); + new_divider_cell + .extend_from_slice(&(page.get().get().id as u32).to_be_bytes()); write_varint_to_vec(rowid, &mut new_divider_cell); } else { // Leaf index - new_divider_cell.extend_from_slice(&(page.get().id as u32).to_be_bytes()); + new_divider_cell + .extend_from_slice(&(page.get().get().id as u32).to_be_bytes()); new_divider_cell.extend_from_slice(divider_cell); } @@ -2928,7 +2984,7 @@ impl BTreeCursor { i, left_pointer ); - assert_eq!(left_pointer, page.get().id as u32); + assert_eq!(left_pointer, page.get().get().id as u32); // FIXME: remove this lock assert!( left_pointer <= self.pager.db_header.lock().database_size, @@ -2949,7 +3005,7 @@ impl BTreeCursor { balance_info, parent_contents, i, - page, + &page.get(), ); } tracing::debug!( @@ -2963,9 +3019,9 @@ impl BTreeCursor { for page in pages_to_balance_new.iter().take(sibling_count_new) { let page = page.as_ref().unwrap(); assert!( - pages_pointed_to.contains(&(page.get().id as u32)), + pages_pointed_to.contains(&(page.get().get().id as u32)), "page {} not pointed to by divider cell or rightmost pointer", - page.get().id + page.get().get().id ); } } @@ -3023,6 +3079,7 @@ impl BTreeCursor { ) }; let page = pages_to_balance_new[page_idx].as_ref().unwrap(); + let page = page.get(); tracing::debug!("pre_edit_page(page={})", page.get().id); let page_contents = page.get_contents(); edit_page( @@ -3047,6 +3104,7 @@ impl BTreeCursor { // TODO: vacuum support let first_child_page = pages_to_balance_new[0].as_ref().unwrap(); + let first_child_page = first_child_page.get(); let first_child_contents = first_child_page.get_contents(); if parent_is_root && parent_contents.cell_count() == 0 @@ -3103,7 +3161,7 @@ impl BTreeCursor { #[cfg(debug_assertions)] self.post_balance_non_root_validation( - &parent_page, + &parent_page_btree, balance_info, parent_contents, pages_to_balance_new, @@ -3117,7 +3175,8 @@ impl BTreeCursor { // We have to free pages that are not used anymore for i in sibling_count_new..balance_info.sibling_count { let page = balance_info.pages_to_balance[i].as_ref().unwrap(); - self.pager.free_page(Some(page.clone()), page.get().id)?; + self.pager + .free_page(Some(page.get().clone()), page.get().get().id)?; } (WriteState::BalanceStart, Ok(CursorResult::Ok(()))) } @@ -3182,10 +3241,10 @@ impl BTreeCursor { #[cfg(debug_assertions)] fn post_balance_non_root_validation( &self, - parent_page: &PageRef, + parent_page: &BTreePage, balance_info: &mut BalanceInfo, parent_contents: &mut PageContent, - pages_to_balance_new: [Option>; 5], + pages_to_balance_new: [Option; 5], page_type: PageType, leaf_data: bool, mut cells_debug: Vec>, @@ -3212,9 +3271,9 @@ impl BTreeCursor { match cell { BTreeCell::TableInteriorCell(table_interior_cell) => { let left_child_page = table_interior_cell._left_child_page; - if left_child_page == parent_page.get().id as u32 { + if left_child_page == parent_page.get().get().id as u32 { tracing::error!("balance_non_root(parent_divider_points_to_same_page, page_id={}, cell_left_child_page={})", - parent_page.get().id, + parent_page.get().get().id, left_child_page, ); valid = false; @@ -3222,9 +3281,9 @@ impl BTreeCursor { } BTreeCell::IndexInteriorCell(index_interior_cell) => { let left_child_page = index_interior_cell.left_child_page; - if left_child_page == parent_page.get().id as u32 { + if left_child_page == parent_page.get().get().id as u32 { tracing::error!("balance_non_root(parent_divider_points_to_same_page, page_id={}, cell_left_child_page={})", - parent_page.get().id, + parent_page.get().get().id, left_child_page, ); valid = false; @@ -3240,6 +3299,7 @@ impl BTreeCursor { .enumerate() { let page = page.as_ref().unwrap(); + let page = page.get(); let contents = page.get_contents(); debug_validate_cells!(contents, self.usable_space() as u16); // Cells are distributed in order @@ -3293,7 +3353,7 @@ impl BTreeCursor { ); valid = false; } - if left_child_page == parent_page.get().id as u32 { + if left_child_page == parent_page.get().get().id as u32 { tracing::error!("balance_non_root(child_page_points_parent_of_child, page_id={}, cell_left_child_page={}, page_idx={})", page.get().id, left_child_page, @@ -3312,7 +3372,7 @@ impl BTreeCursor { ); valid = false; } - if left_child_page == parent_page.get().id as u32 { + if left_child_page == parent_page.get().get().id as u32 { tracing::error!("balance_non_root(child_page_points_parent_of_child, page_id={}, cell_left_child_page={}, page_idx={})", page.get().id, left_child_page, @@ -3365,10 +3425,12 @@ impl BTreeCursor { valid = false; } - if rightmost == page.get().id as u32 || rightmost == parent_page.get().id as u32 { + if rightmost == page.get().id as u32 + || rightmost == parent_page.get().get().id as u32 + { tracing::error!("balance_non_root(balance_shallower_rightmost_pointer, page_id={}, parent_page_id={}, rightmost={})", page.get().id, - parent_page.get().id, + parent_page.get().get().id, rightmost, ); valid = false; @@ -3660,26 +3722,29 @@ impl BTreeCursor { let is_page_1 = { let current_root = self.stack.top(); - current_root.get().id == 1 + current_root.get().get().id == 1 }; let offset = if is_page_1 { DATABASE_HEADER_SIZE } else { 0 }; - let root = self.stack.top(); + let root_btree = self.stack.top(); + let root = root_btree.get(); let root_contents = root.get_contents(); - let child = self.pager.do_allocate_page(root_contents.page_type(), 0); + // FIXME: handle page cache is full + let child_btree = self.pager.do_allocate_page(root_contents.page_type(), 0); tracing::debug!( "balance_root(root={}, rightmost={}, page_type={:?})", root.get().id, - child.get().id, + child_btree.get().get().id, root.get_contents().page_type() ); self.pager.add_dirty(root.get().id); - self.pager.add_dirty(child.get().id); + self.pager.add_dirty(child_btree.get().get().id); let root_buf = root_contents.as_ptr(); + let child = child_btree.get(); let child_contents = child.get_contents(); let child_buf = child_contents.as_ptr(); let (root_pointer_start, root_pointer_len) = @@ -3721,8 +3786,8 @@ impl BTreeCursor { root_contents.overflow_cells.clear(); self.root_page = root.get().id; self.stack.clear(); - self.stack.push(root.clone()); - self.stack.push(child.clone()); + self.stack.push(root_btree.clone()); + self.stack.push(child_btree.clone()); } fn usable_space(&self) -> usize { @@ -3787,10 +3852,11 @@ impl BTreeCursor { self.move_to_root(); loop { let mem_page = self.stack.top(); - let page_id = mem_page.get().id; - let page = self.pager.read_page(page_id)?; - return_if_locked!(page); + let page_id = mem_page.get().get().id; + let page = self.read_page(page_id)?; + return_if_locked!(page.get()); + let page = page.get(); let contents = page.get().contents.as_ref().unwrap(); if contents.is_leaf() { // set cursor just past the last cell to append @@ -3801,7 +3867,7 @@ impl BTreeCursor { match contents.rightmost_pointer() { Some(right_most_pointer) => { self.stack.set_cell_index(contents.cell_count() as i32 + 1); // invalid on interior - let child = self.pager.read_page(right_most_pointer as usize)?; + let child = self.read_page(right_most_pointer as usize)?; self.stack.push(child); } None => unreachable!("interior page must have rightmost pointer"), @@ -3968,10 +4034,10 @@ impl BTreeCursor { match delete_state { DeleteState::Start => { let page = self.stack.top(); - page.set_dirty(); - self.pager.add_dirty(page.get().id); + page.get().set_dirty(); + self.pager.add_dirty(page.get().get().id); if matches!( - page.get_contents().page_type(), + page.get().get_contents().page_type(), PageType::TableLeaf | PageType::TableInterior ) { let _target_rowid = match self.has_record.get() { @@ -4007,6 +4073,7 @@ impl BTreeCursor { cell_idx -= 1; } + let page = page.get(); let contents = page.get().contents.as_ref().unwrap(); if cell_idx >= contents.cell_count() { return_corrupt!(format!( @@ -4051,7 +4118,8 @@ impl BTreeCursor { return_if_io!(self.clear_overflow_pages(&cell)); let page = self.stack.top(); - let contents = page.get().contents.as_ref().unwrap(); + let page = page.get(); + let contents = page.get_contents(); let delete_info = self.state.mut_delete_info().unwrap(); if !contents.is_leaf() { @@ -4085,15 +4153,16 @@ impl BTreeCursor { return_if_locked_maybe_load!(self.pager, leaf_page); assert!( matches!( - leaf_page.get_contents().page_type(), + leaf_page.get().get_contents().page_type(), PageType::TableLeaf | PageType::IndexLeaf ), "self.prev should have returned a leaf page" ); let parent_page = self.stack.parent_page().unwrap(); - assert!(parent_page.is_loaded(), "parent page"); + assert!(parent_page.get().is_loaded(), "parent page"); + let leaf_page = leaf_page.get(); let leaf_contents = leaf_page.get().contents.as_ref().unwrap(); // The index of the cell to removed must be the last one. let leaf_cell_idx = leaf_contents.cell_count() - 1; @@ -4110,9 +4179,10 @@ impl BTreeCursor { self.usable_space(), )?; - parent_page.set_dirty(); - self.pager.add_dirty(parent_page.get().id); + parent_page.get().set_dirty(); + self.pager.add_dirty(parent_page.get().get().id); + let parent_page = parent_page.get(); let parent_contents = parent_page.get().contents.as_mut().unwrap(); // Create an interior cell from a predecessor @@ -4146,6 +4216,7 @@ impl BTreeCursor { let page = self.stack.top(); return_if_locked_maybe_load!(self.pager, page); + let page = page.get(); let contents = page.get().contents.as_ref().unwrap(); let free_space = compute_free_space(contents, self.usable_space() as u16); let needs_balancing = free_space as usize * 3 > self.usable_space() * 2; @@ -4285,8 +4356,9 @@ impl BTreeCursor { let _ = return_if_io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ)); let page = self.stack.top(); // TODO(pere): request load - return_if_locked!(page); + return_if_locked!(page.get()); + let page = page.get(); let contents = page.get().contents.as_ref().unwrap(); // find cell @@ -4343,9 +4415,10 @@ impl BTreeCursor { self.overflow_state = None; return Err(LimboError::Corrupt("Invalid overflow page number".into())); } - let page = self.pager.read_page(next_page as usize)?; - return_if_locked!(page); + let page = self.read_page(next_page as usize)?; + return_if_locked!(page.get()); + let page = page.get(); let contents = page.get().contents.as_ref().unwrap(); let next = contents.read_u32(0); @@ -4417,8 +4490,9 @@ impl BTreeCursor { } DestroyState::ProcessPage => { let page = self.stack.top(); - assert!(page.is_loaded()); // page should be loaded at this time + assert!(page.get().is_loaded()); // page should be loaded at this time + let page = page.get(); let contents = page.get().contents.as_ref().unwrap(); let cell_idx = self.stack.current_cell_index(); @@ -4436,8 +4510,7 @@ impl BTreeCursor { // Non-leaf page which has processed all children but not it's potential right child (false, n) if n == contents.cell_count() as i32 => { if let Some(rightmost) = contents.rightmost_pointer() { - let rightmost_page = - self.pager.read_page(rightmost as usize)?; + let rightmost_page = self.read_page(rightmost as usize)?; self.stack.advance(); self.stack.push(rightmost_page); let destroy_info = self.state.mut_destroy_info().expect( @@ -4506,7 +4579,7 @@ impl BTreeCursor { BTreeCell::IndexInteriorCell(cell) => cell.left_child_page, _ => panic!("expected interior cell"), }; - let child_page = self.pager.read_page(child_page_id as usize)?; + let child_page = self.read_page(child_page_id as usize)?; self.stack.advance(); self.stack.push(child_page); let destroy_info = self.state.mut_destroy_info().expect( @@ -4523,9 +4596,8 @@ impl BTreeCursor { CursorResult::Ok(_) => match cell { // For an index interior cell, clear the left child page now that overflow pages have been cleared BTreeCell::IndexInteriorCell(index_int_cell) => { - let child_page = self - .pager - .read_page(index_int_cell.left_child_page as usize)?; + let child_page = + self.read_page(index_int_cell.left_child_page as usize)?; self.stack.advance(); self.stack.push(child_page); let destroy_info = self.state.mut_destroy_info().expect( @@ -4549,9 +4621,9 @@ impl BTreeCursor { } DestroyState::FreePage => { let page = self.stack.top(); - let page_id = page.get().id; + let page_id = page.get().get().id; - self.pager.free_page(Some(page), page_id)?; + self.pager.free_page(Some(page.get()), page_id)?; if self.stack.has_parent() { self.stack.pop(); @@ -4575,12 +4647,12 @@ impl BTreeCursor { pub fn overwrite_cell( &mut self, - page_ref: PageRef, + page_ref: BTreePage, cell_idx: usize, record: &ImmutableRecord, ) -> Result> { // build the new payload - let page_type = page_ref.get().contents.as_ref().unwrap().page_type(); + let page_type = page_ref.get().get().contents.as_ref().unwrap().page_type(); let mut new_payload = Vec::with_capacity(record.len()); let CursorHasRecord::Yes { rowid } = self.has_record.get() else { panic!("cursor should be pointing to a record"); @@ -4596,6 +4668,7 @@ impl BTreeCursor { // figure out old cell offset & size let (old_offset, old_local_size) = { + let page_ref = page_ref.get(); let page = page_ref.get().contents.as_ref().unwrap(); page.cell_get_raw_region( cell_idx, @@ -4612,12 +4685,12 @@ impl BTreeCursor { } else { // doesn't fit, drop it and insert a new one drop_cell( - page_ref.get_contents(), + page_ref.get().get_contents(), cell_idx, self.usable_space() as u16, )?; insert_into_cell( - page_ref.get_contents(), + page_ref.get().get_contents(), &new_payload, cell_idx, self.usable_space() as u16, @@ -4628,11 +4701,12 @@ impl BTreeCursor { pub fn overwrite_content( &mut self, - page_ref: PageRef, + page_ref: BTreePage, dest_offset: usize, new_payload: &[u8], ) -> Result> { - return_if_locked!(page_ref); + return_if_locked!(page_ref.get()); + let page_ref = page_ref.get(); let buf = page_ref.get().contents.as_mut().unwrap().as_ptr(); buf[dest_offset..dest_offset + new_payload.len()].copy_from_slice(&new_payload); @@ -4678,7 +4752,7 @@ impl BTreeCursor { mem_page_rc = self.stack.top(); return_if_locked_maybe_load!(self.pager, mem_page_rc); mem_page = mem_page_rc.get(); - contents = mem_page.contents.as_ref().unwrap(); + contents = mem_page.get().contents.as_ref().unwrap(); /* If this is a leaf page or the tree is not an int-key tree, then ** this page contains countable entries. Increment the entry counter @@ -4707,7 +4781,7 @@ impl BTreeCursor { mem_page_rc = self.stack.top(); return_if_locked_maybe_load!(self.pager, mem_page_rc); mem_page = mem_page_rc.get(); - contents = mem_page.contents.as_ref().unwrap(); + contents = mem_page.get().contents.as_ref().unwrap(); let cell_idx = self.stack.current_cell_index() as usize; @@ -4727,7 +4801,7 @@ impl BTreeCursor { // should be safe as contents is not a leaf page let right_most_pointer = contents.rightmost_pointer().unwrap(); self.stack.advance(); - let mem_page = self.pager.read_page(right_most_pointer as usize)?; + let mem_page = self.read_page(right_most_pointer as usize)?; self.going_upwards = false; self.stack.push(mem_page); } else { @@ -4754,7 +4828,7 @@ impl BTreeCursor { left_child_page, .. }) => { self.stack.advance(); - let mem_page = self.pager.read_page(left_child_page as usize)?; + let mem_page = self.read_page(left_child_page as usize)?; self.going_upwards = false; self.stack.push(mem_page); } @@ -4768,7 +4842,7 @@ impl BTreeCursor { pub fn save_context(&mut self) { if let CursorHasRecord::Yes { rowid } = self.has_record.get() { self.valid_state = CursorValidState::RequireSeek; - match self.stack.top().get_contents().page_type() { + match self.stack.top().get().get_contents().page_type() { PageType::TableInterior | PageType::TableLeaf => { self.context = Some(CursorContext::TableRowId(rowid.expect( "table cells should have a rowid since we don't support WITHOUT ROWID tables", @@ -4813,6 +4887,18 @@ impl BTreeCursor { pub fn collations(&self) -> &[CollationSeq] { &self.collations } + + pub fn read_page(&self, page_idx: usize) -> Result { + self.pager.read_page(page_idx).map(|page| { + Arc::new(BTreePageInner { + page: RefCell::new(page), + }) + }) + } + + pub fn allocate_page(&self, page_type: PageType, offset: usize) -> BTreePage { + self.pager.do_allocate_page(page_type, offset) + } } #[cfg(debug_assertions)] @@ -4833,7 +4919,7 @@ struct PageStack { /// Pointer to the current page being consumed current_page: Cell, /// List of pages in the stack. Root page will be in index 0 - stack: RefCell<[Option; BTCURSOR_MAX_DEPTH + 1]>, + stack: RefCell<[Option; BTCURSOR_MAX_DEPTH + 1]>, /// List of cell indices in the stack. /// cell_indices[current_page] is the current cell index being consumed. Similarly /// cell_indices[current_page-1] is the cell index of the parent of the current page @@ -4854,11 +4940,11 @@ impl PageStack { } /// Push a new page onto the stack. /// This effectively means traversing to a child page. - fn _push(&self, page: PageRef, starting_cell_idx: i32) { + fn _push(&self, page: BTreePage, starting_cell_idx: i32) { tracing::trace!( "pagestack::push(current={}, new_page_id={})", self.current_page.get(), - page.get().id + page.get().get().id ); self.increment_current(); let current = self.current_page.get(); @@ -4871,11 +4957,11 @@ impl PageStack { self.cell_indices.borrow_mut()[current as usize] = starting_cell_idx; } - fn push(&self, page: PageRef) { + fn push(&self, page: BTreePage) { self._push(page, 0); } - fn push_backwards(&self, page: PageRef) { + fn push_backwards(&self, page: BTreePage) { self._push(page, i32::MAX); } @@ -4892,7 +4978,7 @@ impl PageStack { /// Get the top page on the stack. /// This is the page that is currently being traversed. - fn top(&self) -> PageRef { + fn top(&self) -> BTreePage { let page = self.stack.borrow()[self.current()] .as_ref() .unwrap() @@ -4900,7 +4986,7 @@ impl PageStack { tracing::trace!( "pagestack::top(current={}, page_id={})", self.current(), - page.get().id + page.get().get().id ); page } @@ -4971,7 +5057,7 @@ impl PageStack { fn clear(&self) { self.current_page.set(-1); } - pub fn parent_page(&self) -> Option { + pub fn parent_page(&self) -> Option { if self.current_page.get() > 0 { Some( self.stack.borrow()[self.current() - 1] @@ -5002,6 +5088,12 @@ impl CellArray { } } +impl BTreePageInner { + pub fn get(&self) -> PageRef { + self.page.borrow().clone() + } +} + /// Try to find a free block available and allocate it if found fn find_free_cell(page_ref: &PageContent, usable_space: u16, amount: usize) -> Result { // NOTE: freelist is in ascending order of keys and pc @@ -5057,11 +5149,15 @@ fn find_free_cell(page_ref: &PageContent, usable_space: u16, amount: usize) -> R Ok(0) } -pub fn btree_init_page(page: &PageRef, page_type: PageType, offset: usize, usable_space: u16) { +pub fn btree_init_page(page: &BTreePage, page_type: PageType, offset: usize, usable_space: u16) { // setup btree page let contents = page.get(); - tracing::debug!("btree_init_page(id={}, offset={})", contents.id, offset); - let contents = contents.contents.as_mut().unwrap(); + tracing::debug!( + "btree_init_page(id={}, offset={})", + contents.get().id, + offset + ); + let contents = contents.get().contents.as_mut().unwrap(); contents.offset = offset; let id = page_type as u8; contents.write_u8(offset::BTREE_PAGE_TYPE, id); @@ -5742,6 +5838,7 @@ fn fill_cell_payload( } // we still have bytes to add, we will need to allocate new overflow page + // FIXME: handle page cache is full let overflow_page = pager.allocate_overflow_page(); overflow_pages.push(overflow_page.clone()); { @@ -5873,7 +5970,6 @@ mod tests { compute_free_space, fill_cell_payload, payload_overflow_threshold_max, payload_overflow_threshold_min, }, - pager::PageRef, sqlite3_ondisk::{BTreeCell, PageContent, PageType}, }, types::Value, @@ -5883,7 +5979,7 @@ mod tests { use super::{btree_init_page, defragment_page, drop_cell, insert_into_cell}; #[allow(clippy::arc_with_non_send_sync)] - fn get_page(id: usize) -> PageRef { + fn get_page(id: usize) -> BTreePage { let page = Arc::new(Page::new(id)); let drop_fn = Rc::new(|_| {}); @@ -5895,6 +5991,9 @@ mod tests { ))), ); page.get().contents.replace(inner); + let page = Arc::new(BTreePageInner { + page: RefCell::new(page), + }); btree_init_page(&page, PageType::TableLeaf, 0, 4096); page @@ -5954,6 +6053,7 @@ mod tests { let db = get_database(); let conn = db.connect().unwrap(); let page = get_page(2); + let page = page.get(); let page = page.get_contents(); let header_size = 8; let record = ImmutableRecord::from_registers(&[Register::Value(Value::Integer(1))]); @@ -5977,6 +6077,7 @@ mod tests { let conn = db.connect().unwrap(); let page = get_page(2); + let page = page.get(); let page = page.get_contents(); let header_size = 8; @@ -6007,9 +6108,14 @@ mod tests { fn validate_btree(pager: Rc, page_idx: usize) -> (usize, bool) { let cursor = BTreeCursor::new_table(None, pager.clone(), page_idx); - let page = pager.read_page(page_idx).unwrap(); + let page = cursor.read_page(page_idx).unwrap(); + while page.get().is_locked() { + pager.io.run_once().unwrap(); + } let page = page.get(); - let contents = page.contents.as_ref().unwrap(); + // Pin page in order to not drop it in between + page.set_dirty(); + let contents = page.get().contents.as_ref().unwrap(); let page_type = contents.page_type(); let mut previous_key = None; let mut valid = true; @@ -6030,8 +6136,12 @@ mod tests { BTreeCell::TableInteriorCell(TableInteriorCell { _left_child_page, .. }) => { - child_pages.push(pager.read_page(_left_child_page as usize).unwrap()); - if _left_child_page == page.id as u32 { + let child_page = cursor.read_page(_left_child_page as usize).unwrap(); + while child_page.get().is_locked() { + pager.io.run_once().unwrap(); + } + child_pages.push(child_page); + if _left_child_page == page.get().id as u32 { valid = false; tracing::error!( "left child page is the same as parent {}", @@ -6048,6 +6158,7 @@ mod tests { }; if current_depth >= 100 { tracing::error!("depth is too big"); + page.clear_dirty(); return (100, false); } depth = Some(depth.unwrap_or(current_depth + 1)); @@ -6080,10 +6191,26 @@ mod tests { valid = false; } } - let first_page_type = child_pages.first().map(|p| p.get_contents().page_type()); + let first_page_type = child_pages.first().map(|p| { + if !p.get().is_loaded() { + let new_page = pager.read_page(p.get().get().id).unwrap(); + p.page.replace(new_page); + } + while p.get().is_locked() { + pager.io.run_once().unwrap(); + } + p.get().get_contents().page_type() + }); if let Some(child_type) = first_page_type { for page in child_pages.iter().skip(1) { - if page.get_contents().page_type() != child_type { + if !page.get().is_loaded() { + let new_page = pager.read_page(page.get().get().id).unwrap(); + page.page.replace(new_page); + } + while page.get().is_locked() { + pager.io.run_once().unwrap(); + } + if page.get().get_contents().page_type() != child_type { tracing::error!("child pages have different types"); valid = false; } @@ -6092,14 +6219,20 @@ mod tests { if contents.rightmost_pointer().is_none() && contents.cell_count() == 0 { valid = false; } + page.clear_dirty(); (depth.unwrap(), valid) } fn format_btree(pager: Rc, page_idx: usize, depth: usize) -> String { let cursor = BTreeCursor::new_table(None, pager.clone(), page_idx); - let page = pager.read_page(page_idx).unwrap(); + let page = cursor.read_page(page_idx).unwrap(); + while page.get().is_locked() { + pager.io.run_once().unwrap(); + } let page = page.get(); - let contents = page.contents.as_ref().unwrap(); + // Pin page in order to not drop it in between loading of different pages. If not contents will be a dangling reference. + page.set_dirty(); + let contents = page.get().contents.as_ref().unwrap(); let page_type = contents.page_type(); let mut current = Vec::new(); let mut child = Vec::new(); @@ -6146,6 +6279,7 @@ mod tests { " ".repeat(depth), current.join(", ") ); + page.clear_dirty(); if child.is_empty() { current } else { @@ -6167,15 +6301,19 @@ mod tests { let wal_file = WalFile::new(io.clone(), page_size, wal_shared, buffer_pool.clone()); let wal = Rc::new(RefCell::new(wal_file)); - let page_cache = Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(10))); + let page_cache = Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(2000))); let pager = { let db_header = Arc::new(SpinLock::new(db_header.clone())); Pager::finish_open(db_header, db_file, Some(wal), io, page_cache, buffer_pool).unwrap() }; let pager = Rc::new(pager); + // FIXME: handle page cache is full let page1 = pager.allocate_page().unwrap(); + let page1 = Arc::new(BTreePageInner { + page: RefCell::new(page1), + }); btree_init_page(&page1, PageType::TableLeaf, 0, 4096); - (pager, page1.get().id) + (pager, page1.get().get().id) } #[test] @@ -6298,6 +6436,8 @@ mod tests { tracing::info!("seed: {}", seed); for insert_id in 0..inserts { let do_validate = do_validate_btree || (insert_id % VALIDATE_INTERVAL == 0); + pager.begin_read_tx().unwrap(); + pager.begin_write_tx().unwrap(); let size = size(&mut rng); let key = { let result; @@ -6340,6 +6480,17 @@ mod tests { pager.deref(), ) .unwrap(); + loop { + match pager.end_tx().unwrap() { + crate::CheckpointStatus::Done(_) => break, + crate::CheckpointStatus::IO => { + pager.io.run_once().unwrap(); + } + } + } + pager.begin_read_tx().unwrap(); + // FIXME: add sorted vector instead, should be okay for small amounts of keys for now :P, too lazy to fix right now + cursor.move_to_root(); let mut valid = true; if do_validate { cursor.move_to_root(); @@ -6363,7 +6514,9 @@ mod tests { println!("btree after:\n{}", btree_after); panic!("invalid btree"); } + pager.end_read_tx().unwrap(); } + pager.begin_read_tx().unwrap(); tracing::info!( "=========== btree ===========\n{}\n\n", format_btree(pager.clone(), root_page, 0) @@ -6382,6 +6535,7 @@ mod tests { key, cursor_rowid ); } + pager.end_read_tx().unwrap(); } } @@ -6403,7 +6557,10 @@ mod tests { let mut cursor = BTreeCursor::new_table(None, pager.clone(), index_root_page); let mut keys = SortedVec::new(); tracing::info!("seed: {}", seed); - for _ in 0..inserts { + for i in 0..inserts { + tracing::info!("insert {}", i); + pager.begin_read_tx().unwrap(); + pager.begin_write_tx().unwrap(); let key = { let result; loop { @@ -6437,7 +6594,16 @@ mod tests { ) .unwrap(); cursor.move_to_root(); + loop { + match pager.end_tx().unwrap() { + crate::CheckpointStatus::Done(_) => break, + crate::CheckpointStatus::IO => { + pager.io.run_once().unwrap(); + } + } + } } + pager.begin_read_tx().unwrap(); cursor.move_to_root(); for key in keys.iter() { tracing::trace!("seeking key: {:?}", key); @@ -6454,6 +6620,7 @@ mod tests { key ); } + pager.end_read_tx().unwrap(); } } @@ -6463,6 +6630,7 @@ mod tests { let conn = db.connect().unwrap(); let page = get_page(2); + let page = page.get(); let page = page.get_contents(); let header_size = 8; @@ -6511,7 +6679,7 @@ mod tests { #[test] pub fn btree_index_insert_fuzz_run_equal_size() { - btree_index_insert_fuzz_run(2, 1024 * 32); + btree_index_insert_fuzz_run(2, 1024); } #[test] @@ -6552,25 +6720,25 @@ mod tests { #[test] #[ignore] pub fn fuzz_long_btree_insert_fuzz_run_random() { - btree_insert_fuzz_run(128, 2_000, |rng| (rng.next_u32() % 4096) as usize); + btree_insert_fuzz_run(2, 10_000, |rng| (rng.next_u32() % 4096) as usize); } #[test] #[ignore] pub fn fuzz_long_btree_insert_fuzz_run_small() { - btree_insert_fuzz_run(1, 10_000, |rng| (rng.next_u32() % 128) as usize); + btree_insert_fuzz_run(2, 10_000, |rng| (rng.next_u32() % 128) as usize); } #[test] #[ignore] pub fn fuzz_long_btree_insert_fuzz_run_big() { - btree_insert_fuzz_run(64, 2_000, |rng| 3 * 1024 + (rng.next_u32() % 1024) as usize); + btree_insert_fuzz_run(2, 10_000, |rng| 3 * 1024 + (rng.next_u32() % 1024) as usize); } #[test] #[ignore] pub fn fuzz_long_btree_insert_fuzz_run_overflow() { - btree_insert_fuzz_run(64, 5_000, |rng| (rng.next_u32() % 32 * 1024) as usize); + btree_insert_fuzz_run(2, 5_000, |rng| (rng.next_u32() % 32 * 1024) as usize); } #[allow(clippy::arc_with_non_send_sync)] @@ -6659,13 +6827,14 @@ mod tests { .write_page(current_page as usize, buf.clone(), c)?; pager.io.run_once()?; - let page = cursor.pager.read_page(current_page as usize)?; - while page.is_locked() { + let page = cursor.read_page(current_page as usize)?; + while page.get().is_locked() { cursor.pager.io.run_once()?; } { - let contents = page.get().contents.as_mut().unwrap(); + let page = page.get(); + let contents = page.get_contents(); let next_page = if current_page < 4 { current_page + 1 @@ -6706,8 +6875,8 @@ mod tests { let trunk_page_id = db_header.lock().freelist_trunk_page; if trunk_page_id > 0 { // Verify trunk page structure - let trunk_page = cursor.pager.read_page(trunk_page_id as usize)?; - if let Some(contents) = trunk_page.get().contents.as_ref() { + let trunk_page = cursor.read_page(trunk_page_id as usize)?; + if let Some(contents) = trunk_page.get().get().contents.as_ref() { // Read number of leaf pages in trunk let n_leaf = contents.read_u32(4); assert!(n_leaf > 0, "Trunk page should have leaf entries"); @@ -6786,32 +6955,33 @@ mod tests { ); // Initialize page 2 as a root page (interior) - let root_page = cursor.pager.read_page(2)?; + let root_page = cursor.read_page(2)?; { btree_init_page(&root_page, PageType::TableInterior, 0, 512); // Use proper page size } // Allocate two leaf pages - let page3 = cursor.pager.allocate_page()?; - btree_init_page(&page3, PageType::TableLeaf, 0, 512); + // FIXME: handle page cache is full + let page3 = cursor.allocate_page(PageType::TableLeaf, 0); - let page4 = cursor.pager.allocate_page()?; - btree_init_page(&page4, PageType::TableLeaf, 0, 512); + // FIXME: handle page cache is full + let page4 = cursor.allocate_page(PageType::TableLeaf, 0); // Configure the root page to point to the two leaf pages { + let root_page = root_page.get(); let contents = root_page.get().contents.as_mut().unwrap(); // Set rightmost pointer to page4 - contents.write_u32(offset::BTREE_RIGHTMOST_PTR, page4.get().id as u32); + contents.write_u32(offset::BTREE_RIGHTMOST_PTR, page4.get().get().id as u32); // Create a cell with pointer to page3 let cell_content = vec![ // First 4 bytes: left child pointer (page3) - (page3.get().id >> 24) as u8, - (page3.get().id >> 16) as u8, - (page3.get().id >> 8) as u8, - page3.get().id as u8, + (page3.get().get().id >> 24) as u8, + (page3.get().get().id >> 16) as u8, + (page3.get().get().id >> 8) as u8, + page3.get().get().id as u8, // Next byte: rowid as varint (simple value 100) 100, ]; @@ -6822,6 +6992,7 @@ mod tests { // Add a simple record to each leaf page for page in [&page3, &page4] { + let page = page.get(); let contents = page.get().contents.as_mut().unwrap(); // Simple record with just a rowid and payload @@ -6863,6 +7034,7 @@ mod tests { let conn = db.connect().unwrap(); let page = get_page(2); + let page = page.get(); let page = page.get_contents(); let header_size = 8; @@ -6903,6 +7075,7 @@ mod tests { let conn = db.connect().unwrap(); let page = get_page(2); + let page = page.get(); let page = page.get_contents(); let header_size = 8; @@ -6949,6 +7122,7 @@ mod tests { let conn = db.connect().unwrap(); let page = get_page(2); + let page = page.get(); let page = page.get_contents(); let header_size = 8; @@ -7027,6 +7201,7 @@ mod tests { let conn = db.connect().unwrap(); let page = get_page(2); + let page = page.get(); let page = page.get_contents(); let header_size = 8; @@ -7196,6 +7371,7 @@ mod tests { let db = get_database(); let conn = db.connect().unwrap(); let page = get_page(2); + let page = page.get(); let page = page.get_contents(); let header_size = 8; let usable_space = 4096; @@ -7212,6 +7388,7 @@ mod tests { let conn = db.connect().unwrap(); let page = get_page(2); + let page = page.get(); let page = page.get_contents(); let usable_space = 4096; @@ -7237,6 +7414,7 @@ mod tests { let conn = db.connect().unwrap(); let page = get_page(2); + let page = page.get(); let page = page.get_contents(); let usable_space = 4096; @@ -7270,6 +7448,7 @@ mod tests { let conn = db.connect().unwrap(); let page = get_page(2); + let page = page.get(); let page = page.get_contents(); let usable_space = 4096; @@ -7305,6 +7484,7 @@ mod tests { let conn = db.connect().unwrap(); let page = get_page(2); + let page = page.get(); let page = page.get_contents(); let usable_space = 4096; @@ -7327,6 +7507,7 @@ mod tests { let conn = db.connect().unwrap(); let page = get_page(2); + let page = page.get(); let page = page.get_contents(); let usable_space = 4096; @@ -7365,6 +7546,7 @@ mod tests { let defragment = |page| { defragment_page(page, usable_space); }; + let page = page.get(); defragment(page.get_contents()); defragment(page.get_contents()); insert(0, page.get_contents()); @@ -7407,13 +7589,14 @@ mod tests { let record = ImmutableRecord::from_registers(&[Register::Value(Value::Integer(0))]); let mut payload: Vec = Vec::new(); fill_cell_payload( - page.get_contents().page_type(), + page.get().get_contents().page_type(), Some(0), &mut payload, &record, 4096, conn.pager.clone(), ); + let page = page.get(); insert(0, page.get_contents()); defragment(page.get_contents()); insert(0, page.get_contents()); @@ -7480,19 +7663,19 @@ mod tests { ImmutableRecord::from_registers(&[Register::Value(Value::Blob(vec![0; 3600]))]); let mut payload: Vec = Vec::new(); fill_cell_payload( - page.get_contents().page_type(), + page.get().get_contents().page_type(), Some(0), &mut payload, &record, 4096, conn.pager.clone(), ); - insert_into_cell(page.get_contents(), &payload, 0, 4096).unwrap(); - let free = compute_free_space(page.get_contents(), usable_space); + insert_into_cell(page.get().get_contents(), &payload, 0, 4096).unwrap(); + let free = compute_free_space(page.get().get_contents(), usable_space); let total_size = payload.len() + 2; assert_eq!( free, - usable_space - page.get_contents().header_size() as u16 - total_size as u16 + usable_space - page.get().get_contents().header_size() as u16 - total_size as u16 ); dbg!(free); } @@ -7816,7 +7999,11 @@ mod tests { let (pager, _) = empty_btree(); let page_type = PageType::TableLeaf; let page = pager.allocate_page().unwrap(); + let page = Arc::new(BTreePageInner { + page: RefCell::new(page), + }); btree_init_page(&page, page_type, 0, pager.usable_space() as u16); + let page = page.get(); let mut size = (rng.next_u64() % 100) as u16; let mut i = 0; // add a bunch of cells diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index bd82e5849..3dffd27bc 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -1,9 +1,12 @@ -use std::{cell::RefCell, collections::HashMap, ptr::NonNull}; +use std::{cell::RefCell, ptr::NonNull}; +use std::sync::Arc; use tracing::{debug, trace}; use super::pager::PageRef; +const DEFAULT_PAGE_CACHE_SIZE_IN_PAGES: usize = 2000; + // In limbo, page cache is shared by default, meaning that multiple frames from WAL can reside in // the cache, meaning, we need a way to differentiate between pages cached in different // connections. For this we include the max_frame that a connection will read from so that if two @@ -25,21 +28,44 @@ struct PageCacheEntry { next: Option>, } -impl PageCacheEntry { - fn as_non_null(&mut self) -> NonNull { - NonNull::new(&mut *self).unwrap() - } -} - pub struct DumbLruPageCache { capacity: usize, - map: RefCell>>, + map: RefCell, head: RefCell>>, tail: RefCell>>, } unsafe impl Send for DumbLruPageCache {} unsafe impl Sync for DumbLruPageCache {} +struct PageHashMap { + // FIXME: do we prefer array buckets or list? Deletes will be slower here which I guess happens often. I will do this for now to test how well it does. + buckets: Vec>, + capacity: usize, + size: usize, +} + +#[derive(Clone)] +struct HashMapNode { + key: PageCacheKey, + value: NonNull, +} + +#[derive(Debug, PartialEq)] +pub enum CacheError { + InternalError(String), + Locked, + Dirty, + ActiveRefs, + Full, + KeyExists, +} + +#[derive(Debug, PartialEq)] +pub enum CacheResizeResult { + Done, + PendingEvictions, +} + impl PageCacheKey { pub fn new(pgno: usize, max_frame: Option) -> Self { Self { pgno, max_frame } @@ -47,9 +73,10 @@ impl PageCacheKey { } impl DumbLruPageCache { pub fn new(capacity: usize) -> Self { + assert!(capacity > 0, "capacity of cache should be at least 1"); Self { capacity, - map: RefCell::new(HashMap::new()), + map: RefCell::new(PageHashMap::new(capacity)), head: RefCell::new(None), tail: RefCell::new(None), } @@ -59,9 +86,36 @@ impl DumbLruPageCache { self.map.borrow().contains_key(key) } - pub fn insert(&mut self, key: PageCacheKey, value: PageRef) { - self._delete(key.clone(), false); - trace!("cache_insert(key={:?})", key); + pub fn insert(&mut self, key: PageCacheKey, value: PageRef) -> Result<(), CacheError> { + self._insert(key, value, false) + } + + pub fn insert_ignore_existing( + &mut self, + key: PageCacheKey, + value: PageRef, + ) -> Result<(), CacheError> { + self._insert(key, value, true) + } + + pub fn _insert( + &mut self, + key: PageCacheKey, + value: PageRef, + ignore_exists: bool, + ) -> Result<(), CacheError> { + trace!("insert(key={:?})", key); + // Check first if page already exists in cache + if !ignore_exists { + if let Some(existing_page_ref) = self.get(&key) { + assert!( + Arc::ptr_eq(&value, &existing_page_ref), + "Attempted to insert different page with same key" + ); + return Err(CacheError::KeyExists); + } + } + self.make_room_for(1)?; let entry = Box::new(PageCacheEntry { key: key.clone(), next: None, @@ -69,28 +123,30 @@ impl DumbLruPageCache { page: value, }); let ptr_raw = Box::into_raw(entry); - let ptr = unsafe { ptr_raw.as_mut().unwrap().as_non_null() }; + let ptr = unsafe { NonNull::new_unchecked(ptr_raw) }; self.touch(ptr); self.map.borrow_mut().insert(key, ptr); - if self.len() > self.capacity { - self.pop_if_not_dirty(); - } + Ok(()) } - pub fn delete(&mut self, key: PageCacheKey) { + pub fn delete(&mut self, key: PageCacheKey) -> Result<(), CacheError> { trace!("cache_delete(key={:?})", key); self._delete(key, true) } - pub fn _delete(&mut self, key: PageCacheKey, clean_page: bool) { - let ptr = self.map.borrow_mut().remove(&key); - if ptr.is_none() { - return; + // Returns Ok if key is not found + pub fn _delete(&mut self, key: PageCacheKey, clean_page: bool) -> Result<(), CacheError> { + if !self.contains_key(&key) { + return Ok(()); } - let ptr = ptr.unwrap(); - self.detach(ptr, clean_page); + + let ptr = *self.map.borrow().get(&key).unwrap(); + // Try to detach from LRU list first, can fail + self.detach(ptr, clean_page)?; + let ptr = self.map.borrow_mut().remove(&key).unwrap(); unsafe { std::ptr::drop_in_place(ptr.as_ptr()) }; + Ok(()) } fn get_ptr(&mut self, key: &PageCacheKey) -> Option> { @@ -109,26 +165,47 @@ impl DumbLruPageCache { let mut ptr = self.get_ptr(key)?; let page = unsafe { ptr.as_mut().page.clone() }; if touch { - self.detach(ptr, false); + self.unlink(ptr); self.touch(ptr); } Some(page) } - pub fn resize(&mut self, capacity: usize) { - let _ = capacity; - todo!(); + // To match SQLite behavior, just set capacity and try to shrink as much as possible. + // In case of failure, the caller should request further evictions (e.g. after I/O). + pub fn resize(&mut self, capacity: usize) -> CacheResizeResult { + let new_map = self.map.borrow().rehash(capacity); + self.map.replace(new_map); + self.capacity = capacity; + match self.make_room_for(0) { + Ok(_) => CacheResizeResult::Done, + Err(_) => CacheResizeResult::PendingEvictions, + } } - fn detach(&mut self, mut entry: NonNull, clean_page: bool) { - if clean_page { - // evict buffer - let page = unsafe { &entry.as_mut().page }; - page.clear_loaded(); - debug!("cleaning up page {}", page.get().id); - let _ = page.get().contents.take(); + fn detach( + &mut self, + mut entry: NonNull, + clean_page: bool, + ) -> Result<(), CacheError> { + let entry_mut = unsafe { entry.as_mut() }; + if entry_mut.page.is_locked() { + return Err(CacheError::Locked); + } + if entry_mut.page.is_dirty() { + return Err(CacheError::Dirty); } + if clean_page { + entry_mut.page.clear_loaded(); + debug!("cleaning up page {}", entry_mut.page.get().id); + let _ = entry_mut.page.get().contents.take(); + } + self.unlink(entry); + Ok(()) + } + + fn unlink(&mut self, mut entry: NonNull) { let (next, prev) = unsafe { let c = entry.as_mut(); let next = c.next; @@ -138,7 +215,6 @@ impl DumbLruPageCache { (next, prev) }; - // detach match (prev, next) { (None, None) => { self.head.replace(None); @@ -177,42 +253,352 @@ impl DumbLruPageCache { self.head.borrow_mut().replace(entry); } - fn pop_if_not_dirty(&mut self) { - let tail = *self.tail.borrow(); - if tail.is_none() { - return; + pub fn make_room_for(&mut self, n: usize) -> Result<(), CacheError> { + if n > self.capacity { + return Err(CacheError::Full); } - let mut tail = tail.unwrap(); - let tail_entry = unsafe { tail.as_mut() }; - if tail_entry.page.is_dirty() { - // TODO: drop from another clean entry? - return; - } - tracing::debug!("pop_if_not_dirty(key={:?})", tail_entry.key); - self.detach(tail, true); - assert!(self.map.borrow_mut().remove(&tail_entry.key).is_some()); - } - pub fn clear(&mut self) { - let to_remove: Vec = self.map.borrow().iter().map(|v| v.0.clone()).collect(); - for key in to_remove { - self.delete(key); + let len = self.len(); + let available = self.capacity.saturating_sub(len); + if n <= available && len <= self.capacity { + return Ok(()); + } + + let tail = self.tail.borrow().ok_or_else(|| { + CacheError::InternalError(format!( + "Page cache of len {} expected to have a tail pointer", + self.len() + )) + })?; + + // Handle len > capacity, too + let available = self.capacity.saturating_sub(len); + let x = n.saturating_sub(available); + let mut need_to_evict = x.saturating_add(len.saturating_sub(self.capacity)); + + let mut current_opt = Some(tail); + while need_to_evict > 0 && current_opt.is_some() { + let current = current_opt.unwrap(); + let entry = unsafe { current.as_ref() }; + current_opt = entry.prev; // Pick prev before modifying entry + match self.delete(entry.key.clone()) { + Err(_) => {} + Ok(_) => need_to_evict -= 1, + } + } + + match need_to_evict > 0 { + true => Err(CacheError::Full), + false => Ok(()), } } - pub fn print(&mut self) { - println!("page_cache={}", self.map.borrow().len()); - println!("page_cache={:?}", self.map.borrow()) + pub fn clear(&mut self) -> Result<(), CacheError> { + let mut current = *self.head.borrow(); + while let Some(current_entry) = current { + unsafe { + self.map.borrow_mut().remove(¤t_entry.as_ref().key); + } + let next = unsafe { current_entry.as_ref().next }; + self.detach(current_entry, true)?; + unsafe { + assert!(!current_entry.as_ref().page.is_dirty()); + } + unsafe { std::ptr::drop_in_place(current_entry.as_ptr()) }; + current = next; + } + let _ = self.head.take(); + let _ = self.tail.take(); + + assert!(self.head.borrow().is_none()); + assert!(self.tail.borrow().is_none()); + assert!(self.map.borrow().is_empty()); + Ok(()) + } + + pub fn print(&self) { + tracing::debug!("page_cache_len={}", self.map.borrow().len()); + let head_ptr = *self.head.borrow(); + let mut current = head_ptr; + while let Some(node) = current { + unsafe { + tracing::debug!("page={:?}", node.as_ref().key); + let node_ref = node.as_ref(); + current = node_ref.next; + } + } + } + + #[cfg(test)] + pub fn keys(&mut self) -> Vec { + let mut this_keys = Vec::new(); + let head_ptr = *self.head.borrow(); + let mut current = head_ptr; + while let Some(node) = current { + unsafe { + this_keys.push(node.as_ref().key.clone()); + let node_ref = node.as_ref(); + current = node_ref.next; + } + } + this_keys } pub fn len(&self) -> usize { self.map.borrow().len() } + + #[cfg(test)] + fn get_entry_ptr(&self, key: &PageCacheKey) -> Option> { + self.map.borrow().get(key).copied() + } + + #[cfg(test)] + fn verify_list_integrity(&self) { + let map_len = self.map.borrow().len(); + let head_ptr = *self.head.borrow(); + let tail_ptr: Option> = *self.tail.borrow(); + + if map_len == 0 { + assert!(head_ptr.is_none(), "Head should be None when map is empty"); + assert!(tail_ptr.is_none(), "Tail should be None when map is empty"); + return; + } + + assert!( + head_ptr.is_some(), + "Head should be Some when map is not empty" + ); + assert!( + tail_ptr.is_some(), + "Tail should be Some when map is not empty" + ); + + unsafe { + assert!( + head_ptr.unwrap().as_ref().prev.is_none(), + "Head's prev pointer mismatch" + ); + } + + unsafe { + assert!( + tail_ptr.unwrap().as_ref().next.is_none(), + "Tail's next pointer mismatch" + ); + } + + // Forward traversal + let mut forward_count = 0; + let mut current = head_ptr; + let mut last_ptr: Option> = None; + while let Some(node) = current { + forward_count += 1; + unsafe { + let node_ref = node.as_ref(); + assert_eq!( + node_ref.prev, last_ptr, + "Backward pointer mismatch during forward traversal for key {:?}", + node_ref.key + ); + assert!( + self.map.borrow().contains_key(&node_ref.key), + "Node key {:?} not found in map during forward traversal", + node_ref.key + ); + assert_eq!( + self.map.borrow().get(&node_ref.key).copied(), + Some(node), + "Map pointer mismatch for key {:?}", + node_ref.key + ); + + last_ptr = Some(node); + current = node_ref.next; + } + + if forward_count > map_len + 5 { + panic!( + "Infinite loop suspected in forward integrity check. Size {}, count {}", + map_len, forward_count + ); + } + } + assert_eq!( + forward_count, map_len, + "Forward count mismatch (counted {}, map has {})", + forward_count, map_len + ); + assert_eq!( + tail_ptr, last_ptr, + "Tail pointer mismatch after forward traversal" + ); + + // Backward traversal + let mut backward_count = 0; + current = tail_ptr; + last_ptr = None; + while let Some(node) = current { + backward_count += 1; + unsafe { + let node_ref = node.as_ref(); + assert_eq!( + node_ref.next, last_ptr, + "Forward pointer mismatch during backward traversal for key {:?}", + node_ref.key + ); + assert!( + self.map.borrow().contains_key(&node_ref.key), + "Node key {:?} not found in map during backward traversal", + node_ref.key + ); + + last_ptr = Some(node); + current = node_ref.prev; + } + if backward_count > map_len + 5 { + panic!( + "Infinite loop suspected in backward integrity check. Size {}, count {}", + map_len, backward_count + ); + } + } + assert_eq!( + backward_count, map_len, + "Backward count mismatch (counted {}, map has {})", + backward_count, map_len + ); + assert_eq!( + head_ptr, last_ptr, + "Head pointer mismatch after backward traversal" + ); + } + + #[cfg(test)] + /// For testing purposes, in case we use cursor api directly, we might want to unmark pages as dirty because we bypass the WAL transaction layer + pub fn unset_dirty_all_pages(&mut self) { + for node in self.map.borrow_mut().iter_mut() { + unsafe { + let entry = node.value.as_mut(); + entry.page.clear_dirty() + }; + } + } +} + +impl Default for DumbLruPageCache { + fn default() -> Self { + DumbLruPageCache::new(DEFAULT_PAGE_CACHE_SIZE_IN_PAGES) + } +} + +impl PageHashMap { + pub fn new(capacity: usize) -> PageHashMap { + PageHashMap { + buckets: vec![vec![]; capacity], + capacity, + size: 0, + } + } + + /// Insert page into hashmap. If a key was already in the hashmap, then update it and return the previous value. + pub fn insert( + &mut self, + key: PageCacheKey, + value: NonNull, + ) -> Option> { + let bucket = self.hash(&key); + let bucket = &mut self.buckets[bucket]; + let mut idx = 0; + while let Some(node) = bucket.get_mut(idx) { + if node.key == key { + let prev = node.value; + node.value = value; + return Some(prev); + } + idx += 1; + } + bucket.push(HashMapNode { key, value }); + self.size += 1; + None + } + + pub fn contains_key(&self, key: &PageCacheKey) -> bool { + let bucket = self.hash(&key); + self.buckets[bucket].iter().any(|node| node.key == *key) + } + + pub fn get(&self, key: &PageCacheKey) -> Option<&NonNull> { + let bucket = self.hash(&key); + let bucket = &self.buckets[bucket]; + let mut idx = 0; + while let Some(node) = bucket.get(idx) { + if node.key == *key { + return Some(&node.value); + } + idx += 1; + } + None + } + + pub fn remove(&mut self, key: &PageCacheKey) -> Option> { + let bucket = self.hash(&key); + let bucket = &mut self.buckets[bucket]; + let mut idx = 0; + while let Some(node) = bucket.get(idx) { + if node.key == *key { + break; + } + idx += 1; + } + if idx == bucket.len() { + None + } else { + let v = bucket.remove(idx); + self.size -= 1; + Some(v.value) + } + } + + pub fn is_empty(&self) -> bool { + self.size == 0 + } + + pub fn len(&self) -> usize { + self.size + } + + pub fn iter(&self) -> impl Iterator { + self.buckets.iter().flat_map(|bucket| bucket.iter()) + } + + #[cfg(test)] + pub fn iter_mut(&mut self) -> impl Iterator { + self.buckets.iter_mut().flat_map(|bucket| bucket.iter_mut()) + } + + fn hash(&self, key: &PageCacheKey) -> usize { + key.pgno % self.capacity + } + + pub fn rehash(&self, new_capacity: usize) -> PageHashMap { + let mut new_hash_map = PageHashMap::new(new_capacity); + for node in self.iter() { + new_hash_map.insert(node.key.clone(), node.value); + } + new_hash_map + } } #[cfg(test)] mod tests { - use std::{num::NonZeroUsize, sync::Arc}; + use super::*; + use crate::io::{Buffer, BufferData}; + use crate::storage::page_cache::CacheError; + use crate::storage::pager::{Page, PageRef}; + use crate::storage::sqlite3_ondisk::PageContent; + use std::ptr::NonNull; + use std::{cell::RefCell, num::NonZeroUsize, pin::Pin, rc::Rc, sync::Arc}; use lru::LruCache; use rand_chacha::{ @@ -220,9 +606,277 @@ mod tests { ChaCha8Rng, }; - use crate::{storage::page_cache::DumbLruPageCache, Page}; + fn create_key(id: usize) -> PageCacheKey { + PageCacheKey::new(id, Some(id as u64)) + } - use super::PageCacheKey; + #[allow(clippy::arc_with_non_send_sync)] + pub fn page_with_content(page_id: usize) -> PageRef { + let page = Arc::new(Page::new(page_id)); + { + let buffer_drop_fn = Rc::new(|_data: BufferData| {}); + let buffer = Buffer::new(Pin::new(vec![0; 4096]), buffer_drop_fn); + let page_content = PageContent { + offset: 0, + buffer: Arc::new(RefCell::new(buffer)), + overflow_cells: Vec::new(), + }; + page.get().contents = Some(page_content); + page.set_loaded(); + } + page + } + + fn insert_page(cache: &mut DumbLruPageCache, id: usize) -> PageCacheKey { + let key = create_key(id); + let page = page_with_content(id); + assert!(cache.insert(key.clone(), page).is_ok()); + key + } + + fn page_has_content(page: &PageRef) -> bool { + page.is_loaded() && page.get().contents.is_some() + } + + fn insert_and_get_entry( + cache: &mut DumbLruPageCache, + id: usize, + ) -> (PageCacheKey, NonNull) { + let key = create_key(id); + let page = page_with_content(id); + assert!(cache.insert(key.clone(), page).is_ok()); + let entry = cache.get_ptr(&key).expect("Entry should exist"); + (key, entry) + } + + #[test] + fn test_detach_only_element() { + let mut cache = DumbLruPageCache::default(); + let key1 = insert_page(&mut cache, 1); + cache.verify_list_integrity(); + assert_eq!(cache.len(), 1); + assert!(cache.head.borrow().is_some()); + assert!(cache.tail.borrow().is_some()); + assert_eq!(*cache.head.borrow(), *cache.tail.borrow()); + + assert!(cache.delete(key1.clone()).is_ok()); + + assert_eq!( + cache.len(), + 0, + "Length should be 0 after deleting only element" + ); + assert!( + cache.map.borrow().get(&key1).is_none(), + "Map should not contain key after delete" + ); + assert!(cache.head.borrow().is_none(), "Head should be None"); + assert!(cache.tail.borrow().is_none(), "Tail should be None"); + cache.verify_list_integrity(); + } + + #[test] + fn test_detach_head() { + let mut cache = DumbLruPageCache::default(); + let _key1 = insert_page(&mut cache, 1); // Tail + let key2 = insert_page(&mut cache, 2); // Middle + let key3 = insert_page(&mut cache, 3); // Head + cache.verify_list_integrity(); + assert_eq!(cache.len(), 3); + + let head_ptr_before = cache.head.borrow().unwrap(); + assert_eq!( + unsafe { &head_ptr_before.as_ref().key }, + &key3, + "Initial head check" + ); + + assert!(cache.delete(key3.clone()).is_ok()); + + assert_eq!(cache.len(), 2, "Length should be 2 after deleting head"); + assert!( + cache.map.borrow().get(&key3).is_none(), + "Map should not contain deleted head key" + ); + cache.verify_list_integrity(); + + let new_head_ptr = cache.head.borrow().unwrap(); + assert_eq!( + unsafe { &new_head_ptr.as_ref().key }, + &key2, + "New head should be key2" + ); + assert!( + unsafe { new_head_ptr.as_ref().prev.is_none() }, + "New head's prev should be None" + ); + + let tail_ptr = cache.tail.borrow().unwrap(); + assert_eq!( + unsafe { new_head_ptr.as_ref().next }, + Some(tail_ptr), + "New head's next should point to tail (key1)" + ); + } + + #[test] + fn test_detach_tail() { + let mut cache = DumbLruPageCache::default(); + let key1 = insert_page(&mut cache, 1); // Tail + let key2 = insert_page(&mut cache, 2); // Middle + let _key3 = insert_page(&mut cache, 3); // Head + cache.verify_list_integrity(); + assert_eq!(cache.len(), 3); + + let tail_ptr_before = cache.tail.borrow().unwrap(); + assert_eq!( + unsafe { &tail_ptr_before.as_ref().key }, + &key1, + "Initial tail check" + ); + + assert!(cache.delete(key1.clone()).is_ok()); // Delete tail + + assert_eq!(cache.len(), 2, "Length should be 2 after deleting tail"); + assert!( + cache.map.borrow().get(&key1).is_none(), + "Map should not contain deleted tail key" + ); + cache.verify_list_integrity(); + + let new_tail_ptr = cache.tail.borrow().unwrap(); + assert_eq!( + unsafe { &new_tail_ptr.as_ref().key }, + &key2, + "New tail should be key2" + ); + assert!( + unsafe { new_tail_ptr.as_ref().next.is_none() }, + "New tail's next should be None" + ); + + let head_ptr = cache.head.borrow().unwrap(); + assert_eq!( + unsafe { head_ptr.as_ref().prev }, + None, + "Head's prev should point to new tail (key2)" + ); + assert_eq!( + unsafe { head_ptr.as_ref().next }, + Some(new_tail_ptr), + "Head's next should point to new tail (key2)" + ); + assert_eq!( + unsafe { new_tail_ptr.as_ref().next }, + None, + "Double check new tail's next is None" + ); + } + + #[test] + fn test_detach_middle() { + let mut cache = DumbLruPageCache::default(); + let key1 = insert_page(&mut cache, 1); // Tail + let key2 = insert_page(&mut cache, 2); // Middle + let key3 = insert_page(&mut cache, 3); // Middle + let _key4 = insert_page(&mut cache, 4); // Head + cache.verify_list_integrity(); + assert_eq!(cache.len(), 4); + + let head_ptr_before = cache.head.borrow().unwrap(); + let tail_ptr_before = cache.tail.borrow().unwrap(); + + assert!(cache.delete(key2.clone()).is_ok()); // Detach a middle element (key2) + + assert_eq!(cache.len(), 3, "Length should be 3 after deleting middle"); + assert!( + cache.map.borrow().get(&key2).is_none(), + "Map should not contain deleted middle key2" + ); + cache.verify_list_integrity(); + + // Check neighbors + let key1_ptr = cache.get_entry_ptr(&key1).expect("Key1 should still exist"); + let key3_ptr = cache.get_entry_ptr(&key3).expect("Key3 should still exist"); + assert_eq!( + unsafe { key3_ptr.as_ref().next }, + Some(key1_ptr), + "Key3's next should point to key1" + ); + assert_eq!( + unsafe { key1_ptr.as_ref().prev }, + Some(key3_ptr), + "Key1's prev should point to key2" + ); + + assert_eq!( + cache.head.borrow().unwrap(), + head_ptr_before, + "Head should remain key4" + ); + assert_eq!( + cache.tail.borrow().unwrap(), + tail_ptr_before, + "Tail should remain key1" + ); + } + + #[test] + #[ignore = "for now let's not track active refs"] + fn test_detach_via_delete() { + let mut cache = DumbLruPageCache::default(); + let key1 = create_key(1); + let page1 = page_with_content(1); + assert!(cache.insert(key1.clone(), page1.clone()).is_ok()); + assert!(page_has_content(&page1)); + cache.verify_list_integrity(); + + let result = cache.delete(key1.clone()); + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), CacheError::ActiveRefs); + assert_eq!(cache.len(), 1); + + drop(page1); + + assert!(cache.delete(key1).is_ok()); + assert_eq!(cache.len(), 0); + cache.verify_list_integrity(); + } + + #[test] + fn test_insert_same_id_different_frame() { + let mut cache = DumbLruPageCache::default(); + let key1_1 = PageCacheKey::new(1, Some(1 as u64)); + let key1_2 = PageCacheKey::new(1, Some(2 as u64)); + let page1_1 = page_with_content(1); + let page1_2 = page_with_content(1); + + assert!(cache.insert(key1_1.clone(), page1_1.clone()).is_ok()); + assert!(cache.insert(key1_2.clone(), page1_2.clone()).is_ok()); + assert_eq!(cache.len(), 2); + cache.verify_list_integrity(); + } + + #[test] + #[should_panic(expected = "Attempted to insert different page with same key")] + fn test_insert_existing_key_fail() { + let mut cache = DumbLruPageCache::default(); + let key1 = create_key(1); + let page1_v1 = page_with_content(1); + let page1_v2 = page_with_content(1); + assert!(cache.insert(key1.clone(), page1_v1.clone()).is_ok()); + assert_eq!(cache.len(), 1); + cache.verify_list_integrity(); + let _ = cache.insert(key1.clone(), page1_v2.clone()); // Panic + } + + #[test] + fn test_detach_nonexistent_key() { + let mut cache = DumbLruPageCache::default(); + let key_nonexist = create_key(99); + + assert!(cache.delete(key_nonexist.clone()).is_ok()); // no-op + } #[test] fn test_page_cache_evict() { @@ -233,6 +887,116 @@ mod tests { assert!(cache.get(&key1).is_none()); } + #[test] + fn test_detach_locked_page() { + let mut cache = DumbLruPageCache::default(); + let (_, mut entry) = insert_and_get_entry(&mut cache, 1); + unsafe { entry.as_mut().page.set_locked() }; + assert_eq!(cache.detach(entry, false), Err(CacheError::Locked)); + cache.verify_list_integrity(); + } + + #[test] + fn test_detach_dirty_page() { + let mut cache = DumbLruPageCache::default(); + let (key, mut entry) = insert_and_get_entry(&mut cache, 1); + cache.get(&key).expect("Page should exist"); + unsafe { entry.as_mut().page.set_dirty() }; + assert_eq!(cache.detach(entry, false), Err(CacheError::Dirty)); + cache.verify_list_integrity(); + } + + #[test] + #[ignore = "for now let's not track active refs"] + fn test_detach_with_active_reference_clean() { + let mut cache = DumbLruPageCache::default(); + let (key, entry) = insert_and_get_entry(&mut cache, 1); + let page_ref = cache.get(&key); + assert_eq!(cache.detach(entry, true), Err(CacheError::ActiveRefs)); + drop(page_ref); + cache.verify_list_integrity(); + } + + #[test] + #[ignore = "for now let's not track active refs"] + fn test_detach_with_active_reference_no_clean() { + let mut cache = DumbLruPageCache::default(); + let (key, entry) = insert_and_get_entry(&mut cache, 1); + cache.get(&key).expect("Page should exist"); + assert!(cache.detach(entry, false).is_ok()); + assert!(cache.map.borrow_mut().remove(&key).is_some()); + cache.verify_list_integrity(); + } + + #[test] + fn test_detach_without_cleaning() { + let mut cache = DumbLruPageCache::default(); + let (key, entry) = insert_and_get_entry(&mut cache, 1); + assert!(cache.detach(entry, false).is_ok()); + assert!(cache.map.borrow_mut().remove(&key).is_some()); + cache.verify_list_integrity(); + assert_eq!(cache.len(), 0); + } + + #[test] + fn test_detach_with_cleaning() { + let mut cache = DumbLruPageCache::default(); + let (key, entry) = insert_and_get_entry(&mut cache, 1); + let page = cache.get(&key).expect("Page should exist"); + assert!(page_has_content(&page)); + drop(page); + assert!(cache.detach(entry, true).is_ok()); + // Internal testing: the page is still in map, so we use it to check content + let page = cache.peek(&key, false).expect("Page should exist in map"); + assert!(!page_has_content(&page)); + assert!(cache.map.borrow_mut().remove(&key).is_some()); + cache.verify_list_integrity(); + } + + #[test] + fn test_detach_only_element_preserves_integrity() { + let mut cache = DumbLruPageCache::default(); + let (_, entry) = insert_and_get_entry(&mut cache, 1); + assert!(cache.detach(entry, false).is_ok()); + assert!( + cache.head.borrow().is_none(), + "Head should be None after detaching only element" + ); + assert!( + cache.tail.borrow().is_none(), + "Tail should be None after detaching only element" + ); + } + + #[test] + fn test_detach_with_multiple_pages() { + let mut cache = DumbLruPageCache::default(); + let (key1, _) = insert_and_get_entry(&mut cache, 1); + let (key2, entry2) = insert_and_get_entry(&mut cache, 2); + let (key3, _) = insert_and_get_entry(&mut cache, 3); + let head_key = unsafe { cache.head.borrow().unwrap().as_ref().key.clone() }; + let tail_key = unsafe { cache.tail.borrow().unwrap().as_ref().key.clone() }; + assert_eq!(head_key, key3, "Head should be key3"); + assert_eq!(tail_key, key1, "Tail should be key1"); + assert!(cache.detach(entry2, false).is_ok()); + let head_entry = unsafe { cache.head.borrow().unwrap().as_ref() }; + let tail_entry = unsafe { cache.tail.borrow().unwrap().as_ref() }; + assert_eq!(head_entry.key, key3, "Head should still be key3"); + assert_eq!(tail_entry.key, key1, "Tail should still be key1"); + assert_eq!( + unsafe { head_entry.next.unwrap().as_ref().key.clone() }, + key1, + "Head's next should point to tail after middle element detached" + ); + assert_eq!( + unsafe { tail_entry.prev.unwrap().as_ref().key.clone() }, + key3, + "Tail's prev should point to head after middle element detached" + ); + assert!(cache.map.borrow_mut().remove(&key2).is_some()); + cache.verify_list_integrity(); + } + #[test] fn test_page_cache_fuzz() { let seed = std::time::SystemTime::now() @@ -246,7 +1010,11 @@ mod tests { let mut lru = LruCache::new(NonZeroUsize::new(10).unwrap()); for _ in 0..10000 { - match rng.next_u64() % 3 { + cache.print(); + for (key, _) in &lru { + tracing::debug!("lru_page={:?}", key); + } + match rng.next_u64() % 2 { 0 => { // add let id_page = rng.next_u64() % max_pages; @@ -254,44 +1022,74 @@ mod tests { let key = PageCacheKey::new(id_page as usize, Some(id_frame)); #[allow(clippy::arc_with_non_send_sync)] let page = Arc::new(Page::new(id_page as usize)); - // println!("inserting page {:?}", key); - cache.insert(key.clone(), page.clone()); - lru.push(key, page); + if let Some(_) = cache.peek(&key, false) { + continue; // skip duplicate page ids + } + tracing::debug!("inserting page {:?}", key); + match cache.insert(key.clone(), page.clone()) { + Err(CacheError::Full | CacheError::ActiveRefs) => {} // Ignore + Err(err) => { + // Any other error should fail the test + panic!("Cache insertion failed: {:?}", err); + } + Ok(_) => { + lru.push(key, page); + } + } assert!(cache.len() <= 10); } 1 => { // remove let random = rng.next_u64() % 2 == 0; let key = if random || lru.is_empty() { - let id_page = rng.next_u64() % max_pages; + let id_page: u64 = rng.next_u64() % max_pages; let id_frame = rng.next_u64() % max_pages; let key = PageCacheKey::new(id_page as usize, Some(id_frame)); key } else { let i = rng.next_u64() as usize % lru.len(); - let key = lru.iter().skip(i).next().unwrap().0.clone(); + let key: PageCacheKey = lru.iter().skip(i).next().unwrap().0.clone(); key }; - // println!("removing page {:?}", key); + tracing::debug!("removing page {:?}", key); lru.pop(&key); - cache.delete(key); - } - 2 => { - // test contents - for (key, page) in &lru { - // println!("getting page {:?}", key); - cache.peek(&key, false).unwrap(); - assert_eq!(page.get().id, key.pgno); - } + assert!(cache.delete(key).is_ok()); } _ => unreachable!(), } + compare_to_lru(&mut cache, &lru); + cache.print(); + for (key, _) in &lru { + tracing::debug!("lru_page={:?}", key); + } + cache.verify_list_integrity(); + for (key, page) in &lru { + println!("getting page {:?}", key); + cache.peek(&key, false).unwrap(); + assert_eq!(page.get().id, key.pgno); + } + } + assert!(lru.len() > 0); // Check it inserted some + } + + pub fn compare_to_lru(cache: &mut DumbLruPageCache, lru: &LruCache) { + let this_keys = cache.keys(); + let mut lru_keys = Vec::new(); + for (lru_key, _) in lru { + lru_keys.push(lru_key.clone()); + } + if this_keys != lru_keys { + cache.print(); + for (lru_key, _) in lru { + tracing::debug!("lru_page={:?}", lru_key); + } + assert_eq!(&this_keys, &lru_keys) } } #[test] fn test_page_cache_insert_and_get() { - let mut cache = DumbLruPageCache::new(2); + let mut cache = DumbLruPageCache::default(); let key1 = insert_page(&mut cache, 1); let key2 = insert_page(&mut cache, 2); assert_eq!(cache.get(&key1).unwrap().get().id, 1); @@ -311,36 +1109,101 @@ mod tests { #[test] fn test_page_cache_delete() { - let mut cache = DumbLruPageCache::new(2); + let mut cache = DumbLruPageCache::default(); let key1 = insert_page(&mut cache, 1); - cache.delete(key1.clone()); + assert!(cache.delete(key1.clone()).is_ok()); assert!(cache.get(&key1).is_none()); } #[test] fn test_page_cache_clear() { - let mut cache = DumbLruPageCache::new(2); + let mut cache = DumbLruPageCache::default(); let key1 = insert_page(&mut cache, 1); let key2 = insert_page(&mut cache, 2); - cache.clear(); + assert!(cache.clear().is_ok()); assert!(cache.get(&key1).is_none()); assert!(cache.get(&key2).is_none()); } - fn insert_page(cache: &mut DumbLruPageCache, id: usize) -> PageCacheKey { - let key = PageCacheKey::new(id, None); - #[allow(clippy::arc_with_non_send_sync)] - let page = Arc::new(Page::new(id)); - cache.insert(key.clone(), page.clone()); - key - } - #[test] fn test_page_cache_insert_sequential() { - let mut cache = DumbLruPageCache::new(2); + let mut cache = DumbLruPageCache::default(); for i in 0..10000 { let key = insert_page(&mut cache, i); assert_eq!(cache.peek(&key, false).unwrap().get().id, i); } } + + #[test] + fn test_resize_smaller_success() { + let mut cache = DumbLruPageCache::default(); + for i in 1..=5 { + let _ = insert_page(&mut cache, i); + } + assert_eq!(cache.len(), 5); + let result = cache.resize(3); + assert_eq!(result, CacheResizeResult::Done); + assert_eq!(cache.len(), 3); + assert_eq!(cache.capacity, 3); + assert!(cache.insert(create_key(6), page_with_content(6)).is_ok()); + } + + #[test] + #[should_panic(expected = "Attempted to insert different page with same key")] + fn test_resize_larger() { + let mut cache = DumbLruPageCache::default(); + let _ = insert_page(&mut cache, 1); + let _ = insert_page(&mut cache, 2); + assert_eq!(cache.len(), 2); + let result = cache.resize(5); + assert_eq!(result, CacheResizeResult::Done); + assert_eq!(cache.len(), 2); + assert_eq!(cache.capacity, 5); + assert!(cache.get(&create_key(1)).is_some()); + assert!(cache.get(&create_key(2)).is_some()); + for i in 3..=5 { + let _ = insert_page(&mut cache, i); + } + assert_eq!(cache.len(), 5); + // FIXME: For now this will assert because we cannot insert a page with same id but different contents of page. + assert!(cache.insert(create_key(4), page_with_content(4)).is_err()); + cache.verify_list_integrity(); + } + + #[test] + #[ignore = "for now let's not track active refs"] + fn test_resize_with_active_references() { + let mut cache = DumbLruPageCache::default(); + let page1 = page_with_content(1); + let page2 = page_with_content(2); + let page3 = page_with_content(3); + assert!(cache.insert(create_key(1), page1.clone()).is_ok()); + assert!(cache.insert(create_key(2), page2.clone()).is_ok()); + assert!(cache.insert(create_key(3), page3.clone()).is_ok()); + assert_eq!(cache.len(), 3); + cache.verify_list_integrity(); + assert_eq!(cache.resize(2), CacheResizeResult::PendingEvictions); + assert_eq!(cache.capacity, 2); + assert_eq!(cache.len(), 3); + drop(page2); + drop(page3); + assert_eq!(cache.resize(1), CacheResizeResult::Done); // Evicted 2 and 3 + assert_eq!(cache.len(), 1); + assert!(cache.insert(create_key(4), page_with_content(4)).is_err()); + cache.verify_list_integrity(); + } + + #[test] + fn test_resize_same_capacity() { + let mut cache = DumbLruPageCache::new(3); + for i in 1..=3 { + let _ = insert_page(&mut cache, i); + } + let result = cache.resize(3); + assert_eq!(result, CacheResizeResult::Done); // no-op + assert_eq!(cache.len(), 3); + assert_eq!(cache.capacity, 3); + cache.verify_list_integrity(); + assert!(cache.insert(create_key(4), page_with_content(4)).is_ok()); + } } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 6dbd1c961..e289e8281 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1,5 +1,6 @@ use crate::fast_lock::SpinLock; use crate::result::LimboResult; +use crate::storage::btree::BTreePageInner; use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType}; @@ -13,7 +14,8 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tracing::trace; -use super::page_cache::{DumbLruPageCache, PageCacheKey}; +use super::btree::BTreePage; +use super::page_cache::{CacheError, CacheResizeResult, DumbLruPageCache, PageCacheKey}; use super::wal::{CheckpointMode, CheckpointStatus}; pub struct PageInner { @@ -22,6 +24,7 @@ pub struct PageInner { pub id: usize, } +#[derive(Debug)] pub struct Page { pub inner: UnsafeCell, } @@ -213,6 +216,7 @@ impl Pager { }) } + // FIXME: handle no room in page cache pub fn btree_create(&self, flags: &CreateBTreeFlags) -> u32 { let page_type = match flags { _ if flags.is_table() => PageType::TableLeaf, @@ -220,12 +224,13 @@ impl Pager { _ => unreachable!("Invalid flags state"), }; let page = self.do_allocate_page(page_type, 0); - let id = page.get().id; + let id = page.get().get().id; id as u32 } /// Allocate a new overflow page. /// This is done when a cell overflows and new space is needed. + // FIXME: handle no room in page cache pub fn allocate_overflow_page(&self) -> PageRef { let page = self.allocate_page().unwrap(); tracing::debug!("Pager::allocate_overflow_page(id={})", page.get().id); @@ -240,13 +245,17 @@ impl Pager { /// Allocate a new page to the btree via the pager. /// This marks the page as dirty and writes the page header. - pub fn do_allocate_page(&self, page_type: PageType, offset: usize) -> PageRef { + // FIXME: handle no room in page cache + pub fn do_allocate_page(&self, page_type: PageType, offset: usize) -> BTreePage { let page = self.allocate_page().unwrap(); + let page = Arc::new(BTreePageInner { + page: RefCell::new(page), + }); crate::btree_init_page(&page, page_type, offset, self.usable_space() as u16); tracing::debug!( "do_allocate_page(id={}, page_type={:?})", - page.get().id, - page.get_contents().page_type() + page.get().get().id, + page.get().get_contents().page_type() ); page } @@ -302,7 +311,7 @@ impl Pager { } /// Reads a page from the database. - pub fn read_page(&self, page_idx: usize) -> Result { + pub fn read_page(&self, page_idx: usize) -> Result { tracing::trace!("read_page(page_idx = {})", page_idx); let mut page_cache = self.page_cache.write(); let max_frame = match &self.wal { @@ -324,9 +333,21 @@ impl Pager { { page.set_uptodate(); } - // TODO(pere) ensure page is inserted, we should probably first insert to page cache - // and if successful, read frame or page - page_cache.insert(page_key, page.clone()); + // TODO(pere) should probably first insert to page cache, and if successful, + // read frame or page + match page_cache.insert(page_key, page.clone()) { + Ok(_) => {} + Err(CacheError::Full) => return Err(LimboError::CacheFull), + Err(CacheError::KeyExists) => { + unreachable!("Page should not exist in cache after get() miss") + } + Err(e) => { + return Err(LimboError::InternalError(format!( + "Failed to insert page into cache: {:?}", + e + ))) + } + } return Ok(page); } } @@ -336,49 +357,20 @@ impl Pager { page.clone(), page_idx, )?; - // TODO(pere) ensure page is inserted - page_cache.insert(page_key, page.clone()); - Ok(page) - } - - /// Loads pages if not loaded - pub fn load_page(&self, page: PageRef) -> Result<()> { - let id = page.get().id; - trace!("load_page(page_idx = {})", id); - let mut page_cache = self.page_cache.write(); - page.set_locked(); - let max_frame = match &self.wal { - Some(wal) => wal.borrow().get_max_frame(), - None => 0, - }; - let page_key = PageCacheKey::new(id, Some(max_frame)); - if let Some(wal) = &self.wal { - if let Some(frame_id) = wal.borrow().find_frame(id as u64)? { - wal.borrow() - .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; - { - page.set_uptodate(); - } - // TODO(pere) ensure page is inserted - if !page_cache.contains_key(&page_key) { - page_cache.insert(page_key, page.clone()); - } - return Ok(()); + match page_cache.insert(page_key, page.clone()) { + Ok(_) => {} + Err(CacheError::Full) => return Err(LimboError::CacheFull), + Err(CacheError::KeyExists) => { + unreachable!("Page should not exist in cache after get() miss") + } + Err(e) => { + return Err(LimboError::InternalError(format!( + "Failed to insert page into cache: {:?}", + e + ))) } } - - // TODO(pere) ensure page is inserted - if !page_cache.contains_key(&page_key) { - page_cache.insert(page_key, page.clone()); - } - sqlite3_ondisk::begin_read_page( - self.db_file.clone(), - self.buffer_pool.clone(), - page.clone(), - id, - )?; - - Ok(()) + Ok(page) } /// Writes the database header. @@ -387,9 +379,9 @@ impl Pager { } /// Changes the size of the page cache. - pub fn change_page_cache_size(&self, capacity: usize) { + pub fn change_page_cache_size(&self, capacity: usize) -> Result { let mut page_cache = self.page_cache.write(); - page_cache.resize(capacity); + Ok(page_cache.resize(capacity)) } pub fn add_dirty(&self, page_id: usize) { @@ -422,8 +414,8 @@ impl Pager { for page_id in self.dirty_pages.borrow().iter() { let mut cache = self.page_cache.write(); let page_key = PageCacheKey::new(*page_id, Some(max_frame)); + let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); if let Some(wal) = &self.wal { - let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); trace!("cacheflush(page={}, page_type={:?}", page_id, page_type); wal.borrow_mut().append_frame( @@ -432,10 +424,12 @@ impl Pager { self.flush_info.borrow().in_flight_writes.clone(), )?; } - // This page is no longer valid. - // For example: - // We took page with key (page_num, max_frame) -- this page is no longer valid for that max_frame so it must be invalidated. - cache.delete(page_key); + page.clear_dirty(); + } + // This is okay assuming we use shared cache by default. + { + let mut cache = self.page_cache.write(); + cache.clear().unwrap(); } self.dirty_pages.borrow_mut().clear(); self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames; @@ -560,7 +554,10 @@ impl Pager { } } // TODO: only clear cache of things that are really invalidated - self.page_cache.write().clear(); + self.page_cache + .write() + .clear() + .expect("Failed to clear page cache"); checkpoint_result } @@ -640,6 +637,7 @@ impl Pager { Gets a new page that increasing the size of the page or uses a free page. Currently free list pages are not yet supported. */ + // FIXME: handle no room in page cache #[allow(clippy::readonly_write_lock)] pub fn allocate_page(&self) -> Result { let header = &self.db_header; @@ -663,33 +661,55 @@ impl Pager { } } + // FIXME: should reserve page cache entry before modifying the database let page = allocate_page(header.database_size as usize, &self.buffer_pool, 0); { // setup page and add to cache page.set_dirty(); self.add_dirty(page.get().id); - let mut cache = self.page_cache.write(); let max_frame = match &self.wal { Some(wal) => wal.borrow().get_max_frame(), None => 0, }; let page_key = PageCacheKey::new(page.get().id, Some(max_frame)); - cache.insert(page_key, page.clone()); + let mut cache = self.page_cache.write(); + match cache.insert(page_key, page.clone()) { + Err(CacheError::Full) => return Err(LimboError::CacheFull), + Err(_) => { + return Err(LimboError::InternalError( + "Unknown error inserting page to cache".into(), + )) + } + Ok(_) => return Ok(page), + } } - Ok(page) } - pub fn put_loaded_page(&self, id: usize, page: PageRef) { + pub fn update_dirty_loaded_page_in_cache( + &self, + id: usize, + page: PageRef, + ) -> Result<(), LimboError> { let mut cache = self.page_cache.write(); - // cache insert invalidates previous page let max_frame = match &self.wal { Some(wal) => wal.borrow().get_max_frame(), None => 0, }; let page_key = PageCacheKey::new(id, Some(max_frame)); - cache.insert(page_key, page.clone()); + + // FIXME: use specific page key for writer instead of max frame, this will make readers not conflict + assert!(page.is_dirty()); + cache + .insert_ignore_existing(page_key, page.clone()) + .map_err(|e| { + LimboError::InternalError(format!( + "Failed to insert loaded page {} into cache: {:?}", + id, e + )) + })?; page.set_loaded(); + Ok(()) } pub fn usable_size(&self) -> usize { @@ -760,7 +780,7 @@ mod tests { std::thread::spawn(move || { let mut cache = cache.write(); let page_key = PageCacheKey::new(1, None); - cache.insert(page_key, Arc::new(Page::new(1))); + cache.insert(page_key, Arc::new(Page::new(1))).unwrap(); }) }; let _ = thread.join(); diff --git a/core/translate/pragma.rs b/core/translate/pragma.rs index 4e49a55d1..2de4ce996 100644 --- a/core/translate/pragma.rs +++ b/core/translate/pragma.rs @@ -305,5 +305,7 @@ fn update_cache_size(value: i64, header: Arc>, pager: R pager.write_database_header(&header_copy); // update cache size - pager.change_page_cache_size(cache_size); + pager + .change_page_cache_size(cache_size) + .expect("couldn't update page cache size"); } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index a6b375651..ad0f9714a 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -4323,6 +4323,7 @@ pub fn op_create_btree( // TODO: implement temp databases todo!("temp databases not implemented yet"); } + // FIXME: handle page cache is full let root_page = pager.btree_create(flags); state.registers[*root] = Register::Value(Value::Integer(root_page as i64)); state.pc += 1; @@ -4691,7 +4692,7 @@ pub fn op_open_ephemeral( let db_header = Pager::begin_open(db_file.clone())?; let buffer_pool = Rc::new(BufferPool::new(db_header.lock().get_page_size() as usize)); - let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10))); + let page_cache = Arc::new(RwLock::new(DumbLruPageCache::default())); let pager = Rc::new(Pager::finish_open( db_header, @@ -4708,6 +4709,7 @@ pub fn op_open_ephemeral( &CreateBTreeFlags::new_index() }; + // FIXME: handle page cache is full let root_page = pager.btree_create(flag); let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap();