diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index f68182d9d..982118a36 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -294,10 +294,22 @@ impl DumbLruPageCache { } pub fn clear(&mut self) -> Result<(), CacheError> { - let keys_to_remove: Vec = self.map.borrow().keys_cloned(); - for key in keys_to_remove { - self.delete(key)?; + let mut current = *self.head.borrow(); + while let Some(current_entry) = current { + unsafe { + self.map.borrow_mut().remove(¤t_entry.as_ref().key); + } + let next = unsafe { current_entry.as_ref().next }; + self.detach(current_entry, true)?; + unsafe { + assert!(!current_entry.as_ref().page.is_dirty()); + } + unsafe { std::ptr::drop_in_place(current_entry.as_ptr()) }; + current = next; } + let _ = self.head.take(); + let _ = self.tail.take(); + assert!(self.head.borrow().is_none()); assert!(self.tail.borrow().is_none()); assert!(self.map.borrow().is_empty()); @@ -585,6 +597,13 @@ impl PageHashMap { } new_hash_map } + + pub fn clear(&mut self) { + for bucket in &mut self.buckets { + bucket.clear(); + } + self.size = 0; + } } #[cfg(test)] diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 2d6fd5637..e289e8281 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -411,11 +411,6 @@ impl Pager { Some(wal) => wal.borrow().get_max_frame(), None => 0, }; - let max_frame_after_append = self.wal.as_ref().map(|wal| { - wal.borrow().get_max_frame() + self.dirty_pages.borrow().len() as u64 - }); - tracing::error!("start flush"); - tracing::error!("pages={:?}", self.dirty_pages.borrow()); for page_id in self.dirty_pages.borrow().iter() { let mut cache = self.page_cache.write(); let page_key = PageCacheKey::new(*page_id, Some(max_frame)); @@ -428,24 +423,13 @@ impl Pager { db_size, self.flush_info.borrow().in_flight_writes.clone(), )?; - // Assuming writer will always end append frames at frameid == this_transaction.max_frame + dirty_pages, - // we can insert this page with a new key into that new "snapshot" that has the newest max frame. We don't - // simply clone the page and insert with new key because next cache.delete will invalidate the contents of the page, - // therefore we need to clone the contents itself and place them on the new page. Cloning contents should be fast because - // buffer is wrapped around an Arc. - let new_page = Page::new(*page_id); - new_page.get().contents = Some(page.get_contents().clone()); - new_page.set_loaded(); - let new_page: Arc = Arc::new(new_page); - let new_page_key = PageCacheKey::new(*page_id, max_frame_after_append); - cache.insert(new_page_key, new_page).map_err(|e| {LimboError::InternalError(format!("Failed to delete page {:?} from cache during flush: {:?}. Might be actively referenced.", page_id, e))})?; } page.clear_dirty(); - // This page is no longer valid. - // For example: - // We took page with key (page_num, max_frame) -- this page is no longer valid for that max_frame - // so it must be invalidated. There shouldn't be any active refs. - cache.delete(page_key).map_err(|e| {LimboError::InternalError(format!("Failed to delete page {:?} from cache during flush: {:?}. Might be actively referenced.", page_id, e))})?; + } + // This is okay assuming we use shared cache by default. + { + let mut cache = self.page_cache.write(); + cache.clear().unwrap(); } self.dirty_pages.borrow_mut().clear(); self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames;