From e4d7474372f757f648e30bc1ffdf7a084e1fa2d5 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 4 Feb 2025 18:13:01 +0200 Subject: [PATCH] core: Switch to parking_lot for RwLock We really need to make the WAL lock less expensive, but switching to `parking_lot` is anyway something we should do. Before: ``` Execute `SELECT 1`/Limbo time: [56.230 ns 56.463 ns 56.688 ns] ``` After: ``` Execute `SELECT 1`/Limbo time: [52.003 ns 52.132 ns 52.287 ns] ``` --- Cargo.lock | 3 ++- core/Cargo.toml | 1 + core/lib.rs | 3 ++- core/storage/pager.rs | 25 ++++++++++++++----------- core/storage/sqlite3_ondisk.rs | 5 +++-- core/storage/wal.rs | 33 +++++++++++++++++---------------- 6 files changed, 39 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63f0cc3ef..dc49a85f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1590,6 +1590,7 @@ dependencies = [ "miette", "mimalloc", "mockall", + "parking_lot", "pest", "pest_derive", "polling", @@ -1681,7 +1682,7 @@ dependencies = [ [[package]] name = "limbo_time" -version = "0.0.13" +version = "0.0.14" dependencies = [ "chrono", "limbo_ext", diff --git a/core/Cargo.toml b/core/Cargo.toml index de6dc7dfe..f2d43a156 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -69,6 +69,7 @@ limbo_percentile = { path = "../extensions/percentile", optional = true, feature limbo_time = { path = "../extensions/time", optional = true, features = ["static"] } miette = "7.4.0" strum = "0.26" +parking_lot = "0.12.3" [build-dependencies] chrono = "0.4.38" diff --git a/core/lib.rs b/core/lib.rs index b98c0c416..d02312dbc 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -24,13 +24,14 @@ use fallible_iterator::FallibleIterator; use libloading::{Library, Symbol}; use limbo_ext::{ExtensionApi, ExtensionEntryPoint}; use log::trace; +use parking_lot::RwLock; use schema::Schema; use sqlite3_parser::ast; use sqlite3_parser::{ast::Cmd, lexer::sql::Parser}; use std::cell::Cell; use std::collections::HashMap; use std::num::NonZero; -use std::sync::{Arc, OnceLock, RwLock}; +use std::sync::{Arc, OnceLock}; use std::{cell::RefCell, rc::Rc}; use storage::btree::btree_init_page; #[cfg(feature = "fs")] diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 7833acdef..1be78cdb6 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -5,11 +5,12 @@ use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent}; use crate::storage::wal::{CheckpointResult, Wal}; use crate::{Buffer, Result}; use log::trace; +use parking_lot::RwLock; use std::cell::{RefCell, UnsafeCell}; use std::collections::HashSet; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use super::page_cache::{DumbLruPageCache, PageCacheKey}; use super::wal::{CheckpointMode, CheckpointStatus}; @@ -225,7 +226,7 @@ impl Pager { /// Reads a page from the database. pub fn read_page(&self, page_idx: usize) -> Result { trace!("read_page(page_idx = {})", page_idx); - let mut page_cache = self.page_cache.write().unwrap(); + let mut page_cache = self.page_cache.write(); let page_key = PageCacheKey::new(page_idx, Some(self.wal.borrow().get_max_frame())); if let Some(page) = page_cache.get(&page_key) { trace!("read_page(page_idx = {}) = cached", page_idx); @@ -261,7 +262,7 @@ impl Pager { pub fn load_page(&self, page: PageRef) -> Result<()> { let id = page.get().id; trace!("load_page(page_idx = {})", id); - let mut page_cache = self.page_cache.write().unwrap(); + let mut page_cache = self.page_cache.write(); page.set_locked(); let page_key = PageCacheKey::new(id, Some(self.wal.borrow().get_max_frame())); if let Some(frame_id) = self.wal.borrow().find_frame(id as u64)? { @@ -297,7 +298,7 @@ impl Pager { /// Changes the size of the page cache. pub fn change_page_cache_size(&self, capacity: usize) { - let mut page_cache = self.page_cache.write().unwrap(); + let mut page_cache = self.page_cache.write(); page_cache.resize(capacity); } @@ -315,7 +316,7 @@ impl Pager { FlushState::Start => { let db_size = self.db_header.borrow().database_size; for page_id in self.dirty_pages.borrow().iter() { - let mut cache = self.page_cache.write().unwrap(); + let mut cache = self.page_cache.write(); let page_key = PageCacheKey::new(*page_id, Some(self.wal.borrow().get_max_frame())); let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); @@ -446,7 +447,7 @@ impl Pager { } } // TODO: only clear cache of things that are really invalidated - self.page_cache.write().unwrap().clear(); + self.page_cache.write().clear(); checkpoint_result } @@ -482,7 +483,7 @@ impl Pager { // setup page and add to cache page.set_dirty(); self.add_dirty(page.get().id); - let mut cache = self.page_cache.write().unwrap(); + let mut cache = self.page_cache.write(); let page_key = PageCacheKey::new(page.get().id, Some(self.wal.borrow().get_max_frame())); cache.insert(page_key, page.clone()); @@ -491,7 +492,7 @@ impl Pager { } pub fn put_loaded_page(&self, id: usize, page: PageRef) { - let mut cache = self.page_cache.write().unwrap(); + let mut cache = self.page_cache.write(); // cache insert invalidates previous page let page_key = PageCacheKey::new(id, Some(self.wal.borrow().get_max_frame())); cache.insert(page_key, page.clone()); @@ -525,7 +526,9 @@ pub fn allocate_page(page_id: usize, buffer_pool: &Rc, offset: usize #[cfg(test)] mod tests { - use std::sync::{Arc, RwLock}; + use std::sync::Arc; + + use parking_lot::RwLock; use crate::storage::page_cache::{DumbLruPageCache, PageCacheKey}; @@ -539,13 +542,13 @@ mod tests { let thread = { let cache = cache.clone(); std::thread::spawn(move || { - let mut cache = cache.write().unwrap(); + let mut cache = cache.write(); let page_key = PageCacheKey::new(1, None); cache.insert(page_key, Arc::new(Page::new(1))); }) }; let _ = thread.join(); - let mut cache = cache.write().unwrap(); + let mut cache = cache.write(); let page_key = PageCacheKey::new(1, None); let page = cache.get(&page_key); assert_eq!(page.unwrap().get().id, 1); diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 5543fa7db..e86d89520 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -49,10 +49,11 @@ use crate::storage::pager::Pager; use crate::types::{OwnedRecord, OwnedValue}; use crate::{File, Result}; use log::trace; +use parking_lot::RwLock; use std::cell::RefCell; use std::pin::Pin; use std::rc::Rc; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use super::pager::PageRef; @@ -1147,7 +1148,7 @@ pub fn begin_read_wal_header(io: &Rc) -> Result> fn finish_read_wal_header(buf: Rc>, header: Arc>) -> Result<()> { let buf = buf.borrow(); let buf = buf.as_slice(); - let mut header = header.write().unwrap(); + let mut header = header.write(); header.magic = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]); header.file_format = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]); header.page_size = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index c514eae84..e42fde2b6 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1,8 +1,9 @@ use log::{debug, trace}; use std::collections::HashMap; + +use parking_lot::RwLock; use std::fmt::Formatter; use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::RwLock; use std::{cell::RefCell, fmt, rc::Rc, sync::Arc}; use crate::io::{File, SyncCompletion, IO}; @@ -311,7 +312,7 @@ impl fmt::Debug for WalFileShared { impl Wal for WalFile { /// Begin a read transaction. fn begin_read_tx(&mut self) -> Result { - let mut shared = self.shared.write().unwrap(); + let mut shared = self.shared.write(); let max_frame_in_wal = shared.max_frame; self.min_frame = shared.nbackfills + 1; @@ -365,7 +366,7 @@ impl Wal for WalFile { /// End a read transaction. fn end_read_tx(&self) -> Result { - let mut shared = self.shared.write().unwrap(); + let mut shared = self.shared.write(); let read_lock = &mut shared.read_locks[self.max_frame_read_lock_index]; read_lock.unlock(); Ok(LimboResult::Ok) @@ -373,7 +374,7 @@ impl Wal for WalFile { /// Begin a write transaction fn begin_write_tx(&mut self) -> Result { - let mut shared = self.shared.write().unwrap(); + let mut shared = self.shared.write(); let busy = !shared.write_lock.write(); if busy { return Ok(LimboResult::Busy); @@ -383,14 +384,14 @@ impl Wal for WalFile { /// End a write transaction fn end_write_tx(&self) -> Result { - let mut shared = self.shared.write().unwrap(); + let mut shared = self.shared.write(); shared.write_lock.unlock(); Ok(LimboResult::Ok) } /// Find the latest frame containing a page. fn find_frame(&self, page_id: u64) -> Result> { - let shared = self.shared.read().unwrap(); + let shared = self.shared.read(); let frames = shared.frame_cache.get(&page_id); if frames.is_none() { return Ok(None); @@ -408,7 +409,7 @@ impl Wal for WalFile { fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc) -> Result<()> { debug!("read_frame({})", frame_id); let offset = self.frame_offset(frame_id); - let shared = self.shared.read().unwrap(); + let shared = self.shared.read(); page.set_locked(); begin_read_wal_frame( &shared.file, @@ -427,7 +428,7 @@ impl Wal for WalFile { write_counter: Rc>, ) -> Result<()> { let page_id = page.get().id; - let mut shared = self.shared.write().unwrap(); + let mut shared = self.shared.write(); let frame_id = if shared.max_frame == 0 { 1 } else { @@ -441,7 +442,7 @@ impl Wal for WalFile { page_id ); let header = shared.wal_header.clone(); - let header = header.read().unwrap(); + let header = header.read(); let checksums = shared.last_checksum; let checksums = begin_write_wal_frame( &shared.file, @@ -468,7 +469,7 @@ impl Wal for WalFile { } fn should_checkpoint(&self) -> bool { - let shared = self.shared.read().unwrap(); + let shared = self.shared.read(); let frame_id = shared.max_frame as usize; frame_id >= self.checkpoint_threshold } @@ -490,7 +491,7 @@ impl Wal for WalFile { CheckpointState::Start => { // TODO(pere): check what frames are safe to checkpoint between many readers! self.ongoing_checkpoint.min_frame = self.min_frame; - let mut shared = self.shared.write().unwrap(); + let mut shared = self.shared.write(); let max_frame_in_wal = shared.max_frame as u32; let mut max_safe_frame = shared.max_frame; for read_lock in shared.read_locks.iter_mut() { @@ -515,7 +516,7 @@ impl Wal for WalFile { ); } CheckpointState::ReadFrame => { - let shared = self.shared.read().unwrap(); + let shared = self.shared.read(); assert!( self.ongoing_checkpoint.current_page as usize <= shared.pages_in_frames.len() @@ -574,7 +575,7 @@ impl Wal for WalFile { if *write_counter.borrow() > 0 { return Ok(CheckpointStatus::IO); } - let shared = self.shared.read().unwrap(); + let shared = self.shared.read(); if (self.ongoing_checkpoint.current_page as usize) < shared.pages_in_frames.len() { @@ -587,7 +588,7 @@ impl Wal for WalFile { if *write_counter.borrow() > 0 { return Ok(CheckpointStatus::IO); } - let mut shared = self.shared.write().unwrap(); + let mut shared = self.shared.write(); // Record two num pages fields to return as checkpoint result to caller. // Ref: pnLog, pnCkpt on https://www.sqlite.org/c3ref/wal_checkpoint_v2.html @@ -618,7 +619,7 @@ impl Wal for WalFile { let state = *self.sync_state.borrow(); match state { SyncState::NotSyncing => { - let shared = self.shared.write().unwrap(); + let shared = self.shared.write(); debug!("wal_sync"); { let syncing = self.syncing.clone(); @@ -756,7 +757,7 @@ impl WalFileShared { Arc::new(RwLock::new(wal_header)) }; let checksum = { - let checksum = header.read().unwrap(); + let checksum = header.read(); (checksum.checksum_1, checksum.checksum_2) }; let shared = WalFileShared {