Merge 'Initial pass on WAL reader' from Pekka Enberg

Closes #249
This commit is contained in:
Pekka Enberg
2024-08-01 11:35:55 +03:00
5 changed files with 168 additions and 3 deletions

View File

@@ -13,6 +13,7 @@ mod translate;
mod types;
mod util;
mod vdbe;
mod wal;
#[cfg(not(target_family = "wasm"))]
#[global_allocator]

View File

@@ -1,10 +1,10 @@
use crate::buffer_pool::BufferPool;
use crate::sqlite3_ondisk::PageContent;
use crate::sqlite3_ondisk::{self, DatabaseHeader};
use crate::wal::Wal;
use crate::{Buffer, PageSource, Result};
use log::trace;
use sieve_cache::SieveCache;
use std::borrow::Borrow;
use std::cell::RefCell;
use std::collections::HashMap;
use std::hash::Hash;
@@ -264,6 +264,9 @@ impl<K: Eq + Hash + Clone, V> PageCache<K, V> {
pub struct Pager {
/// Source of the database pages.
pub page_source: PageSource,
/// The write-ahead log (WAL) for the database.
wal: Option<Wal>,
/// A page cache for the database.
page_cache: RefCell<DumbLruPageCache>,
/// Buffer pool for temporary data storage.
buffer_pool: Rc<BufferPool>,
@@ -291,6 +294,7 @@ impl Pager {
let page_cache = RefCell::new(DumbLruPageCache::new(10));
Ok(Self {
page_source,
wal: None,
buffer_pool,
page_cache,
io,
@@ -299,6 +303,20 @@ impl Pager {
})
}
pub fn begin_read_tx(&self) -> Result<()> {
if let Some(wal) = &self.wal {
wal.begin_read_tx()?;
}
Ok(())
}
pub fn end_read_tx(&self) -> Result<()> {
if let Some(wal) = &self.wal {
wal.end_read_tx()?;
}
Ok(())
}
/// Reads a page from the database.
pub fn read_page(&self, page_idx: usize) -> crate::Result<Rc<RefCell<Page>>> {
trace!("read_page(page_idx = {})", page_idx);
@@ -308,6 +326,17 @@ impl Pager {
}
let page = Rc::new(RefCell::new(Page::new(page_idx)));
RefCell::borrow(&page).set_locked();
if let Some(wal) = &self.wal {
if let Some(frame_id) = wal.find_frame(page_idx as u64)? {
wal.read_frame(frame_id, page.clone())?;
{
let page = page.borrow_mut();
page.set_uptodate();
}
page_cache.insert(page_idx, page.clone());
return Ok(page);
}
}
sqlite3_ondisk::begin_read_page(
&self.page_source,
self.buffer_pool.clone(),

View File

@@ -15,20 +15,37 @@
///
/// Each page consists of a page header and N cells, which contain the records.
///
/// ```text
/// +-----------------+----------------+---------------------+----------------+
/// | | | | |
/// | Page header | Cell pointer | Unallocated | Cell content |
/// | (8 or 12 bytes) | array | space | area |
/// | | | | |
/// +-----------------+----------------+---------------------+----------------+
/// ```
///
/// For more information, see: https://www.sqlite.org/fileformat.html
/// The write-ahead log (WAL) is a separate file that contains the physical
/// log of changes to a database file. The file starts with a WAL header and
/// is followed by a sequence of WAL frames, which are database pages with
/// additional metadata.
///
/// ```text
/// +-----------------+-----------------+-----------------+-----------------+
/// | | | | |
/// | WAL header | WAL frame 1 | WAL frame 2 | WAL frame N |
/// | | | | |
/// +-----------------+-----------------+-----------------+-----------------+
/// ```
///
/// For more information, see the SQLite file format specification:
///
/// https://www.sqlite.org/fileformat.html
use crate::buffer_pool::BufferPool;
use crate::error::LimboError;
use crate::io::{Buffer, Completion, ReadCompletion, WriteCompletion};
use crate::pager::{Page, Pager};
use crate::types::{OwnedRecord, OwnedValue};
use crate::{PageSource, Result};
use crate::{File, PageSource, Result};
use log::trace;
use std::cell::RefCell;
use std::rc::Rc;
@@ -68,6 +85,28 @@ pub struct DatabaseHeader {
version_number: u32,
}
#[derive(Debug, Default)]
pub struct WalHeader {
magic: [u8; 4],
file_format: u32,
page_size: u32,
checkpoint_seq: u32,
salt_1: u32,
salt_2: u32,
checksum_1: u32,
checksum_2: u32,
}
#[derive(Debug, Default)]
pub struct WalFrameHeader {
page_number: u32,
db_size: u32,
salt_1: u32,
salt_2: u32,
checksum_1: u32,
checksum_2: u32,
}
pub fn begin_read_database_header(page_source: &PageSource) -> Result<Rc<RefCell<DatabaseHeader>>> {
let drop_fn = Rc::new(|_buf| {});
let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn)));
@@ -785,6 +824,68 @@ pub fn write_varint(buf: &mut [u8], value: u64) -> usize {
return n;
}
pub fn begin_read_wal_header(io: &Box<dyn File>) -> Result<Rc<RefCell<WalHeader>>> {
let drop_fn = Rc::new(|_buf| {});
let buf = Rc::new(RefCell::new(Buffer::allocate(32, drop_fn)));
let result = Rc::new(RefCell::new(WalHeader::default()));
let header = result.clone();
let complete = Box::new(move |buf: Rc<RefCell<Buffer>>| {
let header = header.clone();
finish_read_wal_header(buf, header).unwrap();
});
let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete)));
io.pread(0, c)?;
Ok(result)
}
fn finish_read_wal_header(buf: Rc<RefCell<Buffer>>, header: Rc<RefCell<WalHeader>>) -> Result<()> {
let buf = buf.borrow();
let buf = buf.as_slice();
let mut header = header.borrow_mut();
header.magic.copy_from_slice(&buf[0..4]);
header.file_format = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
header.page_size = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
header.checkpoint_seq = u32::from_be_bytes([buf[12], buf[13], buf[14], buf[15]]);
header.salt_1 = u32::from_be_bytes([buf[16], buf[17], buf[18], buf[19]]);
header.salt_2 = u32::from_be_bytes([buf[20], buf[21], buf[22], buf[23]]);
header.checksum_1 = u32::from_be_bytes([buf[24], buf[25], buf[26], buf[27]]);
header.checksum_2 = u32::from_be_bytes([buf[28], buf[29], buf[30], buf[31]]);
Ok(())
}
pub fn begin_read_wal_frame_header(
io: &Box<dyn File>,
offset: usize,
) -> Result<Rc<RefCell<WalFrameHeader>>> {
let drop_fn = Rc::new(|_buf| {});
let buf = Rc::new(RefCell::new(Buffer::allocate(32, drop_fn)));
let result = Rc::new(RefCell::new(WalFrameHeader::default()));
let frame = result.clone();
let complete = Box::new(move |buf: Rc<RefCell<Buffer>>| {
let frame = frame.clone();
finish_read_wal_frame_header(buf, frame).unwrap();
});
let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete)));
io.pread(offset, c)?;
Ok(result)
}
fn finish_read_wal_frame_header(
buf: Rc<RefCell<Buffer>>,
frame: Rc<RefCell<WalFrameHeader>>,
) -> Result<()> {
let buf = buf.borrow();
let buf = buf.as_slice();
let mut frame = frame.borrow_mut();
frame.page_number = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
frame.db_size = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
frame.salt_1 = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
frame.salt_2 = u32::from_be_bytes([buf[12], buf[13], buf[14], buf[15]]);
frame.checksum_1 = u32::from_be_bytes([buf[16], buf[17], buf[18], buf[19]]);
frame.checksum_2 = u32::from_be_bytes([buf[20], buf[21], buf[22], buf[23]]);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -742,9 +742,11 @@ impl Program {
}
}
Insn::Halt => {
pager.end_read_tx()?;
return Ok(StepResult::Done);
}
Insn::Transaction => {
pager.begin_read_tx()?;
state.pc += 1;
}
Insn::Goto { target_pc } => {

32
core/wal.rs Normal file
View File

@@ -0,0 +1,32 @@
use std::{cell::RefCell, rc::Rc};
use crate::{pager::Page, Result};
/// Write-ahead log (WAL).
pub struct Wal {}
impl Wal {
pub fn new() -> Self {
Self {}
}
/// Begin a write transaction.
pub fn begin_read_tx(&self) -> Result<()> {
Ok(())
}
/// End a write transaction.
pub fn end_read_tx(&self) -> Result<()> {
Ok(())
}
/// Find the latest frame containing a page.
pub fn find_frame(&self, _page_id: u64) -> Result<Option<u64>> {
Ok(None)
}
/// Read a frame from the WAL.
pub fn read_frame(&self, _frame_id: u64, _page: Rc<RefCell<Page>>) -> Result<()> {
todo!();
}
}