mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-15 21:14:21 +01:00
cacheflush clear cache
This commit is contained in:
@@ -294,10 +294,22 @@ impl DumbLruPageCache {
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) -> Result<(), CacheError> {
|
||||
let keys_to_remove: Vec<PageCacheKey> = self.map.borrow().keys_cloned();
|
||||
for key in keys_to_remove {
|
||||
self.delete(key)?;
|
||||
let mut current = *self.head.borrow();
|
||||
while let Some(current_entry) = current {
|
||||
unsafe {
|
||||
self.map.borrow_mut().remove(¤t_entry.as_ref().key);
|
||||
}
|
||||
let next = unsafe { current_entry.as_ref().next };
|
||||
self.detach(current_entry, true)?;
|
||||
unsafe {
|
||||
assert!(!current_entry.as_ref().page.is_dirty());
|
||||
}
|
||||
unsafe { std::ptr::drop_in_place(current_entry.as_ptr()) };
|
||||
current = next;
|
||||
}
|
||||
let _ = self.head.take();
|
||||
let _ = self.tail.take();
|
||||
|
||||
assert!(self.head.borrow().is_none());
|
||||
assert!(self.tail.borrow().is_none());
|
||||
assert!(self.map.borrow().is_empty());
|
||||
@@ -585,6 +597,13 @@ impl PageHashMap {
|
||||
}
|
||||
new_hash_map
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
for bucket in &mut self.buckets {
|
||||
bucket.clear();
|
||||
}
|
||||
self.size = 0;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -411,11 +411,6 @@ impl Pager {
|
||||
Some(wal) => wal.borrow().get_max_frame(),
|
||||
None => 0,
|
||||
};
|
||||
let max_frame_after_append = self.wal.as_ref().map(|wal| {
|
||||
wal.borrow().get_max_frame() + self.dirty_pages.borrow().len() as u64
|
||||
});
|
||||
tracing::error!("start flush");
|
||||
tracing::error!("pages={:?}", self.dirty_pages.borrow());
|
||||
for page_id in self.dirty_pages.borrow().iter() {
|
||||
let mut cache = self.page_cache.write();
|
||||
let page_key = PageCacheKey::new(*page_id, Some(max_frame));
|
||||
@@ -428,24 +423,13 @@ impl Pager {
|
||||
db_size,
|
||||
self.flush_info.borrow().in_flight_writes.clone(),
|
||||
)?;
|
||||
// Assuming writer will always end append frames at frameid == this_transaction.max_frame + dirty_pages,
|
||||
// we can insert this page with a new key into that new "snapshot" that has the newest max frame. We don't
|
||||
// simply clone the page and insert with new key because next cache.delete will invalidate the contents of the page,
|
||||
// therefore we need to clone the contents itself and place them on the new page. Cloning contents should be fast because
|
||||
// buffer is wrapped around an Arc.
|
||||
let new_page = Page::new(*page_id);
|
||||
new_page.get().contents = Some(page.get_contents().clone());
|
||||
new_page.set_loaded();
|
||||
let new_page: Arc<Page> = Arc::new(new_page);
|
||||
let new_page_key = PageCacheKey::new(*page_id, max_frame_after_append);
|
||||
cache.insert(new_page_key, new_page).map_err(|e| {LimboError::InternalError(format!("Failed to delete page {:?} from cache during flush: {:?}. Might be actively referenced.", page_id, e))})?;
|
||||
}
|
||||
page.clear_dirty();
|
||||
// This page is no longer valid.
|
||||
// For example:
|
||||
// We took page with key (page_num, max_frame) -- this page is no longer valid for that max_frame
|
||||
// so it must be invalidated. There shouldn't be any active refs.
|
||||
cache.delete(page_key).map_err(|e| {LimboError::InternalError(format!("Failed to delete page {:?} from cache during flush: {:?}. Might be actively referenced.", page_id, e))})?;
|
||||
}
|
||||
// This is okay assuming we use shared cache by default.
|
||||
{
|
||||
let mut cache = self.page_cache.write();
|
||||
cache.clear().unwrap();
|
||||
}
|
||||
self.dirty_pages.borrow_mut().clear();
|
||||
self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames;
|
||||
|
||||
Reference in New Issue
Block a user