From 2a49fe9bd2a0e62c95ee56023df7b814f6437392 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 1 Apr 2025 17:52:18 +0200 Subject: [PATCH 1/3] Remove RWLock from Shared wal state WalShared state can be shared without having to wrap everything with a lock, and instead use atomics on some places and rwlock on others -- for now. ## Results: From: ---- Execute `SELECT 1`/limbo_execute_select_1 time: [34.125 ns 34.218 ns 34.324 ns] Execute `SELECT 1`/sqlite_execute_select_1 time: [28.124 ns 28.254 ns 28.385 ns] To: ---- Gnuplot not found, using plotters backend Execute `SELECT 1`/limbo_execute_select_1 time: [31.919 ns 32.113 ns 32.327 ns] Execute `SELECT 1`/sqlite_execute_select_1 time: [29.662 ns 29.900 ns 30.139 ns] --- core/lib.rs | 6 +- core/storage/sqlite3_ondisk.rs | 12 ++- core/storage/wal.rs | 164 +++++++++++++++++---------------- 3 files changed, 93 insertions(+), 89 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index f1a1b37a0..e827c3d0d 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -38,7 +38,7 @@ use parking_lot::RwLock; use schema::{Column, Schema}; use std::{ borrow::Cow, - cell::{Cell, RefCell}, + cell::{Cell, RefCell, UnsafeCell}, collections::HashMap, io::Write, num::NonZero, @@ -92,7 +92,7 @@ pub struct Database { // Shared structures of a Database are the parts that are common to multiple threads that might // create DB connections. shared_page_cache: Arc>, - shared_wal: Arc>, + shared_wal: Arc>, } unsafe impl Send for Database {} @@ -118,7 +118,7 @@ impl Database { pub fn open( io: Arc, db_file: Arc, - shared_wal: Arc>, + shared_wal: Arc>, enable_mvcc: bool, ) -> Result> { let db_header = Pager::begin_open(db_file.clone())?; diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 636b1acaf..c2790740e 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -49,7 +49,6 @@ use crate::storage::database::DatabaseStorage; use crate::storage::pager::Pager; use crate::types::{ImmutableRecord, RawSlice, RefValue, TextRef, TextSubtype}; use crate::{File, Result}; -use parking_lot::RwLock; use std::cell::RefCell; use std::mem::MaybeUninit; use std::pin::Pin; @@ -1329,11 +1328,11 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec) { payload.extend_from_slice(&varint[0..n]); } -pub fn begin_read_wal_header(io: &Arc) -> Result>> { +pub fn begin_read_wal_header(io: &Arc) -> Result>> { let drop_fn = Rc::new(|_buf| {}); #[allow(clippy::arc_with_non_send_sync)] let buf = Arc::new(RefCell::new(Buffer::allocate(512, drop_fn))); - let result = Arc::new(RwLock::new(WalHeader::default())); + let result = Arc::new(SpinLock::new(WalHeader::default())); let header = result.clone(); let complete = Box::new(move |buf: Arc>| { let header = header.clone(); @@ -1344,10 +1343,13 @@ pub fn begin_read_wal_header(io: &Arc) -> Result Ok(result) } -fn finish_read_wal_header(buf: Arc>, header: Arc>) -> Result<()> { +fn finish_read_wal_header( + buf: Arc>, + header: Arc>, +) -> Result<()> { let buf = buf.borrow(); let buf = buf.as_slice(); - let mut header = header.write(); + let mut header = header.lock(); header.magic = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]); header.file_format = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]); header.page_size = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index d2c8a41b8..363abbcf2 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1,11 +1,12 @@ +use std::cell::UnsafeCell; use std::collections::HashMap; use tracing::{debug, trace}; -use parking_lot::RwLock; use std::fmt::Formatter; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::{cell::RefCell, fmt, rc::Rc, sync::Arc}; +use crate::fast_lock::SpinLock; use crate::io::{File, SyncCompletion, IO}; use crate::result::LimboResult; use crate::storage::sqlite3_ondisk::{ @@ -247,7 +248,7 @@ pub struct WalFile { syncing: Rc>, page_size: usize, - shared: Arc>, + shared: Arc>, ongoing_checkpoint: OngoingCheckpoint, checkpoint_threshold: usize, // min and max frames for this connection @@ -282,19 +283,19 @@ impl fmt::Debug for WalFile { /// that needs to be communicated between threads so this struct does the job. #[allow(dead_code)] pub struct WalFileShared { - wal_header: Arc>, - min_frame: u64, - max_frame: u64, - nbackfills: u64, + wal_header: Arc>, + min_frame: AtomicU64, + max_frame: AtomicU64, + nbackfills: AtomicU64, // Frame cache maps a Page to all the frames it has stored in WAL in ascending order. // This is to easily find the frame it must checkpoint each connection if a checkpoint is // necessary. // One difference between SQLite and limbo is that we will never support multi process, meaning // we don't need WAL's index file. So we can do stuff like this without shared memory. // TODO: this will need refactoring because this is incredible memory inefficient. - frame_cache: HashMap>, + frame_cache: Arc>>>, // Another memory inefficient array made to just keep track of pages that are in frame_cache. - pages_in_frames: Vec, + pages_in_frames: Arc>>, last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL file: Arc, /// read_locks is a list of read locks that can coexist with the max_frame number stored in @@ -325,15 +326,13 @@ impl fmt::Debug for WalFileShared { impl Wal for WalFile { /// Begin a read transaction. fn begin_read_tx(&mut self) -> Result { - let mut shared = self.shared.write(); - let max_frame_in_wal = shared.max_frame; - self.min_frame = shared.nbackfills + 1; + let max_frame_in_wal = self.get_shared().max_frame.load(Ordering::SeqCst); let mut max_read_mark = 0; let mut max_read_mark_index = -1; // Find the largest mark we can find, ignore frames that are impossible to be in range and // that are not set - for (index, lock) in shared.read_locks.iter().enumerate() { + for (index, lock) in self.get_shared().read_locks.iter().enumerate() { let this_mark = lock.value.load(Ordering::SeqCst); if this_mark > max_read_mark && this_mark <= max_frame_in_wal as u32 { max_read_mark = this_mark; @@ -343,7 +342,7 @@ impl Wal for WalFile { // If we didn't find any mark or we can update, let's update them if (max_read_mark as u64) < max_frame_in_wal || max_read_mark_index == -1 { - for (index, lock) in shared.read_locks.iter_mut().enumerate() { + for (index, lock) in self.get_shared().read_locks.iter_mut().enumerate() { let busy = !lock.write(); if !busy { // If this was busy then it must mean >1 threads tried to set this read lock @@ -360,14 +359,17 @@ impl Wal for WalFile { return Ok(LimboResult::Busy); } - let lock = &mut shared.read_locks[max_read_mark_index as usize]; - let busy = !lock.read(); - if busy { - return Ok(LimboResult::Busy); + let shared = self.get_shared(); + { + let lock = &mut shared.read_locks[max_read_mark_index as usize]; + let busy = !lock.read(); + if busy { + return Ok(LimboResult::Busy); + } } + self.min_frame = shared.nbackfills.load(Ordering::SeqCst) + 1; self.max_frame_read_lock_index = max_read_mark_index as usize; self.max_frame = max_read_mark as u64; - self.min_frame = shared.nbackfills + 1; tracing::debug!( "begin_read_tx(min_frame={}, max_frame={}, lock={}, max_frame_in_wal={})", self.min_frame, @@ -381,16 +383,14 @@ impl Wal for WalFile { /// End a read transaction. fn end_read_tx(&self) -> Result { tracing::debug!("end_read_tx"); - let mut shared = self.shared.write(); - let read_lock = &mut shared.read_locks[self.max_frame_read_lock_index]; + let read_lock = &mut self.get_shared().read_locks[self.max_frame_read_lock_index]; read_lock.unlock(); Ok(LimboResult::Ok) } /// Begin a write transaction fn begin_write_tx(&mut self) -> Result { - let mut shared = self.shared.write(); - let busy = !shared.write_lock.write(); + let busy = !self.get_shared().write_lock.write(); tracing::debug!("begin_write_transaction(busy={})", busy); if busy { return Ok(LimboResult::Busy); @@ -401,15 +401,15 @@ impl Wal for WalFile { /// End a write transaction fn end_write_tx(&self) -> Result { tracing::debug!("end_write_txn"); - let mut shared = self.shared.write(); - shared.write_lock.unlock(); + self.get_shared().write_lock.unlock(); Ok(LimboResult::Ok) } /// Find the latest frame containing a page. fn find_frame(&self, page_id: u64) -> Result> { - let shared = self.shared.read(); - let frames = shared.frame_cache.get(&page_id); + let shared = self.get_shared(); + let frames = shared.frame_cache.lock(); + let frames = frames.get(&page_id); if frames.is_none() { return Ok(None); } @@ -426,10 +426,9 @@ impl Wal for WalFile { fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc) -> Result<()> { debug!("read_frame({})", frame_id); let offset = self.frame_offset(frame_id); - let shared = self.shared.read(); page.set_locked(); begin_read_wal_frame( - &shared.file, + &self.get_shared().file, offset + WAL_FRAME_HEADER_SIZE, buffer_pool, page, @@ -445,12 +444,9 @@ impl Wal for WalFile { write_counter: Rc>, ) -> Result<()> { let page_id = page.get().id; - let mut shared = self.shared.write(); - let frame_id = if shared.max_frame == 0 { - 1 - } else { - shared.max_frame + 1 - }; + let shared = self.get_shared(); + let max_frame = shared.max_frame.load(Ordering::SeqCst); + let frame_id = if max_frame == 0 { 1 } else { max_frame + 1 }; let offset = self.frame_offset(frame_id); tracing::debug!( "append_frame(frame={}, offset={}, page_id={})", @@ -459,7 +455,7 @@ impl Wal for WalFile { page_id ); let header = shared.wal_header.clone(); - let header = header.read(); + let header = header.lock(); let checksums = shared.last_checksum; let checksums = begin_write_wal_frame( &shared.file, @@ -471,14 +467,15 @@ impl Wal for WalFile { checksums, )?; shared.last_checksum = checksums; - shared.max_frame = frame_id; + shared.max_frame.store(frame_id, Ordering::SeqCst); { - let frames = shared.frame_cache.get_mut(&(page_id as u64)); + let mut frame_cache = shared.frame_cache.lock(); + let frames = frame_cache.get_mut(&(page_id as u64)); match frames { Some(frames) => frames.push(frame_id), None => { - shared.frame_cache.insert(page_id as u64, vec![frame_id]); - shared.pages_in_frames.push(page_id as u64); + frame_cache.insert(page_id as u64, vec![frame_id]); + shared.pages_in_frames.lock().push(page_id as u64); } } } @@ -486,8 +483,8 @@ impl Wal for WalFile { } fn should_checkpoint(&self) -> bool { - let shared = self.shared.read(); - let frame_id = shared.max_frame as usize; + let shared = self.get_shared(); + let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize; frame_id >= self.checkpoint_threshold } @@ -508,8 +505,8 @@ impl Wal for WalFile { CheckpointState::Start => { // TODO(pere): check what frames are safe to checkpoint between many readers! self.ongoing_checkpoint.min_frame = self.min_frame; - let mut shared = self.shared.write(); - let mut max_safe_frame = shared.max_frame; + let shared = self.get_shared(); + let mut max_safe_frame = shared.max_frame.load(Ordering::SeqCst); for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate() { let this_mark = read_lock.value.load(Ordering::SeqCst); if this_mark < max_safe_frame as u32 { @@ -537,27 +534,26 @@ impl Wal for WalFile { ); } CheckpointState::ReadFrame => { - let shared = self.shared.read(); - assert!( - self.ongoing_checkpoint.current_page as usize - <= shared.pages_in_frames.len() - ); - if self.ongoing_checkpoint.current_page as usize == shared.pages_in_frames.len() - { + let shared = self.get_shared(); + let min_frame = self.ongoing_checkpoint.min_frame; + let max_frame = self.ongoing_checkpoint.max_frame; + let pages_in_frames = shared.pages_in_frames.clone(); + let pages_in_frames = pages_in_frames.lock(); + + let frame_cache = shared.frame_cache.clone(); + let frame_cache = frame_cache.lock(); + assert!(self.ongoing_checkpoint.current_page as usize <= pages_in_frames.len()); + if self.ongoing_checkpoint.current_page as usize == pages_in_frames.len() { self.ongoing_checkpoint.state = CheckpointState::Done; continue 'checkpoint_loop; } - let page = - shared.pages_in_frames[self.ongoing_checkpoint.current_page as usize]; - let frames = shared - .frame_cache + let page = pages_in_frames[self.ongoing_checkpoint.current_page as usize]; + let frames = frame_cache .get(&page) .expect("page must be in frame cache if it's in list"); for frame in frames.iter().rev() { - if *frame >= self.ongoing_checkpoint.min_frame - && *frame <= self.ongoing_checkpoint.max_frame - { + if *frame >= min_frame && *frame <= max_frame { debug!( "checkpoint page(state={:?}, page={}, frame={})", state, page, *frame @@ -596,9 +592,9 @@ impl Wal for WalFile { if *write_counter.borrow() > 0 { return Ok(CheckpointStatus::IO); } - let shared = self.shared.read(); + let shared = self.get_shared(); if (self.ongoing_checkpoint.current_page as usize) - < shared.pages_in_frames.len() + < shared.pages_in_frames.lock().len() { self.ongoing_checkpoint.state = CheckpointState::ReadFrame; } else { @@ -609,26 +605,28 @@ impl Wal for WalFile { if *write_counter.borrow() > 0 { return Ok(CheckpointStatus::IO); } - let mut shared = self.shared.write(); + let shared = self.get_shared(); // Record two num pages fields to return as checkpoint result to caller. // Ref: pnLog, pnCkpt on https://www.sqlite.org/c3ref/wal_checkpoint_v2.html let checkpoint_result = CheckpointResult { - num_wal_frames: shared.max_frame, + num_wal_frames: shared.max_frame.load(Ordering::SeqCst), num_checkpointed_frames: self.ongoing_checkpoint.max_frame, }; - let everything_backfilled = - shared.max_frame == self.ongoing_checkpoint.max_frame; + let everything_backfilled = shared.max_frame.load(Ordering::SeqCst) + == self.ongoing_checkpoint.max_frame; if everything_backfilled { // Here we know that we backfilled everything, therefore we can safely // reset the wal. - shared.frame_cache.clear(); - shared.pages_in_frames.clear(); - shared.max_frame = 0; - shared.nbackfills = 0; + shared.frame_cache.lock().clear(); + shared.pages_in_frames.lock().clear(); + shared.max_frame.store(0, Ordering::SeqCst); + shared.nbackfills.store(0, Ordering::SeqCst); // TODO(pere): truncate wal file here. } else { - shared.nbackfills = self.ongoing_checkpoint.max_frame; + shared + .nbackfills + .store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst); } self.ongoing_checkpoint.state = CheckpointState::Start; return Ok(CheckpointStatus::Done(checkpoint_result)); @@ -641,7 +639,7 @@ impl Wal for WalFile { let state = *self.sync_state.borrow(); match state { SyncState::NotSyncing => { - let shared = self.shared.write(); + let shared = self.get_shared(); debug!("wal_sync"); { let syncing = self.syncing.clone(); @@ -673,7 +671,7 @@ impl Wal for WalFile { } fn get_max_frame_in_wal(&self) -> u64 { - self.shared.read().max_frame + self.get_shared().max_frame.load(Ordering::SeqCst) } fn get_max_frame(&self) -> u64 { @@ -689,7 +687,7 @@ impl WalFile { pub fn new( io: Arc, page_size: usize, - shared: Arc>, + shared: Arc>, buffer_pool: Rc, ) -> Self { let checkpoint_page = Arc::new(Page::new(0)); @@ -733,6 +731,10 @@ impl WalFile { let offset = WAL_HEADER_SIZE as u64 + page_offset; offset as usize } + + fn get_shared(&self) -> &mut WalFileShared { + unsafe { self.shared.get().as_mut().unwrap() } + } } impl WalFileShared { @@ -740,7 +742,7 @@ impl WalFileShared { io: &Arc, path: &str, page_size: u16, - ) -> Result>> { + ) -> Result>> { let file = io.open_file(path, crate::io::OpenFlags::Create, false)?; let header = if file.size()? > 0 { let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) { @@ -781,21 +783,21 @@ impl WalFileShared { wal_header.checksum_1 = checksums.0; wal_header.checksum_2 = checksums.1; sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?; - Arc::new(RwLock::new(wal_header)) + Arc::new(SpinLock::new(wal_header)) }; let checksum = { - let checksum = header.read(); + let checksum = header.lock(); (checksum.checksum_1, checksum.checksum_2) }; let shared = WalFileShared { wal_header: header, - min_frame: 0, - max_frame: 0, - nbackfills: 0, - frame_cache: HashMap::new(), + min_frame: AtomicU64::new(0), + max_frame: AtomicU64::new(0), + nbackfills: AtomicU64::new(0), + frame_cache: Arc::new(SpinLock::new(HashMap::new())), last_checksum: checksum, file, - pages_in_frames: Vec::new(), + pages_in_frames: Arc::new(SpinLock::new(Vec::new())), read_locks: [ LimboRwLock { lock: AtomicU32::new(NO_LOCK), @@ -829,6 +831,6 @@ impl WalFileShared { value: AtomicU32::new(READMARK_NOT_USED), }, }; - Ok(Arc::new(RwLock::new(shared))) + Ok(Arc::new(UnsafeCell::new(shared))) } } From e2d00a9f96f03f58dc1e54ce55f33c7087931053 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 1 Apr 2025 18:18:43 +0200 Subject: [PATCH 2/3] inline start transactions from pager and wal Execute `SELECT 1`/limbo_execute_select_1 time: [30.543 ns 30.585 ns 30.632 ns] --- core/storage/pager.rs | 2 ++ core/storage/wal.rs | 1 + 2 files changed, 3 insertions(+) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 01bae1478..213e285c8 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -240,10 +240,12 @@ impl Pager { (db_header.page_size - db_header.reserved_space as u16) as usize } + #[inline(always)] pub fn begin_read_tx(&self) -> Result { self.wal.borrow_mut().begin_read_tx() } + #[inline(always)] pub fn begin_write_tx(&self) -> Result { self.wal.borrow_mut().begin_write_tx() } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 363abbcf2..6cfff19d7 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -381,6 +381,7 @@ impl Wal for WalFile { } /// End a read transaction. + #[inline(always)] fn end_read_tx(&self) -> Result { tracing::debug!("end_read_tx"); let read_lock = &mut self.get_shared().read_locks[self.max_frame_read_lock_index]; From 46814d2bd72d7ed902ffc1d5cf083d573f6aac1e Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 1 Apr 2025 18:32:35 +0200 Subject: [PATCH 3/3] ignore warning mut_from_ref --- core/storage/wal.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 6cfff19d7..b56246a78 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -733,6 +733,7 @@ impl WalFile { offset as usize } + #[allow(clippy::mut_from_ref)] fn get_shared(&self) -> &mut WalFileShared { unsafe { self.shared.get().as_mut().unwrap() } }