Merge 'Remove RWLock from Shared wal state' from Pere Diaz Bou

Fixes #780
WalShared state can be shared without having to wrap everything with a
lock, and instead use atomics on some places and rwlock on others -- for
now.
## Results:
From:
----
```
Execute `SELECT 1`/limbo_execute_select_1
                        time:   [34.125 ns 34.218 ns 34.324 ns]
Execute `SELECT 1`/sqlite_execute_select_1
                        time:   [28.124 ns 28.254 ns 28.385 ns]
```
To:
----
```bash
Execute `SELECT 1`/limbo_execute_select_1
                        time:   [31.919 ns 32.113 ns 32.327 ns]
Execute `SELECT 1`/sqlite_execute_select_1
                        time:   [29.662 ns 29.900 ns 30.139 ns]

```
And with `begin_read_tx` inlined:
----
```bash
Execute `SELECT 1`/limbo_execute_select_1
                        time:   [30.543 ns 30.585 ns 30.632 ns]
```

Closes #1225
This commit is contained in:
Pekka Enberg
2025-04-02 17:47:45 +03:00
4 changed files with 97 additions and 89 deletions

View File

@@ -38,7 +38,7 @@ use parking_lot::RwLock;
use schema::{Column, Schema};
use std::{
borrow::Cow,
cell::{Cell, RefCell},
cell::{Cell, RefCell, UnsafeCell},
collections::HashMap,
io::Write,
num::NonZero,
@@ -92,7 +92,7 @@ pub struct Database {
// Shared structures of a Database are the parts that are common to multiple threads that might
// create DB connections.
shared_page_cache: Arc<RwLock<DumbLruPageCache>>,
shared_wal: Arc<RwLock<WalFileShared>>,
shared_wal: Arc<UnsafeCell<WalFileShared>>,
}
unsafe impl Send for Database {}
@@ -118,7 +118,7 @@ impl Database {
pub fn open(
io: Arc<dyn IO>,
db_file: Arc<dyn DatabaseStorage>,
shared_wal: Arc<RwLock<WalFileShared>>,
shared_wal: Arc<UnsafeCell<WalFileShared>>,
enable_mvcc: bool,
) -> Result<Arc<Database>> {
let db_header = Pager::begin_open(db_file.clone())?;

View File

@@ -240,10 +240,12 @@ impl Pager {
(db_header.page_size - db_header.reserved_space as u16) as usize
}
#[inline(always)]
pub fn begin_read_tx(&self) -> Result<LimboResult> {
self.wal.borrow_mut().begin_read_tx()
}
#[inline(always)]
pub fn begin_write_tx(&self) -> Result<LimboResult> {
self.wal.borrow_mut().begin_write_tx()
}

View File

@@ -49,7 +49,6 @@ use crate::storage::database::DatabaseStorage;
use crate::storage::pager::Pager;
use crate::types::{ImmutableRecord, RawSlice, RefValue, TextRef, TextSubtype};
use crate::{File, Result};
use parking_lot::RwLock;
use std::cell::RefCell;
use std::mem::MaybeUninit;
use std::pin::Pin;
@@ -1329,11 +1328,11 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec<u8>) {
payload.extend_from_slice(&varint[0..n]);
}
pub fn begin_read_wal_header(io: &Arc<dyn File>) -> Result<Arc<RwLock<WalHeader>>> {
pub fn begin_read_wal_header(io: &Arc<dyn File>) -> Result<Arc<SpinLock<WalHeader>>> {
let drop_fn = Rc::new(|_buf| {});
#[allow(clippy::arc_with_non_send_sync)]
let buf = Arc::new(RefCell::new(Buffer::allocate(512, drop_fn)));
let result = Arc::new(RwLock::new(WalHeader::default()));
let result = Arc::new(SpinLock::new(WalHeader::default()));
let header = result.clone();
let complete = Box::new(move |buf: Arc<RefCell<Buffer>>| {
let header = header.clone();
@@ -1344,10 +1343,13 @@ pub fn begin_read_wal_header(io: &Arc<dyn File>) -> Result<Arc<RwLock<WalHeader>
Ok(result)
}
fn finish_read_wal_header(buf: Arc<RefCell<Buffer>>, header: Arc<RwLock<WalHeader>>) -> Result<()> {
fn finish_read_wal_header(
buf: Arc<RefCell<Buffer>>,
header: Arc<SpinLock<WalHeader>>,
) -> Result<()> {
let buf = buf.borrow();
let buf = buf.as_slice();
let mut header = header.write();
let mut header = header.lock();
header.magic = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
header.file_format = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
header.page_size = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);

View File

@@ -1,11 +1,12 @@
use std::cell::UnsafeCell;
use std::collections::HashMap;
use tracing::{debug, trace};
use parking_lot::RwLock;
use std::fmt::Formatter;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::{cell::RefCell, fmt, rc::Rc, sync::Arc};
use crate::fast_lock::SpinLock;
use crate::io::{File, SyncCompletion, IO};
use crate::result::LimboResult;
use crate::storage::sqlite3_ondisk::{
@@ -247,7 +248,7 @@ pub struct WalFile {
syncing: Rc<RefCell<bool>>,
page_size: usize,
shared: Arc<RwLock<WalFileShared>>,
shared: Arc<UnsafeCell<WalFileShared>>,
ongoing_checkpoint: OngoingCheckpoint,
checkpoint_threshold: usize,
// min and max frames for this connection
@@ -282,19 +283,19 @@ impl fmt::Debug for WalFile {
/// that needs to be communicated between threads so this struct does the job.
#[allow(dead_code)]
pub struct WalFileShared {
wal_header: Arc<RwLock<WalHeader>>,
min_frame: u64,
max_frame: u64,
nbackfills: u64,
wal_header: Arc<SpinLock<WalHeader>>,
min_frame: AtomicU64,
max_frame: AtomicU64,
nbackfills: AtomicU64,
// Frame cache maps a Page to all the frames it has stored in WAL in ascending order.
// This is to easily find the frame it must checkpoint each connection if a checkpoint is
// necessary.
// One difference between SQLite and limbo is that we will never support multi process, meaning
// we don't need WAL's index file. So we can do stuff like this without shared memory.
// TODO: this will need refactoring because this is incredible memory inefficient.
frame_cache: HashMap<u64, Vec<u64>>,
frame_cache: Arc<SpinLock<HashMap<u64, Vec<u64>>>>,
// Another memory inefficient array made to just keep track of pages that are in frame_cache.
pages_in_frames: Vec<u64>,
pages_in_frames: Arc<SpinLock<Vec<u64>>>,
last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL
file: Arc<dyn File>,
/// read_locks is a list of read locks that can coexist with the max_frame number stored in
@@ -325,15 +326,13 @@ impl fmt::Debug for WalFileShared {
impl Wal for WalFile {
/// Begin a read transaction.
fn begin_read_tx(&mut self) -> Result<LimboResult> {
let mut shared = self.shared.write();
let max_frame_in_wal = shared.max_frame;
self.min_frame = shared.nbackfills + 1;
let max_frame_in_wal = self.get_shared().max_frame.load(Ordering::SeqCst);
let mut max_read_mark = 0;
let mut max_read_mark_index = -1;
// Find the largest mark we can find, ignore frames that are impossible to be in range and
// that are not set
for (index, lock) in shared.read_locks.iter().enumerate() {
for (index, lock) in self.get_shared().read_locks.iter().enumerate() {
let this_mark = lock.value.load(Ordering::SeqCst);
if this_mark > max_read_mark && this_mark <= max_frame_in_wal as u32 {
max_read_mark = this_mark;
@@ -343,7 +342,7 @@ impl Wal for WalFile {
// If we didn't find any mark or we can update, let's update them
if (max_read_mark as u64) < max_frame_in_wal || max_read_mark_index == -1 {
for (index, lock) in shared.read_locks.iter_mut().enumerate() {
for (index, lock) in self.get_shared().read_locks.iter_mut().enumerate() {
let busy = !lock.write();
if !busy {
// If this was busy then it must mean >1 threads tried to set this read lock
@@ -360,14 +359,17 @@ impl Wal for WalFile {
return Ok(LimboResult::Busy);
}
let lock = &mut shared.read_locks[max_read_mark_index as usize];
let busy = !lock.read();
if busy {
return Ok(LimboResult::Busy);
let shared = self.get_shared();
{
let lock = &mut shared.read_locks[max_read_mark_index as usize];
let busy = !lock.read();
if busy {
return Ok(LimboResult::Busy);
}
}
self.min_frame = shared.nbackfills.load(Ordering::SeqCst) + 1;
self.max_frame_read_lock_index = max_read_mark_index as usize;
self.max_frame = max_read_mark as u64;
self.min_frame = shared.nbackfills + 1;
tracing::debug!(
"begin_read_tx(min_frame={}, max_frame={}, lock={}, max_frame_in_wal={})",
self.min_frame,
@@ -379,18 +381,17 @@ impl Wal for WalFile {
}
/// End a read transaction.
#[inline(always)]
fn end_read_tx(&self) -> Result<LimboResult> {
tracing::debug!("end_read_tx");
let mut shared = self.shared.write();
let read_lock = &mut shared.read_locks[self.max_frame_read_lock_index];
let read_lock = &mut self.get_shared().read_locks[self.max_frame_read_lock_index];
read_lock.unlock();
Ok(LimboResult::Ok)
}
/// Begin a write transaction
fn begin_write_tx(&mut self) -> Result<LimboResult> {
let mut shared = self.shared.write();
let busy = !shared.write_lock.write();
let busy = !self.get_shared().write_lock.write();
tracing::debug!("begin_write_transaction(busy={})", busy);
if busy {
return Ok(LimboResult::Busy);
@@ -401,15 +402,15 @@ impl Wal for WalFile {
/// End a write transaction
fn end_write_tx(&self) -> Result<LimboResult> {
tracing::debug!("end_write_txn");
let mut shared = self.shared.write();
shared.write_lock.unlock();
self.get_shared().write_lock.unlock();
Ok(LimboResult::Ok)
}
/// Find the latest frame containing a page.
fn find_frame(&self, page_id: u64) -> Result<Option<u64>> {
let shared = self.shared.read();
let frames = shared.frame_cache.get(&page_id);
let shared = self.get_shared();
let frames = shared.frame_cache.lock();
let frames = frames.get(&page_id);
if frames.is_none() {
return Ok(None);
}
@@ -426,10 +427,9 @@ impl Wal for WalFile {
fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc<BufferPool>) -> Result<()> {
debug!("read_frame({})", frame_id);
let offset = self.frame_offset(frame_id);
let shared = self.shared.read();
page.set_locked();
begin_read_wal_frame(
&shared.file,
&self.get_shared().file,
offset + WAL_FRAME_HEADER_SIZE,
buffer_pool,
page,
@@ -445,12 +445,9 @@ impl Wal for WalFile {
write_counter: Rc<RefCell<usize>>,
) -> Result<()> {
let page_id = page.get().id;
let mut shared = self.shared.write();
let frame_id = if shared.max_frame == 0 {
1
} else {
shared.max_frame + 1
};
let shared = self.get_shared();
let max_frame = shared.max_frame.load(Ordering::SeqCst);
let frame_id = if max_frame == 0 { 1 } else { max_frame + 1 };
let offset = self.frame_offset(frame_id);
tracing::debug!(
"append_frame(frame={}, offset={}, page_id={})",
@@ -459,7 +456,7 @@ impl Wal for WalFile {
page_id
);
let header = shared.wal_header.clone();
let header = header.read();
let header = header.lock();
let checksums = shared.last_checksum;
let checksums = begin_write_wal_frame(
&shared.file,
@@ -471,14 +468,15 @@ impl Wal for WalFile {
checksums,
)?;
shared.last_checksum = checksums;
shared.max_frame = frame_id;
shared.max_frame.store(frame_id, Ordering::SeqCst);
{
let frames = shared.frame_cache.get_mut(&(page_id as u64));
let mut frame_cache = shared.frame_cache.lock();
let frames = frame_cache.get_mut(&(page_id as u64));
match frames {
Some(frames) => frames.push(frame_id),
None => {
shared.frame_cache.insert(page_id as u64, vec![frame_id]);
shared.pages_in_frames.push(page_id as u64);
frame_cache.insert(page_id as u64, vec![frame_id]);
shared.pages_in_frames.lock().push(page_id as u64);
}
}
}
@@ -486,8 +484,8 @@ impl Wal for WalFile {
}
fn should_checkpoint(&self) -> bool {
let shared = self.shared.read();
let frame_id = shared.max_frame as usize;
let shared = self.get_shared();
let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize;
frame_id >= self.checkpoint_threshold
}
@@ -508,8 +506,8 @@ impl Wal for WalFile {
CheckpointState::Start => {
// TODO(pere): check what frames are safe to checkpoint between many readers!
self.ongoing_checkpoint.min_frame = self.min_frame;
let mut shared = self.shared.write();
let mut max_safe_frame = shared.max_frame;
let shared = self.get_shared();
let mut max_safe_frame = shared.max_frame.load(Ordering::SeqCst);
for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate() {
let this_mark = read_lock.value.load(Ordering::SeqCst);
if this_mark < max_safe_frame as u32 {
@@ -537,27 +535,26 @@ impl Wal for WalFile {
);
}
CheckpointState::ReadFrame => {
let shared = self.shared.read();
assert!(
self.ongoing_checkpoint.current_page as usize
<= shared.pages_in_frames.len()
);
if self.ongoing_checkpoint.current_page as usize == shared.pages_in_frames.len()
{
let shared = self.get_shared();
let min_frame = self.ongoing_checkpoint.min_frame;
let max_frame = self.ongoing_checkpoint.max_frame;
let pages_in_frames = shared.pages_in_frames.clone();
let pages_in_frames = pages_in_frames.lock();
let frame_cache = shared.frame_cache.clone();
let frame_cache = frame_cache.lock();
assert!(self.ongoing_checkpoint.current_page as usize <= pages_in_frames.len());
if self.ongoing_checkpoint.current_page as usize == pages_in_frames.len() {
self.ongoing_checkpoint.state = CheckpointState::Done;
continue 'checkpoint_loop;
}
let page =
shared.pages_in_frames[self.ongoing_checkpoint.current_page as usize];
let frames = shared
.frame_cache
let page = pages_in_frames[self.ongoing_checkpoint.current_page as usize];
let frames = frame_cache
.get(&page)
.expect("page must be in frame cache if it's in list");
for frame in frames.iter().rev() {
if *frame >= self.ongoing_checkpoint.min_frame
&& *frame <= self.ongoing_checkpoint.max_frame
{
if *frame >= min_frame && *frame <= max_frame {
debug!(
"checkpoint page(state={:?}, page={}, frame={})",
state, page, *frame
@@ -596,9 +593,9 @@ impl Wal for WalFile {
if *write_counter.borrow() > 0 {
return Ok(CheckpointStatus::IO);
}
let shared = self.shared.read();
let shared = self.get_shared();
if (self.ongoing_checkpoint.current_page as usize)
< shared.pages_in_frames.len()
< shared.pages_in_frames.lock().len()
{
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
} else {
@@ -609,26 +606,28 @@ impl Wal for WalFile {
if *write_counter.borrow() > 0 {
return Ok(CheckpointStatus::IO);
}
let mut shared = self.shared.write();
let shared = self.get_shared();
// Record two num pages fields to return as checkpoint result to caller.
// Ref: pnLog, pnCkpt on https://www.sqlite.org/c3ref/wal_checkpoint_v2.html
let checkpoint_result = CheckpointResult {
num_wal_frames: shared.max_frame,
num_wal_frames: shared.max_frame.load(Ordering::SeqCst),
num_checkpointed_frames: self.ongoing_checkpoint.max_frame,
};
let everything_backfilled =
shared.max_frame == self.ongoing_checkpoint.max_frame;
let everything_backfilled = shared.max_frame.load(Ordering::SeqCst)
== self.ongoing_checkpoint.max_frame;
if everything_backfilled {
// Here we know that we backfilled everything, therefore we can safely
// reset the wal.
shared.frame_cache.clear();
shared.pages_in_frames.clear();
shared.max_frame = 0;
shared.nbackfills = 0;
shared.frame_cache.lock().clear();
shared.pages_in_frames.lock().clear();
shared.max_frame.store(0, Ordering::SeqCst);
shared.nbackfills.store(0, Ordering::SeqCst);
// TODO(pere): truncate wal file here.
} else {
shared.nbackfills = self.ongoing_checkpoint.max_frame;
shared
.nbackfills
.store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst);
}
self.ongoing_checkpoint.state = CheckpointState::Start;
return Ok(CheckpointStatus::Done(checkpoint_result));
@@ -641,7 +640,7 @@ impl Wal for WalFile {
let state = *self.sync_state.borrow();
match state {
SyncState::NotSyncing => {
let shared = self.shared.write();
let shared = self.get_shared();
debug!("wal_sync");
{
let syncing = self.syncing.clone();
@@ -673,7 +672,7 @@ impl Wal for WalFile {
}
fn get_max_frame_in_wal(&self) -> u64 {
self.shared.read().max_frame
self.get_shared().max_frame.load(Ordering::SeqCst)
}
fn get_max_frame(&self) -> u64 {
@@ -689,7 +688,7 @@ impl WalFile {
pub fn new(
io: Arc<dyn IO>,
page_size: usize,
shared: Arc<RwLock<WalFileShared>>,
shared: Arc<UnsafeCell<WalFileShared>>,
buffer_pool: Rc<BufferPool>,
) -> Self {
let checkpoint_page = Arc::new(Page::new(0));
@@ -733,6 +732,11 @@ impl WalFile {
let offset = WAL_HEADER_SIZE as u64 + page_offset;
offset as usize
}
#[allow(clippy::mut_from_ref)]
fn get_shared(&self) -> &mut WalFileShared {
unsafe { self.shared.get().as_mut().unwrap() }
}
}
impl WalFileShared {
@@ -740,7 +744,7 @@ impl WalFileShared {
io: &Arc<dyn IO>,
path: &str,
page_size: u16,
) -> Result<Arc<RwLock<WalFileShared>>> {
) -> Result<Arc<UnsafeCell<WalFileShared>>> {
let file = io.open_file(path, crate::io::OpenFlags::Create, false)?;
let header = if file.size()? > 0 {
let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) {
@@ -781,21 +785,21 @@ impl WalFileShared {
wal_header.checksum_1 = checksums.0;
wal_header.checksum_2 = checksums.1;
sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?;
Arc::new(RwLock::new(wal_header))
Arc::new(SpinLock::new(wal_header))
};
let checksum = {
let checksum = header.read();
let checksum = header.lock();
(checksum.checksum_1, checksum.checksum_2)
};
let shared = WalFileShared {
wal_header: header,
min_frame: 0,
max_frame: 0,
nbackfills: 0,
frame_cache: HashMap::new(),
min_frame: AtomicU64::new(0),
max_frame: AtomicU64::new(0),
nbackfills: AtomicU64::new(0),
frame_cache: Arc::new(SpinLock::new(HashMap::new())),
last_checksum: checksum,
file,
pages_in_frames: Vec::new(),
pages_in_frames: Arc::new(SpinLock::new(Vec::new())),
read_locks: [
LimboRwLock {
lock: AtomicU32::new(NO_LOCK),
@@ -829,6 +833,6 @@ impl WalFileShared {
value: AtomicU32::new(READMARK_NOT_USED),
},
};
Ok(Arc::new(RwLock::new(shared)))
Ok(Arc::new(UnsafeCell::new(shared)))
}
}