diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 697a32d9b..88d125dfa 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -662,7 +662,7 @@ impl Pager { /// Open the subjournal if not yet open. /// The subjournal is a file that is used to store the "before images" of pages for the /// current savepoint. If the savepoint is rolled back, the pages can be restored from the subjournal. - /// + /// /// Currently uses MemoryIO, but should eventually be backed by temporary on-disk files. pub fn open_subjournal(&self) -> Result<()> { if self.subjournal.read().is_some() { @@ -676,6 +676,79 @@ impl Pager { *self.subjournal.write() = Some(db_file); Ok(()) } + + /// Write page to subjournal if the current savepoint does not currently + /// contain an an entry for it. In case of a statement-level rollback, + /// the page image can be restored from the subjournal. + /// + /// A buffer of length page_size + 4 bytes is allocated and the page id + /// is written to the beginning of the buffer. The rest of the buffer is filled with the page contents. + pub fn subjournal_page_if_required(&self, page: &Page) -> Result<()> { + if self.subjournal.read().is_none() { + return Ok(()); + } + let write_offset = { + let savepoints = self.savepoints.read(); + let Some(cur_savepoint) = savepoints.last() else { + return Ok(()); + }; + if cur_savepoint.has_dirty_page(page.get().id as u32) { + return Ok(()); + } + cur_savepoint.write_offset.load(Ordering::SeqCst) + }; + let page_id = page.get().id; + let page_size = self.page_size.load(Ordering::SeqCst) as usize; + let buffer = { + let page_id = page.get().id as u32; + let contents = page.get_contents(); + let buffer = self.buffer_pool.allocate(page_size + 4); + let contents_buffer = contents.buffer.as_slice(); + turso_assert!( + contents_buffer.len() == page_size, + "contents buffer length should be equal to page size" + ); + + buffer.as_mut_slice()[0..4].copy_from_slice(&page_id.to_be_bytes()); + buffer.as_mut_slice()[4..4 + page_size].copy_from_slice(&contents_buffer); + + Arc::new(buffer) + }; + + let savepoints = self.savepoints.clone(); + + let write_complete = { + let buf_copy = buffer.clone(); + Box::new(move |res: Result| { + let Ok(bytes_written) = res else { + return; + }; + let buf_copy = buf_copy.clone(); + let buf_len = buf_copy.len(); + + turso_assert!( + bytes_written == buf_len as i32, + "wrote({bytes_written}) != expected({buf_len})" + ); + + let savepoints = savepoints.read(); + let cur_savepoint = savepoints.last().unwrap(); + cur_savepoint.add_dirty_page(page_id as u32); + cur_savepoint + .write_offset + .fetch_add(page_size as u64 + 4, Ordering::SeqCst); + }) + }; + let c = Completion::new_write(write_complete); + + let subjournal = self.subjournal.read(); + let subjournal = subjournal.as_ref().unwrap(); + + let c = subjournal.write_page(write_offset, page_size, buffer.clone(), c)?; + assert!(c.succeeded(), "memory IO should complete immediately"); + Ok(()) + } + pub fn open_savepoint(&self) -> Result<()> { self.open_subjournal()?; let subjournal_offset = self.subjournal.read().as_ref().unwrap().size()?;