diff --git a/core/storage/pager.rs b/core/storage/pager.rs index cbbcb9dcf..3d50d7943 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -792,6 +792,74 @@ impl Pager { assert!(c.succeeded(), "memory IO should complete immediately"); Ok(()) } + + /// Rollback to the newest savepoint. This basically just means reading the subjournal from the start offset + /// of the savepoint to the end of the subjournal and restoring the page images to the page cache. + pub fn rollback_to_newest_savepoint(&self) -> Result<()> { + let subjournal = self.subjournal.read(); + let Some(subjournal) = subjournal.as_ref() else { + return Ok(()); + }; + let mut savepoints = self.savepoints.write(); + let Some(savepoint) = savepoints.pop() else { + return Ok(()); + }; + let journal_start_offset = savepoint.start_offset.load(Ordering::SeqCst); + + let mut rollback_bitset = RoaringBitmap::new(); + + // Read the subjournal starting from start offset, first reading 4 bytes to get page id, then if rollback_bitset already has the page, skip reading the page + // and just advance the offset. otherwise read the page and add the page id to the rollback_bitset + put the page image into the page cache + let mut current_offset = journal_start_offset; + let page_size = self.page_size.load(Ordering::SeqCst) as u64; + let journal_end_offset = savepoint.write_offset.load(Ordering::SeqCst); + + while current_offset < journal_end_offset { + // Read 4 bytes for page id + let page_id_buffer = Arc::new(self.buffer_pool.allocate(4)); + let c = subjournal.read_page_number(current_offset, page_id_buffer.clone())?; + assert!(c.succeeded(), "memory IO should complete immediately"); + let page_id = u32::from_be_bytes(page_id_buffer.as_slice()[0..4].try_into().unwrap()); + current_offset += 4; + + // Check if we've already rolled back this page + if rollback_bitset.contains(page_id) { + // Skip reading the page, just advance offset + current_offset += page_size; + continue; + } + + // Read the page data + let page_buffer = Arc::new(self.buffer_pool.allocate(page_size as usize)); + let page = Arc::new(Page::new(page_id as i64)); + let c = subjournal.read_page( + current_offset, + page_buffer.clone(), + page.clone(), + page_size as usize, + )?; + assert!(c.succeeded(), "memory IO should complete immediately"); + current_offset += page_size; + + // Add page to rollback bitset + rollback_bitset.insert(page_id); + + // Put the page image into the page cache + self.upsert_page_in_cache(page_id as usize, page, false)?; + } + + let truncate_completion = self + .subjournal + .read() + .as_ref() + .unwrap() + .truncate(journal_start_offset)?; + assert!( + truncate_completion.succeeded(), + "memory IO should complete immediately" + ); + + Ok(()) } #[cfg(feature = "test_helper")]