From 459c01f93c7b655c4c7d2524d9c1919379b3c97e Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Tue, 21 Oct 2025 11:14:26 +0300 Subject: [PATCH] Add subjournal module The subjournal is a temporary file where stmt subtransactions write an 'undo log' of pages before modifying them. If a stmt subtransaction rolls back, the pages are restored from the subjournal. --- core/storage/mod.rs | 1 + core/storage/subjournal.rs | 88 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 core/storage/subjournal.rs diff --git a/core/storage/mod.rs b/core/storage/mod.rs index ee5029ba0..f64d26385 100644 --- a/core/storage/mod.rs +++ b/core/storage/mod.rs @@ -22,6 +22,7 @@ pub(crate) mod pager; pub(super) mod slot_bitmap; pub(crate) mod sqlite3_ondisk; mod state_machines; +pub(crate) mod subjournal; #[allow(clippy::arc_with_non_send_sync)] pub(crate) mod wal; diff --git a/core/storage/subjournal.rs b/core/storage/subjournal.rs new file mode 100644 index 000000000..d174230a5 --- /dev/null +++ b/core/storage/subjournal.rs @@ -0,0 +1,88 @@ +use std::sync::Arc; + +use crate::{ + storage::sqlite3_ondisk::finish_read_page, Buffer, Completion, CompletionError, PageRef, Result, +}; + +#[derive(Clone)] +pub struct Subjournal { + file: Arc, +} + +impl Subjournal { + pub fn new(file: Arc) -> Self { + Self { file } + } + + pub fn size(&self) -> Result { + self.file.size() + } + + pub fn write_page( + &self, + offset: u64, + page_size: usize, + buffer: Arc, + c: Completion, + ) -> Result { + assert!( + buffer.len() == page_size + 4, + "buffer length should be page_size + 4 bytes for page id" + ); + self.file.pwrite(offset, buffer, c) + } + + pub fn read_page_number(&self, offset: u64, page_id_buffer: Arc) -> Result { + assert!( + page_id_buffer.len() == 4, + "page_id_buffer length should be 4 bytes" + ); + let c = Completion::new_read( + page_id_buffer, + move |res: Result<(Arc, i32), CompletionError>| { + let Ok((_buffer, _bytes_read)) = res else { + return; + }; + }, + ); + let c = self.file.pread(offset, c)?; + Ok(c) + } + + pub fn read_page( + &self, + offset: u64, + buffer: Arc, + page: PageRef, + page_size: usize, + ) -> Result { + assert!( + buffer.len() == page_size, + "buffer length should be page_size" + ); + let c = Completion::new_read( + buffer, + move |res: Result<(Arc, i32), CompletionError>| { + let Ok((buffer, bytes_read)) = res else { + return; + }; + assert!( + bytes_read == page_size as i32, + "bytes_read should be page_size" + ); + finish_read_page(page.get().id as usize, buffer, page.clone()); + }, + ); + let c = self.file.pread(offset, c)?; + Ok(c) + } + + pub fn truncate(&self, offset: u64) -> Result { + let c = Completion::new_trunc(move |res: Result| { + let Ok(_) = res else { + return; + }; + }); + self.file.truncate(offset, c) + } +}