From adf72f2bf8475a7eef9cff5191fdfc4bbf9d65d2 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 19 May 2025 15:02:38 +0200 Subject: [PATCH] allow updating a page id in page cache --- core/storage/btree.rs | 4 +-- core/storage/page_cache.rs | 67 +++++++++++++++++++++++++------------- core/storage/pager.rs | 30 +++++++---------- 3 files changed, 59 insertions(+), 42 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 07fb65495..5958b1539 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -2814,8 +2814,8 @@ impl BTreeCursor { let page = page.as_ref().unwrap(); if *new_id != page.get().id { page.get().id = *new_id; - // FIXME: why would there be another page at this id/frame? - self.pager.put_loaded_page(*new_id, page.clone())?; + self.pager + .update_dirty_loaded_page_in_cache(*new_id, page.clone())?; } } diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index 566c8063a..1626119a0 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -71,14 +71,33 @@ impl DumbLruPageCache { } pub fn insert(&mut self, key: PageCacheKey, value: PageRef) -> Result<(), CacheError> { + self._insert(key, value, false) + } + + pub fn insert_ignore_existing( + &mut self, + key: PageCacheKey, + value: PageRef, + ) -> Result<(), CacheError> { + self._insert(key, value, true) + } + + pub fn _insert( + &mut self, + key: PageCacheKey, + value: PageRef, + ignore_exists: bool, + ) -> Result<(), CacheError> { trace!("insert(key={:?})", key); // Check first if page already exists in cache - if let Some(existing_page_ref) = self.get(&key) { - assert!( - Arc::ptr_eq(&value, &existing_page_ref), - "Attempted to insert different page with same key" - ); - return Err(CacheError::KeyExists); + if !ignore_exists { + if let Some(existing_page_ref) = self.get(&key) { + assert!( + Arc::ptr_eq(&value, &existing_page_ref), + "Attempted to insert different page with same key" + ); + return Err(CacheError::KeyExists); + } } self.make_room_for(1)?; let entry = Box::new(PageCacheEntry { @@ -268,15 +287,13 @@ impl DumbLruPageCache { } pub fn print(&mut self) { - println!("page_cache_len={}", self.map.borrow().len()); + tracing::debug!("page_cache_len={}", self.map.borrow().len()); let head_ptr = *self.head.borrow(); let mut current = head_ptr; - let mut last_ptr: Option> = None; while let Some(node) = current { unsafe { - println!("page={:?}", node.as_ref().key); + tracing::debug!("page={:?}", node.as_ref().key); let node_ref = node.as_ref(); - last_ptr = Some(node); current = node_ref.next; } } @@ -287,12 +304,10 @@ impl DumbLruPageCache { let mut this_keys = Vec::new(); let head_ptr = *self.head.borrow(); let mut current = head_ptr; - let mut last_ptr: Option> = None; while let Some(node) = current { unsafe { this_keys.push(node.as_ref().key.clone()); let node_ref = node.as_ref(); - last_ptr = Some(node); current = node_ref.next; } } @@ -428,6 +443,17 @@ impl DumbLruPageCache { "Head pointer mismatch after backward traversal" ); } + + #[cfg(test)] + /// For testing purposes, in case we use cursor api directly, we might want to unmark pages as dirty because we bypass the WAL transaction layer + pub fn unset_dirty_all_pages(&mut self) { + for (_, page) in self.map.borrow_mut().iter_mut() { + unsafe { + let entry = page.as_mut(); + entry.page.clear_dirty() + }; + } + } } #[cfg(test)] @@ -850,10 +876,9 @@ mod tests { let mut lru = LruCache::new(NonZeroUsize::new(10).unwrap()); for _ in 0..10000 { - dbg!(&lru); cache.print(); - for (key, page) in &lru { - println!("lru_page={:?}", key); + for (key, _) in &lru { + tracing::debug!("lru_page={:?}", key); } match rng.next_u64() % 2 { 0 => { @@ -866,7 +891,7 @@ mod tests { if let Some(_) = cache.peek(&key, false) { continue; // skip duplicate page ids } - println!("inserting page {:?}", key); + tracing::debug!("inserting page {:?}", key); match cache.insert(key.clone(), page.clone()) { Err(CacheError::Full | CacheError::ActiveRefs) => {} // Ignore Err(err) => { @@ -892,7 +917,7 @@ mod tests { let key: PageCacheKey = lru.iter().skip(i).next().unwrap().0.clone(); key }; - println!("removing page {:?}", key); + tracing::debug!("removing page {:?}", key); lru.pop(&key); assert!(cache.delete(key).is_ok()); } @@ -900,8 +925,8 @@ mod tests { } compare_to_lru(&mut cache, &lru); cache.print(); - for (key, page) in &lru { - println!("lru_page={:?}", key); + for (key, _) in &lru { + tracing::debug!("lru_page={:?}", key); } cache.verify_list_integrity(); for (key, page) in &lru { @@ -914,8 +939,6 @@ mod tests { } pub fn compare_to_lru(cache: &mut DumbLruPageCache, lru: &LruCache) { - use lru::LruCache; - let this_keys = cache.keys(); let mut lru_keys = Vec::new(); for (lru_key, _) in lru { @@ -924,7 +947,7 @@ mod tests { if this_keys != lru_keys { cache.print(); for (lru_key, _) in lru { - println!("lru_page={:?}", lru_key); + tracing::debug!("lru_page={:?}", lru_key); } assert_eq!(&this_keys, &lru_keys) } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index f369c9d57..0e6f718da 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -750,7 +750,11 @@ impl Pager { } } - pub fn put_loaded_page(&self, id: usize, page: PageRef) -> Result<(), LimboError> { + pub fn update_dirty_loaded_page_in_cache( + &self, + id: usize, + page: PageRef, + ) -> Result<(), LimboError> { let mut cache = self.page_cache.write(); let max_frame = match &self.wal { Some(wal) => wal.borrow().get_max_frame(), @@ -758,26 +762,16 @@ impl Pager { }; let page_key = PageCacheKey::new(id, Some(max_frame)); - // FIXME: is this correct? Why would there be another version of the page? - // Check if there's an existing page at this key and remove it - // This can happen when a page is being updated during a transaction - // or when WAL frames are being managed - if let Some(_existing_page_ref) = cache.get(&page_key) { - cache.delete(page_key.clone()).map_err(|e| { + // FIXME: use specific page key for writer instead of max frame, this will make readers not conflict + assert!(page.is_dirty()); + cache + .insert_ignore_existing(page_key, page.clone()) + .map_err(|e| { LimboError::InternalError(format!( - "put_loaded_page failed to remove old version of page {:?}: {:?}.", - page_key, e + "Failed to insert loaded page {} into cache: {:?}", + id, e )) })?; - } - - cache.insert(page_key, page.clone()).map_err(|e| { - LimboError::InternalError(format!( - "Failed to insert loaded page {} into cache: {:?}", - id, e - )) - })?; - page.set_loaded(); Ok(()) }