Merge 'core/storage: Wrap WalFile::max_frame_read_lock_index with AtomicUsize' from Pekka Enberg

Closes #3411
This commit is contained in:
Pekka Enberg
2025-09-28 14:16:49 +03:00
committed by GitHub

View File

@@ -8,7 +8,7 @@ use tracing::{instrument, Level};
use parking_lot::RwLock;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::{cell::Cell, fmt, sync::Arc};
use super::buffer_pool::BufferPool;
@@ -569,7 +569,7 @@ pub struct WalFile {
// min and max frames for this connection
/// This is the index to the read_lock in WalFileShared that we are holding. This lock contains
/// the max frame for this connection.
max_frame_read_lock_index: Cell<usize>,
max_frame_read_lock_index: AtomicUsize,
/// Max frame allowed to lookup range=(minframe..max_frame)
max_frame: u64,
/// Start of range to look for frames range=(minframe..max_frame)
@@ -812,9 +812,11 @@ impl Wal for WalFile {
#[instrument(skip_all, level = Level::DEBUG)]
fn begin_read_tx(&mut self) -> Result<bool> {
turso_assert!(
self.max_frame_read_lock_index.get().eq(&NO_LOCK_HELD),
self.max_frame_read_lock_index
.load(Ordering::Acquire)
.eq(&NO_LOCK_HELD),
"cannot start a new read tx without ending an existing one, lock_value={}, expected={}",
self.max_frame_read_lock_index.get(),
self.max_frame_read_lock_index.load(Ordering::Acquire),
NO_LOCK_HELD
);
let (shared_max, nbackfills, last_checksum, checkpoint_seq) = {
@@ -843,7 +845,8 @@ impl Wal for WalFile {
// we need to keep self.max_frame set to the appropriate
// max frame in the wal at the time this transaction starts.
self.max_frame = shared_max;
self.max_frame_read_lock_index.set(lock_0_idx);
self.max_frame_read_lock_index
.store(lock_0_idx, Ordering::Release);
self.min_frame = nbackfills + 1;
self.last_checksum = last_checksum;
return Ok(db_changed);
@@ -937,7 +940,8 @@ impl Wal for WalFile {
}
self.min_frame = nb2 + 1;
self.max_frame = best_mark as u64;
self.max_frame_read_lock_index.set(best_idx as usize);
self.max_frame_read_lock_index
.store(best_idx as usize, Ordering::Release);
tracing::debug!(
"begin_read_tx(min={}, max={}, slot={}, max_frame_in_wal={})",
self.min_frame,
@@ -952,10 +956,11 @@ impl Wal for WalFile {
#[inline(always)]
#[instrument(skip_all, level = Level::DEBUG)]
fn end_read_tx(&self) {
let slot = self.max_frame_read_lock_index.get();
let slot = self.max_frame_read_lock_index.load(Ordering::Acquire);
if slot != NO_LOCK_HELD {
self.get_shared_mut().read_locks[slot].unlock();
self.max_frame_read_lock_index.set(NO_LOCK_HELD);
self.max_frame_read_lock_index
.store(NO_LOCK_HELD, Ordering::Release);
tracing::debug!("end_read_tx(slot={slot})");
} else {
tracing::debug!("end_read_tx(slot=no_lock)");
@@ -972,7 +977,7 @@ impl Wal for WalFile {
// assert(pWal->readLock >= 0);
// assert(pWal->writeLock == 0 && pWal->iReCksum == 0);
turso_assert!(
self.max_frame_read_lock_index.get() != NO_LOCK_HELD,
self.max_frame_read_lock_index.load(Ordering::Acquire) != NO_LOCK_HELD,
"must have a read transaction to begin a write transaction"
);
if !shared.write_lock.write() {
@@ -1033,7 +1038,9 @@ impl Wal for WalFile {
// min_frame is set to nbackfill + 1 and max_frame is set to shared_max_frame
//
// by default, SQLite tries to restart log file in this case - but for now let's keep it simple in the turso-db
if self.max_frame_read_lock_index.get() == 0 && self.max_frame < self.min_frame {
if self.max_frame_read_lock_index.load(Ordering::Acquire) == 0
&& self.max_frame < self.min_frame
{
tracing::debug!(
"find_frame(page_id={}, frame_watermark={:?}): max_frame is 0 - read from DB file",
page_id,
@@ -1618,7 +1625,7 @@ impl WalFile {
checkpoint_seq: AtomicU32::new(0),
syncing: Arc::new(AtomicBool::new(false)),
min_frame: 0,
max_frame_read_lock_index: NO_LOCK_HELD.into(),
max_frame_read_lock_index: AtomicUsize::new(NO_LOCK_HELD),
last_checksum,
prev_checkpoint: CheckpointResult::default(),
checkpoint_guard: None,
@@ -1693,7 +1700,8 @@ impl WalFile {
}
fn reset_internal_states(&mut self) {
self.max_frame_read_lock_index.set(NO_LOCK_HELD);
self.max_frame_read_lock_index
.store(NO_LOCK_HELD, Ordering::Release);
self.ongoing_checkpoint.reset();
self.syncing.store(false, Ordering::SeqCst);
}
@@ -3037,7 +3045,7 @@ pub mod test {
{
let wal_any = wal.as_any();
if let Some(wal_file) = wal_any.downcast_ref::<WalFile>() {
return wal_file.max_frame_read_lock_index.get() == expected_slot;
return wal_file.max_frame_read_lock_index.load(Ordering::Acquire) == expected_slot;
}
}