mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-29 22:14:23 +01:00
Pager: add rollback_to_newest_savepoint() method
This commit is contained in:
@@ -792,6 +792,74 @@ impl Pager {
|
||||
assert!(c.succeeded(), "memory IO should complete immediately");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Rollback to the newest savepoint. This basically just means reading the subjournal from the start offset
|
||||
/// of the savepoint to the end of the subjournal and restoring the page images to the page cache.
|
||||
pub fn rollback_to_newest_savepoint(&self) -> Result<()> {
|
||||
let subjournal = self.subjournal.read();
|
||||
let Some(subjournal) = subjournal.as_ref() else {
|
||||
return Ok(());
|
||||
};
|
||||
let mut savepoints = self.savepoints.write();
|
||||
let Some(savepoint) = savepoints.pop() else {
|
||||
return Ok(());
|
||||
};
|
||||
let journal_start_offset = savepoint.start_offset.load(Ordering::SeqCst);
|
||||
|
||||
let mut rollback_bitset = RoaringBitmap::new();
|
||||
|
||||
// Read the subjournal starting from start offset, first reading 4 bytes to get page id, then if rollback_bitset already has the page, skip reading the page
|
||||
// and just advance the offset. otherwise read the page and add the page id to the rollback_bitset + put the page image into the page cache
|
||||
let mut current_offset = journal_start_offset;
|
||||
let page_size = self.page_size.load(Ordering::SeqCst) as u64;
|
||||
let journal_end_offset = savepoint.write_offset.load(Ordering::SeqCst);
|
||||
|
||||
while current_offset < journal_end_offset {
|
||||
// Read 4 bytes for page id
|
||||
let page_id_buffer = Arc::new(self.buffer_pool.allocate(4));
|
||||
let c = subjournal.read_page_number(current_offset, page_id_buffer.clone())?;
|
||||
assert!(c.succeeded(), "memory IO should complete immediately");
|
||||
let page_id = u32::from_be_bytes(page_id_buffer.as_slice()[0..4].try_into().unwrap());
|
||||
current_offset += 4;
|
||||
|
||||
// Check if we've already rolled back this page
|
||||
if rollback_bitset.contains(page_id) {
|
||||
// Skip reading the page, just advance offset
|
||||
current_offset += page_size;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Read the page data
|
||||
let page_buffer = Arc::new(self.buffer_pool.allocate(page_size as usize));
|
||||
let page = Arc::new(Page::new(page_id as i64));
|
||||
let c = subjournal.read_page(
|
||||
current_offset,
|
||||
page_buffer.clone(),
|
||||
page.clone(),
|
||||
page_size as usize,
|
||||
)?;
|
||||
assert!(c.succeeded(), "memory IO should complete immediately");
|
||||
current_offset += page_size;
|
||||
|
||||
// Add page to rollback bitset
|
||||
rollback_bitset.insert(page_id);
|
||||
|
||||
// Put the page image into the page cache
|
||||
self.upsert_page_in_cache(page_id as usize, page, false)?;
|
||||
}
|
||||
|
||||
let truncate_completion = self
|
||||
.subjournal
|
||||
.read()
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.truncate(journal_start_offset)?;
|
||||
assert!(
|
||||
truncate_completion.succeeded(),
|
||||
"memory IO should complete immediately"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "test_helper")]
|
||||
|
||||
Reference in New Issue
Block a user