From 39a75147d4ed35f4dc79f5a65a5eb302a4ea3ced Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 13 Dec 2024 21:41:04 +0100 Subject: [PATCH] Page cache by page_number and frame_number Since page cache is now shared by default, we need to cache pages by page number and something else. I chose to go with max_frame of connection, because this connection will have a max_frame set until from the start of a transaction until the end of it. With key pairs of (pgno, max_frame) we make sure each connection is caching based on the snapshot it is at as two different connections might have the same pageno being using but a different frame. If both have same max_frame then they will share same page. --- bindings/wasm/lib.rs | 17 +-- core/lib.rs | 13 +- core/storage/mod.rs | 1 + core/storage/page_cache.rs | 191 +++++++++++++++++++++++++ core/storage/pager.rs | 245 ++++----------------------------- core/storage/sqlite3_ondisk.rs | 6 + core/storage/wal.rs | 155 +++++++++++++++++---- 7 files changed, 375 insertions(+), 253 deletions(-) create mode 100644 core/storage/page_cache.rs diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index a1febb5d7..c456cb617 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -1,4 +1,6 @@ -use limbo_core::{maybe_init_database_file, OpenFlags, Pager, Result, WalFile, WalFileShared}; +use limbo_core::{ + maybe_init_database_file, BufferPool, OpenFlags, Pager, Result, WalFile, WalFileShared, +}; use std::cell::RefCell; use std::rc::Rc; use std::sync::Arc; @@ -26,17 +28,19 @@ impl Database { // ensure db header is there io.run_once().unwrap(); + let page_size = db_header.borrow().page_size; + let wal_path = format!("{}-wal", path); - let wal_shared = - WalFileShared::open_shared(&io, wal_path.as_str(), db_header.borrow().page_size) - .unwrap(); + let wal_shared = WalFileShared::open_shared(&io, wal_path.as_str(), page_size).unwrap(); + let buffer_pool = Rc::new(BufferPool::new(page_size as usize)); let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), db_header.borrow().page_size as usize, wal_shared.clone(), + buffer_pool.clone(), ))); - let db = limbo_core::Database::open(io, page_io, wal, wal_shared).unwrap(); + let db = limbo_core::Database::open(io, page_io, wal, wal_shared, buffer_pool).unwrap(); let conn = db.connect(); Database { db, conn } } @@ -258,9 +262,6 @@ impl DatabaseStorage { } } -#[allow(dead_code)] -struct BufferPool {} - impl limbo_core::DatabaseStorage for DatabaseStorage { fn read_page(&self, page_idx: usize, c: Rc) -> Result<()> { let r = match &(*c) { diff --git a/core/lib.rs b/core/lib.rs index 3a608af4d..48736c4c0 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -27,7 +27,8 @@ use std::{cell::RefCell, rc::Rc}; use storage::btree::btree_init_page; #[cfg(feature = "fs")] use storage::database::FileStorage; -use storage::pager::{allocate_page, DumbLruPageCache}; +use storage::page_cache::DumbLruPageCache; +use storage::pager::allocate_page; use storage::sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE}; pub use storage::wal::WalFile; pub use storage::wal::WalFileShared; @@ -82,14 +83,16 @@ impl Database { let wal_path = format!("{}-wal", path); let db_header = Pager::begin_open(page_io.clone())?; io.run_once()?; - let wal_shared = - WalFileShared::open_shared(&io, wal_path.as_str(), db_header.borrow().page_size)?; + let page_size = db_header.borrow().page_size; + let wal_shared = WalFileShared::open_shared(&io, wal_path.as_str(), page_size)?; + let buffer_pool = Rc::new(BufferPool::new(page_size as usize)); let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), db_header.borrow().page_size as usize, wal_shared.clone(), + buffer_pool.clone(), ))); - Self::open(io, page_io, wal, wal_shared) + Self::open(io, page_io, wal, wal_shared, buffer_pool) } pub fn open( @@ -97,6 +100,7 @@ impl Database { page_io: Rc, wal: Rc>, shared_wal: Arc>, + buffer_pool: Rc, ) -> Result> { let db_header = Pager::begin_open(page_io.clone())?; io.run_once()?; @@ -111,6 +115,7 @@ impl Database { wal, io.clone(), shared_page_cache.clone(), + buffer_pool, )?); let bootstrap_schema = Rc::new(RefCell::new(Schema::new())); let conn = Rc::new(Connection { diff --git a/core/storage/mod.rs b/core/storage/mod.rs index a5454c062..b3e9c9df6 100644 --- a/core/storage/mod.rs +++ b/core/storage/mod.rs @@ -14,6 +14,7 @@ pub(crate) mod btree; pub(crate) mod buffer_pool; pub(crate) mod database; +pub(crate) mod page_cache; pub(crate) mod pager; pub(crate) mod sqlite3_ondisk; pub(crate) mod wal; diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs new file mode 100644 index 000000000..e21433af1 --- /dev/null +++ b/core/storage/page_cache.rs @@ -0,0 +1,191 @@ +use std::{cell::RefCell, collections::HashMap, ptr::NonNull}; + +use log::debug; + +use super::pager::PageRef; + +// In limbo, page cache is shared by default, meaning that multiple frames from WAL can reside in +// the cache, meaning, we need a way to differentiate between pages cached in different +// connections. For this we include the max_frame that will read a connection from so that if two +// connections have different max_frames, they might or not have different frame read from WAL. +// +// WAL was introduced after Shared cache in SQLite, so this is why these two features don't work +// well together because pages with different snapshots may collide. +#[derive(Debug, Eq, Hash, PartialEq, Clone)] +pub struct PageCacheKey { + pgno: usize, + max_frame: Option, +} + +#[allow(dead_code)] +struct PageCacheEntry { + key: PageCacheKey, + page: PageRef, + prev: Option>, + next: Option>, +} + +impl PageCacheEntry { + fn as_non_null(&mut self) -> NonNull { + NonNull::new(&mut *self).unwrap() + } +} + +pub struct DumbLruPageCache { + capacity: usize, + map: RefCell>>, + head: RefCell>>, + tail: RefCell>>, +} +unsafe impl Send for DumbLruPageCache {} +unsafe impl Sync for DumbLruPageCache {} + +impl PageCacheKey { + pub fn new(pgno: usize, max_frame: Option) -> Self { + Self { pgno, max_frame } + } +} +impl DumbLruPageCache { + pub fn new(capacity: usize) -> Self { + Self { + capacity, + map: RefCell::new(HashMap::new()), + head: RefCell::new(None), + tail: RefCell::new(None), + } + } + + pub fn contains_key(&mut self, key: &PageCacheKey) -> bool { + self.map.borrow().contains_key(key) + } + + pub fn insert(&mut self, key: PageCacheKey, value: PageRef) { + self._delete(key.clone(), false); + debug!("cache_insert(key={:?})", key); + let mut entry = Box::new(PageCacheEntry { + key: key.clone(), + next: None, + prev: None, + page: value, + }); + self.touch(&mut entry); + + if self.map.borrow().len() >= self.capacity { + self.pop_if_not_dirty(); + } + let b = Box::into_raw(entry); + let as_non_null = NonNull::new(b).unwrap(); + self.map.borrow_mut().insert(key, as_non_null); + } + + pub fn delete(&mut self, key: PageCacheKey) { + self._delete(key, true) + } + + pub fn _delete(&mut self, key: PageCacheKey, clean_page: bool) { + debug!("cache_delete(key={:?}, clean={})", key, clean_page); + let ptr = self.map.borrow_mut().remove(&key); + if ptr.is_none() { + return; + } + let mut ptr = ptr.unwrap(); + { + let ptr = unsafe { ptr.as_mut() }; + self.detach(ptr, clean_page); + } + unsafe { std::ptr::drop_in_place(ptr.as_ptr()) }; + } + + fn get_ptr(&mut self, key: &PageCacheKey) -> Option> { + let m = self.map.borrow_mut(); + let ptr = m.get(key); + ptr.copied() + } + + pub fn get(&mut self, key: &PageCacheKey) -> Option { + debug!("cache_get(key={:?})", key); + let ptr = self.get_ptr(key); + ptr?; + let ptr = unsafe { ptr.unwrap().as_mut() }; + let page = ptr.page.clone(); + //self.detach(ptr); + self.touch(ptr); + Some(page) + } + + pub fn resize(&mut self, capacity: usize) { + let _ = capacity; + todo!(); + } + + fn detach(&mut self, entry: &mut PageCacheEntry, clean_page: bool) { + let mut current = entry.as_non_null(); + + if clean_page { + // evict buffer + let page = &entry.page; + page.clear_loaded(); + debug!("cleaning up page {}", page.get().id); + let _ = page.get().contents.take(); + } + + let (next, prev) = unsafe { + let c = current.as_mut(); + let next = c.next; + let prev = c.prev; + c.prev = None; + c.next = None; + (next, prev) + }; + + // detach + match (prev, next) { + (None, None) => {} + (None, Some(_)) => todo!(), + (Some(p), None) => { + self.tail = RefCell::new(Some(p)); + } + (Some(mut p), Some(mut n)) => unsafe { + let p_mut = p.as_mut(); + p_mut.next = Some(n); + let n_mut = n.as_mut(); + n_mut.prev = Some(p); + }, + }; + } + + fn touch(&mut self, entry: &mut PageCacheEntry) { + let mut current = entry.as_non_null(); + unsafe { + let c = current.as_mut(); + c.next = *self.head.borrow(); + } + + if let Some(mut head) = *self.head.borrow_mut() { + unsafe { + let head = head.as_mut(); + head.prev = Some(current); + } + } + } + + fn pop_if_not_dirty(&mut self) { + let tail = *self.tail.borrow(); + if tail.is_none() { + return; + } + let tail = unsafe { tail.unwrap().as_mut() }; + if tail.page.is_dirty() { + // TODO: drop from another clean entry? + return; + } + self.detach(tail, true); + } + + pub fn clear(&mut self) { + let to_remove: Vec = self.map.borrow().iter().map(|v| v.0.clone()).collect(); + for key in to_remove { + self.delete(key); + } + } +} diff --git a/core/storage/pager.rs b/core/storage/pager.rs index f0e8369fd..690d711bf 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -4,15 +4,13 @@ use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent}; use crate::storage::wal::Wal; use crate::{Buffer, Result}; use log::{debug, trace}; -use sieve_cache::SieveCache; use std::cell::{RefCell, UnsafeCell}; -use std::collections::{HashMap, HashSet}; -use std::hash::Hash; -use std::ptr::{drop_in_place, NonNull}; +use std::collections::HashSet; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; +use super::page_cache::{DumbLruPageCache, PageCacheKey}; use super::wal::CheckpointStatus; pub struct PageInner { @@ -117,198 +115,6 @@ impl Page { } } -#[allow(dead_code)] -struct PageCacheEntry { - key: usize, - page: PageRef, - prev: Option>, - next: Option>, -} - -impl PageCacheEntry { - fn as_non_null(&mut self) -> NonNull { - NonNull::new(&mut *self).unwrap() - } -} - -pub struct DumbLruPageCache { - capacity: usize, - map: RefCell>>, - head: RefCell>>, - tail: RefCell>>, -} -unsafe impl Send for DumbLruPageCache {} -unsafe impl Sync for DumbLruPageCache {} - -impl DumbLruPageCache { - pub fn new(capacity: usize) -> Self { - Self { - capacity, - map: RefCell::new(HashMap::new()), - head: RefCell::new(None), - tail: RefCell::new(None), - } - } - - pub fn contains_key(&mut self, key: usize) -> bool { - self.map.borrow().contains_key(&key) - } - - pub fn insert(&mut self, key: usize, value: PageRef) { - self._delete(key, false); - debug!("cache_insert(key={})", key); - let mut entry = Box::new(PageCacheEntry { - key, - next: None, - prev: None, - page: value, - }); - self.touch(&mut entry); - - if self.map.borrow().len() >= self.capacity { - self.pop_if_not_dirty(); - } - let b = Box::into_raw(entry); - let as_non_null = NonNull::new(b).unwrap(); - self.map.borrow_mut().insert(key, as_non_null); - } - - pub fn delete(&mut self, key: usize) { - self._delete(key, true) - } - - pub fn _delete(&mut self, key: usize, clean_page: bool) { - debug!("cache_delete(key={}, clean={})", key, clean_page); - let ptr = self.map.borrow_mut().remove(&key); - if ptr.is_none() { - return; - } - let mut ptr = ptr.unwrap(); - { - let ptr = unsafe { ptr.as_mut() }; - self.detach(ptr, clean_page); - } - unsafe { drop_in_place(ptr.as_ptr()) }; - } - - fn get_ptr(&mut self, key: usize) -> Option> { - let m = self.map.borrow_mut(); - let ptr = m.get(&key); - ptr.copied() - } - - pub fn get(&mut self, key: &usize) -> Option { - debug!("cache_get(key={})", key); - let ptr = self.get_ptr(*key); - ptr?; - let ptr = unsafe { ptr.unwrap().as_mut() }; - let page = ptr.page.clone(); - //self.detach(ptr); - self.touch(ptr); - Some(page) - } - - pub fn resize(&mut self, capacity: usize) { - let _ = capacity; - todo!(); - } - - fn detach(&mut self, entry: &mut PageCacheEntry, clean_page: bool) { - let mut current = entry.as_non_null(); - - if clean_page { - // evict buffer - let page = &entry.page; - page.clear_loaded(); - debug!("cleaning up page {}", page.get().id); - let _ = page.get().contents.take(); - } - - let (next, prev) = unsafe { - let c = current.as_mut(); - let next = c.next; - let prev = c.prev; - c.prev = None; - c.next = None; - (next, prev) - }; - - // detach - match (prev, next) { - (None, None) => {} - (None, Some(_)) => todo!(), - (Some(p), None) => { - self.tail = RefCell::new(Some(p)); - } - (Some(mut p), Some(mut n)) => unsafe { - let p_mut = p.as_mut(); - p_mut.next = Some(n); - let n_mut = n.as_mut(); - n_mut.prev = Some(p); - }, - }; - } - - fn touch(&mut self, entry: &mut PageCacheEntry) { - let mut current = entry.as_non_null(); - unsafe { - let c = current.as_mut(); - c.next = *self.head.borrow(); - } - - if let Some(mut head) = *self.head.borrow_mut() { - unsafe { - let head = head.as_mut(); - head.prev = Some(current); - } - } - } - - fn pop_if_not_dirty(&mut self) { - let tail = *self.tail.borrow(); - if tail.is_none() { - return; - } - let tail = unsafe { tail.unwrap().as_mut() }; - if tail.page.is_dirty() { - // TODO: drop from another clean entry? - return; - } - self.detach(tail, true); - } - - fn clear(&mut self) { - let to_remove: Vec = self.map.borrow().iter().map(|v| *v.0).collect(); - for key in to_remove { - self.delete(key); - } - } -} - -#[allow(dead_code)] -pub struct PageCache { - cache: SieveCache, -} - -#[allow(dead_code)] -impl PageCache { - pub fn new(cache: SieveCache) -> Self { - Self { cache } - } - - pub fn insert(&mut self, key: K, value: V) { - self.cache.insert(key, value); - } - - pub fn get(&mut self, key: &K) -> Option<&V> { - self.cache.get(key) - } - - pub fn resize(&mut self, capacity: usize) { - self.cache = SieveCache::new(capacity).unwrap(); - } -} - #[derive(Clone)] enum FlushState { Start, @@ -368,14 +174,11 @@ impl Pager { wal: Rc>, io: Arc, page_cache: Arc>, + buffer_pool: Rc, ) -> Result { - let db_header = RefCell::borrow(&db_header_ref); - let page_size = db_header.page_size as usize; - let buffer_pool = Rc::new(BufferPool::new(page_size)); Ok(Self { page_io, wal, - buffer_pool, page_cache, io, dirty_pages: Rc::new(RefCell::new(HashSet::new())), @@ -387,6 +190,7 @@ impl Pager { syncing: Rc::new(RefCell::new(false)), checkpoint_state: RefCell::new(CheckpointState::Checkpoint), checkpoint_inflight: Rc::new(RefCell::new(0)), + buffer_pool, }) } @@ -413,7 +217,8 @@ impl Pager { pub fn read_page(&self, page_idx: usize) -> crate::Result { trace!("read_page(page_idx = {})", page_idx); let mut page_cache = self.page_cache.write().unwrap(); - if let Some(page) = page_cache.get(&page_idx) { + let page_key = PageCacheKey::new(page_idx, Some(self.wal.borrow().get_max_frame())); + if let Some(page) = page_cache.get(&page_key) { trace!("read_page(page_idx = {}) = cached", page_idx); return Ok(page.clone()); } @@ -429,7 +234,7 @@ impl Pager { } // TODO(pere) ensure page is inserted, we should probably first insert to page cache // and if successful, read frame or page - page_cache.insert(page_idx, page.clone()); + page_cache.insert(page_key, page.clone()); return Ok(page); } sqlite3_ondisk::begin_read_page( @@ -439,7 +244,7 @@ impl Pager { page_idx, )?; // TODO(pere) ensure page is inserted - page_cache.insert(page_idx, page.clone()); + page_cache.insert(page_key, page.clone()); Ok(page) } @@ -449,6 +254,7 @@ impl Pager { trace!("load_page(page_idx = {})", id); let mut page_cache = self.page_cache.write().unwrap(); page.set_locked(); + let page_key = PageCacheKey::new(id, Some(self.wal.borrow().get_max_frame())); if let Some(frame_id) = self.wal.borrow().find_frame(id as u64)? { self.wal .borrow() @@ -457,8 +263,8 @@ impl Pager { page.set_uptodate(); } // TODO(pere) ensure page is inserted - if !page_cache.contains_key(id) { - page_cache.insert(id, page.clone()); + if !page_cache.contains_key(&page_key) { + page_cache.insert(page_key, page.clone()); } return Ok(()); } @@ -469,8 +275,8 @@ impl Pager { id, )?; // TODO(pere) ensure page is inserted - if !page_cache.contains_key(id) { - page_cache.insert(id, page.clone()); + if !page_cache.contains_key(&page_key) { + page_cache.insert(page_key, page.clone()); } Ok(()) } @@ -500,9 +306,11 @@ impl Pager { let db_size = self.db_header.borrow().database_size; for page_id in self.dirty_pages.borrow().iter() { let mut cache = self.page_cache.write().unwrap(); - let page = cache.get(page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); + let page_key = + PageCacheKey::new(*page_id, Some(self.wal.borrow().get_max_frame())); + let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); - debug!("appending frame {} {:?}", page_id, page_type); + log::trace!("cacheflush(page={}, page_type={:?}", page_id, page_type); self.wal.borrow_mut().append_frame( page.clone(), db_size, @@ -565,7 +373,7 @@ impl Pager { pub fn checkpoint(&self) -> Result { loop { let state = self.checkpoint_state.borrow().clone(); - log::trace!("checkpoint(state={:?})", state); + log::trace!("pager_checkpoint(state={:?})", state); match state { CheckpointState::Checkpoint => { let in_flight = self.checkpoint_inflight.clone(); @@ -641,7 +449,9 @@ impl Pager { page.set_dirty(); self.add_dirty(page.get().id); let mut cache = self.page_cache.write().unwrap(); - cache.insert(page.get().id, page.clone()); + let page_key = + PageCacheKey::new(page.get().id, Some(self.wal.borrow().get_max_frame())); + cache.insert(page_key, page.clone()); } Ok(page) } @@ -649,7 +459,8 @@ impl Pager { pub fn put_loaded_page(&self, id: usize, page: PageRef) { let mut cache = self.page_cache.write().unwrap(); // cache insert invalidates previous page - cache.insert(id, page.clone()); + let page_key = PageCacheKey::new(id, Some(self.wal.borrow().get_max_frame())); + cache.insert(page_key, page.clone()); page.set_loaded(); } @@ -682,7 +493,9 @@ pub fn allocate_page(page_id: usize, buffer_pool: &Rc, offset: usize mod tests { use std::sync::{Arc, RwLock}; - use super::{DumbLruPageCache, Page}; + use crate::storage::page_cache::{DumbLruPageCache, PageCacheKey}; + + use super::Page; #[test] fn test_shared_cache() { @@ -693,12 +506,14 @@ mod tests { let cache = cache.clone(); std::thread::spawn(move || { let mut cache = cache.write().unwrap(); - cache.insert(1, Arc::new(Page::new(1))); + let page_key = PageCacheKey::new(1, None); + cache.insert(page_key, Arc::new(Page::new(1))); }) }; let _ = thread.join(); let mut cache = cache.write().unwrap(); - let page = cache.get(&1); + let page_key = PageCacheKey::new(1, None); + let page = cache.get(&page_key); assert_eq!(page.unwrap().get().id, 1); } } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 3d7b7ae11..a1f58d38a 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -581,6 +581,7 @@ pub fn begin_write_btree_page( page: &PageRef, write_counter: Rc>, ) -> Result<()> { + log::trace!("begin_write_btree_page(page={})", page.get().id); let page_source = &pager.page_io; let page_finish = page.clone(); @@ -1039,6 +1040,11 @@ pub fn begin_read_wal_frame( buffer_pool: Rc, page: PageRef, ) -> Result<()> { + log::trace!( + "begin_read_wal_frame(offset={}, page={})", + offset, + page.get().id + ); let buf = buffer_pool.get(); let drop_fn = Rc::new(move |buf| { let buffer_pool = buffer_pool.clone(); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index c7938330a..9a6f281b5 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -8,12 +8,13 @@ use crate::io::{File, SyncCompletion, IO}; use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; -use crate::Completion; use crate::Result; +use crate::{Completion, Page}; use self::sqlite3_ondisk::{checksum_wal, WAL_MAGIC_BE, WAL_MAGIC_LE}; use super::buffer_pool::BufferPool; +use super::page_cache::PageCacheKey; use super::pager::{PageRef, Pager}; use super::sqlite3_ondisk::{self, begin_write_btree_page, WalHeader}; @@ -53,16 +54,27 @@ pub trait Wal { write_counter: Rc>, ) -> Result; fn sync(&mut self) -> Result; + fn get_max_frame(&self) -> u64; + fn get_min_frame(&self) -> u64; +} + +struct OngoingCheckpoint { + page: PageRef, + state: CheckpointState, + min_frame: u64, + max_frame: u64, + current_page: u64, } pub struct WalFile { io: Arc, + buffer_pool: Rc, syncing: Rc>, page_size: usize, - ongoing_checkpoint: HashSet, shared: Arc>, + ongoing_checkpoint: OngoingCheckpoint, checkpoint_threshold: usize, // min and max frames for this connection max_frame: u64, @@ -75,11 +87,22 @@ pub struct WalFileShared { max_frame: u64, nbackfills: u64, // Maps pgno to frame id and offset in wal file - frame_cache: HashMap>, // FIXME: for now let's use a simple hashmap instead of a shm file + frame_cache: HashMap>, // we will avoid shm files because we are not + // multiprocess + pages_in_frames: Vec, last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL file: Rc, } +#[derive(Debug, Copy, Clone)] +pub enum CheckpointState { + Start, + ReadFrame, + WaitReadFrame, + WritePage, + Done, +} + pub enum CheckpointStatus { Done, IO, @@ -167,6 +190,7 @@ impl Wal for WalFile { Some(frames) => frames.push(frame_id), None => { shared.frame_cache.insert(page_id as u64, vec![frame_id]); + shared.pages_in_frames.push(page_id as u64); } } } @@ -194,30 +218,88 @@ impl Wal for WalFile { pager: &Pager, write_counter: Rc>, ) -> Result { - let mut shared = self.shared.write().unwrap(); - for (page_id, _frames) in shared.frame_cache.iter() { - // move page from WAL to database file - // TODO(Pere): use splice syscall in linux to do zero-copy file page movements to improve perf - let page_id = *page_id as usize; - if self.ongoing_checkpoint.contains(&page_id) { - continue; - } + 'checkpoint_loop: loop { + let state = self.ongoing_checkpoint.state; + log::debug!("checkpoint(state={:?})", state); + match state { + CheckpointState::Start => { + // TODO(pere): check what frames are safe to checkpoint between many readers! + self.ongoing_checkpoint.min_frame = self.min_frame; + self.ongoing_checkpoint.max_frame = self.max_frame; + self.ongoing_checkpoint.current_page = 0; + self.ongoing_checkpoint.state = CheckpointState::ReadFrame; + } + CheckpointState::ReadFrame => { + let shared = self.shared.read().unwrap(); + for page in shared + .pages_in_frames + .iter() + .skip(self.ongoing_checkpoint.current_page as usize) + { + let frames = shared + .frame_cache + .get(page) + .expect("page must be in frame cache if it's in list"); - let page = pager.read_page(page_id)?; - if page.is_locked() { - return Ok(CheckpointStatus::IO); + for frame in frames.iter().rev() { + if *frame <= self.ongoing_checkpoint.max_frame { + log::debug!( + "checkpoint page(state={:?}, page={}, frame={})", + state, + *page, + *frame + ); + self.ongoing_checkpoint.page.get().id = *page as usize; + self.read_frame( + *frame, + self.ongoing_checkpoint.page.clone(), + self.buffer_pool.clone(), + ); + self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame; + self.ongoing_checkpoint.current_page += 1; + continue 'checkpoint_loop; + } + } + self.ongoing_checkpoint.current_page += 1; + } + self.ongoing_checkpoint.state = CheckpointState::Done; + } + CheckpointState::WaitReadFrame => { + if self.ongoing_checkpoint.page.is_locked() { + return Ok(CheckpointStatus::IO); + } else { + self.ongoing_checkpoint.state = CheckpointState::WritePage; + } + } + CheckpointState::WritePage => { + begin_write_btree_page( + pager, + &self.ongoing_checkpoint.page, + write_counter.clone(), + )?; + let shared = self.shared.read().unwrap(); + if (self.ongoing_checkpoint.current_page as usize) + < shared.pages_in_frames.len() + { + self.ongoing_checkpoint.state = CheckpointState::ReadFrame; + } else { + self.ongoing_checkpoint.state = CheckpointState::Done; + } + } + CheckpointState::Done => { + if *write_counter.borrow() > 0 { + return Ok(CheckpointStatus::IO); + } + let mut shared = self.shared.write().unwrap(); + shared.frame_cache.clear(); + shared.pages_in_frames.clear(); + shared.max_frame = 0; + shared.nbackfills = 0; + self.ongoing_checkpoint.state = CheckpointState::Start; + return Ok(CheckpointStatus::Done); + } } - - begin_write_btree_page(pager, &page, write_counter.clone())?; - self.ongoing_checkpoint.insert(page_id); } - - // TODO: only clear checkpointed frames - shared.frame_cache.clear(); - shared.max_frame = 0; - shared.nbackfills = 0; - self.ongoing_checkpoint.clear(); - Ok(CheckpointStatus::Done) } fn sync(&mut self) -> Result { @@ -238,19 +320,39 @@ impl Wal for WalFile { Ok(CheckpointStatus::Done) } } + + fn get_max_frame(&self) -> u64 { + self.max_frame + } + + fn get_min_frame(&self) -> u64 { + self.min_frame + } } impl WalFile { - pub fn new(io: Arc, page_size: usize, shared: Arc>) -> Self { + pub fn new( + io: Arc, + page_size: usize, + shared: Arc>, + buffer_pool: Rc, + ) -> Self { Self { io, shared, - ongoing_checkpoint: HashSet::new(), + ongoing_checkpoint: OngoingCheckpoint { + page: Arc::new(Page::new(0)), + state: CheckpointState::Start, + min_frame: 0, + max_frame: 0, + current_page: 0, + }, syncing: Rc::new(RefCell::new(false)), checkpoint_threshold: 1000, page_size, max_frame: 0, min_frame: 0, + buffer_pool, } } @@ -322,6 +424,7 @@ impl WalFileShared { frame_cache: HashMap::new(), last_checksum: checksum, file, + pages_in_frames: Vec::new(), }; Ok(Arc::new(RwLock::new(shared))) }