From b00a27696004f0ccb98dd20cc6b39429ebcfdaeb Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Sun, 19 Oct 2025 18:28:11 -0300 Subject: [PATCH] add scoped locking for SharedWalFile to avoid holding locks for longer than needed --- core/storage/wal.rs | 250 +++++++++++++++++++++++--------------------- 1 file changed, 133 insertions(+), 117 deletions(-) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 1fd114a94..87863b1a3 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -822,16 +822,16 @@ impl Wal for WalFile { self.max_frame_read_lock_index.load(Ordering::Acquire), NO_LOCK_HELD ); - let (shared_max, nbackfills, last_checksum, checkpoint_seq, transaction_count) = { - let shared = self.get_shared(); - let mx = shared.max_frame.load(Ordering::Acquire); - let nb = shared.nbackfills.load(Ordering::Acquire); - let ck = shared.last_checksum; - let checkpoint_seq = shared.wal_header.lock().checkpoint_seq; - let transaction_count = shared.transaction_count.load(Ordering::Acquire); - (mx, nb, ck, checkpoint_seq, transaction_count) - }; - let db_changed = self.db_changed(&self.get_shared()); + let (shared_max, nbackfills, last_checksum, checkpoint_seq, transaction_count) = self + .with_shared(|shared| { + let mx = shared.max_frame.load(Ordering::Acquire); + let nb = shared.nbackfills.load(Ordering::Acquire); + let ck = shared.last_checksum; + let checkpoint_seq = shared.wal_header.lock().checkpoint_seq; + let transaction_count = shared.transaction_count.load(Ordering::Acquire); + (mx, nb, ck, checkpoint_seq, transaction_count) + }); + let db_changed = self.with_shared(|shared| self.db_changed(shared)); // WAL is already fully back‑filled into the main DB image // (mxFrame == nBackfill). Readers can therefore ignore the @@ -840,7 +840,7 @@ impl Wal for WalFile { if shared_max == nbackfills { tracing::debug!("begin_read_tx: WAL is already fully back‑filled into the main DB image, shared_max={}, nbackfills={}", shared_max, nbackfills); let lock_0_idx = 0; - if !self.get_shared().read_locks[lock_0_idx].read() { + if !self.with_shared(|shared| shared.read_locks[lock_0_idx].read()) { tracing::debug!("begin_read_tx: read lock 0 is already held, returning Busy"); return Err(LimboError::Busy); } @@ -864,13 +864,15 @@ impl Wal for WalFile { // Find largest mark <= mx among slots 1..N let mut best_idx: i64 = -1; let mut best_mark: u32 = 0; - for (idx, lock) in self.get_shared().read_locks.iter().enumerate().skip(1) { - let m = lock.get_value(); - if m != READMARK_NOT_USED && m <= shared_max as u32 && m > best_mark { - best_mark = m; - best_idx = idx as i64; + self.with_shared(|shared| { + for (idx, lock) in shared.read_locks.iter().enumerate().skip(1) { + let m = lock.get_value(); + if m != READMARK_NOT_USED && m <= shared_max as u32 && m > best_mark { + best_mark = m; + best_idx = idx as i64; + } } - } + }); // If none found or lagging, try to claim/update a slot if best_idx == -1 || (best_mark as u64) < shared_max { @@ -901,20 +903,19 @@ impl Wal for WalFile { // Now take a shared read on that slot, and if we are successful, // grab another snapshot of the shared state. - let (mx2, nb2, cksm2, ckpt_seq2) = { - let shared = self.get_shared(); + let (mx2, nb2, cksm2, ckpt_seq2) = self.with_shared(|shared| { if !shared.read_locks[best_idx as usize].read() { // TODO: we should retry here instead of always returning Busy return Err(LimboError::Busy); } let checkpoint_seq = shared.wal_header.lock().checkpoint_seq; - ( + Ok(( shared.max_frame.load(Ordering::Acquire), shared.nbackfills.load(Ordering::Acquire), shared.last_checksum, checkpoint_seq, - ) - }; + )) + })?; // sqlite/src/wal.c 3225 // Now that the read-lock has been obtained, check that neither the @@ -1008,7 +1009,7 @@ impl Wal for WalFile { #[instrument(skip_all, level = Level::DEBUG)] fn end_write_tx(&self) { tracing::debug!("end_write_txn"); - self.get_shared().write_lock.unlock(); + self.with_shared(|shared| shared.write_lock.unlock()); } /// Find the latest frame containing a page. @@ -1029,10 +1030,13 @@ impl Wal for WalFile { // // if it's not, than pages from WAL range [frame_watermark..nBackfill] are already in the DB file, // and in case if page first occurrence in WAL was after frame_watermark - we will be unable to read proper previous version of the page - turso_assert!( - frame_watermark.is_none() || frame_watermark.unwrap() >= self.get_shared().nbackfills.load(Ordering::Acquire), - "frame_watermark must be >= than current WAL backfill amount: frame_watermark={:?}, nBackfill={}", frame_watermark, self.get_shared().nbackfills.load(Ordering::Acquire) - ); + self.with_shared(|shared| { + let nbackfills = shared.nbackfills.load(Ordering::Acquire); + turso_assert!( + frame_watermark.is_none() || frame_watermark.unwrap() >= nbackfills, + "frame_watermark must be >= than current WAL backfill amount: frame_watermark={:?}, nBackfill={}", frame_watermark, nbackfills + ); + }); // if we are holding read_lock 0 and didn't write anything to the WAL, skip and read right from db file. // @@ -1050,30 +1054,31 @@ impl Wal for WalFile { ); return Ok(None); } - let shared = self.get_shared(); - let frames = shared.frame_cache.lock(); - let range = frame_watermark.map(|x| 0..=x).unwrap_or( - self.min_frame.load(Ordering::Acquire)..=self.max_frame.load(Ordering::Acquire), - ); - tracing::debug!( - "find_frame(page_id={}, frame_watermark={:?}): min_frame={}, max_frame={}", - page_id, - frame_watermark, - self.min_frame.load(Ordering::Acquire), - self.max_frame.load(Ordering::Acquire) - ); - if let Some(list) = frames.get(&page_id) { - if let Some(f) = list.iter().rfind(|&&f| range.contains(&f)) { - tracing::debug!( - "find_frame(page_id={}, frame_watermark={:?}): found frame={}", - page_id, - frame_watermark, - *f - ); - return Ok(Some(*f)); + self.with_shared(|shared| { + let frames = shared.frame_cache.lock(); + let range = frame_watermark.map(|x| 0..=x).unwrap_or( + self.min_frame.load(Ordering::Acquire)..=self.max_frame.load(Ordering::Acquire), + ); + tracing::debug!( + "find_frame(page_id={}, frame_watermark={:?}): min_frame={}, max_frame={}", + page_id, + frame_watermark, + self.min_frame.load(Ordering::Acquire), + self.max_frame.load(Ordering::Acquire) + ); + if let Some(list) = frames.get(&page_id) { + if let Some(f) = list.iter().rfind(|&&f| range.contains(&f)) { + tracing::debug!( + "find_frame(page_id={}, frame_watermark={:?}): found frame={}", + page_id, + frame_watermark, + *f + ); + return Ok(Some(*f)); + } } - } - Ok(None) + Ok(None) + }) } /// Read a frame from the WAL. @@ -1110,8 +1115,7 @@ impl Wal for WalFile { let epoch = shared_file.read().epoch.load(Ordering::Acquire); frame.set_wal_tag(frame_id, epoch); }); - let file = { - let shared = self.get_shared(); + let file = self.with_shared(|shared| { assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); // important not to hold shared lock beyond this point to avoid deadlock scenario where: // thread 1: takes readlock here, passes reference to shared.file to begin_read_wal_frame @@ -1125,7 +1129,7 @@ impl Wal for WalFile { // when there are writers waiting to acquire the lock. // Because of this, attempts to recursively acquire a read lock within a single thread may result in a deadlock." shared.file.as_ref().unwrap().clone() - }; + }); begin_read_wal_frame( file.as_ref(), offset + WAL_FRAME_HEADER_SIZE as u64, @@ -1184,9 +1188,10 @@ impl Wal for WalFile { } } }); - let shared = self.get_shared(); - assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); - let file = shared.file.as_ref().unwrap(); + let file = self.with_shared(|shared| { + assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); + shared.file.as_ref().unwrap().clone() + }); let c = begin_read_wal_frame_raw(&self.buffer_pool, file.as_ref(), offset, complete)?; Ok(c) } @@ -1243,9 +1248,10 @@ impl Wal for WalFile { } } }); - let shared = self.get_shared(); - assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); - let file = shared.file.as_ref().unwrap(); + let file = self.with_shared(|shared| { + assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); + shared.file.as_ref().unwrap().clone() + }); let c = begin_read_wal_frame( file.as_ref(), offset + WAL_FRAME_HEADER_SIZE as u64, @@ -1266,13 +1272,12 @@ impl Wal for WalFile { // perform actual write let offset = self.frame_offset(frame_id); - let (header, file) = { - let shared = self.get_shared(); + let (header, file) = self.with_shared(|shared| { let header = shared.wal_header.clone(); assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); let file = shared.file.as_ref().unwrap().clone(); (header, file) - }; + }); let header = header.lock(); let checksums = self.last_checksum; let (checksums, frame_bytes) = prepare_wal_frame( @@ -1296,10 +1301,11 @@ impl Wal for WalFile { #[instrument(skip_all, level = Level::DEBUG)] fn should_checkpoint(&self) -> bool { - let shared = self.get_shared(); - let frame_id = shared.max_frame.load(Ordering::Acquire) as usize; - let nbackfills = shared.nbackfills.load(Ordering::Acquire) as usize; - frame_id > self.checkpoint_threshold + nbackfills + self.with_shared(|shared| { + let frame_id = shared.max_frame.load(Ordering::Acquire) as usize; + let nbackfills = shared.nbackfills.load(Ordering::Acquire) as usize; + frame_id > self.checkpoint_threshold + nbackfills + }) } #[instrument(skip_all, level = Level::DEBUG)] @@ -1322,11 +1328,10 @@ impl Wal for WalFile { tracing::debug!("wal_sync finish"); syncing.store(false, Ordering::SeqCst); }); - let file = { - let shared = self.get_shared(); + let file = self.with_shared(|shared| { assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); shared.file.as_ref().unwrap().clone() - }; + }); self.syncing.store(true, Ordering::SeqCst); let c = file.sync(completion)?; Ok(c) @@ -1338,11 +1343,11 @@ impl Wal for WalFile { } fn get_max_frame_in_wal(&self) -> u64 { - self.get_shared().max_frame.load(Ordering::Acquire) + self.with_shared(|shared| shared.max_frame.load(Ordering::Acquire)) } fn get_checkpoint_seq(&self) -> u32 { - self.get_shared().wal_header.lock().checkpoint_seq + self.with_shared(|shared| shared.wal_header.lock().checkpoint_seq) } fn get_max_frame(&self) -> u64 { @@ -1355,8 +1360,7 @@ impl Wal for WalFile { #[instrument(skip_all, level = Level::DEBUG)] fn rollback(&mut self) { - let (max_frame, last_checksum) = { - let shared = self.get_shared(); + let (max_frame, last_checksum) = self.with_shared(|shared| { let max_frame = shared.max_frame.load(Ordering::Acquire); let mut frame_cache = shared.frame_cache.lock(); frame_cache.retain(|_page_id, frames| { @@ -1367,7 +1371,7 @@ impl Wal for WalFile { !frames.is_empty() }); (max_frame, shared.last_checksum) - }; + }); self.last_checksum = last_checksum; self.max_frame.store(max_frame, Ordering::Release); self.reset_internal_states(); @@ -1413,7 +1417,7 @@ impl Wal for WalFile { } fn prepare_wal_start(&mut self, page_size: PageSize) -> Result> { - if self.get_shared().is_initialized()? { + if self.with_shared(|shared| shared.is_initialized())? { return Ok(None); } tracing::debug!("ensure_header_if_needed"); @@ -1447,17 +1451,22 @@ impl Wal for WalFile { }; self.max_frame.store(0, Ordering::Release); - let shared = self.get_shared(); - assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); - let file = shared.file.as_ref().unwrap(); - let c = sqlite3_ondisk::begin_write_wal_header(file.as_ref(), &shared.wal_header.lock())?; + let (header, file) = self.with_shared(|shared| { + assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); + ( + *shared.wal_header.lock(), + shared.file.as_ref().unwrap().clone(), + ) + }); + let c = sqlite3_ondisk::begin_write_wal_header(file.as_ref(), &header)?; Ok(Some(c)) } fn prepare_wal_finish(&mut self) -> Result { - let shared = self.get_shared(); - assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); - let file = shared.file.as_ref().unwrap(); + let file = self.with_shared(|shared| { + assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); + shared.file.as_ref().unwrap().clone() + }); let shared = self.shared.clone(); let c = file.sync(Completion::new_sync(move |_| { shared.read().initialized.store(true, Ordering::Release); @@ -1477,18 +1486,17 @@ impl Wal for WalFile { "we limit number of iovecs to IOV_MAX" ); turso_assert!( - self.get_shared().is_initialized()?, + self.with_shared(|shared| shared.is_initialized())?, "WAL must be prepared with prepare_wal_start/prepare_wal_finish method" ); - let (header, shared_page_size, epoch) = { - let shared = self.get_shared(); + let (header, shared_page_size, epoch) = self.with_shared(|shared| { let hdr_guard = shared.wal_header.lock(); let header: WalHeader = *hdr_guard; let shared_page_size = header.page_size; let epoch = shared.epoch.load(Ordering::Acquire); (header, shared_page_size, epoch) - }; + }); turso_assert!( shared_page_size == page_sz.get(), "page size mismatch, tried to change page size after WAL header was already initialized: shared.page_size={shared_page_size}, page_size={}", @@ -1577,11 +1585,10 @@ impl Wal for WalFile { let c = Completion::new_write_linked(cmp); - let file = { - let shared = self.get_shared(); + let file = self.with_shared(|shared| { assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); shared.file.as_ref().unwrap().clone() - }; + }); let c = file.pwritev(start_off, iovecs, c)?; Ok(c) } @@ -1596,8 +1603,10 @@ impl Wal for WalFile { } fn update_max_frame(&mut self) { - let new_max_frame = self.get_shared().max_frame.load(Ordering::Acquire); - self.max_frame.store(new_max_frame, Ordering::Release); + self.with_shared(|shared| { + let new_max_frame = shared.max_frame.load(Ordering::Acquire); + self.max_frame.store(new_max_frame, Ordering::Release); + }) } } @@ -1649,7 +1658,7 @@ impl WalFile { } fn page_size(&self) -> u32 { - self.get_shared().wal_header.lock().page_size + self.with_shared(|shared| shared.wal_header.lock().page_size) } fn frame_offset(&self, frame_id: u64) -> u64 { @@ -1677,7 +1686,7 @@ impl WalFile { } } - fn get_shared(&self) -> parking_lot::RwLockReadGuard<'_, WalFileShared> { + fn _get_shared(&self) -> parking_lot::RwLockReadGuard<'_, WalFileShared> { // WASM in browser main thread doesn't have a way to "park" a thread // so, we spin way here instead of calling blocking lock #[cfg(target_family = "wasm")] @@ -1696,11 +1705,19 @@ impl WalFile { } } + #[inline] + fn with_shared(&self, func: F) -> R + where + F: FnOnce(&WalFileShared) -> R, + { + let shared = self._get_shared(); + func(&shared) + } + fn complete_append_frame(&mut self, page_id: u64, frame_id: u64, checksums: (u32, u32)) { self.last_checksum = checksums; self.max_frame.store(frame_id, Ordering::Release); - let shared = self.get_shared(); - { + self.with_shared(|shared| { let mut frame_cache = shared.frame_cache.lock(); match frame_cache.get_mut(&page_id) { Some(frames) => { @@ -1710,7 +1727,7 @@ impl WalFile { frame_cache.insert(page_id, vec![frame_id]); } } - } + }) } fn reset_internal_states(&mut self) { @@ -1745,12 +1762,11 @@ impl WalFile { // so no other checkpointer can run. fsync WAL if there are unapplied frames. // Decide the largest frame we are allowed to back‑fill. CheckpointState::Start => { - let (max_frame, nbackfills) = { - let shared = self.get_shared(); + let (max_frame, nbackfills) = self.with_shared(|shared| { let max_frame = shared.max_frame.load(Ordering::Acquire); let n_backfills = shared.nbackfills.load(Ordering::Acquire); (max_frame, n_backfills) - }; + }); let needs_backfill = max_frame > nbackfills; if !needs_backfill && !mode.should_restart_log() { // there are no frames to copy over and we don't need to reset @@ -1786,8 +1802,7 @@ impl WalFile { self.ongoing_checkpoint.max_frame = max_frame; self.ongoing_checkpoint.min_frame = nbackfills + 1; - let to_checkpoint = { - let shared = self.get_shared(); + let to_checkpoint = self.with_shared(|shared| { let frame_cache = shared.frame_cache.lock(); let mut list = Vec::with_capacity( self.ongoing_checkpoint @@ -1808,7 +1823,7 @@ impl WalFile { // sort by frame_id for read locality list.sort_unstable_by(|a, b| (a.1, a.0).cmp(&(b.1, b.0))); list - }; + }); self.ongoing_checkpoint.pages_to_checkpoint = to_checkpoint; self.ongoing_checkpoint.current_page = 0; self.ongoing_checkpoint.inflight_writes.clear(); @@ -1839,7 +1854,7 @@ impl WalFile { if self.ongoing_checkpoint.process_pending_reads() { tracing::trace!("Drained reads into batch"); } - let epoch = self.get_shared().epoch.load(Ordering::Acquire); + let epoch = self.with_shared(|shared| shared.epoch.load(Ordering::Acquire)); // Issue reads until we hit limits 'inner: while self.ongoing_checkpoint.should_issue_reads() { let (page_id, target_frame) = self.ongoing_checkpoint.pages_to_checkpoint @@ -1931,8 +1946,7 @@ impl WalFile { self.ongoing_checkpoint.complete(), "checkpoint pending flush must have finished" ); - let checkpoint_result = { - let shared = self.get_shared(); + let checkpoint_result = self.with_shared(|shared| { let current_mx = shared.max_frame.load(Ordering::Acquire); let nbackfills = shared.nbackfills.load(Ordering::Acquire); // Record two num pages fields to return as checkpoint result to caller. @@ -1964,14 +1978,16 @@ impl WalFile { checkpoint_max_frame, ) } - }; + }); // store the max frame we were able to successfully checkpoint. // NOTE: we don't have a .shm file yet, so it's safe to update nbackfills here // before we sync, because if we crash and then recover, we will checkpoint the entire db anyway. - self.get_shared() - .nbackfills - .store(self.ongoing_checkpoint.max_frame, Ordering::Release); + self.with_shared(|shared| { + shared + .nbackfills + .store(self.ongoing_checkpoint.max_frame, Ordering::Release) + }); if mode.require_all_backfilled() && !checkpoint_result.everything_backfilled() { return Err(LimboError::Busy); @@ -2002,7 +2018,7 @@ impl WalFile { checkpoint_result.take().unwrap() }; // increment wal epoch to ensure no stale pages are used for backfilling - self.get_shared().epoch.fetch_add(1, Ordering::Release); + self.with_shared(|shared| shared.epoch.fetch_add(1, Ordering::Release)); // store a copy of the checkpoint result to return in the future if pragma // wal_checkpoint is called and we haven't backfilled again since. @@ -2129,7 +2145,7 @@ impl WalFile { // reinitialize in‑memory state self.get_shared_mut().restart_wal_header(&self.io, mode); - let cksm = self.get_shared().last_checksum; + let cksm = self.with_shared(|shared| shared.last_checksum); self.last_checksum = cksm; self.max_frame.store(0, Ordering::Release); self.min_frame.store(0, Ordering::Release); @@ -2138,12 +2154,11 @@ impl WalFile { } fn truncate_log(&mut self) -> Result> { - let file = { - let shared = self.get_shared(); + let file = self.with_shared(|shared| { assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); shared.initialized.store(false, Ordering::Release); shared.file.as_ref().unwrap().clone() - }; + }); let CheckpointState::Truncate { sync_sent, @@ -2250,9 +2265,10 @@ impl WalFile { }) }; // schedule read of the page payload - let shared = self.get_shared(); - assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); - let file = shared.file.as_ref().unwrap(); + let file = self.with_shared(|shared| { + assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); + shared.file.as_ref().unwrap().clone() + }); let c = begin_read_wal_frame( file.as_ref(), offset + WAL_FRAME_HEADER_SIZE as u64,