Page cache by page_number and frame_number

Since page cache is now shared by default, we need to cache pages by
page number and something else. I chose to go with max_frame of
connection, because this connection will have a max_frame set until from
the start of a transaction until the end of it.

With key pairs of (pgno, max_frame) we make sure each connection is
caching based on the snapshot it is at as two different connections
might have the same pageno being using but a different frame. If both
have same max_frame then they will share same page.
This commit is contained in:
Pere Diaz Bou
2024-12-13 21:41:04 +01:00
parent 138b3a00e8
commit 39a75147d4
7 changed files with 375 additions and 253 deletions

View File

@@ -1,4 +1,6 @@
use limbo_core::{maybe_init_database_file, OpenFlags, Pager, Result, WalFile, WalFileShared};
use limbo_core::{
maybe_init_database_file, BufferPool, OpenFlags, Pager, Result, WalFile, WalFileShared,
};
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
@@ -26,17 +28,19 @@ impl Database {
// ensure db header is there
io.run_once().unwrap();
let page_size = db_header.borrow().page_size;
let wal_path = format!("{}-wal", path);
let wal_shared =
WalFileShared::open_shared(&io, wal_path.as_str(), db_header.borrow().page_size)
.unwrap();
let wal_shared = WalFileShared::open_shared(&io, wal_path.as_str(), page_size).unwrap();
let buffer_pool = Rc::new(BufferPool::new(page_size as usize));
let wal = Rc::new(RefCell::new(WalFile::new(
io.clone(),
db_header.borrow().page_size as usize,
wal_shared.clone(),
buffer_pool.clone(),
)));
let db = limbo_core::Database::open(io, page_io, wal, wal_shared).unwrap();
let db = limbo_core::Database::open(io, page_io, wal, wal_shared, buffer_pool).unwrap();
let conn = db.connect();
Database { db, conn }
}
@@ -258,9 +262,6 @@ impl DatabaseStorage {
}
}
#[allow(dead_code)]
struct BufferPool {}
impl limbo_core::DatabaseStorage for DatabaseStorage {
fn read_page(&self, page_idx: usize, c: Rc<limbo_core::Completion>) -> Result<()> {
let r = match &(*c) {

View File

@@ -27,7 +27,8 @@ use std::{cell::RefCell, rc::Rc};
use storage::btree::btree_init_page;
#[cfg(feature = "fs")]
use storage::database::FileStorage;
use storage::pager::{allocate_page, DumbLruPageCache};
use storage::page_cache::DumbLruPageCache;
use storage::pager::allocate_page;
use storage::sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE};
pub use storage::wal::WalFile;
pub use storage::wal::WalFileShared;
@@ -82,14 +83,16 @@ impl Database {
let wal_path = format!("{}-wal", path);
let db_header = Pager::begin_open(page_io.clone())?;
io.run_once()?;
let wal_shared =
WalFileShared::open_shared(&io, wal_path.as_str(), db_header.borrow().page_size)?;
let page_size = db_header.borrow().page_size;
let wal_shared = WalFileShared::open_shared(&io, wal_path.as_str(), page_size)?;
let buffer_pool = Rc::new(BufferPool::new(page_size as usize));
let wal = Rc::new(RefCell::new(WalFile::new(
io.clone(),
db_header.borrow().page_size as usize,
wal_shared.clone(),
buffer_pool.clone(),
)));
Self::open(io, page_io, wal, wal_shared)
Self::open(io, page_io, wal, wal_shared, buffer_pool)
}
pub fn open(
@@ -97,6 +100,7 @@ impl Database {
page_io: Rc<dyn DatabaseStorage>,
wal: Rc<RefCell<dyn Wal>>,
shared_wal: Arc<RwLock<WalFileShared>>,
buffer_pool: Rc<BufferPool>,
) -> Result<Arc<Database>> {
let db_header = Pager::begin_open(page_io.clone())?;
io.run_once()?;
@@ -111,6 +115,7 @@ impl Database {
wal,
io.clone(),
shared_page_cache.clone(),
buffer_pool,
)?);
let bootstrap_schema = Rc::new(RefCell::new(Schema::new()));
let conn = Rc::new(Connection {

View File

@@ -14,6 +14,7 @@
pub(crate) mod btree;
pub(crate) mod buffer_pool;
pub(crate) mod database;
pub(crate) mod page_cache;
pub(crate) mod pager;
pub(crate) mod sqlite3_ondisk;
pub(crate) mod wal;

191
core/storage/page_cache.rs Normal file
View File

@@ -0,0 +1,191 @@
use std::{cell::RefCell, collections::HashMap, ptr::NonNull};
use log::debug;
use super::pager::PageRef;
// In limbo, page cache is shared by default, meaning that multiple frames from WAL can reside in
// the cache, meaning, we need a way to differentiate between pages cached in different
// connections. For this we include the max_frame that will read a connection from so that if two
// connections have different max_frames, they might or not have different frame read from WAL.
//
// WAL was introduced after Shared cache in SQLite, so this is why these two features don't work
// well together because pages with different snapshots may collide.
#[derive(Debug, Eq, Hash, PartialEq, Clone)]
pub struct PageCacheKey {
pgno: usize,
max_frame: Option<u64>,
}
#[allow(dead_code)]
struct PageCacheEntry {
key: PageCacheKey,
page: PageRef,
prev: Option<NonNull<PageCacheEntry>>,
next: Option<NonNull<PageCacheEntry>>,
}
impl PageCacheEntry {
fn as_non_null(&mut self) -> NonNull<PageCacheEntry> {
NonNull::new(&mut *self).unwrap()
}
}
pub struct DumbLruPageCache {
capacity: usize,
map: RefCell<HashMap<PageCacheKey, NonNull<PageCacheEntry>>>,
head: RefCell<Option<NonNull<PageCacheEntry>>>,
tail: RefCell<Option<NonNull<PageCacheEntry>>>,
}
unsafe impl Send for DumbLruPageCache {}
unsafe impl Sync for DumbLruPageCache {}
impl PageCacheKey {
pub fn new(pgno: usize, max_frame: Option<u64>) -> Self {
Self { pgno, max_frame }
}
}
impl DumbLruPageCache {
pub fn new(capacity: usize) -> Self {
Self {
capacity,
map: RefCell::new(HashMap::new()),
head: RefCell::new(None),
tail: RefCell::new(None),
}
}
pub fn contains_key(&mut self, key: &PageCacheKey) -> bool {
self.map.borrow().contains_key(key)
}
pub fn insert(&mut self, key: PageCacheKey, value: PageRef) {
self._delete(key.clone(), false);
debug!("cache_insert(key={:?})", key);
let mut entry = Box::new(PageCacheEntry {
key: key.clone(),
next: None,
prev: None,
page: value,
});
self.touch(&mut entry);
if self.map.borrow().len() >= self.capacity {
self.pop_if_not_dirty();
}
let b = Box::into_raw(entry);
let as_non_null = NonNull::new(b).unwrap();
self.map.borrow_mut().insert(key, as_non_null);
}
pub fn delete(&mut self, key: PageCacheKey) {
self._delete(key, true)
}
pub fn _delete(&mut self, key: PageCacheKey, clean_page: bool) {
debug!("cache_delete(key={:?}, clean={})", key, clean_page);
let ptr = self.map.borrow_mut().remove(&key);
if ptr.is_none() {
return;
}
let mut ptr = ptr.unwrap();
{
let ptr = unsafe { ptr.as_mut() };
self.detach(ptr, clean_page);
}
unsafe { std::ptr::drop_in_place(ptr.as_ptr()) };
}
fn get_ptr(&mut self, key: &PageCacheKey) -> Option<NonNull<PageCacheEntry>> {
let m = self.map.borrow_mut();
let ptr = m.get(key);
ptr.copied()
}
pub fn get(&mut self, key: &PageCacheKey) -> Option<PageRef> {
debug!("cache_get(key={:?})", key);
let ptr = self.get_ptr(key);
ptr?;
let ptr = unsafe { ptr.unwrap().as_mut() };
let page = ptr.page.clone();
//self.detach(ptr);
self.touch(ptr);
Some(page)
}
pub fn resize(&mut self, capacity: usize) {
let _ = capacity;
todo!();
}
fn detach(&mut self, entry: &mut PageCacheEntry, clean_page: bool) {
let mut current = entry.as_non_null();
if clean_page {
// evict buffer
let page = &entry.page;
page.clear_loaded();
debug!("cleaning up page {}", page.get().id);
let _ = page.get().contents.take();
}
let (next, prev) = unsafe {
let c = current.as_mut();
let next = c.next;
let prev = c.prev;
c.prev = None;
c.next = None;
(next, prev)
};
// detach
match (prev, next) {
(None, None) => {}
(None, Some(_)) => todo!(),
(Some(p), None) => {
self.tail = RefCell::new(Some(p));
}
(Some(mut p), Some(mut n)) => unsafe {
let p_mut = p.as_mut();
p_mut.next = Some(n);
let n_mut = n.as_mut();
n_mut.prev = Some(p);
},
};
}
fn touch(&mut self, entry: &mut PageCacheEntry) {
let mut current = entry.as_non_null();
unsafe {
let c = current.as_mut();
c.next = *self.head.borrow();
}
if let Some(mut head) = *self.head.borrow_mut() {
unsafe {
let head = head.as_mut();
head.prev = Some(current);
}
}
}
fn pop_if_not_dirty(&mut self) {
let tail = *self.tail.borrow();
if tail.is_none() {
return;
}
let tail = unsafe { tail.unwrap().as_mut() };
if tail.page.is_dirty() {
// TODO: drop from another clean entry?
return;
}
self.detach(tail, true);
}
pub fn clear(&mut self) {
let to_remove: Vec<PageCacheKey> = self.map.borrow().iter().map(|v| v.0.clone()).collect();
for key in to_remove {
self.delete(key);
}
}
}

View File

@@ -4,15 +4,13 @@ use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent};
use crate::storage::wal::Wal;
use crate::{Buffer, Result};
use log::{debug, trace};
use sieve_cache::SieveCache;
use std::cell::{RefCell, UnsafeCell};
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::ptr::{drop_in_place, NonNull};
use std::collections::HashSet;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use super::page_cache::{DumbLruPageCache, PageCacheKey};
use super::wal::CheckpointStatus;
pub struct PageInner {
@@ -117,198 +115,6 @@ impl Page {
}
}
#[allow(dead_code)]
struct PageCacheEntry {
key: usize,
page: PageRef,
prev: Option<NonNull<PageCacheEntry>>,
next: Option<NonNull<PageCacheEntry>>,
}
impl PageCacheEntry {
fn as_non_null(&mut self) -> NonNull<PageCacheEntry> {
NonNull::new(&mut *self).unwrap()
}
}
pub struct DumbLruPageCache {
capacity: usize,
map: RefCell<HashMap<usize, NonNull<PageCacheEntry>>>,
head: RefCell<Option<NonNull<PageCacheEntry>>>,
tail: RefCell<Option<NonNull<PageCacheEntry>>>,
}
unsafe impl Send for DumbLruPageCache {}
unsafe impl Sync for DumbLruPageCache {}
impl DumbLruPageCache {
pub fn new(capacity: usize) -> Self {
Self {
capacity,
map: RefCell::new(HashMap::new()),
head: RefCell::new(None),
tail: RefCell::new(None),
}
}
pub fn contains_key(&mut self, key: usize) -> bool {
self.map.borrow().contains_key(&key)
}
pub fn insert(&mut self, key: usize, value: PageRef) {
self._delete(key, false);
debug!("cache_insert(key={})", key);
let mut entry = Box::new(PageCacheEntry {
key,
next: None,
prev: None,
page: value,
});
self.touch(&mut entry);
if self.map.borrow().len() >= self.capacity {
self.pop_if_not_dirty();
}
let b = Box::into_raw(entry);
let as_non_null = NonNull::new(b).unwrap();
self.map.borrow_mut().insert(key, as_non_null);
}
pub fn delete(&mut self, key: usize) {
self._delete(key, true)
}
pub fn _delete(&mut self, key: usize, clean_page: bool) {
debug!("cache_delete(key={}, clean={})", key, clean_page);
let ptr = self.map.borrow_mut().remove(&key);
if ptr.is_none() {
return;
}
let mut ptr = ptr.unwrap();
{
let ptr = unsafe { ptr.as_mut() };
self.detach(ptr, clean_page);
}
unsafe { drop_in_place(ptr.as_ptr()) };
}
fn get_ptr(&mut self, key: usize) -> Option<NonNull<PageCacheEntry>> {
let m = self.map.borrow_mut();
let ptr = m.get(&key);
ptr.copied()
}
pub fn get(&mut self, key: &usize) -> Option<PageRef> {
debug!("cache_get(key={})", key);
let ptr = self.get_ptr(*key);
ptr?;
let ptr = unsafe { ptr.unwrap().as_mut() };
let page = ptr.page.clone();
//self.detach(ptr);
self.touch(ptr);
Some(page)
}
pub fn resize(&mut self, capacity: usize) {
let _ = capacity;
todo!();
}
fn detach(&mut self, entry: &mut PageCacheEntry, clean_page: bool) {
let mut current = entry.as_non_null();
if clean_page {
// evict buffer
let page = &entry.page;
page.clear_loaded();
debug!("cleaning up page {}", page.get().id);
let _ = page.get().contents.take();
}
let (next, prev) = unsafe {
let c = current.as_mut();
let next = c.next;
let prev = c.prev;
c.prev = None;
c.next = None;
(next, prev)
};
// detach
match (prev, next) {
(None, None) => {}
(None, Some(_)) => todo!(),
(Some(p), None) => {
self.tail = RefCell::new(Some(p));
}
(Some(mut p), Some(mut n)) => unsafe {
let p_mut = p.as_mut();
p_mut.next = Some(n);
let n_mut = n.as_mut();
n_mut.prev = Some(p);
},
};
}
fn touch(&mut self, entry: &mut PageCacheEntry) {
let mut current = entry.as_non_null();
unsafe {
let c = current.as_mut();
c.next = *self.head.borrow();
}
if let Some(mut head) = *self.head.borrow_mut() {
unsafe {
let head = head.as_mut();
head.prev = Some(current);
}
}
}
fn pop_if_not_dirty(&mut self) {
let tail = *self.tail.borrow();
if tail.is_none() {
return;
}
let tail = unsafe { tail.unwrap().as_mut() };
if tail.page.is_dirty() {
// TODO: drop from another clean entry?
return;
}
self.detach(tail, true);
}
fn clear(&mut self) {
let to_remove: Vec<usize> = self.map.borrow().iter().map(|v| *v.0).collect();
for key in to_remove {
self.delete(key);
}
}
}
#[allow(dead_code)]
pub struct PageCache<K: Eq + Hash + Clone, V> {
cache: SieveCache<K, V>,
}
#[allow(dead_code)]
impl<K: Eq + Hash + Clone, V> PageCache<K, V> {
pub fn new(cache: SieveCache<K, V>) -> Self {
Self { cache }
}
pub fn insert(&mut self, key: K, value: V) {
self.cache.insert(key, value);
}
pub fn get(&mut self, key: &K) -> Option<&V> {
self.cache.get(key)
}
pub fn resize(&mut self, capacity: usize) {
self.cache = SieveCache::new(capacity).unwrap();
}
}
#[derive(Clone)]
enum FlushState {
Start,
@@ -368,14 +174,11 @@ impl Pager {
wal: Rc<RefCell<dyn Wal>>,
io: Arc<dyn crate::io::IO>,
page_cache: Arc<RwLock<DumbLruPageCache>>,
buffer_pool: Rc<BufferPool>,
) -> Result<Self> {
let db_header = RefCell::borrow(&db_header_ref);
let page_size = db_header.page_size as usize;
let buffer_pool = Rc::new(BufferPool::new(page_size));
Ok(Self {
page_io,
wal,
buffer_pool,
page_cache,
io,
dirty_pages: Rc::new(RefCell::new(HashSet::new())),
@@ -387,6 +190,7 @@ impl Pager {
syncing: Rc::new(RefCell::new(false)),
checkpoint_state: RefCell::new(CheckpointState::Checkpoint),
checkpoint_inflight: Rc::new(RefCell::new(0)),
buffer_pool,
})
}
@@ -413,7 +217,8 @@ impl Pager {
pub fn read_page(&self, page_idx: usize) -> crate::Result<PageRef> {
trace!("read_page(page_idx = {})", page_idx);
let mut page_cache = self.page_cache.write().unwrap();
if let Some(page) = page_cache.get(&page_idx) {
let page_key = PageCacheKey::new(page_idx, Some(self.wal.borrow().get_max_frame()));
if let Some(page) = page_cache.get(&page_key) {
trace!("read_page(page_idx = {}) = cached", page_idx);
return Ok(page.clone());
}
@@ -429,7 +234,7 @@ impl Pager {
}
// TODO(pere) ensure page is inserted, we should probably first insert to page cache
// and if successful, read frame or page
page_cache.insert(page_idx, page.clone());
page_cache.insert(page_key, page.clone());
return Ok(page);
}
sqlite3_ondisk::begin_read_page(
@@ -439,7 +244,7 @@ impl Pager {
page_idx,
)?;
// TODO(pere) ensure page is inserted
page_cache.insert(page_idx, page.clone());
page_cache.insert(page_key, page.clone());
Ok(page)
}
@@ -449,6 +254,7 @@ impl Pager {
trace!("load_page(page_idx = {})", id);
let mut page_cache = self.page_cache.write().unwrap();
page.set_locked();
let page_key = PageCacheKey::new(id, Some(self.wal.borrow().get_max_frame()));
if let Some(frame_id) = self.wal.borrow().find_frame(id as u64)? {
self.wal
.borrow()
@@ -457,8 +263,8 @@ impl Pager {
page.set_uptodate();
}
// TODO(pere) ensure page is inserted
if !page_cache.contains_key(id) {
page_cache.insert(id, page.clone());
if !page_cache.contains_key(&page_key) {
page_cache.insert(page_key, page.clone());
}
return Ok(());
}
@@ -469,8 +275,8 @@ impl Pager {
id,
)?;
// TODO(pere) ensure page is inserted
if !page_cache.contains_key(id) {
page_cache.insert(id, page.clone());
if !page_cache.contains_key(&page_key) {
page_cache.insert(page_key, page.clone());
}
Ok(())
}
@@ -500,9 +306,11 @@ impl Pager {
let db_size = self.db_header.borrow().database_size;
for page_id in self.dirty_pages.borrow().iter() {
let mut cache = self.page_cache.write().unwrap();
let page = cache.get(page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_key =
PageCacheKey::new(*page_id, Some(self.wal.borrow().get_max_frame()));
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
debug!("appending frame {} {:?}", page_id, page_type);
log::trace!("cacheflush(page={}, page_type={:?}", page_id, page_type);
self.wal.borrow_mut().append_frame(
page.clone(),
db_size,
@@ -565,7 +373,7 @@ impl Pager {
pub fn checkpoint(&self) -> Result<CheckpointStatus> {
loop {
let state = self.checkpoint_state.borrow().clone();
log::trace!("checkpoint(state={:?})", state);
log::trace!("pager_checkpoint(state={:?})", state);
match state {
CheckpointState::Checkpoint => {
let in_flight = self.checkpoint_inflight.clone();
@@ -641,7 +449,9 @@ impl Pager {
page.set_dirty();
self.add_dirty(page.get().id);
let mut cache = self.page_cache.write().unwrap();
cache.insert(page.get().id, page.clone());
let page_key =
PageCacheKey::new(page.get().id, Some(self.wal.borrow().get_max_frame()));
cache.insert(page_key, page.clone());
}
Ok(page)
}
@@ -649,7 +459,8 @@ impl Pager {
pub fn put_loaded_page(&self, id: usize, page: PageRef) {
let mut cache = self.page_cache.write().unwrap();
// cache insert invalidates previous page
cache.insert(id, page.clone());
let page_key = PageCacheKey::new(id, Some(self.wal.borrow().get_max_frame()));
cache.insert(page_key, page.clone());
page.set_loaded();
}
@@ -682,7 +493,9 @@ pub fn allocate_page(page_id: usize, buffer_pool: &Rc<BufferPool>, offset: usize
mod tests {
use std::sync::{Arc, RwLock};
use super::{DumbLruPageCache, Page};
use crate::storage::page_cache::{DumbLruPageCache, PageCacheKey};
use super::Page;
#[test]
fn test_shared_cache() {
@@ -693,12 +506,14 @@ mod tests {
let cache = cache.clone();
std::thread::spawn(move || {
let mut cache = cache.write().unwrap();
cache.insert(1, Arc::new(Page::new(1)));
let page_key = PageCacheKey::new(1, None);
cache.insert(page_key, Arc::new(Page::new(1)));
})
};
let _ = thread.join();
let mut cache = cache.write().unwrap();
let page = cache.get(&1);
let page_key = PageCacheKey::new(1, None);
let page = cache.get(&page_key);
assert_eq!(page.unwrap().get().id, 1);
}
}

View File

@@ -581,6 +581,7 @@ pub fn begin_write_btree_page(
page: &PageRef,
write_counter: Rc<RefCell<usize>>,
) -> Result<()> {
log::trace!("begin_write_btree_page(page={})", page.get().id);
let page_source = &pager.page_io;
let page_finish = page.clone();
@@ -1039,6 +1040,11 @@ pub fn begin_read_wal_frame(
buffer_pool: Rc<BufferPool>,
page: PageRef,
) -> Result<()> {
log::trace!(
"begin_read_wal_frame(offset={}, page={})",
offset,
page.get().id
);
let buf = buffer_pool.get();
let drop_fn = Rc::new(move |buf| {
let buffer_pool = buffer_pool.clone();

View File

@@ -8,12 +8,13 @@ use crate::io::{File, SyncCompletion, IO};
use crate::storage::sqlite3_ondisk::{
begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
};
use crate::Completion;
use crate::Result;
use crate::{Completion, Page};
use self::sqlite3_ondisk::{checksum_wal, WAL_MAGIC_BE, WAL_MAGIC_LE};
use super::buffer_pool::BufferPool;
use super::page_cache::PageCacheKey;
use super::pager::{PageRef, Pager};
use super::sqlite3_ondisk::{self, begin_write_btree_page, WalHeader};
@@ -53,16 +54,27 @@ pub trait Wal {
write_counter: Rc<RefCell<usize>>,
) -> Result<CheckpointStatus>;
fn sync(&mut self) -> Result<CheckpointStatus>;
fn get_max_frame(&self) -> u64;
fn get_min_frame(&self) -> u64;
}
struct OngoingCheckpoint {
page: PageRef,
state: CheckpointState,
min_frame: u64,
max_frame: u64,
current_page: u64,
}
pub struct WalFile {
io: Arc<dyn crate::io::IO>,
buffer_pool: Rc<BufferPool>,
syncing: Rc<RefCell<bool>>,
page_size: usize,
ongoing_checkpoint: HashSet<usize>,
shared: Arc<RwLock<WalFileShared>>,
ongoing_checkpoint: OngoingCheckpoint,
checkpoint_threshold: usize,
// min and max frames for this connection
max_frame: u64,
@@ -75,11 +87,22 @@ pub struct WalFileShared {
max_frame: u64,
nbackfills: u64,
// Maps pgno to frame id and offset in wal file
frame_cache: HashMap<u64, Vec<u64>>, // FIXME: for now let's use a simple hashmap instead of a shm file
frame_cache: HashMap<u64, Vec<u64>>, // we will avoid shm files because we are not
// multiprocess
pages_in_frames: Vec<u64>,
last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL
file: Rc<dyn File>,
}
#[derive(Debug, Copy, Clone)]
pub enum CheckpointState {
Start,
ReadFrame,
WaitReadFrame,
WritePage,
Done,
}
pub enum CheckpointStatus {
Done,
IO,
@@ -167,6 +190,7 @@ impl Wal for WalFile {
Some(frames) => frames.push(frame_id),
None => {
shared.frame_cache.insert(page_id as u64, vec![frame_id]);
shared.pages_in_frames.push(page_id as u64);
}
}
}
@@ -194,30 +218,88 @@ impl Wal for WalFile {
pager: &Pager,
write_counter: Rc<RefCell<usize>>,
) -> Result<CheckpointStatus> {
let mut shared = self.shared.write().unwrap();
for (page_id, _frames) in shared.frame_cache.iter() {
// move page from WAL to database file
// TODO(Pere): use splice syscall in linux to do zero-copy file page movements to improve perf
let page_id = *page_id as usize;
if self.ongoing_checkpoint.contains(&page_id) {
continue;
}
'checkpoint_loop: loop {
let state = self.ongoing_checkpoint.state;
log::debug!("checkpoint(state={:?})", state);
match state {
CheckpointState::Start => {
// TODO(pere): check what frames are safe to checkpoint between many readers!
self.ongoing_checkpoint.min_frame = self.min_frame;
self.ongoing_checkpoint.max_frame = self.max_frame;
self.ongoing_checkpoint.current_page = 0;
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
}
CheckpointState::ReadFrame => {
let shared = self.shared.read().unwrap();
for page in shared
.pages_in_frames
.iter()
.skip(self.ongoing_checkpoint.current_page as usize)
{
let frames = shared
.frame_cache
.get(page)
.expect("page must be in frame cache if it's in list");
let page = pager.read_page(page_id)?;
if page.is_locked() {
return Ok(CheckpointStatus::IO);
for frame in frames.iter().rev() {
if *frame <= self.ongoing_checkpoint.max_frame {
log::debug!(
"checkpoint page(state={:?}, page={}, frame={})",
state,
*page,
*frame
);
self.ongoing_checkpoint.page.get().id = *page as usize;
self.read_frame(
*frame,
self.ongoing_checkpoint.page.clone(),
self.buffer_pool.clone(),
);
self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame;
self.ongoing_checkpoint.current_page += 1;
continue 'checkpoint_loop;
}
}
self.ongoing_checkpoint.current_page += 1;
}
self.ongoing_checkpoint.state = CheckpointState::Done;
}
CheckpointState::WaitReadFrame => {
if self.ongoing_checkpoint.page.is_locked() {
return Ok(CheckpointStatus::IO);
} else {
self.ongoing_checkpoint.state = CheckpointState::WritePage;
}
}
CheckpointState::WritePage => {
begin_write_btree_page(
pager,
&self.ongoing_checkpoint.page,
write_counter.clone(),
)?;
let shared = self.shared.read().unwrap();
if (self.ongoing_checkpoint.current_page as usize)
< shared.pages_in_frames.len()
{
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
} else {
self.ongoing_checkpoint.state = CheckpointState::Done;
}
}
CheckpointState::Done => {
if *write_counter.borrow() > 0 {
return Ok(CheckpointStatus::IO);
}
let mut shared = self.shared.write().unwrap();
shared.frame_cache.clear();
shared.pages_in_frames.clear();
shared.max_frame = 0;
shared.nbackfills = 0;
self.ongoing_checkpoint.state = CheckpointState::Start;
return Ok(CheckpointStatus::Done);
}
}
begin_write_btree_page(pager, &page, write_counter.clone())?;
self.ongoing_checkpoint.insert(page_id);
}
// TODO: only clear checkpointed frames
shared.frame_cache.clear();
shared.max_frame = 0;
shared.nbackfills = 0;
self.ongoing_checkpoint.clear();
Ok(CheckpointStatus::Done)
}
fn sync(&mut self) -> Result<CheckpointStatus> {
@@ -238,19 +320,39 @@ impl Wal for WalFile {
Ok(CheckpointStatus::Done)
}
}
fn get_max_frame(&self) -> u64 {
self.max_frame
}
fn get_min_frame(&self) -> u64 {
self.min_frame
}
}
impl WalFile {
pub fn new(io: Arc<dyn IO>, page_size: usize, shared: Arc<RwLock<WalFileShared>>) -> Self {
pub fn new(
io: Arc<dyn IO>,
page_size: usize,
shared: Arc<RwLock<WalFileShared>>,
buffer_pool: Rc<BufferPool>,
) -> Self {
Self {
io,
shared,
ongoing_checkpoint: HashSet::new(),
ongoing_checkpoint: OngoingCheckpoint {
page: Arc::new(Page::new(0)),
state: CheckpointState::Start,
min_frame: 0,
max_frame: 0,
current_page: 0,
},
syncing: Rc::new(RefCell::new(false)),
checkpoint_threshold: 1000,
page_size,
max_frame: 0,
min_frame: 0,
buffer_pool,
}
}
@@ -322,6 +424,7 @@ impl WalFileShared {
frame_cache: HashMap::new(),
last_checksum: checksum,
file,
pages_in_frames: Vec::new(),
};
Ok(Arc::new(RwLock::new(shared)))
}