From a4297702bd5108702113f4fe9a9eae587ada8acc Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 21 Nov 2024 14:00:28 +0100 Subject: [PATCH 1/7] extract page cache to be multi threaded --- bindings/wasm/lib.rs | 2 +- core/lib.rs | 20 ++++++++++++-------- core/storage/pager.rs | 23 ++++++++++++----------- core/storage/wal.rs | 2 -- simulator/main.rs | 2 +- sqlite3/src/lib.rs | 4 ++-- 6 files changed, 28 insertions(+), 25 deletions(-) diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 06a3a43ee..3d3290426 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -6,7 +6,7 @@ use wasm_bindgen::prelude::*; #[wasm_bindgen] pub struct Database { - db: Rc, + db: Arc, conn: Rc, } diff --git a/core/lib.rs b/core/lib.rs index 83210483a..c883acc65 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -21,13 +21,13 @@ use schema::Schema; use sqlite3_parser::ast; use sqlite3_parser::{ast::Cmd, lexer::sql::Parser}; use std::cell::Cell; -use std::rc::Weak; -use std::sync::{Arc, OnceLock}; +use std::sync::Weak; +use std::sync::{Arc, OnceLock, RwLock}; use std::{cell::RefCell, rc::Rc}; use storage::btree::btree_init_page; #[cfg(feature = "fs")] use storage::database::FileStorage; -use storage::pager::allocate_page; +use storage::pager::{allocate_page, DumbLruPageCache}; use storage::sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE}; pub use storage::wal::WalFile; use util::parse_schema_rows; @@ -64,11 +64,12 @@ pub struct Database { schema: Rc>, header: Rc>, transaction_state: RefCell, + shared_page_cache: Arc>, } impl Database { #[cfg(feature = "fs")] - pub fn open_file(io: Arc, path: &str) -> Result> { + pub fn open_file(io: Arc, path: &str) -> Result> { let file = io.open_file(path, io::OpenFlags::Create, true)?; maybe_init_database_file(&file, &io)?; let page_io = Rc::new(FileStorage::new(file)); @@ -87,18 +88,20 @@ impl Database { io: Arc, page_io: Rc, wal: Rc>, - ) -> Result> { + ) -> Result> { let db_header = Pager::begin_open(page_io.clone())?; io.run_once()?; DATABASE_VERSION.get_or_init(|| { let version = db_header.borrow().version_number; version.to_string() }); + let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10))); let pager = Rc::new(Pager::finish_open( db_header.clone(), page_io, wal, io.clone(), + shared_page_cache.clone(), )?); let bootstrap_schema = Rc::new(RefCell::new(Schema::new())); let conn = Rc::new(Connection { @@ -113,21 +116,22 @@ impl Database { parse_schema_rows(rows, &mut schema, io)?; let schema = Rc::new(RefCell::new(schema)); let header = db_header; - Ok(Rc::new(Database { + Ok(Arc::new(Database { pager, schema, header, transaction_state: RefCell::new(TransactionState::None), + shared_page_cache, })) } - pub fn connect(self: &Rc) -> Rc { + pub fn connect(self: &Arc) -> Rc { Rc::new(Connection { pager: self.pager.clone(), schema: self.schema.clone(), header: self.header.clone(), - db: Rc::downgrade(self), last_insert_rowid: Cell::new(0), + db: Arc::downgrade(self), }) } } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 1bfbe8392..a0e5a5969 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -11,7 +11,7 @@ use std::hash::Hash; use std::ptr::{drop_in_place, NonNull}; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use super::wal::CheckpointStatus; @@ -117,7 +117,7 @@ impl PageCacheEntry { } } -struct DumbLruPageCache { +pub struct DumbLruPageCache { capacity: usize, map: RefCell>>, head: RefCell>>, @@ -325,7 +325,7 @@ pub struct Pager { /// The write-ahead log (WAL) for the database. wal: Rc>, /// A page cache for the database. - page_cache: RefCell, + page_cache: Arc>, /// Buffer pool for temporary data storage. buffer_pool: Rc, /// I/O interface for input/output operations. @@ -351,11 +351,11 @@ impl Pager { page_io: Rc, wal: Rc>, io: Arc, + page_cache: Arc>, ) -> Result { let db_header = RefCell::borrow(&db_header_ref); let page_size = db_header.page_size as usize; let buffer_pool = Rc::new(BufferPool::new(page_size)); - let page_cache = RefCell::new(DumbLruPageCache::new(10)); Ok(Self { page_io, wal, @@ -396,7 +396,7 @@ impl Pager { /// Reads a page from the database. pub fn read_page(&self, page_idx: usize) -> crate::Result>> { trace!("read_page(page_idx = {})", page_idx); - let mut page_cache = self.page_cache.borrow_mut(); + let mut page_cache = self.page_cache.write().unwrap(); if let Some(page) = page_cache.get(&page_idx) { trace!("read_page(page_idx = {}) = cached", page_idx); return Ok(page.clone()); @@ -431,7 +431,7 @@ impl Pager { pub fn load_page(&self, page: Rc>) -> Result<()> { let id = page.borrow().id; trace!("load_page(page_idx = {})", id); - let mut page_cache = self.page_cache.borrow_mut(); + let mut page_cache = self.page_cache.write().unwrap(); page.borrow_mut().set_locked(); if let Some(frame_id) = self.wal.borrow().find_frame(id as u64)? { self.wal @@ -467,7 +467,8 @@ impl Pager { /// Changes the size of the page cache. pub fn change_page_cache_size(&self, capacity: usize) { - self.page_cache.borrow_mut().resize(capacity); + let mut page_cache = self.page_cache.write().unwrap(); + page_cache.resize(capacity); } pub fn add_dirty(&self, page_id: usize) { @@ -483,7 +484,7 @@ impl Pager { FlushState::Start => { let db_size = self.db_header.borrow().database_size; for page_id in self.dirty_pages.borrow().iter() { - let mut cache = self.page_cache.borrow_mut(); + let mut cache = self.page_cache.write().unwrap(); let page = cache.get(page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); let page_type = page.borrow().contents.as_ref().unwrap().maybe_page_type(); debug!("appending frame {} {:?}", page_id, page_type); @@ -589,7 +590,7 @@ impl Pager { Err(err) => panic!("error while clearing cache {}", err), } } - self.page_cache.borrow_mut().clear(); + self.page_cache.write().unwrap().clear(); } /* @@ -627,14 +628,14 @@ impl Pager { let page = page_ref.borrow_mut(); page.set_dirty(); self.add_dirty(page.id); - let mut cache = self.page_cache.borrow_mut(); + let mut cache = self.page_cache.write().unwrap(); cache.insert(page.id, page_ref.clone()); } Ok(page_ref) } pub fn put_loaded_page(&self, id: usize, page: Rc>) { - let mut cache = RefCell::borrow_mut(&self.page_cache); + let mut cache = self.page_cache.write().unwrap(); // cache insert invalidates previous page cache.insert(id, page.clone()); page.borrow_mut().set_loaded(); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 774e57cc9..ad5e9eb40 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -277,8 +277,6 @@ impl WalFile { self.io.run_once()?; self.wal_header.replace(Some(wal_header)); } else { - // magic is a single number represented as WAL_MAGIC_LE but the big endian - // counterpart is just the same number with LSB set to 1. let magic = if cfg!(target_endian = "big") { WAL_MAGIC_BE } else { diff --git a/simulator/main.rs b/simulator/main.rs index 3c71bfef5..b7af9854e 100644 --- a/simulator/main.rs +++ b/simulator/main.rs @@ -13,7 +13,7 @@ struct SimulatorEnv { tables: Vec, connections: Vec, io: Arc, - db: Rc, + db: Arc, rng: ChaCha8Rng, } diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index ac37d8ea2..6a86a04fe 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -33,7 +33,7 @@ pub mod util; use util::sqlite3_safety_check_sick_or_ok; pub struct sqlite3 { - pub(crate) _db: Rc, + pub(crate) _db: Arc, pub(crate) conn: Rc, pub(crate) err_code: ffi::c_int, pub(crate) err_mask: ffi::c_int, @@ -43,7 +43,7 @@ pub struct sqlite3 { } impl sqlite3 { - pub fn new(db: Rc, conn: Rc) -> Self { + pub fn new(db: Arc, conn: Rc) -> Self { Self { _db: db, conn, From 3fda2d09b998e0211596005c20bbb66fbf89bc46 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 22 Nov 2024 12:50:02 +0100 Subject: [PATCH 2/7] Extract multi threaded part from WalFile to WalFileShared Since we expect to ensure thread safety between multiple threads in the future, we extract what is important to be shared between multiple connections with regards to WAL. This is WIP so I just put whatever feels like important behind a RwLock but expect this to change to Atomics in the future as needed. Maybe even these locks might disappear because they will be better served with transaction locks. --- bindings/wasm/lib.rs | 14 ++- core/lib.rs | 16 ++- core/storage/sqlite3_ondisk.rs | 11 +- core/storage/wal.rs | 220 +++++++++++++++++---------------- 4 files changed, 141 insertions(+), 120 deletions(-) diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 3d3290426..a1febb5d7 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -1,4 +1,4 @@ -use limbo_core::{maybe_init_database_file, OpenFlags, Pager, Result, WalFile}; +use limbo_core::{maybe_init_database_file, OpenFlags, Pager, Result, WalFile, WalFileShared}; use std::cell::RefCell; use std::rc::Rc; use std::sync::Arc; @@ -22,13 +22,21 @@ impl Database { maybe_init_database_file(&file, &io).unwrap(); let page_io = Rc::new(DatabaseStorage::new(file)); let db_header = Pager::begin_open(page_io.clone()).unwrap(); + + // ensure db header is there + io.run_once().unwrap(); + let wal_path = format!("{}-wal", path); + let wal_shared = + WalFileShared::open_shared(&io, wal_path.as_str(), db_header.borrow().page_size) + .unwrap(); let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), - wal_path, db_header.borrow().page_size as usize, + wal_shared.clone(), ))); - let db = limbo_core::Database::open(io, page_io, wal).unwrap(); + + let db = limbo_core::Database::open(io, page_io, wal, wal_shared).unwrap(); let conn = db.connect(); Database { db, conn } } diff --git a/core/lib.rs b/core/lib.rs index c883acc65..d5619cac4 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -28,8 +28,9 @@ use storage::btree::btree_init_page; #[cfg(feature = "fs")] use storage::database::FileStorage; use storage::pager::{allocate_page, DumbLruPageCache}; -use storage::sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE}; +use storage::sqlite3_ondisk::{DatabaseHeader, WalHeader, DATABASE_HEADER_SIZE}; pub use storage::wal::WalFile; +pub use storage::wal::WalFileShared; use util::parse_schema_rows; use translate::optimizer::optimize_plan; @@ -64,30 +65,38 @@ pub struct Database { schema: Rc>, header: Rc>, transaction_state: RefCell, + // Shared structures of a Database are the parts that are common to multiple threads that might + // create DB connections. shared_page_cache: Arc>, + shared_wal: Arc>, } impl Database { #[cfg(feature = "fs")] pub fn open_file(io: Arc, path: &str) -> Result> { + use storage::wal::WalFileShared; + let file = io.open_file(path, io::OpenFlags::Create, true)?; maybe_init_database_file(&file, &io)?; let page_io = Rc::new(FileStorage::new(file)); let wal_path = format!("{}-wal", path); let db_header = Pager::begin_open(page_io.clone())?; io.run_once()?; + let wal_shared = + WalFileShared::open_shared(&io, wal_path.as_str(), db_header.borrow().page_size)?; let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), - wal_path, db_header.borrow().page_size as usize, + wal_shared.clone(), ))); - Self::open(io, page_io, wal) + Self::open(io, page_io, wal, wal_shared) } pub fn open( io: Arc, page_io: Rc, wal: Rc>, + shared_wal: Arc>, ) -> Result> { let db_header = Pager::begin_open(page_io.clone())?; io.run_once()?; @@ -122,6 +131,7 @@ impl Database { header, transaction_state: RefCell::new(TransactionState::None), shared_page_cache, + shared_wal, })) } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 3f7935491..dd992615d 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -52,6 +52,7 @@ use log::trace; use std::cell::RefCell; use std::pin::Pin; use std::rc::Rc; +use std::sync::{Arc, RwLock}; /// The size of the database header in bytes. pub const DATABASE_HEADER_SIZE: usize = 100; @@ -95,7 +96,7 @@ pub const WAL_FRAME_HEADER_SIZE: usize = 24; pub const WAL_MAGIC_LE: u32 = 0x377f0682; pub const WAL_MAGIC_BE: u32 = 0x377f0683; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] #[repr(C)] // This helps with encoding because rust does not respect the order in structs, so in // this case we want to keep the order pub struct WalHeader { @@ -1006,10 +1007,10 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec) { payload.extend_from_slice(&varint); } -pub fn begin_read_wal_header(io: &Rc) -> Result>> { +pub fn begin_read_wal_header(io: &Rc) -> Result>> { let drop_fn = Rc::new(|_buf| {}); let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn))); - let result = Rc::new(RefCell::new(WalHeader::default())); + let result = Arc::new(RwLock::new(WalHeader::default())); let header = result.clone(); let complete = Box::new(move |buf: Rc>| { let header = header.clone(); @@ -1020,10 +1021,10 @@ pub fn begin_read_wal_header(io: &Rc) -> Result> Ok(result) } -fn finish_read_wal_header(buf: Rc>, header: Rc>) -> Result<()> { +fn finish_read_wal_header(buf: Rc>, header: Arc>) -> Result<()> { let buf = buf.borrow(); let buf = buf.as_slice(); - let mut header = header.borrow_mut(); + let mut header = header.write().unwrap(); header.magic = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]); header.file_format = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]); header.page_size = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index ad5e9eb40..68264ca35 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet}; +use std::sync::RwLock; use std::{cell::RefCell, rc::Rc, sync::Arc}; use log::{debug, trace}; @@ -7,8 +8,8 @@ use crate::io::{File, SyncCompletion, IO}; use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; -use crate::Completion; use crate::{storage::pager::Page, Result}; +use crate::{Completion, OpenFlags}; use self::sqlite3_ondisk::{checksum_wal, WAL_MAGIC_BE, WAL_MAGIC_LE}; @@ -61,22 +62,24 @@ pub trait Wal { pub struct WalFile { io: Arc, - wal_path: String, - file: RefCell>>, - wal_header: RefCell>>>, - min_frame: RefCell, - max_frame: RefCell, - nbackfills: RefCell, - // Maps pgno to frame id and offset in wal file - frame_cache: RefCell>>, // FIXME: for now let's use a simple hashmap instead of a shm file - checkpoint_threshold: usize, - ongoing_checkpoint: HashSet, syncing: Rc>, page_size: usize, - last_checksum: RefCell<(u32, u32)>, // Check of last frame in WAL, this is a cumulative checksum - // over all frames in the WAL + ongoing_checkpoint: HashSet, + shared: Arc>, + checkpoint_threshold: usize, +} + +pub struct WalFileShared { + wal_header: Arc>, + min_frame: u64, + max_frame: u64, + nbackfills: u64, + // Maps pgno to frame id and offset in wal file + frame_cache: HashMap>, // FIXME: for now let's use a simple hashmap instead of a shm file + last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL + file: Rc, } pub enum CheckpointStatus { @@ -87,7 +90,8 @@ pub enum CheckpointStatus { impl Wal for WalFile { /// Begin a read transaction. fn begin_read_tx(&self) -> Result<()> { - self.min_frame.replace(*self.nbackfills.borrow() + 1); + let mut shared = self.shared.write().unwrap(); + shared.min_frame = shared.nbackfills + 1; Ok(()) } @@ -98,15 +102,14 @@ impl Wal for WalFile { /// Find the latest frame containing a page. fn find_frame(&self, page_id: u64) -> Result> { - let frame_cache = self.frame_cache.borrow(); - let frames = frame_cache.get(&page_id); + let shared = self.shared.read().unwrap(); + let frames = shared.frame_cache.get(&page_id); if frames.is_none() { return Ok(None); } - self.ensure_init()?; let frames = frames.unwrap(); for frame in frames.iter().rev() { - if *frame <= *self.max_frame.borrow() { + if *frame <= shared.max_frame { return Ok(Some(*frame)); } } @@ -122,8 +125,9 @@ impl Wal for WalFile { ) -> Result<()> { debug!("read_frame({})", frame_id); let offset = self.frame_offset(frame_id); + let shared = self.shared.read().unwrap(); begin_read_wal_frame( - self.file.borrow().as_ref().unwrap(), + &shared.file, offset + WAL_FRAME_HEADER_SIZE, buffer_pool, page, @@ -139,9 +143,9 @@ impl Wal for WalFile { _pager: &Pager, write_counter: Rc>, ) -> Result<()> { - self.ensure_init()?; let page_id = page.borrow().id; - let frame_id = *self.max_frame.borrow(); + let mut shared = self.shared.write().unwrap(); + let frame_id = shared.max_frame; let offset = self.frame_offset(frame_id); trace!( "append_frame(frame={}, offset={}, page_id={})", @@ -149,12 +153,11 @@ impl Wal for WalFile { offset, page_id ); - let header = self.wal_header.borrow(); - let header = header.as_ref().unwrap(); - let header = header.borrow(); - let checksums = *self.last_checksum.borrow(); + let header = shared.wal_header.clone(); + let header = header.read().unwrap(); + let checksums = shared.last_checksum; let checksums = begin_write_wal_frame( - self.file.borrow().as_ref().unwrap(), + &shared.file, offset, &page, db_size, @@ -162,15 +165,14 @@ impl Wal for WalFile { &header, checksums, )?; - self.last_checksum.replace(checksums); - self.max_frame.replace(frame_id + 1); + shared.last_checksum = checksums; + shared.max_frame = frame_id + 1; { - let mut frame_cache = self.frame_cache.borrow_mut(); - let frames = frame_cache.get_mut(&(page_id as u64)); + let frames = shared.frame_cache.get_mut(&(page_id as u64)); match frames { Some(frames) => frames.push(frame_id), None => { - frame_cache.insert(page_id as u64, vec![frame_id]); + shared.frame_cache.insert(page_id as u64, vec![frame_id]); } } } @@ -188,7 +190,8 @@ impl Wal for WalFile { } fn should_checkpoint(&self) -> bool { - let frame_id = *self.max_frame.borrow() as usize; + let shared = self.shared.read().unwrap(); + let frame_id = shared.max_frame as usize; frame_id >= self.checkpoint_threshold } @@ -197,7 +200,8 @@ impl Wal for WalFile { pager: &Pager, write_counter: Rc>, ) -> Result { - for (page_id, _frames) in self.frame_cache.borrow().iter() { + let mut shared = self.shared.write().unwrap(); + for (page_id, _frames) in shared.frame_cache.iter() { // move page from WAL to database file // TODO(Pere): use splice syscall in linux to do zero-copy file page movements to improve perf let page_id = *page_id as usize; @@ -214,16 +218,15 @@ impl Wal for WalFile { self.ongoing_checkpoint.insert(page_id); } - self.frame_cache.borrow_mut().clear(); - *self.max_frame.borrow_mut() = 0; + // TODO: only clear checkpointed frames + shared.frame_cache.clear(); + shared.max_frame = 0; self.ongoing_checkpoint.clear(); Ok(CheckpointStatus::Done) } fn sync(&mut self) -> Result { - self.ensure_init()?; - let file = self.file.borrow(); - let file = file.as_ref().unwrap(); + let shared = self.shared.write().unwrap(); { let syncing = self.syncing.clone(); let completion = Completion::Sync(SyncCompletion { @@ -231,7 +234,7 @@ impl Wal for WalFile { *syncing.borrow_mut() = false; }), }); - file.sync(Rc::new(completion))?; + shared.file.sync(Rc::new(completion))?; } if *self.syncing.borrow() { @@ -243,87 +246,86 @@ impl Wal for WalFile { } impl WalFile { - pub fn new(io: Arc, wal_path: String, page_size: usize) -> Self { + pub fn new(io: Arc, page_size: usize, shared: Arc>) -> Self { Self { io, - wal_path, - file: RefCell::new(None), - wal_header: RefCell::new(None), - frame_cache: RefCell::new(HashMap::new()), - min_frame: RefCell::new(0), - max_frame: RefCell::new(0), - nbackfills: RefCell::new(0), - checkpoint_threshold: 1000, + shared, ongoing_checkpoint: HashSet::new(), syncing: Rc::new(RefCell::new(false)), + checkpoint_threshold: 1000, page_size, - last_checksum: RefCell::new((0, 0)), } } - fn ensure_init(&self) -> Result<()> { - if self.file.borrow().is_none() { - match self - .io - .open_file(&self.wal_path, crate::io::OpenFlags::Create, false) - { - Ok(file) => { - if file.size()? > 0 { - let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) { - Ok(header) => header, - Err(err) => panic!("Couldn't read header page: {:?}", err), - }; - // TODO: Return a completion instead. - self.io.run_once()?; - self.wal_header.replace(Some(wal_header)); - } else { - let magic = if cfg!(target_endian = "big") { - WAL_MAGIC_BE - } else { - WAL_MAGIC_LE - }; - let mut wal_header = WalHeader { - magic, - file_format: 3007000, - page_size: self.page_size as u32, - checkpoint_seq: 0, // TODO implement sequence number - salt_1: 0, // TODO implement salt - salt_2: 0, - checksum_1: 0, - checksum_2: 0, - }; - let native = cfg!(target_endian = "big"); // if target_endian is - // already big then we don't care but if isn't, header hasn't yet been - // encoded to big endian, therefore we wan't to swap bytes to compute this - // checksum. - let checksums = *self.last_checksum.borrow_mut(); - let checksums = checksum_wal( - &wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes - &wal_header, - checksums, - native, // this is false because we haven't encoded the wal header yet - ); - wal_header.checksum_1 = checksums.0; - wal_header.checksum_2 = checksums.1; - self.last_checksum.replace(checksums); - sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?; - self.wal_header - .replace(Some(Rc::new(RefCell::new(wal_header)))); - } - *self.file.borrow_mut() = Some(file); - } - Err(err) => panic!("{:?} {}", err, &self.wal_path), - }; - } - Ok(()) - } - fn frame_offset(&self, frame_id: u64) -> usize { - let header = self.wal_header.borrow(); - let header = header.as_ref().unwrap().borrow(); - let page_size = header.page_size; + let page_size = self.page_size; let page_offset = frame_id * (page_size as u64 + WAL_FRAME_HEADER_SIZE as u64); let offset = WAL_HEADER_SIZE as u64 + page_offset; offset as usize } } + +impl WalFileShared { + pub fn open_shared( + io: &Arc, + path: &str, + page_size: u16, + ) -> Result>> { + let file = io.open_file(path, crate::io::OpenFlags::Create, false)?; + let header = if file.size()? > 0 { + let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) { + Ok(header) => header, + Err(err) => panic!("Couldn't read header page: {:?}", err), + }; + log::info!("recover not implemented yet"); + // TODO: Return a completion instead. + io.run_once()?; + wal_header + } else { + let magic = if cfg!(target_endian = "big") { + WAL_MAGIC_BE + } else { + WAL_MAGIC_LE + }; + let mut wal_header = WalHeader { + magic, + file_format: 3007000, + page_size: page_size as u32, + checkpoint_seq: 0, // TODO implement sequence number + salt_1: 0, // TODO implement salt + salt_2: 0, + checksum_1: 0, + checksum_2: 0, + }; + let native = cfg!(target_endian = "big"); // if target_endian is + // already big then we don't care but if isn't, header hasn't yet been + // encoded to big endian, therefore we wan't to swap bytes to compute this + // checksum. + let checksums = (0, 0); + let checksums = checksum_wal( + &wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes + &wal_header, + checksums, + native, // this is false because we haven't encoded the wal header yet + ); + wal_header.checksum_1 = checksums.0; + wal_header.checksum_2 = checksums.1; + sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?; + Arc::new(RwLock::new(wal_header)) + }; + let checksum = { + let checksum = header.read().unwrap(); + (checksum.checksum_1, checksum.checksum_2) + }; + let shared = WalFileShared { + wal_header: header, + min_frame: 0, + max_frame: 0, + nbackfills: 0, + frame_cache: HashMap::new(), + last_checksum: checksum, + file, + }; + Ok(Arc::new(RwLock::new(shared))) + } +} From c816186326c4cfe44f90d9f6a72c0aeae9fa9dea Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 22 Nov 2024 12:53:04 +0100 Subject: [PATCH 3/7] fmt --- core/storage/wal.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 68264ca35..9566c16cf 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -8,8 +8,8 @@ use crate::io::{File, SyncCompletion, IO}; use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; +use crate::Completion; use crate::{storage::pager::Page, Result}; -use crate::{Completion, OpenFlags}; use self::sqlite3_ondisk::{checksum_wal, WAL_MAGIC_BE, WAL_MAGIC_LE}; From 97dd95abea1f5c6b21bb909d53588f38c961895b Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 22 Nov 2024 15:34:51 +0100 Subject: [PATCH 4/7] core: change Rc> to Arc This includes an inner struct in Page wrapped with Unsafe cell to access it. This is done intentionally because concurrency control of pages is handled by pages and not by the page itself. --- core/lib.rs | 5 +- core/storage/btree.rs | 198 ++++++++++++++------------------- core/storage/pager.rs | 123 ++++++++++---------- core/storage/sqlite3_ondisk.rs | 37 +++--- core/storage/wal.rs | 26 ++--- sqlite3/src/lib.rs | 2 +- 6 files changed, 177 insertions(+), 214 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index d5619cac4..486e56ea6 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -28,7 +28,7 @@ use storage::btree::btree_init_page; #[cfg(feature = "fs")] use storage::database::FileStorage; use storage::pager::{allocate_page, DumbLruPageCache}; -use storage::sqlite3_ondisk::{DatabaseHeader, WalHeader, DATABASE_HEADER_SIZE}; +use storage::sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE}; pub use storage::wal::WalFile; pub use storage::wal::WalFileShared; use util::parse_schema_rows; @@ -167,8 +167,7 @@ pub fn maybe_init_database_file(file: &Rc, io: &Arc) -> Result DATABASE_HEADER_SIZE, ); - let mut page = page1.borrow_mut(); - let contents = page.contents.as_mut().unwrap(); + let contents = page1.get().contents.as_mut().unwrap(); contents.write_database_header(&db_header); // write the first page to disk synchronously let flag_complete = Rc::new(RefCell::new(false)); diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 58eaa04d2..5ef2d8d6c 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -1,6 +1,6 @@ use log::debug; -use crate::storage::pager::{Page, Pager}; +use crate::storage::pager::Pager; use crate::storage::sqlite3_ondisk::{ read_btree_cell, read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType, TableInteriorCell, TableLeafCell, @@ -12,6 +12,7 @@ use std::cell::{Ref, RefCell}; use std::pin::Pin; use std::rc::Rc; +use super::pager::PageRef; use super::sqlite3_ondisk::{ write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, DATABASE_HEADER_SIZE, }; @@ -67,7 +68,7 @@ enum WriteState { struct WriteInfo { state: WriteState, - new_pages: RefCell>>>, + new_pages: RefCell>, scratch_cells: RefCell>, rightmost_pointer: RefCell>, page_copy: RefCell>, // this holds the copy a of a page needed for buffer references @@ -101,7 +102,7 @@ struct PageStack { /// Pointer to the currenet page being consumed current_page: RefCell, /// List of pages in the stack. Root page will be in index 0 - stack: RefCell<[Option>>; BTCURSOR_MAX_DEPTH + 1]>, + stack: RefCell<[Option; BTCURSOR_MAX_DEPTH + 1]>, /// List of cell indices in the stack. /// cell_indices[current_page] is the current cell index being consumed. Similarly /// cell_indices[current_page-1] is the cell index of the parent of the current page @@ -143,16 +144,15 @@ impl BTreeCursor { fn is_empty_table(&mut self) -> Result> { let page = self.pager.read_page(self.root_page)?; - let page = RefCell::borrow(&page); return_if_locked!(page); - let cell_count = page.contents.as_ref().unwrap().cell_count(); + let cell_count = page.get().contents.as_ref().unwrap().cell_count(); Ok(CursorResult::Ok(cell_count == 0)) } fn get_prev_record(&mut self) -> Result, Option)>> { loop { - let mem_page_rc = self.stack.top(); + let page = self.stack.top(); let cell_idx = self.stack.current_index(); // moved to current page begin @@ -177,18 +177,15 @@ impl BTreeCursor { let cell_idx = cell_idx as usize; debug!( "get_prev_record current id={} cell={}", - mem_page_rc.borrow().id, + page.get().id, cell_idx ); - if mem_page_rc.borrow().is_locked() { + return_if_locked!(page); + if !page.is_loaded() { + self.pager.load_page(page.clone())?; return Ok(CursorResult::IO); } - if !mem_page_rc.borrow().is_loaded() { - self.pager.load_page(mem_page_rc.clone())?; - return Ok(CursorResult::IO); - } - let mem_page = mem_page_rc.borrow(); - let contents = mem_page.contents.as_ref().unwrap(); + let contents = page.get().contents.as_ref().unwrap(); let cell_count = contents.cell_count(); let cell_idx = if cell_idx >= cell_count { @@ -239,13 +236,13 @@ impl BTreeCursor { let mem_page_rc = self.stack.top(); let cell_idx = self.stack.current_index() as usize; - debug!("current id={} cell={}", mem_page_rc.borrow().id, cell_idx); - return_if_locked!(mem_page_rc.borrow()); - if !mem_page_rc.borrow().is_loaded() { + debug!("current id={} cell={}", mem_page_rc.get().id, cell_idx); + return_if_locked!(mem_page_rc); + if !mem_page_rc.is_loaded() { self.pager.load_page(mem_page_rc.clone())?; return Ok(CursorResult::IO); } - let mem_page = mem_page_rc.borrow(); + let mem_page = mem_page_rc.get(); let contents = mem_page.contents.as_ref().unwrap(); @@ -397,11 +394,10 @@ impl BTreeCursor { return_if_io!(self.move_to(key.clone(), op.clone())); { - let page_rc = self.stack.top(); - let page = page_rc.borrow(); + let page = self.stack.top(); return_if_locked!(page); - let contents = page.contents.as_ref().unwrap(); + let contents = page.get().contents.as_ref().unwrap(); for cell_idx in 0..contents.cell_count() { let cell = contents.cell_get( @@ -491,11 +487,10 @@ impl BTreeCursor { loop { let mem_page = self.stack.top(); - let page_idx = mem_page.borrow().id; + let page_idx = mem_page.get().id; let page = self.pager.read_page(page_idx)?; - let page = RefCell::borrow(&page); return_if_locked!(page); - let contents = page.contents.as_ref().unwrap(); + let contents = page.get().contents.as_ref().unwrap(); if contents.is_leaf() { if contents.cell_count() > 0 { self.stack.set_cell_index(contents.cell_count() as i32 - 1); @@ -545,11 +540,10 @@ impl BTreeCursor { self.move_to_root(); loop { - let page_rc = self.stack.top(); - let page = RefCell::borrow(&page_rc); + let page = self.stack.top(); return_if_locked!(page); - let contents = page.contents.as_ref().unwrap(); + let contents = page.get().contents.as_ref().unwrap(); if contents.is_leaf() { return Ok(CursorResult::Ok(())); } @@ -649,7 +643,7 @@ impl BTreeCursor { let state = &self.write_info.state; match state { WriteState::Start => { - let page_ref = self.stack.top(); + let page = self.stack.top(); let int_key = match key { OwnedValue::Integer(i) => *i as u64, _ => unreachable!("btree tables are indexed by integers!"), @@ -657,13 +651,12 @@ impl BTreeCursor { // get page and find cell let (cell_idx, page_type) = { - let mut page = page_ref.borrow_mut(); return_if_locked!(page); page.set_dirty(); - self.pager.add_dirty(page.id); + self.pager.add_dirty(page.get().id); - let page = page.contents.as_mut().unwrap(); + let page = page.get().contents.as_mut().unwrap(); assert!(matches!(page.page_type(), PageType::TableLeaf)); // find cell @@ -679,8 +672,7 @@ impl BTreeCursor { // insert let overflow = { - let mut page = page_ref.borrow_mut(); - let contents = page.contents.as_mut().unwrap(); + let contents = page.get().contents.as_mut().unwrap(); log::debug!( "insert_into_page(overflow, cell_count={})", contents.cell_count() @@ -831,11 +823,10 @@ impl BTreeCursor { // can be a "rightmost pointer" or a "cell". // we always asumme there is a parent let current_page = self.stack.top(); - let mut page_rc = current_page.borrow_mut(); { // check if we don't need to balance // don't continue if there are no overflow cells - let page = page_rc.contents.as_mut().unwrap(); + let page = current_page.get().contents.as_mut().unwrap(); if page.overflow_cells.is_empty() { self.write_info.state = WriteState::Finish; return Ok(CursorResult::Ok(())); @@ -843,17 +834,15 @@ impl BTreeCursor { } if !self.stack.has_parent() { - drop(page_rc); - drop(current_page); self.balance_root(); return Ok(CursorResult::Ok(())); } - debug!("Balancing leaf. leaf={}", page_rc.id); + debug!("Balancing leaf. leaf={}", current_page.get().id); // Copy of page used to reference cell bytes. // This needs to be saved somewhere safe so taht references still point to here, // this will be store in write_info below - let page_copy = page_rc.contents.as_ref().unwrap().clone(); + let page_copy = current_page.get().contents.as_ref().unwrap().clone(); // In memory in order copy of all cells in pages we want to balance. For now let's do a 2 page split. // Right pointer in interior cells should be converted to regular cells if more than 2 pages are used for balancing. @@ -880,7 +869,7 @@ impl BTreeCursor { // allocate new pages and move cells to those new pages // split procedure - let page = page_rc.contents.as_mut().unwrap(); + let page = current_page.get().contents.as_mut().unwrap(); assert!( matches!( page.page_type(), @@ -889,9 +878,8 @@ impl BTreeCursor { "indexes still not supported " ); - let right_page_ref = self.allocate_page(page.page_type(), 0); - let right_page = right_page_ref.borrow_mut(); - let right_page_id = right_page.id; + let right_page = self.allocate_page(page.page_type(), 0); + let right_page_id = right_page.get().id; self.write_info.new_pages.borrow_mut().clear(); self.write_info @@ -901,41 +889,43 @@ impl BTreeCursor { self.write_info .new_pages .borrow_mut() - .push(right_page_ref.clone()); + .push(right_page.clone()); - debug!("splitting left={} right={}", page_rc.id, right_page_id); + debug!( + "splitting left={} right={}", + current_page.get().id, + right_page_id + ); self.write_info.state = WriteState::BalanceGetParentPage; Ok(CursorResult::Ok(())) } WriteState::BalanceGetParentPage => { - let parent_rc = self.stack.parent(); - let loaded = parent_rc.borrow().is_loaded(); - return_if_locked!(parent_rc.borrow()); + let parent = self.stack.parent(); + let loaded = parent.is_loaded(); + return_if_locked!(parent); if !loaded { debug!("balance_leaf(loading page)"); - self.pager.load_page(parent_rc.clone())?; + self.pager.load_page(parent.clone())?; return Ok(CursorResult::IO); } - parent_rc.borrow_mut().set_dirty(); + parent.set_dirty(); self.write_info.state = WriteState::BalanceMoveUp; Ok(CursorResult::Ok(())) } WriteState::BalanceMoveUp => { - let parent_ref = self.stack.parent(); - let mut parent = parent_ref.borrow_mut(); + let parent = self.stack.parent(); let (page_type, current_idx) = { let current_page = self.stack.top(); - let page_ref = current_page.borrow(); - let contents = page_ref.contents.as_ref().unwrap(); - (contents.page_type().clone(), page_ref.id) + let contents = current_page.get().contents.as_ref().unwrap(); + (contents.page_type().clone(), current_page.get().id) }; parent.set_dirty(); - self.pager.add_dirty(parent.id); - let parent_contents = parent.contents.as_mut().unwrap(); + self.pager.add_dirty(parent.get().id); + let parent_contents = parent.get().contents.as_mut().unwrap(); // if this isn't empty next loop won't work assert_eq!(parent_contents.overflow_cells.len(), 0); @@ -974,9 +964,8 @@ impl BTreeCursor { // reset pages for page in new_pages.iter() { - let mut page = page.borrow_mut(); assert!(page.is_dirty()); - let contents = page.contents.as_mut().unwrap(); + let contents = page.get().contents.as_mut().unwrap(); contents.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, 0); contents.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0); @@ -1005,9 +994,8 @@ impl BTreeCursor { ); for (i, page) in new_pages.iter_mut().enumerate() { - let mut page = page.borrow_mut(); - let page_id = page.id; - let contents = page.contents.as_mut().unwrap(); + let page_id = page.get().id; + let contents = page.get().contents.as_mut().unwrap(); let last_page = i == new_pages_len - 1; let cells_to_copy = if last_page { @@ -1033,16 +1021,14 @@ impl BTreeCursor { } let is_leaf = { let page = self.stack.top(); - let page = page.borrow(); - let page = page.contents.as_ref().unwrap(); + let page = page.get().contents.as_ref().unwrap(); page.is_leaf() }; // update rightmost pointer for each page if we are in interior page if !is_leaf { for page in new_pages.iter_mut().take(new_pages_len - 1) { - let mut page = page.borrow_mut(); - let contents = page.contents.as_mut().unwrap(); + let contents = page.get().contents.as_mut().unwrap(); assert!(contents.cell_count() == 1); let last_cell = contents @@ -1063,8 +1049,7 @@ impl BTreeCursor { } // last page right most pointer points to previous right most pointer before splitting let last_page = new_pages.last().unwrap(); - let mut last_page = last_page.borrow_mut(); - let last_page_contents = last_page.contents.as_mut().unwrap(); + let last_page_contents = last_page.get().contents.as_mut().unwrap(); last_page_contents.write_u32( BTREE_HEADER_OFFSET_RIGHTMOST, self.write_info.rightmost_pointer.borrow().unwrap(), @@ -1076,8 +1061,7 @@ impl BTreeCursor { for (page_id_index, page) in new_pages.iter_mut().take(new_pages_len - 1).enumerate() { - let mut page = page.borrow_mut(); - let contents = page.contents.as_mut().unwrap(); + let contents = page.get().contents.as_mut().unwrap(); let divider_cell_index = divider_cells_index[page_id_index]; let cell_payload = scratch_cells[divider_cell_index]; let cell = read_btree_cell( @@ -1098,7 +1082,7 @@ impl BTreeCursor { _ => unreachable!(), }; let mut divider_cell = Vec::new(); - divider_cell.extend_from_slice(&(page.id as u32).to_be_bytes()); + divider_cell.extend_from_slice(&(page.get().id as u32).to_be_bytes()); divider_cell.extend(std::iter::repeat(0).take(9)); let n = write_varint(&mut divider_cell.as_mut_slice()[4..], key); divider_cell.truncate(4 + n); @@ -1122,7 +1106,7 @@ impl BTreeCursor { { // copy last page id to right pointer - let last_pointer = new_pages.last().unwrap().borrow().id as u32; + let last_pointer = new_pages.last().unwrap().get().id as u32; parent_contents.write_u32(right_pointer, last_pointer); } self.stack.pop(); @@ -1141,20 +1125,17 @@ impl BTreeCursor { let is_page_1 = { let current_root = self.stack.top(); - let current_root_ref = current_root.borrow(); - current_root_ref.id == 1 + current_root.get().id == 1 }; let offset = if is_page_1 { DATABASE_HEADER_SIZE } else { 0 }; - let new_root_page_ref = self.allocate_page(PageType::TableInterior, offset); + let new_root_page = self.allocate_page(PageType::TableInterior, offset); { let current_root = self.stack.top(); - let current_root_ref = current_root.borrow(); - let current_root_contents = current_root_ref.contents.as_ref().unwrap(); + let current_root_contents = current_root.get().contents.as_ref().unwrap(); - let mut new_root_page = new_root_page_ref.borrow_mut(); - let new_root_page_id = new_root_page.id; - let new_root_page_contents = new_root_page.contents.as_mut().unwrap(); + let new_root_page_id = new_root_page.get().id; + let new_root_page_contents = new_root_page.get().contents.as_mut().unwrap(); if is_page_1 { // Copy header let current_root_buf = current_root_contents.as_ptr(); @@ -1166,8 +1147,6 @@ impl BTreeCursor { new_root_page_contents .write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, new_root_page_id as u32); new_root_page_contents.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0); - // TODO:: this page should have offset - // copy header bytes to here } /* swap splitted page buffer with new root buffer so we don't have to update page idx */ @@ -1175,18 +1154,16 @@ impl BTreeCursor { let (root_id, child_id, child) = { let page_ref = self.stack.top(); let child = page_ref.clone(); - let mut child_rc = page_ref.borrow_mut(); - let mut new_root_page = new_root_page_ref.borrow_mut(); // Swap the entire Page structs - std::mem::swap(&mut child_rc.id, &mut new_root_page.id); + std::mem::swap(&mut child.get().id, &mut new_root_page.get().id); // TODO:: shift bytes by offset to left on child because now child has offset 100 // and header bytes // Also change the offset of page // if is_page_1 { // Remove header from child and set offset to 0 - let contents = child_rc.contents.as_mut().unwrap(); + let contents = child.get().contents.as_mut().unwrap(); let (cell_pointer_offset, _) = contents.cell_get_raw_pointer_region(); // change cell pointers for cell_idx in 0..contents.cell_count() { @@ -1200,13 +1177,13 @@ impl BTreeCursor { buf.copy_within(DATABASE_HEADER_SIZE.., 0); } - self.pager.add_dirty(new_root_page.id); - self.pager.add_dirty(child_rc.id); - (new_root_page.id, child_rc.id, child) + self.pager.add_dirty(new_root_page.get().id); + self.pager.add_dirty(child.get().id); + (new_root_page.get().id, child.get().id, child) }; debug!("Balancing root. root={}, rightmost={}", root_id, child_id); - let root = new_root_page_ref.clone(); + let root = new_root_page.clone(); self.root_page = root_id; self.stack.clear(); @@ -1218,22 +1195,19 @@ impl BTreeCursor { } } - fn allocate_page(&self, page_type: PageType, offset: usize) -> Rc> { + fn allocate_page(&self, page_type: PageType, offset: usize) -> PageRef { let page = self.pager.allocate_page().unwrap(); btree_init_page(&page, page_type, &self.database_header.borrow(), offset); page } - fn allocate_overflow_page(&self) -> Rc> { + fn allocate_overflow_page(&self) -> PageRef { let page = self.pager.allocate_page().unwrap(); - { - // setup overflow page - let mut contents = page.borrow_mut(); - let contents = contents.contents.as_mut().unwrap(); - let buf = contents.as_ptr(); - buf.fill(0); - } + // setup overflow page + let contents = page.get().contents.as_mut().unwrap(); + let buf = contents.as_ptr(); + buf.fill(0); page } @@ -1493,9 +1467,8 @@ impl BTreeCursor { let overflow_page = self.allocate_overflow_page(); overflow_pages.push(overflow_page.clone()); { - let mut page = overflow_page.borrow_mut(); - let id = page.id as u32; - let contents = page.contents.as_mut().unwrap(); + let id = overflow_page.get().id as u32; + let contents = overflow_page.get().contents.as_mut().unwrap(); // TODO: take into account offset here? let buf = contents.as_ptr(); @@ -1572,11 +1545,11 @@ impl BTreeCursor { } impl PageStack { - fn push(&self, page: Rc>) { + fn push(&self, page: PageRef) { debug!( "pagestack::push(current={}, new_page_id={})", self.current_page.borrow(), - page.borrow().id + page.get().id ); *self.current_page.borrow_mut() += 1; let current = *self.current_page.borrow(); @@ -1596,7 +1569,7 @@ impl PageStack { *self.current_page.borrow_mut() -= 1; } - fn top(&self) -> Rc> { + fn top(&self) -> PageRef { let current = *self.current_page.borrow(); let page = self.stack.borrow()[current as usize] .as_ref() @@ -1605,12 +1578,12 @@ impl PageStack { debug!( "pagestack::top(current={}, page_id={})", current, - page.borrow().id + page.get().id ); page } - fn parent(&self) -> Rc> { + fn parent(&self) -> PageRef { let current = *self.current_page.borrow(); self.stack.borrow()[current as usize - 1] .as_ref() @@ -1794,12 +1767,11 @@ impl Cursor for BTreeCursor { _ => unreachable!("btree tables are indexed by integers!"), }; return_if_io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ)); - let page_ref = self.stack.top(); - let page = page_ref.borrow(); + let page = self.stack.top(); // TODO(pere): request load return_if_locked!(page); - let contents = page.contents.as_ref().unwrap(); + let contents = page.get().contents.as_ref().unwrap(); // find cell let int_key = match key { @@ -1834,19 +1806,19 @@ impl Cursor for BTreeCursor { ), }; let page = self.allocate_page(page_type, 0); - let id = page.borrow().id; + let id = page.get().id; id as u32 } } pub fn btree_init_page( - page: &Rc>, + page: &PageRef, page_type: PageType, db_header: &DatabaseHeader, offset: usize, ) { // setup btree page - let mut contents = page.borrow_mut(); + let contents = page.get(); debug!("btree_init_page(id={}, offset={})", contents.id, offset); let contents = contents.contents.as_mut().unwrap(); contents.offset = offset; diff --git a/core/storage/pager.rs b/core/storage/pager.rs index a0e5a5969..abf6257b8 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -5,7 +5,7 @@ use crate::storage::wal::Wal; use crate::{Buffer, Result}; use log::{debug, trace}; use sieve_cache::SieveCache; -use std::cell::RefCell; +use std::cell::{RefCell, UnsafeCell}; use std::collections::{HashMap, HashSet}; use std::hash::Hash; use std::ptr::{drop_in_place, NonNull}; @@ -15,12 +15,20 @@ use std::sync::{Arc, RwLock}; use super::wal::CheckpointStatus; -pub struct Page { +pub struct PageInner { pub flags: AtomicUsize, pub contents: Option, pub id: usize, } +pub struct Page { + pub inner: UnsafeCell, +} + +// Concurrency control of pages will be handled by the pager, we won't wrap Page with RwLock +// because that is bad bad. +pub type PageRef = Arc; + /// Page is up-to-date. const PAGE_UPTODATE: usize = 0b001; /// Page is locked for I/O to prevent concurrent access. @@ -35,78 +43,84 @@ const PAGE_LOADED: usize = 0b10000; impl Page { pub fn new(id: usize) -> Page { Page { - flags: AtomicUsize::new(0), - contents: None, - id, + inner: UnsafeCell::new(PageInner { + flags: AtomicUsize::new(0), + contents: None, + id, + }), } } + pub fn get(&self) -> &mut PageInner { + unsafe { &mut *self.inner.get() } + } + pub fn is_uptodate(&self) -> bool { - self.flags.load(Ordering::SeqCst) & PAGE_UPTODATE != 0 + self.get().flags.load(Ordering::SeqCst) & PAGE_UPTODATE != 0 } pub fn set_uptodate(&self) { - self.flags.fetch_or(PAGE_UPTODATE, Ordering::SeqCst); + self.get().flags.fetch_or(PAGE_UPTODATE, Ordering::SeqCst); } pub fn clear_uptodate(&self) { - self.flags.fetch_and(!PAGE_UPTODATE, Ordering::SeqCst); + self.get().flags.fetch_and(!PAGE_UPTODATE, Ordering::SeqCst); } pub fn is_locked(&self) -> bool { - self.flags.load(Ordering::SeqCst) & PAGE_LOCKED != 0 + self.get().flags.load(Ordering::SeqCst) & PAGE_LOCKED != 0 } pub fn set_locked(&self) { - self.flags.fetch_or(PAGE_LOCKED, Ordering::SeqCst); + self.get().flags.fetch_or(PAGE_LOCKED, Ordering::SeqCst); } pub fn clear_locked(&self) { - self.flags.fetch_and(!PAGE_LOCKED, Ordering::SeqCst); + self.get().flags.fetch_and(!PAGE_LOCKED, Ordering::SeqCst); } pub fn is_error(&self) -> bool { - self.flags.load(Ordering::SeqCst) & PAGE_ERROR != 0 + self.get().flags.load(Ordering::SeqCst) & PAGE_ERROR != 0 } pub fn set_error(&self) { - self.flags.fetch_or(PAGE_ERROR, Ordering::SeqCst); + self.get().flags.fetch_or(PAGE_ERROR, Ordering::SeqCst); } pub fn clear_error(&self) { - self.flags.fetch_and(!PAGE_ERROR, Ordering::SeqCst); + self.get().flags.fetch_and(!PAGE_ERROR, Ordering::SeqCst); } pub fn is_dirty(&self) -> bool { - self.flags.load(Ordering::SeqCst) & PAGE_DIRTY != 0 + self.get().flags.load(Ordering::SeqCst) & PAGE_DIRTY != 0 } pub fn set_dirty(&self) { - self.flags.fetch_or(PAGE_DIRTY, Ordering::SeqCst); + self.get().flags.fetch_or(PAGE_DIRTY, Ordering::SeqCst); } pub fn clear_dirty(&self) { - self.flags.fetch_and(!PAGE_DIRTY, Ordering::SeqCst); + self.get().flags.fetch_and(!PAGE_DIRTY, Ordering::SeqCst); } pub fn is_loaded(&self) -> bool { - self.flags.load(Ordering::SeqCst) & PAGE_LOADED != 0 + self.get().flags.load(Ordering::SeqCst) & PAGE_LOADED != 0 } pub fn set_loaded(&self) { - self.flags.fetch_or(PAGE_LOADED, Ordering::SeqCst); + self.get().flags.fetch_or(PAGE_LOADED, Ordering::SeqCst); } pub fn clear_loaded(&self) { - log::debug!("clear loaded {}", self.id); - self.flags.fetch_and(!PAGE_LOADED, Ordering::SeqCst); + log::debug!("clear loaded {}", self.get().id); + self.get().flags.fetch_and(!PAGE_LOADED, Ordering::SeqCst); } } #[allow(dead_code)] struct PageCacheEntry { key: usize, - page: Rc>, + page: PageRef, prev: Option>, next: Option>, } @@ -138,7 +152,7 @@ impl DumbLruPageCache { self.map.borrow().contains_key(&key) } - pub fn insert(&mut self, key: usize, value: Rc>) { + pub fn insert(&mut self, key: usize, value: PageRef) { self._delete(key, false); debug!("cache_insert(key={})", key); let mut entry = Box::new(PageCacheEntry { @@ -181,7 +195,7 @@ impl DumbLruPageCache { ptr.copied() } - pub fn get(&mut self, key: &usize) -> Option>> { + pub fn get(&mut self, key: &usize) -> Option { debug!("cache_get(key={})", key); let ptr = self.get_ptr(*key); ptr?; @@ -202,10 +216,10 @@ impl DumbLruPageCache { if clean_page { // evict buffer - let mut page = entry.page.borrow_mut(); + let page = &entry.page; page.clear_loaded(); - debug!("cleaning up page {}", page.id); - let _ = page.contents.take(); + debug!("cleaning up page {}", page.get().id); + let _ = page.get().contents.take(); } let (next, prev) = unsafe { @@ -254,7 +268,7 @@ impl DumbLruPageCache { return; } let tail = unsafe { tail.unwrap().as_mut() }; - if RefCell::borrow(&tail.page).is_dirty() { + if tail.page.is_dirty() { // TODO: drop from another clean entry? return; } @@ -394,21 +408,21 @@ impl Pager { } /// Reads a page from the database. - pub fn read_page(&self, page_idx: usize) -> crate::Result>> { + pub fn read_page(&self, page_idx: usize) -> crate::Result { trace!("read_page(page_idx = {})", page_idx); let mut page_cache = self.page_cache.write().unwrap(); if let Some(page) = page_cache.get(&page_idx) { trace!("read_page(page_idx = {}) = cached", page_idx); return Ok(page.clone()); } - let page = Rc::new(RefCell::new(Page::new(page_idx))); - RefCell::borrow(&page).set_locked(); + let page = Arc::new(Page::new(page_idx)); + page.set_locked(); + if let Some(frame_id) = self.wal.borrow().find_frame(page_idx as u64)? { self.wal .borrow() .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; { - let page = page.borrow_mut(); page.set_uptodate(); } // TODO(pere) ensure page is inserted, we should probably first insert to page cache @@ -428,17 +442,16 @@ impl Pager { } /// Loads pages if not loaded - pub fn load_page(&self, page: Rc>) -> Result<()> { - let id = page.borrow().id; + pub fn load_page(&self, page: PageRef) -> Result<()> { + let id = page.get().id; trace!("load_page(page_idx = {})", id); let mut page_cache = self.page_cache.write().unwrap(); - page.borrow_mut().set_locked(); + page.set_locked(); if let Some(frame_id) = self.wal.borrow().find_frame(id as u64)? { self.wal .borrow() .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; { - let page = page.borrow_mut(); page.set_uptodate(); } // TODO(pere) ensure page is inserted @@ -486,7 +499,7 @@ impl Pager { for page_id in self.dirty_pages.borrow().iter() { let mut cache = self.page_cache.write().unwrap(); let page = cache.get(page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); - let page_type = page.borrow().contents.as_ref().unwrap().maybe_page_type(); + let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); debug!("appending frame {} {:?}", page_id, page_type); self.wal.borrow_mut().append_frame( page.clone(), @@ -598,7 +611,7 @@ impl Pager { Currently free list pages are not yet supported. */ #[allow(clippy::readonly_write_lock)] - pub fn allocate_page(&self) -> Result>> { + pub fn allocate_page(&self) -> Result { let header = &self.db_header; let mut header = RefCell::borrow_mut(header); header.database_size += 1; @@ -607,38 +620,35 @@ impl Pager { // read sync for now loop { let first_page_ref = self.read_page(1)?; - let first_page = RefCell::borrow_mut(&first_page_ref); - if first_page.is_locked() { - drop(first_page); + if first_page_ref.is_locked() { self.io.run_once()?; continue; } - first_page.set_dirty(); + first_page_ref.set_dirty(); self.add_dirty(1); - let contents = first_page.contents.as_ref().unwrap(); + let contents = first_page_ref.get().contents.as_ref().unwrap(); contents.write_database_header(&header); break; } } - let page_ref = allocate_page(header.database_size as usize, &self.buffer_pool, 0); + let page = allocate_page(header.database_size as usize, &self.buffer_pool, 0); { // setup page and add to cache - let page = page_ref.borrow_mut(); page.set_dirty(); - self.add_dirty(page.id); + self.add_dirty(page.get().id); let mut cache = self.page_cache.write().unwrap(); - cache.insert(page.id, page_ref.clone()); + cache.insert(page.get().id, page.clone()); } - Ok(page_ref) + Ok(page) } - pub fn put_loaded_page(&self, id: usize, page: Rc>) { + pub fn put_loaded_page(&self, id: usize, page: PageRef) { let mut cache = self.page_cache.write().unwrap(); // cache insert invalidates previous page cache.insert(id, page.clone()); - page.borrow_mut().set_loaded(); + page.set_loaded(); } pub fn usable_size(&self) -> usize { @@ -647,14 +657,9 @@ impl Pager { } } -pub fn allocate_page( - page_id: usize, - buffer_pool: &Rc, - offset: usize, -) -> Rc> { - let page_ref = Rc::new(RefCell::new(Page::new(page_id))); +pub fn allocate_page(page_id: usize, buffer_pool: &Rc, offset: usize) -> PageRef { + let page = Arc::new(Page::new(page_id)); { - let mut page = RefCell::borrow_mut(&page_ref); let buffer = buffer_pool.get(); let bp = buffer_pool.clone(); let drop_fn = Rc::new(move |buf| { @@ -662,11 +667,11 @@ pub fn allocate_page( }); let buffer = Rc::new(RefCell::new(Buffer::new(buffer, drop_fn))); page.set_loaded(); - page.contents = Some(PageContent { + page.get().contents = Some(PageContent { offset, buffer, overflow_cells: Vec::new(), }); } - page_ref + page } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index dd992615d..3d7b7ae11 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -45,7 +45,7 @@ use crate::error::LimboError; use crate::io::{Buffer, Completion, ReadCompletion, SyncCompletion, WriteCompletion}; use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; -use crate::storage::pager::{Page, Pager}; +use crate::storage::pager::Pager; use crate::types::{OwnedRecord, OwnedValue}; use crate::{File, Result}; use log::trace; @@ -54,6 +54,8 @@ use std::pin::Pin; use std::rc::Rc; use std::sync::{Arc, RwLock}; +use super::pager::PageRef; + /// The size of the database header in bytes. pub const DATABASE_HEADER_SIZE: usize = 100; // DEFAULT_CACHE_SIZE negative values mean that we store the amount of pages a XKiB of memory can hold. @@ -532,7 +534,7 @@ impl PageContent { pub fn begin_read_page( page_io: Rc, buffer_pool: Rc, - page: Rc>, + page: PageRef, page_idx: usize, ) -> Result<()> { trace!("begin_read_btree_page(page_idx = {})", page_idx); @@ -545,7 +547,7 @@ pub fn begin_read_page( let complete = Box::new(move |buf: Rc>| { let page = page.clone(); if finish_read_page(page_idx, buf, page.clone()).is_err() { - page.borrow_mut().set_error(); + page.set_error(); } }); let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete))); @@ -553,11 +555,7 @@ pub fn begin_read_page( Ok(()) } -fn finish_read_page( - page_idx: usize, - buffer_ref: Rc>, - page: Rc>, -) -> Result<()> { +fn finish_read_page(page_idx: usize, buffer_ref: Rc>, page: PageRef) -> Result<()> { trace!("finish_read_btree_page(page_idx = {})", page_idx); let pos = if page_idx == 1 { DATABASE_HEADER_SIZE @@ -570,8 +568,7 @@ fn finish_read_page( overflow_cells: Vec::new(), }; { - let mut page = page.borrow_mut(); - page.contents.replace(inner); + page.get().contents.replace(inner); page.set_uptodate(); page.clear_locked(); page.set_loaded(); @@ -581,16 +578,16 @@ fn finish_read_page( pub fn begin_write_btree_page( pager: &Pager, - page: &Rc>, + page: &PageRef, write_counter: Rc>, ) -> Result<()> { let page_source = &pager.page_io; let page_finish = page.clone(); - let page_id = page.borrow().id; + let page_id = page.get().id; log::trace!("begin_write_btree_page(page_id={})", page_id); let buffer = { - let page = page.borrow(); + let page = page.get(); let contents = page.contents.as_ref().unwrap(); contents.buffer.clone() }; @@ -604,7 +601,7 @@ pub fn begin_write_btree_page( let buf_len = buf_copy.borrow().len(); *write_counter.borrow_mut() -= 1; - page_finish.borrow_mut().clear_dirty(); + page_finish.clear_dirty(); if bytes_written < buf_len as i32 { log::error!("wrote({bytes_written}) less than expected({buf_len})"); } @@ -771,7 +768,7 @@ fn read_payload(unread: &[u8], payload_size: usize, pager: Rc) -> (Vec, offset: usize, buffer_pool: Rc, - page: Rc>, + page: PageRef, ) -> Result<()> { let buf = buffer_pool.get(); let drop_fn = Rc::new(move |buf| { @@ -1061,14 +1058,14 @@ pub fn begin_read_wal_frame( pub fn begin_write_wal_frame( io: &Rc, offset: usize, - page: &Rc>, + page: &PageRef, db_size: u32, write_counter: Rc>, wal_header: &WalHeader, checksums: (u32, u32), ) -> Result<(u32, u32)> { let page_finish = page.clone(); - let page_id = page.borrow().id; + let page_id = page.get().id; trace!("begin_write_wal_frame(offset={}, page={})", offset, page_id); let mut header = WalFrameHeader { @@ -1080,7 +1077,7 @@ pub fn begin_write_wal_frame( checksum_2: 0, }; let (buffer, checksums) = { - let page = page.borrow(); + let page = page.get(); let contents = page.contents.as_ref().unwrap(); let drop_fn = Rc::new(|_buf| {}); @@ -1123,7 +1120,7 @@ pub fn begin_write_wal_frame( let buf_len = buf_copy.borrow().len(); *write_counter.borrow_mut() -= 1; - page_finish.borrow_mut().clear_dirty(); + page_finish.clear_dirty(); if bytes_written < buf_len as i32 { log::error!("wrote({bytes_written}) less than expected({buf_len})"); } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 9566c16cf..3ae854879 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -9,12 +9,12 @@ use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; use crate::Completion; -use crate::{storage::pager::Page, Result}; +use crate::Result; use self::sqlite3_ondisk::{checksum_wal, WAL_MAGIC_BE, WAL_MAGIC_LE}; use super::buffer_pool::BufferPool; -use super::pager::Pager; +use super::pager::{PageRef, Pager}; use super::sqlite3_ondisk::{self, begin_write_btree_page, WalHeader}; /// Write-ahead log (WAL). @@ -35,17 +35,12 @@ pub trait Wal { fn find_frame(&self, page_id: u64) -> Result>; /// Read a frame from the WAL. - fn read_frame( - &self, - frame_id: u64, - page: Rc>, - buffer_pool: Rc, - ) -> Result<()>; + fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc) -> Result<()>; /// Write a frame to the WAL. fn append_frame( &mut self, - page: Rc>, + page: PageRef, db_size: u32, pager: &Pager, write_counter: Rc>, @@ -117,12 +112,7 @@ impl Wal for WalFile { } /// Read a frame from the WAL. - fn read_frame( - &self, - frame_id: u64, - page: Rc>, - buffer_pool: Rc, - ) -> Result<()> { + fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc) -> Result<()> { debug!("read_frame({})", frame_id); let offset = self.frame_offset(frame_id); let shared = self.shared.read().unwrap(); @@ -138,12 +128,12 @@ impl Wal for WalFile { /// Write a frame to the WAL. fn append_frame( &mut self, - page: Rc>, + page: PageRef, db_size: u32, _pager: &Pager, write_counter: Rc>, ) -> Result<()> { - let page_id = page.borrow().id; + let page_id = page.get().id; let mut shared = self.shared.write().unwrap(); let frame_id = shared.max_frame; let offset = self.frame_offset(frame_id); @@ -210,7 +200,7 @@ impl Wal for WalFile { } let page = pager.read_page(page_id)?; - if page.borrow().is_locked() { + if page.is_locked() { return Ok(CheckpointStatus::IO); } diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index 6a86a04fe..3a2394099 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -885,7 +885,7 @@ fn sqlite3_errstr_impl(rc: i32) -> *const std::ffi::c_char { "datatype mismatch", // SQLITE_MISMATCH "bad parameter or other API misuse", // SQLITE_MISUSE #[cfg(feature = "lfs")] - "", // SQLITE_NOLFS + "", // SQLITE_NOLFS #[cfg(not(feature = "lfs"))] "large file support is disabled", // SQLITE_NOLFS "authorization denied", // SQLITE_AUTH From b43e8e46f67bd697b71f38eb50a59a3273d77edc Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 22 Nov 2024 16:03:31 +0100 Subject: [PATCH 5/7] impl sync/send for cache --- core/storage/pager.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index abf6257b8..43ed17e97 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -137,6 +137,8 @@ pub struct DumbLruPageCache { head: RefCell>>, tail: RefCell>>, } +unsafe impl Send for DumbLruPageCache {} +unsafe impl Sync for DumbLruPageCache {} impl DumbLruPageCache { pub fn new(capacity: usize) -> Self { @@ -675,3 +677,28 @@ pub fn allocate_page(page_id: usize, buffer_pool: &Rc, offset: usize } page } + +#[cfg(test)] +mod tests { + use std::sync::{Arc, RwLock}; + + use super::{DumbLruPageCache, Page}; + + #[test] + fn test_shared_cache() { + // ensure cache can be shared between threads + let cache = Arc::new(RwLock::new(DumbLruPageCache::new(10))); + + let thread = { + let cache = cache.clone(); + std::thread::spawn(move || { + let mut cache = cache.write().unwrap(); + cache.insert(1, Arc::new(Page::new(1))); + }) + }; + let _ = thread.join(); + let mut cache = cache.write().unwrap(); + let page = cache.get(&1); + assert_eq!(page.unwrap().get().id, 1); + } +} From 1a663a6ed7a043808ecc79fbc2d9cc02f7a45d54 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 25 Nov 2024 10:11:08 +0100 Subject: [PATCH 6/7] cargo stuff rm --- Cargo.lock | 1 + Cargo.toml | 4 ++++ core/Cargo.toml | 1 + 3 files changed, 6 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index ced65e88c..d0ea15bc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1151,6 +1151,7 @@ dependencies = [ name = "limbo_core" version = "0.0.9" dependencies = [ + "bumpalo", "cfg_block", "chrono", "criterion", diff --git a/Cargo.toml b/Cargo.toml index a8beb431a..a9dccf37a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,10 @@ codegen-units = 1 panic = "abort" lto = true +[profile.bench-profile] +inherits = "release" +debug = true + [profile.dist] inherits = "release" lto = "thin" diff --git a/core/Cargo.toml b/core/Cargo.toml index 52acb9e49..9c9ed5521 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -52,6 +52,7 @@ serde = { version = "1.0", features = ["derive"] } pest = { version = "2.0", optional = true } pest_derive = { version = "2.0", optional = true } rand = "0.8.5" +bumpalo = { version = "3.16.0", features = ["collections", "boxed"] } [target.'cfg(not(target_family = "windows"))'.dev-dependencies] pprof = { version = "0.14.0", features = ["criterion", "flamegraph"] } From 3e59da439cbd165a085ad87a48f8ef79bff23e3c Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 13 Dec 2024 13:10:33 +0100 Subject: [PATCH 7/7] fmt --- sqlite3/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index 3a2394099..6a86a04fe 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -885,7 +885,7 @@ fn sqlite3_errstr_impl(rc: i32) -> *const std::ffi::c_char { "datatype mismatch", // SQLITE_MISMATCH "bad parameter or other API misuse", // SQLITE_MISUSE #[cfg(feature = "lfs")] - "", // SQLITE_NOLFS + "", // SQLITE_NOLFS #[cfg(not(feature = "lfs"))] "large file support is disabled", // SQLITE_NOLFS "authorization denied", // SQLITE_AUTH