add scoped locking for SharedWalFile to avoid holding locks for longer than needed

This commit is contained in:
pedrocarlo
2025-10-19 18:28:11 -03:00
parent 97991a1934
commit b00a276960

View File

@@ -822,16 +822,16 @@ impl Wal for WalFile {
self.max_frame_read_lock_index.load(Ordering::Acquire),
NO_LOCK_HELD
);
let (shared_max, nbackfills, last_checksum, checkpoint_seq, transaction_count) = {
let shared = self.get_shared();
let mx = shared.max_frame.load(Ordering::Acquire);
let nb = shared.nbackfills.load(Ordering::Acquire);
let ck = shared.last_checksum;
let checkpoint_seq = shared.wal_header.lock().checkpoint_seq;
let transaction_count = shared.transaction_count.load(Ordering::Acquire);
(mx, nb, ck, checkpoint_seq, transaction_count)
};
let db_changed = self.db_changed(&self.get_shared());
let (shared_max, nbackfills, last_checksum, checkpoint_seq, transaction_count) = self
.with_shared(|shared| {
let mx = shared.max_frame.load(Ordering::Acquire);
let nb = shared.nbackfills.load(Ordering::Acquire);
let ck = shared.last_checksum;
let checkpoint_seq = shared.wal_header.lock().checkpoint_seq;
let transaction_count = shared.transaction_count.load(Ordering::Acquire);
(mx, nb, ck, checkpoint_seq, transaction_count)
});
let db_changed = self.with_shared(|shared| self.db_changed(shared));
// WAL is already fully backfilled into the main DB image
// (mxFrame == nBackfill). Readers can therefore ignore the
@@ -840,7 +840,7 @@ impl Wal for WalFile {
if shared_max == nbackfills {
tracing::debug!("begin_read_tx: WAL is already fully backfilled into the main DB image, shared_max={}, nbackfills={}", shared_max, nbackfills);
let lock_0_idx = 0;
if !self.get_shared().read_locks[lock_0_idx].read() {
if !self.with_shared(|shared| shared.read_locks[lock_0_idx].read()) {
tracing::debug!("begin_read_tx: read lock 0 is already held, returning Busy");
return Err(LimboError::Busy);
}
@@ -864,13 +864,15 @@ impl Wal for WalFile {
// Find largest mark <= mx among slots 1..N
let mut best_idx: i64 = -1;
let mut best_mark: u32 = 0;
for (idx, lock) in self.get_shared().read_locks.iter().enumerate().skip(1) {
let m = lock.get_value();
if m != READMARK_NOT_USED && m <= shared_max as u32 && m > best_mark {
best_mark = m;
best_idx = idx as i64;
self.with_shared(|shared| {
for (idx, lock) in shared.read_locks.iter().enumerate().skip(1) {
let m = lock.get_value();
if m != READMARK_NOT_USED && m <= shared_max as u32 && m > best_mark {
best_mark = m;
best_idx = idx as i64;
}
}
}
});
// If none found or lagging, try to claim/update a slot
if best_idx == -1 || (best_mark as u64) < shared_max {
@@ -901,20 +903,19 @@ impl Wal for WalFile {
// Now take a shared read on that slot, and if we are successful,
// grab another snapshot of the shared state.
let (mx2, nb2, cksm2, ckpt_seq2) = {
let shared = self.get_shared();
let (mx2, nb2, cksm2, ckpt_seq2) = self.with_shared(|shared| {
if !shared.read_locks[best_idx as usize].read() {
// TODO: we should retry here instead of always returning Busy
return Err(LimboError::Busy);
}
let checkpoint_seq = shared.wal_header.lock().checkpoint_seq;
(
Ok((
shared.max_frame.load(Ordering::Acquire),
shared.nbackfills.load(Ordering::Acquire),
shared.last_checksum,
checkpoint_seq,
)
};
))
})?;
// sqlite/src/wal.c 3225
// Now that the read-lock has been obtained, check that neither the
@@ -1008,7 +1009,7 @@ impl Wal for WalFile {
#[instrument(skip_all, level = Level::DEBUG)]
fn end_write_tx(&self) {
tracing::debug!("end_write_txn");
self.get_shared().write_lock.unlock();
self.with_shared(|shared| shared.write_lock.unlock());
}
/// Find the latest frame containing a page.
@@ -1029,10 +1030,13 @@ impl Wal for WalFile {
//
// if it's not, than pages from WAL range [frame_watermark..nBackfill] are already in the DB file,
// and in case if page first occurrence in WAL was after frame_watermark - we will be unable to read proper previous version of the page
turso_assert!(
frame_watermark.is_none() || frame_watermark.unwrap() >= self.get_shared().nbackfills.load(Ordering::Acquire),
"frame_watermark must be >= than current WAL backfill amount: frame_watermark={:?}, nBackfill={}", frame_watermark, self.get_shared().nbackfills.load(Ordering::Acquire)
);
self.with_shared(|shared| {
let nbackfills = shared.nbackfills.load(Ordering::Acquire);
turso_assert!(
frame_watermark.is_none() || frame_watermark.unwrap() >= nbackfills,
"frame_watermark must be >= than current WAL backfill amount: frame_watermark={:?}, nBackfill={}", frame_watermark, nbackfills
);
});
// if we are holding read_lock 0 and didn't write anything to the WAL, skip and read right from db file.
//
@@ -1050,30 +1054,31 @@ impl Wal for WalFile {
);
return Ok(None);
}
let shared = self.get_shared();
let frames = shared.frame_cache.lock();
let range = frame_watermark.map(|x| 0..=x).unwrap_or(
self.min_frame.load(Ordering::Acquire)..=self.max_frame.load(Ordering::Acquire),
);
tracing::debug!(
"find_frame(page_id={}, frame_watermark={:?}): min_frame={}, max_frame={}",
page_id,
frame_watermark,
self.min_frame.load(Ordering::Acquire),
self.max_frame.load(Ordering::Acquire)
);
if let Some(list) = frames.get(&page_id) {
if let Some(f) = list.iter().rfind(|&&f| range.contains(&f)) {
tracing::debug!(
"find_frame(page_id={}, frame_watermark={:?}): found frame={}",
page_id,
frame_watermark,
*f
);
return Ok(Some(*f));
self.with_shared(|shared| {
let frames = shared.frame_cache.lock();
let range = frame_watermark.map(|x| 0..=x).unwrap_or(
self.min_frame.load(Ordering::Acquire)..=self.max_frame.load(Ordering::Acquire),
);
tracing::debug!(
"find_frame(page_id={}, frame_watermark={:?}): min_frame={}, max_frame={}",
page_id,
frame_watermark,
self.min_frame.load(Ordering::Acquire),
self.max_frame.load(Ordering::Acquire)
);
if let Some(list) = frames.get(&page_id) {
if let Some(f) = list.iter().rfind(|&&f| range.contains(&f)) {
tracing::debug!(
"find_frame(page_id={}, frame_watermark={:?}): found frame={}",
page_id,
frame_watermark,
*f
);
return Ok(Some(*f));
}
}
}
Ok(None)
Ok(None)
})
}
/// Read a frame from the WAL.
@@ -1110,8 +1115,7 @@ impl Wal for WalFile {
let epoch = shared_file.read().epoch.load(Ordering::Acquire);
frame.set_wal_tag(frame_id, epoch);
});
let file = {
let shared = self.get_shared();
let file = self.with_shared(|shared| {
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
// important not to hold shared lock beyond this point to avoid deadlock scenario where:
// thread 1: takes readlock here, passes reference to shared.file to begin_read_wal_frame
@@ -1125,7 +1129,7 @@ impl Wal for WalFile {
// when there are writers waiting to acquire the lock.
// Because of this, attempts to recursively acquire a read lock within a single thread may result in a deadlock."
shared.file.as_ref().unwrap().clone()
};
});
begin_read_wal_frame(
file.as_ref(),
offset + WAL_FRAME_HEADER_SIZE as u64,
@@ -1184,9 +1188,10 @@ impl Wal for WalFile {
}
}
});
let shared = self.get_shared();
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
let file = shared.file.as_ref().unwrap();
let file = self.with_shared(|shared| {
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
shared.file.as_ref().unwrap().clone()
});
let c = begin_read_wal_frame_raw(&self.buffer_pool, file.as_ref(), offset, complete)?;
Ok(c)
}
@@ -1243,9 +1248,10 @@ impl Wal for WalFile {
}
}
});
let shared = self.get_shared();
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
let file = shared.file.as_ref().unwrap();
let file = self.with_shared(|shared| {
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
shared.file.as_ref().unwrap().clone()
});
let c = begin_read_wal_frame(
file.as_ref(),
offset + WAL_FRAME_HEADER_SIZE as u64,
@@ -1266,13 +1272,12 @@ impl Wal for WalFile {
// perform actual write
let offset = self.frame_offset(frame_id);
let (header, file) = {
let shared = self.get_shared();
let (header, file) = self.with_shared(|shared| {
let header = shared.wal_header.clone();
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
let file = shared.file.as_ref().unwrap().clone();
(header, file)
};
});
let header = header.lock();
let checksums = self.last_checksum;
let (checksums, frame_bytes) = prepare_wal_frame(
@@ -1296,10 +1301,11 @@ impl Wal for WalFile {
#[instrument(skip_all, level = Level::DEBUG)]
fn should_checkpoint(&self) -> bool {
let shared = self.get_shared();
let frame_id = shared.max_frame.load(Ordering::Acquire) as usize;
let nbackfills = shared.nbackfills.load(Ordering::Acquire) as usize;
frame_id > self.checkpoint_threshold + nbackfills
self.with_shared(|shared| {
let frame_id = shared.max_frame.load(Ordering::Acquire) as usize;
let nbackfills = shared.nbackfills.load(Ordering::Acquire) as usize;
frame_id > self.checkpoint_threshold + nbackfills
})
}
#[instrument(skip_all, level = Level::DEBUG)]
@@ -1322,11 +1328,10 @@ impl Wal for WalFile {
tracing::debug!("wal_sync finish");
syncing.store(false, Ordering::SeqCst);
});
let file = {
let shared = self.get_shared();
let file = self.with_shared(|shared| {
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
shared.file.as_ref().unwrap().clone()
};
});
self.syncing.store(true, Ordering::SeqCst);
let c = file.sync(completion)?;
Ok(c)
@@ -1338,11 +1343,11 @@ impl Wal for WalFile {
}
fn get_max_frame_in_wal(&self) -> u64 {
self.get_shared().max_frame.load(Ordering::Acquire)
self.with_shared(|shared| shared.max_frame.load(Ordering::Acquire))
}
fn get_checkpoint_seq(&self) -> u32 {
self.get_shared().wal_header.lock().checkpoint_seq
self.with_shared(|shared| shared.wal_header.lock().checkpoint_seq)
}
fn get_max_frame(&self) -> u64 {
@@ -1355,8 +1360,7 @@ impl Wal for WalFile {
#[instrument(skip_all, level = Level::DEBUG)]
fn rollback(&mut self) {
let (max_frame, last_checksum) = {
let shared = self.get_shared();
let (max_frame, last_checksum) = self.with_shared(|shared| {
let max_frame = shared.max_frame.load(Ordering::Acquire);
let mut frame_cache = shared.frame_cache.lock();
frame_cache.retain(|_page_id, frames| {
@@ -1367,7 +1371,7 @@ impl Wal for WalFile {
!frames.is_empty()
});
(max_frame, shared.last_checksum)
};
});
self.last_checksum = last_checksum;
self.max_frame.store(max_frame, Ordering::Release);
self.reset_internal_states();
@@ -1413,7 +1417,7 @@ impl Wal for WalFile {
}
fn prepare_wal_start(&mut self, page_size: PageSize) -> Result<Option<Completion>> {
if self.get_shared().is_initialized()? {
if self.with_shared(|shared| shared.is_initialized())? {
return Ok(None);
}
tracing::debug!("ensure_header_if_needed");
@@ -1447,17 +1451,22 @@ impl Wal for WalFile {
};
self.max_frame.store(0, Ordering::Release);
let shared = self.get_shared();
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
let file = shared.file.as_ref().unwrap();
let c = sqlite3_ondisk::begin_write_wal_header(file.as_ref(), &shared.wal_header.lock())?;
let (header, file) = self.with_shared(|shared| {
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
(
*shared.wal_header.lock(),
shared.file.as_ref().unwrap().clone(),
)
});
let c = sqlite3_ondisk::begin_write_wal_header(file.as_ref(), &header)?;
Ok(Some(c))
}
fn prepare_wal_finish(&mut self) -> Result<Completion> {
let shared = self.get_shared();
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
let file = shared.file.as_ref().unwrap();
let file = self.with_shared(|shared| {
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
shared.file.as_ref().unwrap().clone()
});
let shared = self.shared.clone();
let c = file.sync(Completion::new_sync(move |_| {
shared.read().initialized.store(true, Ordering::Release);
@@ -1477,18 +1486,17 @@ impl Wal for WalFile {
"we limit number of iovecs to IOV_MAX"
);
turso_assert!(
self.get_shared().is_initialized()?,
self.with_shared(|shared| shared.is_initialized())?,
"WAL must be prepared with prepare_wal_start/prepare_wal_finish method"
);
let (header, shared_page_size, epoch) = {
let shared = self.get_shared();
let (header, shared_page_size, epoch) = self.with_shared(|shared| {
let hdr_guard = shared.wal_header.lock();
let header: WalHeader = *hdr_guard;
let shared_page_size = header.page_size;
let epoch = shared.epoch.load(Ordering::Acquire);
(header, shared_page_size, epoch)
};
});
turso_assert!(
shared_page_size == page_sz.get(),
"page size mismatch, tried to change page size after WAL header was already initialized: shared.page_size={shared_page_size}, page_size={}",
@@ -1577,11 +1585,10 @@ impl Wal for WalFile {
let c = Completion::new_write_linked(cmp);
let file = {
let shared = self.get_shared();
let file = self.with_shared(|shared| {
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
shared.file.as_ref().unwrap().clone()
};
});
let c = file.pwritev(start_off, iovecs, c)?;
Ok(c)
}
@@ -1596,8 +1603,10 @@ impl Wal for WalFile {
}
fn update_max_frame(&mut self) {
let new_max_frame = self.get_shared().max_frame.load(Ordering::Acquire);
self.max_frame.store(new_max_frame, Ordering::Release);
self.with_shared(|shared| {
let new_max_frame = shared.max_frame.load(Ordering::Acquire);
self.max_frame.store(new_max_frame, Ordering::Release);
})
}
}
@@ -1649,7 +1658,7 @@ impl WalFile {
}
fn page_size(&self) -> u32 {
self.get_shared().wal_header.lock().page_size
self.with_shared(|shared| shared.wal_header.lock().page_size)
}
fn frame_offset(&self, frame_id: u64) -> u64 {
@@ -1677,7 +1686,7 @@ impl WalFile {
}
}
fn get_shared(&self) -> parking_lot::RwLockReadGuard<'_, WalFileShared> {
fn _get_shared(&self) -> parking_lot::RwLockReadGuard<'_, WalFileShared> {
// WASM in browser main thread doesn't have a way to "park" a thread
// so, we spin way here instead of calling blocking lock
#[cfg(target_family = "wasm")]
@@ -1696,11 +1705,19 @@ impl WalFile {
}
}
#[inline]
fn with_shared<F, R>(&self, func: F) -> R
where
F: FnOnce(&WalFileShared) -> R,
{
let shared = self._get_shared();
func(&shared)
}
fn complete_append_frame(&mut self, page_id: u64, frame_id: u64, checksums: (u32, u32)) {
self.last_checksum = checksums;
self.max_frame.store(frame_id, Ordering::Release);
let shared = self.get_shared();
{
self.with_shared(|shared| {
let mut frame_cache = shared.frame_cache.lock();
match frame_cache.get_mut(&page_id) {
Some(frames) => {
@@ -1710,7 +1727,7 @@ impl WalFile {
frame_cache.insert(page_id, vec![frame_id]);
}
}
}
})
}
fn reset_internal_states(&mut self) {
@@ -1745,12 +1762,11 @@ impl WalFile {
// so no other checkpointer can run. fsync WAL if there are unapplied frames.
// Decide the largest frame we are allowed to backfill.
CheckpointState::Start => {
let (max_frame, nbackfills) = {
let shared = self.get_shared();
let (max_frame, nbackfills) = self.with_shared(|shared| {
let max_frame = shared.max_frame.load(Ordering::Acquire);
let n_backfills = shared.nbackfills.load(Ordering::Acquire);
(max_frame, n_backfills)
};
});
let needs_backfill = max_frame > nbackfills;
if !needs_backfill && !mode.should_restart_log() {
// there are no frames to copy over and we don't need to reset
@@ -1786,8 +1802,7 @@ impl WalFile {
self.ongoing_checkpoint.max_frame = max_frame;
self.ongoing_checkpoint.min_frame = nbackfills + 1;
let to_checkpoint = {
let shared = self.get_shared();
let to_checkpoint = self.with_shared(|shared| {
let frame_cache = shared.frame_cache.lock();
let mut list = Vec::with_capacity(
self.ongoing_checkpoint
@@ -1808,7 +1823,7 @@ impl WalFile {
// sort by frame_id for read locality
list.sort_unstable_by(|a, b| (a.1, a.0).cmp(&(b.1, b.0)));
list
};
});
self.ongoing_checkpoint.pages_to_checkpoint = to_checkpoint;
self.ongoing_checkpoint.current_page = 0;
self.ongoing_checkpoint.inflight_writes.clear();
@@ -1839,7 +1854,7 @@ impl WalFile {
if self.ongoing_checkpoint.process_pending_reads() {
tracing::trace!("Drained reads into batch");
}
let epoch = self.get_shared().epoch.load(Ordering::Acquire);
let epoch = self.with_shared(|shared| shared.epoch.load(Ordering::Acquire));
// Issue reads until we hit limits
'inner: while self.ongoing_checkpoint.should_issue_reads() {
let (page_id, target_frame) = self.ongoing_checkpoint.pages_to_checkpoint
@@ -1931,8 +1946,7 @@ impl WalFile {
self.ongoing_checkpoint.complete(),
"checkpoint pending flush must have finished"
);
let checkpoint_result = {
let shared = self.get_shared();
let checkpoint_result = self.with_shared(|shared| {
let current_mx = shared.max_frame.load(Ordering::Acquire);
let nbackfills = shared.nbackfills.load(Ordering::Acquire);
// Record two num pages fields to return as checkpoint result to caller.
@@ -1964,14 +1978,16 @@ impl WalFile {
checkpoint_max_frame,
)
}
};
});
// store the max frame we were able to successfully checkpoint.
// NOTE: we don't have a .shm file yet, so it's safe to update nbackfills here
// before we sync, because if we crash and then recover, we will checkpoint the entire db anyway.
self.get_shared()
.nbackfills
.store(self.ongoing_checkpoint.max_frame, Ordering::Release);
self.with_shared(|shared| {
shared
.nbackfills
.store(self.ongoing_checkpoint.max_frame, Ordering::Release)
});
if mode.require_all_backfilled() && !checkpoint_result.everything_backfilled() {
return Err(LimboError::Busy);
@@ -2002,7 +2018,7 @@ impl WalFile {
checkpoint_result.take().unwrap()
};
// increment wal epoch to ensure no stale pages are used for backfilling
self.get_shared().epoch.fetch_add(1, Ordering::Release);
self.with_shared(|shared| shared.epoch.fetch_add(1, Ordering::Release));
// store a copy of the checkpoint result to return in the future if pragma
// wal_checkpoint is called and we haven't backfilled again since.
@@ -2129,7 +2145,7 @@ impl WalFile {
// reinitialize inmemory state
self.get_shared_mut().restart_wal_header(&self.io, mode);
let cksm = self.get_shared().last_checksum;
let cksm = self.with_shared(|shared| shared.last_checksum);
self.last_checksum = cksm;
self.max_frame.store(0, Ordering::Release);
self.min_frame.store(0, Ordering::Release);
@@ -2138,12 +2154,11 @@ impl WalFile {
}
fn truncate_log(&mut self) -> Result<IOResult<()>> {
let file = {
let shared = self.get_shared();
let file = self.with_shared(|shared| {
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
shared.initialized.store(false, Ordering::Release);
shared.file.as_ref().unwrap().clone()
};
});
let CheckpointState::Truncate {
sync_sent,
@@ -2250,9 +2265,10 @@ impl WalFile {
})
};
// schedule read of the page payload
let shared = self.get_shared();
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
let file = shared.file.as_ref().unwrap();
let file = self.with_shared(|shared| {
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
shared.file.as_ref().unwrap().clone()
});
let c = begin_read_wal_frame(
file.as_ref(),
offset + WAL_FRAME_HEADER_SIZE as u64,