From d6d72d2966cec7ad30dd8224541f2d93cfd8b03d Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Wed, 13 Aug 2025 15:41:09 -0400 Subject: [PATCH] Update Page to carry epoch of frame + checkpont seq to ensure proper cached page for chkpt --- core/storage/pager.rs | 9 ++++---- core/storage/wal.rs | 51 +++++++++++++------------------------------ 2 files changed, 19 insertions(+), 41 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index ef3f9f7b6..d1da59d4b 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -287,7 +287,6 @@ impl Page { self.get().wal_tag.store(TAG_UNSET, Ordering::Release) } - /// Check if this page is suitable for checkpointing pub fn is_valid_for_checkpoint(&self, target_frame: u64, seq: u32) -> bool { let (f, s) = self.wal_tag_pair(); f == target_frame && s == seq && !self.is_dirty() @@ -916,10 +915,9 @@ impl Pager { let page = return_if_io!(self.allocate_page()); if let Some(wal) = &self.wal { let max_frame = wal.borrow().get_max_frame_in_wal(); + let seq = wal.borrow().checkpoint_seq(); // brand new page gets share max frame + 1 - page.set_wal_frame(max_frame + 1); - } else { - page.set_wal_frame(0); + page.set_wal_tag(max_frame + 1, seq); } let page = Arc::new(BTreePageInner { page: RefCell::new(page), @@ -1417,7 +1415,8 @@ impl Pager { page.get().id == header.page_number as usize, "page has unexpected id" ); - page.set_wal_frame(frame_no); + let seq = wal.checkpoint_seq(); + page.set_wal_tag(frame_no, seq); self.add_dirty(&page); } if header.is_commit_frame() { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 42f65d854..492460534 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -281,7 +281,6 @@ pub trait Wal: Debug { fn sync(&mut self) -> Result; fn is_syncing(&self) -> bool; fn get_max_frame_in_wal(&self) -> u64; - fn get_nbackfills_in_wal(&self) -> u64; fn get_max_frame(&self) -> u64; fn get_min_frame(&self) -> u64; fn rollback(&mut self) -> Result<()>; @@ -303,7 +302,7 @@ pub enum CheckpointState { } /// IOV_MAX is 1024 on most systems -pub const CKPT_BATCH_PAGES: usize = 1024; +pub const CKPT_BATCH_PAGES: usize = 512; /// TODO: *ALL* of these need to be tuned for perf. It is tricky /// trying to figure out the ideal numbers here to work together concurrently @@ -414,7 +413,7 @@ impl std::ops::DerefMut for WriteBatch { } } -/// Information and +/// Information and structures for processing a checkpoint operation. struct OngoingCheckpoint { /// Used for benchmarking/debugging a checkpoint operation. time: std::time::Instant, @@ -437,6 +436,17 @@ struct OngoingCheckpoint { } impl OngoingCheckpoint { + fn reset(&mut self) { + self.min_frame = 0; + self.max_frame = 0; + self.current_page = 0; + self.pages_to_checkpoint.clear(); + self.pending_writes.clear(); + self.inflight_reads.clear(); + self.inflight_writes.clear(); + self.state = CheckpointState::Start; + } + #[inline] /// Whether or not new reads should be issued during checkpoint processing. fn should_issue_reads(&self) -> bool { @@ -482,7 +492,7 @@ impl OngoingCheckpoint { let mut moved = false; // retain only those still pending self.inflight_reads.retain(|slot| { - if slot.done.load(Ordering::Acquire) { + if slot.completion.is_completed() { if let Some(buf) = slot.buf.lock().take() { // read is done, take the buffer and add it to the batch to write self.pending_writes.insert(slot.page_id, buf); @@ -1196,7 +1206,6 @@ impl Wal for WalFile { let shared = self.get_shared(); let header = shared.wal_header.clone(); let header = header.lock(); - let seq = header.checkpoint_seq; let checksums = self.last_checksum; let page_content = page.get_contents(); let page_buf = page_content.as_ptr(); @@ -1300,10 +1309,6 @@ impl Wal for WalFile { self.min_frame } - fn get_nbackfills_in_wal(&self) -> u64 { - self.get_shared().nbackfills.load(Ordering::Acquire) - } - #[instrument(err, skip_all, level = Level::DEBUG)] fn rollback(&mut self) -> Result<()> { // TODO(pere): have to remove things from frame_cache because they are no longer valid. @@ -1463,15 +1468,8 @@ impl WalFile { } fn reset_internal_states(&mut self) { - self.ongoing_checkpoint.state = CheckpointState::Start; - self.ongoing_checkpoint.min_frame = 0; - self.ongoing_checkpoint.max_frame = 0; - self.ongoing_checkpoint.current_page = 0; self.max_frame_read_lock_index.set(NO_LOCK_HELD); - self.ongoing_checkpoint.batch.clear(); - self.ongoing_checkpoint.pending_flush.clear(); - self.sync_state.set(SyncState::NotSyncing); - self.ongoing_checkpoint.pages_to_checkpoint.clear(); + self.ongoing_checkpoint.reset(); self.syncing.set(false); } @@ -1970,25 +1968,6 @@ impl WalFile { buf: buf_slot, }) } - - /// Drain completed inflight reads into the batch. - /// Returns true if we moved at least one item. - fn drain_inflight_into_batch(&mut self) -> bool { - let mut moved = false; - // retain only those still pending - self.ongoing_checkpoint.inflight.retain(|slot| { - if slot.done.load(Ordering::Acquire) { - if let Some(buf) = slot.buf.lock().take() { - self.ongoing_checkpoint.batch.insert(slot.page_id, buf); - moved = true; - } - false // drop this slot - } else { - true // keep pending - } - }); - moved - } } impl WalFileShared {