From a4297702bd5108702113f4fe9a9eae587ada8acc Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 21 Nov 2024 14:00:28 +0100 Subject: [PATCH] extract page cache to be multi threaded --- bindings/wasm/lib.rs | 2 +- core/lib.rs | 20 ++++++++++++-------- core/storage/pager.rs | 23 ++++++++++++----------- core/storage/wal.rs | 2 -- simulator/main.rs | 2 +- sqlite3/src/lib.rs | 4 ++-- 6 files changed, 28 insertions(+), 25 deletions(-) diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 06a3a43ee..3d3290426 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -6,7 +6,7 @@ use wasm_bindgen::prelude::*; #[wasm_bindgen] pub struct Database { - db: Rc, + db: Arc, conn: Rc, } diff --git a/core/lib.rs b/core/lib.rs index 83210483a..c883acc65 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -21,13 +21,13 @@ use schema::Schema; use sqlite3_parser::ast; use sqlite3_parser::{ast::Cmd, lexer::sql::Parser}; use std::cell::Cell; -use std::rc::Weak; -use std::sync::{Arc, OnceLock}; +use std::sync::Weak; +use std::sync::{Arc, OnceLock, RwLock}; use std::{cell::RefCell, rc::Rc}; use storage::btree::btree_init_page; #[cfg(feature = "fs")] use storage::database::FileStorage; -use storage::pager::allocate_page; +use storage::pager::{allocate_page, DumbLruPageCache}; use storage::sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE}; pub use storage::wal::WalFile; use util::parse_schema_rows; @@ -64,11 +64,12 @@ pub struct Database { schema: Rc>, header: Rc>, transaction_state: RefCell, + shared_page_cache: Arc>, } impl Database { #[cfg(feature = "fs")] - pub fn open_file(io: Arc, path: &str) -> Result> { + pub fn open_file(io: Arc, path: &str) -> Result> { let file = io.open_file(path, io::OpenFlags::Create, true)?; maybe_init_database_file(&file, &io)?; let page_io = Rc::new(FileStorage::new(file)); @@ -87,18 +88,20 @@ impl Database { io: Arc, page_io: Rc, wal: Rc>, - ) -> Result> { + ) -> Result> { let db_header = Pager::begin_open(page_io.clone())?; io.run_once()?; DATABASE_VERSION.get_or_init(|| { let version = db_header.borrow().version_number; version.to_string() }); + let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10))); let pager = Rc::new(Pager::finish_open( db_header.clone(), page_io, wal, io.clone(), + shared_page_cache.clone(), )?); let bootstrap_schema = Rc::new(RefCell::new(Schema::new())); let conn = Rc::new(Connection { @@ -113,21 +116,22 @@ impl Database { parse_schema_rows(rows, &mut schema, io)?; let schema = Rc::new(RefCell::new(schema)); let header = db_header; - Ok(Rc::new(Database { + Ok(Arc::new(Database { pager, schema, header, transaction_state: RefCell::new(TransactionState::None), + shared_page_cache, })) } - pub fn connect(self: &Rc) -> Rc { + pub fn connect(self: &Arc) -> Rc { Rc::new(Connection { pager: self.pager.clone(), schema: self.schema.clone(), header: self.header.clone(), - db: Rc::downgrade(self), last_insert_rowid: Cell::new(0), + db: Arc::downgrade(self), }) } } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 1bfbe8392..a0e5a5969 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -11,7 +11,7 @@ use std::hash::Hash; use std::ptr::{drop_in_place, NonNull}; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use super::wal::CheckpointStatus; @@ -117,7 +117,7 @@ impl PageCacheEntry { } } -struct DumbLruPageCache { +pub struct DumbLruPageCache { capacity: usize, map: RefCell>>, head: RefCell>>, @@ -325,7 +325,7 @@ pub struct Pager { /// The write-ahead log (WAL) for the database. wal: Rc>, /// A page cache for the database. - page_cache: RefCell, + page_cache: Arc>, /// Buffer pool for temporary data storage. buffer_pool: Rc, /// I/O interface for input/output operations. @@ -351,11 +351,11 @@ impl Pager { page_io: Rc, wal: Rc>, io: Arc, + page_cache: Arc>, ) -> Result { let db_header = RefCell::borrow(&db_header_ref); let page_size = db_header.page_size as usize; let buffer_pool = Rc::new(BufferPool::new(page_size)); - let page_cache = RefCell::new(DumbLruPageCache::new(10)); Ok(Self { page_io, wal, @@ -396,7 +396,7 @@ impl Pager { /// Reads a page from the database. pub fn read_page(&self, page_idx: usize) -> crate::Result>> { trace!("read_page(page_idx = {})", page_idx); - let mut page_cache = self.page_cache.borrow_mut(); + let mut page_cache = self.page_cache.write().unwrap(); if let Some(page) = page_cache.get(&page_idx) { trace!("read_page(page_idx = {}) = cached", page_idx); return Ok(page.clone()); @@ -431,7 +431,7 @@ impl Pager { pub fn load_page(&self, page: Rc>) -> Result<()> { let id = page.borrow().id; trace!("load_page(page_idx = {})", id); - let mut page_cache = self.page_cache.borrow_mut(); + let mut page_cache = self.page_cache.write().unwrap(); page.borrow_mut().set_locked(); if let Some(frame_id) = self.wal.borrow().find_frame(id as u64)? { self.wal @@ -467,7 +467,8 @@ impl Pager { /// Changes the size of the page cache. pub fn change_page_cache_size(&self, capacity: usize) { - self.page_cache.borrow_mut().resize(capacity); + let mut page_cache = self.page_cache.write().unwrap(); + page_cache.resize(capacity); } pub fn add_dirty(&self, page_id: usize) { @@ -483,7 +484,7 @@ impl Pager { FlushState::Start => { let db_size = self.db_header.borrow().database_size; for page_id in self.dirty_pages.borrow().iter() { - let mut cache = self.page_cache.borrow_mut(); + let mut cache = self.page_cache.write().unwrap(); let page = cache.get(page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); let page_type = page.borrow().contents.as_ref().unwrap().maybe_page_type(); debug!("appending frame {} {:?}", page_id, page_type); @@ -589,7 +590,7 @@ impl Pager { Err(err) => panic!("error while clearing cache {}", err), } } - self.page_cache.borrow_mut().clear(); + self.page_cache.write().unwrap().clear(); } /* @@ -627,14 +628,14 @@ impl Pager { let page = page_ref.borrow_mut(); page.set_dirty(); self.add_dirty(page.id); - let mut cache = self.page_cache.borrow_mut(); + let mut cache = self.page_cache.write().unwrap(); cache.insert(page.id, page_ref.clone()); } Ok(page_ref) } pub fn put_loaded_page(&self, id: usize, page: Rc>) { - let mut cache = RefCell::borrow_mut(&self.page_cache); + let mut cache = self.page_cache.write().unwrap(); // cache insert invalidates previous page cache.insert(id, page.clone()); page.borrow_mut().set_loaded(); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 774e57cc9..ad5e9eb40 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -277,8 +277,6 @@ impl WalFile { self.io.run_once()?; self.wal_header.replace(Some(wal_header)); } else { - // magic is a single number represented as WAL_MAGIC_LE but the big endian - // counterpart is just the same number with LSB set to 1. let magic = if cfg!(target_endian = "big") { WAL_MAGIC_BE } else { diff --git a/simulator/main.rs b/simulator/main.rs index 3c71bfef5..b7af9854e 100644 --- a/simulator/main.rs +++ b/simulator/main.rs @@ -13,7 +13,7 @@ struct SimulatorEnv { tables: Vec, connections: Vec, io: Arc, - db: Rc, + db: Arc, rng: ChaCha8Rng, } diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index ac37d8ea2..6a86a04fe 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -33,7 +33,7 @@ pub mod util; use util::sqlite3_safety_check_sick_or_ok; pub struct sqlite3 { - pub(crate) _db: Rc, + pub(crate) _db: Arc, pub(crate) conn: Rc, pub(crate) err_code: ffi::c_int, pub(crate) err_mask: ffi::c_int, @@ -43,7 +43,7 @@ pub struct sqlite3 { } impl sqlite3 { - pub fn new(db: Rc, conn: Rc) -> Self { + pub fn new(db: Arc, conn: Rc) -> Self { Self { _db: db, conn,