core/storage: Wrap WalFile::{max,min}_frame with AtomicU64

This commit is contained in:
Pekka Enberg
2025-09-28 14:09:20 +03:00
parent 2da6206c0b
commit d3abeb6281

View File

@@ -571,9 +571,9 @@ pub struct WalFile {
/// the max frame for this connection.
max_frame_read_lock_index: AtomicUsize,
/// Max frame allowed to lookup range=(minframe..max_frame)
max_frame: u64,
max_frame: AtomicU64,
/// Start of range to look for frames range=(minframe..max_frame)
min_frame: u64,
min_frame: AtomicU64,
/// Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL
last_checksum: (u32, u32),
checkpoint_seq: AtomicU32,
@@ -827,7 +827,7 @@ impl Wal for WalFile {
let checkpoint_seq = shared.wal_header.lock().checkpoint_seq;
(mx, nb, ck, checkpoint_seq)
};
let db_changed = shared_max != self.max_frame
let db_changed = shared_max != self.max_frame.load(Ordering::Acquire)
|| last_checksum != self.last_checksum
|| checkpoint_seq != self.checkpoint_seq.load(Ordering::Acquire);
@@ -844,10 +844,10 @@ impl Wal for WalFile {
}
// we need to keep self.max_frame set to the appropriate
// max frame in the wal at the time this transaction starts.
self.max_frame = shared_max;
self.max_frame.store(shared_max, Ordering::Release);
self.max_frame_read_lock_index
.store(lock_0_idx, Ordering::Release);
self.min_frame = nbackfills + 1;
self.min_frame.store(nbackfills + 1, Ordering::Release);
self.last_checksum = last_checksum;
return Ok(db_changed);
}
@@ -938,14 +938,14 @@ impl Wal for WalFile {
{
return Err(LimboError::Busy);
}
self.min_frame = nb2 + 1;
self.max_frame = best_mark as u64;
self.min_frame.store(nb2 + 1, Ordering::Release);
self.max_frame.store(best_mark as u64, Ordering::Release);
self.max_frame_read_lock_index
.store(best_idx as usize, Ordering::Release);
tracing::debug!(
"begin_read_tx(min={}, max={}, slot={}, max_frame_in_wal={})",
self.min_frame,
self.max_frame,
self.min_frame.load(Ordering::Acquire),
self.max_frame.load(Ordering::Acquire),
best_idx,
shared_max
);
@@ -988,16 +988,16 @@ impl Wal for WalFile {
shared.nbackfills.load(Ordering::Acquire),
shared.last_checksum,
);
if self.max_frame == shared_max {
if self.max_frame.load(Ordering::Acquire) == shared_max {
// Snapshot still valid; adopt counters
drop(shared);
self.last_checksum = last_checksum;
self.min_frame = nbackfills + 1;
self.min_frame.store(nbackfills + 1, Ordering::Release);
return Ok(());
}
// Snapshot is stale, give up and let caller retry from scratch
tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame, shared_max);
tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame.load(Ordering::Acquire), shared_max);
shared.write_lock.unlock();
Err(LimboError::Busy)
}
@@ -1019,7 +1019,7 @@ impl Wal for WalFile {
);
turso_assert!(
frame_watermark.unwrap_or(0) <= self.max_frame,
frame_watermark.unwrap_or(0) <= self.max_frame.load(Ordering::Acquire),
"frame_watermark must be <= than current WAL max_frame value"
);
@@ -1039,7 +1039,7 @@ impl Wal for WalFile {
//
// by default, SQLite tries to restart log file in this case - but for now let's keep it simple in the turso-db
if self.max_frame_read_lock_index.load(Ordering::Acquire) == 0
&& self.max_frame < self.min_frame
&& self.max_frame.load(Ordering::Acquire) < self.min_frame.load(Ordering::Acquire)
{
tracing::debug!(
"find_frame(page_id={}, frame_watermark={:?}): max_frame is 0 - read from DB file",
@@ -1050,15 +1050,15 @@ impl Wal for WalFile {
}
let shared = self.get_shared();
let frames = shared.frame_cache.lock();
let range = frame_watermark
.map(|x| 0..=x)
.unwrap_or(self.min_frame..=self.max_frame);
let range = frame_watermark.map(|x| 0..=x).unwrap_or(
self.min_frame.load(Ordering::Acquire)..=self.max_frame.load(Ordering::Acquire),
);
tracing::debug!(
"find_frame(page_id={}, frame_watermark={:?}): min_frame={}, max_frame={}",
page_id,
frame_watermark,
self.min_frame,
self.max_frame
self.min_frame.load(Ordering::Acquire),
self.max_frame.load(Ordering::Acquire)
);
if let Some(list) = frames.get(&page_id) {
if let Some(f) = list.iter().rfind(|&&f| range.contains(&f)) {
@@ -1211,14 +1211,15 @@ impl Wal for WalFile {
self.page_size(),
)));
}
if frame_id > self.max_frame + 1 {
if frame_id > self.max_frame.load(Ordering::Acquire) + 1 {
// attempt to write frame out of sequential order - error out
return Err(LimboError::InvalidArgument(format!(
"frame_id is beyond next frame in the WAL: frame_id={}, max_frame={}",
frame_id, self.max_frame
frame_id,
self.max_frame.load(Ordering::Acquire)
)));
}
if frame_id <= self.max_frame {
if frame_id <= self.max_frame.load(Ordering::Acquire) {
// just validate if page content from the frame matches frame in the WAL
let offset = self.frame_offset(frame_id);
let conflict = Arc::new(Cell::new(false));
@@ -1341,11 +1342,11 @@ impl Wal for WalFile {
}
fn get_max_frame(&self) -> u64 {
self.max_frame
self.max_frame.load(Ordering::Acquire)
}
fn get_min_frame(&self) -> u64 {
self.min_frame
self.min_frame.load(Ordering::Acquire)
}
#[instrument(err, skip_all, level = Level::DEBUG)]
@@ -1364,7 +1365,7 @@ impl Wal for WalFile {
(max_frame, shared.last_checksum)
};
self.last_checksum = last_checksum;
self.max_frame = max_frame;
self.max_frame.store(max_frame, Ordering::Release);
self.reset_internal_states();
Ok(())
}
@@ -1372,8 +1373,10 @@ impl Wal for WalFile {
#[instrument(skip_all, level = Level::DEBUG)]
fn finish_append_frames_commit(&mut self) -> Result<()> {
let mut shared = self.get_shared_mut();
shared.max_frame.store(self.max_frame, Ordering::Release);
tracing::trace!(self.max_frame, ?self.last_checksum);
shared
.max_frame
.store(self.max_frame.load(Ordering::Acquire), Ordering::Release);
tracing::trace!(max_frame = self.max_frame.load(Ordering::Acquire), ?self.last_checksum);
shared.last_checksum = self.last_checksum;
Ok(())
}
@@ -1435,7 +1438,7 @@ impl Wal for WalFile {
checksum
};
self.max_frame = 0;
self.max_frame.store(0, Ordering::Release);
let shared = self.get_shared();
assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled");
let file = shared.file.as_ref().unwrap();
@@ -1492,7 +1495,7 @@ impl Wal for WalFile {
// Rolling checksum input to each frame build
let mut rolling_checksum: (u32, u32) = self.last_checksum;
let mut next_frame_id = self.max_frame + 1;
let mut next_frame_id = self.max_frame.load(Ordering::Acquire) + 1;
// Build every frame in order, updating the rolling checksum
for (idx, page) in pages.iter().enumerate() {
let page_id = page.get().id;
@@ -1538,7 +1541,7 @@ impl Wal for WalFile {
next_frame_id += 1;
}
let first_frame_id = self.max_frame + 1;
let first_frame_id = self.max_frame.load(Ordering::Acquire) + 1;
let start_off = self.frame_offset(first_frame_id);
// pre-advance in-memory WAL state
@@ -1584,7 +1587,7 @@ impl Wal for WalFile {
fn update_max_frame(&mut self) {
let new_max_frame = self.get_shared().max_frame.load(Ordering::Acquire);
self.max_frame = new_max_frame;
self.max_frame.store(new_max_frame, Ordering::Release);
}
}
@@ -1607,7 +1610,7 @@ impl WalFile {
Self {
io,
// default to max frame in WAL, so that when we read schema we can read from WAL too if it's there.
max_frame,
max_frame: AtomicU64::new(max_frame),
shared,
ongoing_checkpoint: OngoingCheckpoint {
time: now,
@@ -1624,7 +1627,7 @@ impl WalFile {
buffer_pool,
checkpoint_seq: AtomicU32::new(0),
syncing: Arc::new(AtomicBool::new(false)),
min_frame: 0,
min_frame: AtomicU64::new(0),
max_frame_read_lock_index: AtomicUsize::new(NO_LOCK_HELD),
last_checksum,
prev_checkpoint: CheckpointResult::default(),
@@ -1684,7 +1687,7 @@ impl WalFile {
fn complete_append_frame(&mut self, page_id: u64, frame_id: u64, checksums: (u32, u32)) {
self.last_checksum = checksums;
self.max_frame = frame_id;
self.max_frame.store(frame_id, Ordering::Release);
let shared = self.get_shared();
{
let mut frame_cache = shared.frame_cache.lock();
@@ -2112,8 +2115,8 @@ impl WalFile {
self.get_shared_mut().restart_wal_header(&self.io, mode);
let cksm = self.get_shared().last_checksum;
self.last_checksum = cksm;
self.max_frame = 0;
self.min_frame = 0;
self.max_frame.store(0, Ordering::Release);
self.min_frame.store(0, Ordering::Release);
self.checkpoint_seq.fetch_add(1, Ordering::Release);
Ok(())
}