diff --git a/core/storage/wal.rs b/core/storage/wal.rs index e077bd321..a5ad02e51 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -375,6 +375,11 @@ struct OngoingCheckpoint { max_frame: u64, current_page: u64, } +impl OngoingCheckpoint { + fn has_work(&self) -> bool { + self.min_frame < self.max_frame + } +} pub(super) struct PendingFlush { // page ids to clear @@ -598,9 +603,6 @@ impl CheckpointLocks { return Err(LimboError::Busy); } match mode { - CheckpointMode::Full => Err(LimboError::InternalError( - "Full checkpoint mode is not yet supported".into(), - )), // Passive mode is the only mode not requiring a write lock, as it doesn't block // readers or writers. It acquires the checkpoint lock to ensure that no other // concurrent checkpoint happens, and acquires the exclusive read lock 0 @@ -615,6 +617,22 @@ impl CheckpointLocks { } Ok(Self::Read0 { ptr }) } + CheckpointMode::Full => { + // Full blocks writers and holds read0 exclusively (readers may still use >0 slots) + let read0 = &mut shared.read_locks[0]; + if !read0.write() { + shared.checkpoint_lock.unlock(); + tracing::trace!("CheckpointGuard: read0 lock failed (Full), Busy"); + return Err(LimboError::Busy); + } + if !shared.write_lock.write() { + read0.unlock(); + shared.checkpoint_lock.unlock(); + tracing::trace!("CheckpointGuard: write lock failed (Full), Busy"); + return Err(LimboError::Busy); + } + Ok(Self::Writer { ptr }) + } CheckpointMode::Restart | CheckpointMode::Truncate => { // like all modes, we must acquire an exclusive checkpoint lock and lock on read 0 // to prevent a reader from reading a partially checkpointed db file. @@ -1093,11 +1111,6 @@ impl Wal for WalFile { _write_counter: Rc>, mode: CheckpointMode, ) -> Result> { - if matches!(mode, CheckpointMode::Full) { - return Err(LimboError::InternalError( - "Full checkpoint mode is not implemented yet".into(), - )); - } self.checkpoint_inner(pager, _write_counter, mode) .inspect_err(|_| { let _ = self.checkpoint_guard.take(); @@ -1369,10 +1382,20 @@ impl WalFile { } // acquire the appropriate exclusive locks depending on the checkpoint mode self.acquire_proper_checkpoint_guard(mode)?; - self.ongoing_checkpoint.max_frame = self.determine_max_safe_checkpoint_frame(); + self.ongoing_checkpoint.max_frame = + match self.determine_max_safe_checkpoint_frame(mode) { + Err(_) => return Err(LimboError::Busy), + Ok(res) => res, + }; + self.ongoing_checkpoint.state = if matches!(mode, CheckpointMode::Full) + && !self.ongoing_checkpoint.has_work() + { + CheckpointState::Done + } else { + CheckpointState::ReadFrame + }; self.ongoing_checkpoint.min_frame = nbackfills + 1; self.ongoing_checkpoint.current_page = 0; - self.ongoing_checkpoint.state = CheckpointState::ReadFrame; tracing::trace!( "checkpoint_start(min_frame={}, max_frame={})", self.ongoing_checkpoint.min_frame, @@ -1614,29 +1637,36 @@ impl WalFile { /// Others: lower `mxSafeFrame` and continue scanning. /// /// We never modify slot values while a reader holds that slot. - fn determine_max_safe_checkpoint_frame(&self) -> u64 { + fn determine_max_safe_checkpoint_frame(&self, mode: CheckpointMode) -> Result { let shared = self.get_shared(); - let mut max_safe_frame = shared.max_frame.load(Ordering::Acquire); + let shared_max = shared.max_frame.load(Ordering::Acquire); - for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate().skip(1) { - let this_mark = read_lock.get_value(); - if this_mark < max_safe_frame as u32 { - let busy = !read_lock.write(); - if !busy { - let val = if read_lock_idx == 1 { - // store the shared max_frame for the default read slot 1 - max_safe_frame as u32 - } else { - READMARK_NOT_USED - }; - read_lock.set_value_exclusive(val); - read_lock.unlock(); + // Start optimistic: we want to advance everyone to shared_max + for (idx, lock) in shared.read_locks.iter_mut().enumerate().skip(1) { + let mark = lock.get_value(); + if mark == READMARK_NOT_USED || mark >= shared_max as u32 { + continue; + } + // Try to bump this slot to shared_max (requires exclusive on the slot) + if lock.write() { + // Slot is free to edit, bump to shared_max (slot 1 keeps a real mark, others can be cleaned) + let val = if idx == 1 { + shared_max as u32 } else { - max_safe_frame = this_mark as u64; - } + READMARK_NOT_USED + }; + lock.set_value_exclusive(val); + lock.unlock(); + } else { + // Reader is using this slot. + return match mode { + // clamp down for passive + CheckpointMode::Passive => Ok(mark as u64), + _ => Err(LimboError::Busy), // all others must wait + }; } } - max_safe_frame + Ok(shared_max) } /// Called once the entire WAL has been back‑filled in RESTART or TRUNCATE mode.