diff --git a/core/lib.rs b/core/lib.rs index eefd2acde..be94d17a1 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -13,6 +13,7 @@ mod translate; mod types; mod util; mod vdbe; +mod wal; #[cfg(not(target_family = "wasm"))] #[global_allocator] diff --git a/core/pager.rs b/core/pager.rs index 876c5c242..24a023c3f 100644 --- a/core/pager.rs +++ b/core/pager.rs @@ -1,10 +1,10 @@ use crate::buffer_pool::BufferPool; use crate::sqlite3_ondisk::PageContent; use crate::sqlite3_ondisk::{self, DatabaseHeader}; +use crate::wal::Wal; use crate::{Buffer, PageSource, Result}; use log::trace; use sieve_cache::SieveCache; -use std::borrow::Borrow; use std::cell::RefCell; use std::collections::HashMap; use std::hash::Hash; @@ -264,6 +264,9 @@ impl PageCache { pub struct Pager { /// Source of the database pages. pub page_source: PageSource, + /// The write-ahead log (WAL) for the database. + wal: Option, + /// A page cache for the database. page_cache: RefCell, /// Buffer pool for temporary data storage. buffer_pool: Rc, @@ -291,6 +294,7 @@ impl Pager { let page_cache = RefCell::new(DumbLruPageCache::new(10)); Ok(Self { page_source, + wal: None, buffer_pool, page_cache, io, @@ -299,6 +303,20 @@ impl Pager { }) } + pub fn begin_read_tx(&self) -> Result<()> { + if let Some(wal) = &self.wal { + wal.begin_read_tx()?; + } + Ok(()) + } + + pub fn end_read_tx(&self) -> Result<()> { + if let Some(wal) = &self.wal { + wal.end_read_tx()?; + } + Ok(()) + } + /// Reads a page from the database. pub fn read_page(&self, page_idx: usize) -> crate::Result>> { trace!("read_page(page_idx = {})", page_idx); @@ -308,6 +326,17 @@ impl Pager { } let page = Rc::new(RefCell::new(Page::new(page_idx))); RefCell::borrow(&page).set_locked(); + if let Some(wal) = &self.wal { + if let Some(frame_id) = wal.find_frame(page_idx as u64)? { + wal.read_frame(frame_id, page.clone())?; + { + let page = page.borrow_mut(); + page.set_uptodate(); + } + page_cache.insert(page_idx, page.clone()); + return Ok(page); + } + } sqlite3_ondisk::begin_read_page( &self.page_source, self.buffer_pool.clone(), diff --git a/core/sqlite3_ondisk.rs b/core/sqlite3_ondisk.rs index 0c8cc60ce..82c136b51 100644 --- a/core/sqlite3_ondisk.rs +++ b/core/sqlite3_ondisk.rs @@ -15,20 +15,37 @@ /// /// Each page consists of a page header and N cells, which contain the records. /// +/// ```text /// +-----------------+----------------+---------------------+----------------+ /// | | | | | /// | Page header | Cell pointer | Unallocated | Cell content | /// | (8 or 12 bytes) | array | space | area | /// | | | | | /// +-----------------+----------------+---------------------+----------------+ +/// ``` /// -/// For more information, see: https://www.sqlite.org/fileformat.html +/// The write-ahead log (WAL) is a separate file that contains the physical +/// log of changes to a database file. The file starts with a WAL header and +/// is followed by a sequence of WAL frames, which are database pages with +/// additional metadata. +/// +/// ```text +/// +-----------------+-----------------+-----------------+-----------------+ +/// | | | | | +/// | WAL header | WAL frame 1 | WAL frame 2 | WAL frame N | +/// | | | | | +/// +-----------------+-----------------+-----------------+-----------------+ +/// ``` +/// +/// For more information, see the SQLite file format specification: +/// +/// https://www.sqlite.org/fileformat.html use crate::buffer_pool::BufferPool; use crate::error::LimboError; use crate::io::{Buffer, Completion, ReadCompletion, WriteCompletion}; use crate::pager::{Page, Pager}; use crate::types::{OwnedRecord, OwnedValue}; -use crate::{PageSource, Result}; +use crate::{File, PageSource, Result}; use log::trace; use std::cell::RefCell; use std::rc::Rc; @@ -68,6 +85,28 @@ pub struct DatabaseHeader { version_number: u32, } +#[derive(Debug, Default)] +pub struct WalHeader { + magic: [u8; 4], + file_format: u32, + page_size: u32, + checkpoint_seq: u32, + salt_1: u32, + salt_2: u32, + checksum_1: u32, + checksum_2: u32, +} + +#[derive(Debug, Default)] +pub struct WalFrameHeader { + page_number: u32, + db_size: u32, + salt_1: u32, + salt_2: u32, + checksum_1: u32, + checksum_2: u32, +} + pub fn begin_read_database_header(page_source: &PageSource) -> Result>> { let drop_fn = Rc::new(|_buf| {}); let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn))); @@ -785,6 +824,68 @@ pub fn write_varint(buf: &mut [u8], value: u64) -> usize { return n; } +pub fn begin_read_wal_header(io: &Box) -> Result>> { + let drop_fn = Rc::new(|_buf| {}); + let buf = Rc::new(RefCell::new(Buffer::allocate(32, drop_fn))); + let result = Rc::new(RefCell::new(WalHeader::default())); + let header = result.clone(); + let complete = Box::new(move |buf: Rc>| { + let header = header.clone(); + finish_read_wal_header(buf, header).unwrap(); + }); + let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete))); + io.pread(0, c)?; + Ok(result) +} + +fn finish_read_wal_header(buf: Rc>, header: Rc>) -> Result<()> { + let buf = buf.borrow(); + let buf = buf.as_slice(); + let mut header = header.borrow_mut(); + header.magic.copy_from_slice(&buf[0..4]); + header.file_format = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]); + header.page_size = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]); + header.checkpoint_seq = u32::from_be_bytes([buf[12], buf[13], buf[14], buf[15]]); + header.salt_1 = u32::from_be_bytes([buf[16], buf[17], buf[18], buf[19]]); + header.salt_2 = u32::from_be_bytes([buf[20], buf[21], buf[22], buf[23]]); + header.checksum_1 = u32::from_be_bytes([buf[24], buf[25], buf[26], buf[27]]); + header.checksum_2 = u32::from_be_bytes([buf[28], buf[29], buf[30], buf[31]]); + Ok(()) +} + +pub fn begin_read_wal_frame_header( + io: &Box, + offset: usize, +) -> Result>> { + let drop_fn = Rc::new(|_buf| {}); + let buf = Rc::new(RefCell::new(Buffer::allocate(32, drop_fn))); + let result = Rc::new(RefCell::new(WalFrameHeader::default())); + let frame = result.clone(); + let complete = Box::new(move |buf: Rc>| { + let frame = frame.clone(); + finish_read_wal_frame_header(buf, frame).unwrap(); + }); + let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete))); + io.pread(offset, c)?; + Ok(result) +} + +fn finish_read_wal_frame_header( + buf: Rc>, + frame: Rc>, +) -> Result<()> { + let buf = buf.borrow(); + let buf = buf.as_slice(); + let mut frame = frame.borrow_mut(); + frame.page_number = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]); + frame.db_size = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]); + frame.salt_1 = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]); + frame.salt_2 = u32::from_be_bytes([buf[12], buf[13], buf[14], buf[15]]); + frame.checksum_1 = u32::from_be_bytes([buf[16], buf[17], buf[18], buf[19]]); + frame.checksum_2 = u32::from_be_bytes([buf[20], buf[21], buf[22], buf[23]]); + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 25f166e40..9213a062c 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -742,9 +742,11 @@ impl Program { } } Insn::Halt => { + pager.end_read_tx()?; return Ok(StepResult::Done); } Insn::Transaction => { + pager.begin_read_tx()?; state.pc += 1; } Insn::Goto { target_pc } => { diff --git a/core/wal.rs b/core/wal.rs new file mode 100644 index 000000000..cadcb275e --- /dev/null +++ b/core/wal.rs @@ -0,0 +1,32 @@ +use std::{cell::RefCell, rc::Rc}; + +use crate::{pager::Page, Result}; + +/// Write-ahead log (WAL). +pub struct Wal {} + +impl Wal { + pub fn new() -> Self { + Self {} + } + + /// Begin a write transaction. + pub fn begin_read_tx(&self) -> Result<()> { + Ok(()) + } + + /// End a write transaction. + pub fn end_read_tx(&self) -> Result<()> { + Ok(()) + } + + /// Find the latest frame containing a page. + pub fn find_frame(&self, _page_id: u64) -> Result> { + Ok(None) + } + + /// Read a frame from the WAL. + pub fn read_frame(&self, _frame_id: u64, _page: Rc>) -> Result<()> { + todo!(); + } +}