Pager: add subjournal_page_if_required() method

This commit is contained in:
Jussi Saurio
2025-10-21 11:27:01 +03:00
parent e8226c0e4b
commit 5b01605fae

View File

@@ -662,7 +662,7 @@ impl Pager {
/// Open the subjournal if not yet open.
/// The subjournal is a file that is used to store the "before images" of pages for the
/// current savepoint. If the savepoint is rolled back, the pages can be restored from the subjournal.
///
///
/// Currently uses MemoryIO, but should eventually be backed by temporary on-disk files.
pub fn open_subjournal(&self) -> Result<()> {
if self.subjournal.read().is_some() {
@@ -676,6 +676,79 @@ impl Pager {
*self.subjournal.write() = Some(db_file);
Ok(())
}
/// Write page to subjournal if the current savepoint does not currently
/// contain an an entry for it. In case of a statement-level rollback,
/// the page image can be restored from the subjournal.
///
/// A buffer of length page_size + 4 bytes is allocated and the page id
/// is written to the beginning of the buffer. The rest of the buffer is filled with the page contents.
pub fn subjournal_page_if_required(&self, page: &Page) -> Result<()> {
if self.subjournal.read().is_none() {
return Ok(());
}
let write_offset = {
let savepoints = self.savepoints.read();
let Some(cur_savepoint) = savepoints.last() else {
return Ok(());
};
if cur_savepoint.has_dirty_page(page.get().id as u32) {
return Ok(());
}
cur_savepoint.write_offset.load(Ordering::SeqCst)
};
let page_id = page.get().id;
let page_size = self.page_size.load(Ordering::SeqCst) as usize;
let buffer = {
let page_id = page.get().id as u32;
let contents = page.get_contents();
let buffer = self.buffer_pool.allocate(page_size + 4);
let contents_buffer = contents.buffer.as_slice();
turso_assert!(
contents_buffer.len() == page_size,
"contents buffer length should be equal to page size"
);
buffer.as_mut_slice()[0..4].copy_from_slice(&page_id.to_be_bytes());
buffer.as_mut_slice()[4..4 + page_size].copy_from_slice(&contents_buffer);
Arc::new(buffer)
};
let savepoints = self.savepoints.clone();
let write_complete = {
let buf_copy = buffer.clone();
Box::new(move |res: Result<i32, CompletionError>| {
let Ok(bytes_written) = res else {
return;
};
let buf_copy = buf_copy.clone();
let buf_len = buf_copy.len();
turso_assert!(
bytes_written == buf_len as i32,
"wrote({bytes_written}) != expected({buf_len})"
);
let savepoints = savepoints.read();
let cur_savepoint = savepoints.last().unwrap();
cur_savepoint.add_dirty_page(page_id as u32);
cur_savepoint
.write_offset
.fetch_add(page_size as u64 + 4, Ordering::SeqCst);
})
};
let c = Completion::new_write(write_complete);
let subjournal = self.subjournal.read();
let subjournal = subjournal.as_ref().unwrap();
let c = subjournal.write_page(write_offset, page_size, buffer.clone(), c)?;
assert!(c.succeeded(), "memory IO should complete immediately");
Ok(())
}
pub fn open_savepoint(&self) -> Result<()> {
self.open_subjournal()?;
let subjournal_offset = self.subjournal.read().as_ref().unwrap().size()?;