add mutable scoped locking for SharedWalFile

This commit is contained in:
pedrocarlo
2025-10-19 19:45:05 -03:00
parent b00a276960
commit ba9a1ebbef

View File

@@ -876,23 +876,19 @@ impl Wal for WalFile {
// If none found or lagging, try to claim/update a slot
if best_idx == -1 || (best_mark as u64) < shared_max {
for (idx, lock) in self
.get_shared_mut()
.read_locks
.iter_mut()
.enumerate()
.skip(1)
{
if !lock.write() {
continue; // busy slot
self.with_shared_mut(|shared| {
for (idx, lock) in shared.read_locks.iter_mut().enumerate().skip(1) {
if !lock.write() {
continue; // busy slot
}
// claim or bump this slot
lock.set_value_exclusive(shared_max as u32);
best_idx = idx as i64;
best_mark = shared_max as u32;
lock.unlock();
break;
}
// claim or bump this slot
lock.set_value_exclusive(shared_max as u32);
best_idx = idx as i64;
best_mark = shared_max as u32;
lock.unlock();
break;
}
})
}
if best_idx == -1 || best_mark != shared_max as u32 {
@@ -968,7 +964,7 @@ impl Wal for WalFile {
fn end_read_tx(&self) {
let slot = self.max_frame_read_lock_index.load(Ordering::Acquire);
if slot != NO_LOCK_HELD {
self.get_shared_mut().read_locks[slot].unlock();
self.with_shared_mut(|shared| shared.read_locks[slot].unlock());
self.max_frame_read_lock_index
.store(NO_LOCK_HELD, Ordering::Release);
tracing::debug!("end_read_tx(slot={slot})");
@@ -980,29 +976,29 @@ impl Wal for WalFile {
/// Begin a write transaction
#[instrument(skip_all, level = Level::DEBUG)]
fn begin_write_tx(&mut self) -> Result<()> {
let shared = self.get_shared_mut();
// sqlite/src/wal.c 3702
// Cannot start a write transaction without first holding a read
// transaction.
// assert(pWal->readLock >= 0);
// assert(pWal->writeLock == 0 && pWal->iReCksum == 0);
turso_assert!(
self.max_frame_read_lock_index.load(Ordering::Acquire) != NO_LOCK_HELD,
"must have a read transaction to begin a write transaction"
);
if !shared.write_lock.write() {
return Err(LimboError::Busy);
}
let db_changed = self.db_changed(&shared);
if !db_changed {
drop(shared);
return Ok(());
}
self.with_shared_mut(|shared| {
// sqlite/src/wal.c 3702
// Cannot start a write transaction without first holding a read
// transaction.
// assert(pWal->readLock >= 0);
// assert(pWal->writeLock == 0 && pWal->iReCksum == 0);
turso_assert!(
self.max_frame_read_lock_index.load(Ordering::Acquire) != NO_LOCK_HELD,
"must have a read transaction to begin a write transaction"
);
if !shared.write_lock.write() {
return Err(LimboError::Busy);
}
let db_changed = self.db_changed(shared);
if !db_changed {
return Ok(());
}
// Snapshot is stale, give up and let caller retry from scratch
tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame.load(Ordering::Acquire), shared.max_frame.load(Ordering::Acquire));
shared.write_lock.unlock();
Err(LimboError::Busy)
// Snapshot is stale, give up and let caller retry from scratch
tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame.load(Ordering::Acquire), shared.max_frame.load(Ordering::Acquire));
shared.write_lock.unlock();
Err(LimboError::Busy)
})
}
/// End a write transaction
@@ -1379,18 +1375,19 @@ impl Wal for WalFile {
#[instrument(skip_all, level = Level::DEBUG)]
fn finish_append_frames_commit(&mut self) -> Result<()> {
let mut shared = self.get_shared_mut();
shared
.max_frame
.store(self.max_frame.load(Ordering::Acquire), Ordering::Release);
tracing::trace!(max_frame = self.max_frame.load(Ordering::Acquire), ?self.last_checksum);
shared.last_checksum = self.last_checksum;
self.transaction_count.fetch_add(1, Ordering::Release);
shared.transaction_count.store(
self.transaction_count.load(Ordering::Acquire),
Ordering::Release,
);
Ok(())
self.with_shared_mut(|shared| {
shared
.max_frame
.store(self.max_frame.load(Ordering::Acquire), Ordering::Release);
tracing::trace!(max_frame = self.max_frame.load(Ordering::Acquire), ?self.last_checksum);
shared.last_checksum = self.last_checksum;
self.transaction_count.fetch_add(1, Ordering::Release);
shared.transaction_count.store(
self.transaction_count.load(Ordering::Acquire),
Ordering::Release,
);
Ok(())
})
}
fn changed_pages_after(&self, frame_watermark: u64) -> Result<Vec<u32>> {
@@ -1421,8 +1418,7 @@ impl Wal for WalFile {
return Ok(None);
}
tracing::debug!("ensure_header_if_needed");
self.last_checksum = {
let mut shared = self.get_shared_mut();
self.last_checksum = self.with_shared_mut(|shared| {
let checksum = {
let mut hdr = shared.wal_header.lock();
hdr.magic = if cfg!(target_endian = "big") {
@@ -1448,7 +1444,7 @@ impl Wal for WalFile {
};
shared.last_checksum = checksum;
checksum
};
});
self.max_frame.store(0, Ordering::Release);
let (header, file) = self.with_shared(|shared| {
@@ -1667,7 +1663,7 @@ impl WalFile {
WAL_HEADER_SIZE as u64 + page_offset
}
fn get_shared_mut(&self) -> parking_lot::RwLockWriteGuard<'_, WalFileShared> {
fn _get_shared_mut(&self) -> parking_lot::RwLockWriteGuard<'_, WalFileShared> {
// WASM in browser main thread doesn't have a way to "park" a thread
// so, we spin way here instead of calling blocking lock
#[cfg(target_family = "wasm")]
@@ -1705,6 +1701,15 @@ impl WalFile {
}
}
#[inline]
fn with_shared_mut<F, R>(&self, func: F) -> R
where
F: FnOnce(&mut WalFileShared) -> R,
{
let mut shared = self._get_shared_mut();
func(&mut shared)
}
#[inline]
fn with_shared<F, R>(&self, func: F) -> R
where
@@ -2087,29 +2092,30 @@ impl WalFile {
/// We never modify slot values while a reader holds that slot's lock.
/// TOOD: implement proper BUSY handling behavior
fn determine_max_safe_checkpoint_frame(&self) -> u64 {
let mut shared = self.get_shared_mut();
let shared_max = shared.max_frame.load(Ordering::Acquire);
let mut max_safe_frame = shared_max;
self.with_shared_mut(|shared| {
let shared_max = shared.max_frame.load(Ordering::Acquire);
let mut max_safe_frame = shared_max;
for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate().skip(1) {
let this_mark = read_lock.get_value();
if this_mark < max_safe_frame as u32 {
let busy = !read_lock.write();
if !busy {
let val = if read_lock_idx == 1 {
// store the max_frame for the default read slot 1
max_safe_frame as u32
for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate().skip(1) {
let this_mark = read_lock.get_value();
if this_mark < max_safe_frame as u32 {
let busy = !read_lock.write();
if !busy {
let val = if read_lock_idx == 1 {
// store the max_frame for the default read slot 1
max_safe_frame as u32
} else {
READMARK_NOT_USED
};
read_lock.set_value_exclusive(val);
read_lock.unlock();
} else {
READMARK_NOT_USED
};
read_lock.set_value_exclusive(val);
read_lock.unlock();
} else {
max_safe_frame = this_mark as u64;
max_safe_frame = this_mark as u64;
}
}
}
}
max_safe_frame
max_safe_frame
})
}
/// Called once the entire WAL has been backfilled in RESTART or TRUNCATE mode.
@@ -2125,9 +2131,8 @@ impl WalFile {
self.checkpoint_guard
);
tracing::debug!("restart_log(mode={mode:?})");
{
self.with_shared_mut(|shared| {
// Block all readers
let mut shared = self.get_shared_mut();
for idx in 1..shared.read_locks.len() {
let lock = &mut shared.read_locks[idx];
if !lock.write() {
@@ -2141,10 +2146,11 @@ impl WalFile {
// after the log is reset, we must set all secondary marks to READMARK_NOT_USED so the next reader selects a fresh slot
lock.set_value_exclusive(READMARK_NOT_USED);
}
}
Ok(())
})?;
// reinitialize inmemory state
self.get_shared_mut().restart_wal_header(&self.io, mode);
self.with_shared_mut(|shared| shared.restart_wal_header(&self.io, mode));
let cksm = self.with_shared(|shared| shared.last_checksum);
self.last_checksum = cksm;
self.max_frame.store(0, Ordering::Release);