From ba9a1ebbef91bd45fb875e80193f20004fc4564d Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Sun, 19 Oct 2025 19:45:05 -0300 Subject: [PATCH] add mutable scoped locking for SharedWalFile --- core/storage/wal.rs | 162 +++++++++++++++++++++++--------------------- 1 file changed, 84 insertions(+), 78 deletions(-) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 87863b1a3..8a7ed81a6 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -876,23 +876,19 @@ impl Wal for WalFile { // If none found or lagging, try to claim/update a slot if best_idx == -1 || (best_mark as u64) < shared_max { - for (idx, lock) in self - .get_shared_mut() - .read_locks - .iter_mut() - .enumerate() - .skip(1) - { - if !lock.write() { - continue; // busy slot + self.with_shared_mut(|shared| { + for (idx, lock) in shared.read_locks.iter_mut().enumerate().skip(1) { + if !lock.write() { + continue; // busy slot + } + // claim or bump this slot + lock.set_value_exclusive(shared_max as u32); + best_idx = idx as i64; + best_mark = shared_max as u32; + lock.unlock(); + break; } - // claim or bump this slot - lock.set_value_exclusive(shared_max as u32); - best_idx = idx as i64; - best_mark = shared_max as u32; - lock.unlock(); - break; - } + }) } if best_idx == -1 || best_mark != shared_max as u32 { @@ -968,7 +964,7 @@ impl Wal for WalFile { fn end_read_tx(&self) { let slot = self.max_frame_read_lock_index.load(Ordering::Acquire); if slot != NO_LOCK_HELD { - self.get_shared_mut().read_locks[slot].unlock(); + self.with_shared_mut(|shared| shared.read_locks[slot].unlock()); self.max_frame_read_lock_index .store(NO_LOCK_HELD, Ordering::Release); tracing::debug!("end_read_tx(slot={slot})"); @@ -980,29 +976,29 @@ impl Wal for WalFile { /// Begin a write transaction #[instrument(skip_all, level = Level::DEBUG)] fn begin_write_tx(&mut self) -> Result<()> { - let shared = self.get_shared_mut(); - // sqlite/src/wal.c 3702 - // Cannot start a write transaction without first holding a read - // transaction. - // assert(pWal->readLock >= 0); - // assert(pWal->writeLock == 0 && pWal->iReCksum == 0); - turso_assert!( - self.max_frame_read_lock_index.load(Ordering::Acquire) != NO_LOCK_HELD, - "must have a read transaction to begin a write transaction" - ); - if !shared.write_lock.write() { - return Err(LimboError::Busy); - } - let db_changed = self.db_changed(&shared); - if !db_changed { - drop(shared); - return Ok(()); - } + self.with_shared_mut(|shared| { + // sqlite/src/wal.c 3702 + // Cannot start a write transaction without first holding a read + // transaction. + // assert(pWal->readLock >= 0); + // assert(pWal->writeLock == 0 && pWal->iReCksum == 0); + turso_assert!( + self.max_frame_read_lock_index.load(Ordering::Acquire) != NO_LOCK_HELD, + "must have a read transaction to begin a write transaction" + ); + if !shared.write_lock.write() { + return Err(LimboError::Busy); + } + let db_changed = self.db_changed(shared); + if !db_changed { + return Ok(()); + } - // Snapshot is stale, give up and let caller retry from scratch - tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame.load(Ordering::Acquire), shared.max_frame.load(Ordering::Acquire)); - shared.write_lock.unlock(); - Err(LimboError::Busy) + // Snapshot is stale, give up and let caller retry from scratch + tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame.load(Ordering::Acquire), shared.max_frame.load(Ordering::Acquire)); + shared.write_lock.unlock(); + Err(LimboError::Busy) + }) } /// End a write transaction @@ -1379,18 +1375,19 @@ impl Wal for WalFile { #[instrument(skip_all, level = Level::DEBUG)] fn finish_append_frames_commit(&mut self) -> Result<()> { - let mut shared = self.get_shared_mut(); - shared - .max_frame - .store(self.max_frame.load(Ordering::Acquire), Ordering::Release); - tracing::trace!(max_frame = self.max_frame.load(Ordering::Acquire), ?self.last_checksum); - shared.last_checksum = self.last_checksum; - self.transaction_count.fetch_add(1, Ordering::Release); - shared.transaction_count.store( - self.transaction_count.load(Ordering::Acquire), - Ordering::Release, - ); - Ok(()) + self.with_shared_mut(|shared| { + shared + .max_frame + .store(self.max_frame.load(Ordering::Acquire), Ordering::Release); + tracing::trace!(max_frame = self.max_frame.load(Ordering::Acquire), ?self.last_checksum); + shared.last_checksum = self.last_checksum; + self.transaction_count.fetch_add(1, Ordering::Release); + shared.transaction_count.store( + self.transaction_count.load(Ordering::Acquire), + Ordering::Release, + ); + Ok(()) + }) } fn changed_pages_after(&self, frame_watermark: u64) -> Result> { @@ -1421,8 +1418,7 @@ impl Wal for WalFile { return Ok(None); } tracing::debug!("ensure_header_if_needed"); - self.last_checksum = { - let mut shared = self.get_shared_mut(); + self.last_checksum = self.with_shared_mut(|shared| { let checksum = { let mut hdr = shared.wal_header.lock(); hdr.magic = if cfg!(target_endian = "big") { @@ -1448,7 +1444,7 @@ impl Wal for WalFile { }; shared.last_checksum = checksum; checksum - }; + }); self.max_frame.store(0, Ordering::Release); let (header, file) = self.with_shared(|shared| { @@ -1667,7 +1663,7 @@ impl WalFile { WAL_HEADER_SIZE as u64 + page_offset } - fn get_shared_mut(&self) -> parking_lot::RwLockWriteGuard<'_, WalFileShared> { + fn _get_shared_mut(&self) -> parking_lot::RwLockWriteGuard<'_, WalFileShared> { // WASM in browser main thread doesn't have a way to "park" a thread // so, we spin way here instead of calling blocking lock #[cfg(target_family = "wasm")] @@ -1705,6 +1701,15 @@ impl WalFile { } } + #[inline] + fn with_shared_mut(&self, func: F) -> R + where + F: FnOnce(&mut WalFileShared) -> R, + { + let mut shared = self._get_shared_mut(); + func(&mut shared) + } + #[inline] fn with_shared(&self, func: F) -> R where @@ -2087,29 +2092,30 @@ impl WalFile { /// We never modify slot values while a reader holds that slot's lock. /// TOOD: implement proper BUSY handling behavior fn determine_max_safe_checkpoint_frame(&self) -> u64 { - let mut shared = self.get_shared_mut(); - let shared_max = shared.max_frame.load(Ordering::Acquire); - let mut max_safe_frame = shared_max; + self.with_shared_mut(|shared| { + let shared_max = shared.max_frame.load(Ordering::Acquire); + let mut max_safe_frame = shared_max; - for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate().skip(1) { - let this_mark = read_lock.get_value(); - if this_mark < max_safe_frame as u32 { - let busy = !read_lock.write(); - if !busy { - let val = if read_lock_idx == 1 { - // store the max_frame for the default read slot 1 - max_safe_frame as u32 + for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate().skip(1) { + let this_mark = read_lock.get_value(); + if this_mark < max_safe_frame as u32 { + let busy = !read_lock.write(); + if !busy { + let val = if read_lock_idx == 1 { + // store the max_frame for the default read slot 1 + max_safe_frame as u32 + } else { + READMARK_NOT_USED + }; + read_lock.set_value_exclusive(val); + read_lock.unlock(); } else { - READMARK_NOT_USED - }; - read_lock.set_value_exclusive(val); - read_lock.unlock(); - } else { - max_safe_frame = this_mark as u64; + max_safe_frame = this_mark as u64; + } } } - } - max_safe_frame + max_safe_frame + }) } /// Called once the entire WAL has been back‑filled in RESTART or TRUNCATE mode. @@ -2125,9 +2131,8 @@ impl WalFile { self.checkpoint_guard ); tracing::debug!("restart_log(mode={mode:?})"); - { + self.with_shared_mut(|shared| { // Block all readers - let mut shared = self.get_shared_mut(); for idx in 1..shared.read_locks.len() { let lock = &mut shared.read_locks[idx]; if !lock.write() { @@ -2141,10 +2146,11 @@ impl WalFile { // after the log is reset, we must set all secondary marks to READMARK_NOT_USED so the next reader selects a fresh slot lock.set_value_exclusive(READMARK_NOT_USED); } - } + Ok(()) + })?; // reinitialize in‑memory state - self.get_shared_mut().restart_wal_header(&self.io, mode); + self.with_shared_mut(|shared| shared.restart_wal_header(&self.io, mode)); let cksm = self.with_shared(|shared| shared.last_checksum); self.last_checksum = cksm; self.max_frame.store(0, Ordering::Release);