From f9916e8149e76183a9d664f162bc52aeda7d3f46 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 12 Mar 2025 15:41:17 +0100 Subject: [PATCH] update max frame in case we got a read lock with outdated max frame --- core/storage/wal.rs | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index e9e610253..d6442b2bf 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -176,6 +176,7 @@ pub trait Wal { mode: CheckpointMode, ) -> Result; fn sync(&mut self) -> Result; + fn get_max_frame_in_wal(&self) -> u64; fn get_max_frame(&self) -> u64; fn get_min_frame(&self) -> u64; } @@ -333,8 +334,8 @@ impl Wal for WalFile { } } - // If we didn't find any mark, then let's add a new one - if max_read_mark_index == -1 { + // If we didn't find any mark or we can update, let's update them + if (max_read_mark as u64) < max_frame_in_wal || max_read_mark_index == -1 { for (index, lock) in shared.read_locks.iter_mut().enumerate() { let busy = !lock.write(); if !busy { @@ -361,10 +362,11 @@ impl Wal for WalFile { self.max_frame = max_read_mark as u64; self.min_frame = shared.nbackfills + 1; tracing::debug!( - "begin_read_tx(min_frame={}, max_frame={}, lock={})", + "begin_read_tx(min_frame={}, max_frame={}, lock={}, max_frame_in_wal={})", self.min_frame, self.max_frame, - self.max_frame_read_lock_index + self.max_frame_read_lock_index, + max_frame_in_wal ); Ok(LimboResult::Ok) } @@ -500,14 +502,18 @@ impl Wal for WalFile { // TODO(pere): check what frames are safe to checkpoint between many readers! self.ongoing_checkpoint.min_frame = self.min_frame; let mut shared = self.shared.write(); - let max_frame_in_wal = shared.max_frame as u32; let mut max_safe_frame = shared.max_frame; - for read_lock in shared.read_locks.iter_mut() { + for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate() { let this_mark = read_lock.value.load(Ordering::SeqCst); if this_mark < max_safe_frame as u32 { let busy = !read_lock.write(); if !busy { - read_lock.value.store(max_frame_in_wal, Ordering::SeqCst); + let new_mark = if read_lock_idx == 0 { + max_safe_frame as u32 + } else { + READMARK_NOT_USED + }; + read_lock.value.store(new_mark, Ordering::SeqCst); read_lock.unlock(); } else { max_safe_frame = this_mark as u64; @@ -613,6 +619,7 @@ impl Wal for WalFile { shared.pages_in_frames.clear(); shared.max_frame = 0; shared.nbackfills = 0; + // TODO(pere): truncate wal file here. } else { shared.nbackfills = self.ongoing_checkpoint.max_frame; } @@ -658,6 +665,10 @@ impl Wal for WalFile { } } + fn get_max_frame_in_wal(&self) -> u64 { + self.shared.read().max_frame + } + fn get_max_frame(&self) -> u64 { self.max_frame }