diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 0114e58ef..f0bffb2c4 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -321,7 +321,7 @@ enum CommitState { /// Sync WAL header after prepare PrepareWalSync, /// Appends all frames to the WAL. - Start, + PrepareFrames, /// Fsync the on-disk WAL. SyncWal, /// Checkpoint the WAL to the database file (if needed). @@ -1263,7 +1263,8 @@ impl Pager { let mut pages = Vec::with_capacity(len); let page_sz = self.page_size.get().unwrap_or_default(); - if let Some(c) = wal.borrow_mut().prepare_wal_start(page_sz)? { + let prepare = wal.borrow_mut().prepare_wal_start(page_sz)?; + if let Some(c) = prepare { self.io.wait_for_completion(c)?; let c = wal.borrow_mut().prepare_wal_finish()?; self.io.wait_for_completion(c)?; @@ -1340,7 +1341,7 @@ impl Pager { let page_sz = self.page_size.get().expect("page size not set"); let c = wal.borrow_mut().prepare_wal_start(page_sz)?; let Some(c) = c else { - self.commit_info.state.set(CommitState::Start); + self.commit_info.state.set(CommitState::PrepareFrames); continue; }; self.commit_info.state.set(CommitState::PrepareWalSync); @@ -1350,12 +1351,12 @@ impl Pager { } CommitState::PrepareWalSync => { let c = wal.borrow_mut().prepare_wal_finish()?; - self.commit_info.state.set(CommitState::Start); + self.commit_info.state.set(CommitState::PrepareFrames); if !c.is_completed() { io_yield_one!(c); } } - CommitState::Start => { + CommitState::PrepareFrames => { let now = self.io.now(); self.commit_info.time.set(now); let db_size_after = { @@ -1503,7 +1504,7 @@ impl Pager { let mut completions = self.commit_info.completions.borrow_mut(); if completions.iter().all(|c| c.is_completed()) { completions.clear(); - self.commit_info.state.set(CommitState::Start); + self.commit_info.state.set(CommitState::PrepareWal); wal.borrow_mut().finish_append_frames_commit()?; let result = self.commit_info.result.borrow_mut().take(); return Ok(IOResult::Done(result.expect("commit result should be set"))); @@ -2229,7 +2230,7 @@ impl Pager { fn reset_internal_states(&self) { *self.checkpoint_state.write() = CheckpointState::Checkpoint; self.syncing.replace(false); - self.commit_info.state.set(CommitState::Start); + self.commit_info.state.set(CommitState::PrepareWal); self.commit_info.time.set(self.io.now()); self.allocate_page_state.replace(AllocatePageState::Start); self.free_page_state.replace(FreePageState::Start); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 7ac294346..6c4b2a242 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1936,7 +1936,9 @@ impl WalFile { if mode.require_all_backfilled() && !checkpoint_result.everything_backfilled() { return Err(LimboError::Busy); } - self.restart_log_if_needed(mode)?; + if mode.should_restart_log() { + self.restart_log(mode)?; + } self.ongoing_checkpoint.state = CheckpointState::Truncate { checkpoint_result: Some(checkpoint_result), truncate_sent: false, @@ -1944,7 +1946,12 @@ impl WalFile { }; } CheckpointState::Truncate { .. } => { - return_if_io!(self.truncate_log_if_needed(mode)); + if matches!(mode, CheckpointMode::Truncate { .. }) { + return_if_io!(self.truncate_log()); + } + if mode.should_restart_log() { + Self::unlock(&self.shared, None); + } let mut checkpoint_result = { let CheckpointState::Truncate { checkpoint_result, .. @@ -2051,10 +2058,11 @@ impl WalFile { /// Called once the entire WAL has been back‑filled in RESTART or TRUNCATE mode. /// Must be invoked while writer and checkpoint locks are still held. - fn restart_log_if_needed(&mut self, mode: CheckpointMode) -> Result<()> { - if !mode.should_restart_log() { - return Ok(()); - } + fn restart_log(&mut self, mode: CheckpointMode) -> Result<()> { + turso_assert!( + mode.should_restart_log(), + "CheckpointMode must be Restart or Truncate" + ); turso_assert!( matches!(self.checkpoint_guard, Some(CheckpointLocks::Writer { .. })), "We must hold writer and checkpoint locks to restart the log, found: {:?}", @@ -2089,11 +2097,7 @@ impl WalFile { Ok(()) } - fn truncate_log_if_needed(&mut self, mode: CheckpointMode) -> Result> { - if !matches!(mode, CheckpointMode::Truncate { .. }) { - Self::unlock(&self.shared, None); - return Ok(IOResult::Done(())); - } + fn truncate_log(&mut self) -> Result> { let file = { let shared = self.get_shared(); assert!( @@ -2143,8 +2147,6 @@ impl WalFile { .inspect_err(|e| Self::unlock(&self.shared, Some(e)))?; *sync_sent = true; io_yield_one!(c); - } else { - Self::unlock(&self.shared, None); } Ok(IOResult::Done(())) }