mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-09 19:24:21 +01:00
Merge 'wal: checksums' from Pere Diaz Bou
Implemeted checksums so that sqlite3 is able to read our WAL. This also helps with future work on proper recovery of WAL. Create some frames with CREATE TABLE and kill the process so that there is no checkpoint. ``` Limbo v0.0.6 Enter ".help" for usage hints. limbo> create table x(x); limbo> [1] 15910 killed cargo run xlimbo.db ``` Now sqlite3 is able to recover from this WAL created in limbo: ``` sqlite3 xlimbo.db SQLite version 3.43.2 2023-10-10 13:08:14 Enter ".help" for usage hints. sqlite> .schema CREATE TABLE x (x); ``` Closes #413
This commit is contained in:
@@ -48,6 +48,7 @@ use crate::storage::database::DatabaseStorage;
|
||||
use crate::storage::pager::{Page, Pager};
|
||||
use crate::types::{OwnedRecord, OwnedValue};
|
||||
use crate::{File, Result};
|
||||
use cfg_block::cfg_block;
|
||||
use log::trace;
|
||||
use std::cell::RefCell;
|
||||
use std::pin::Pin;
|
||||
@@ -90,10 +91,16 @@ pub struct DatabaseHeader {
|
||||
|
||||
pub const WAL_HEADER_SIZE: usize = 32;
|
||||
pub const WAL_FRAME_HEADER_SIZE: usize = 24;
|
||||
// magic is a single number represented as WAL_MAGIC_LE but the big endian
|
||||
// counterpart is just the same number with LSB set to 1.
|
||||
pub const WAL_MAGIC_LE: u32 = 0x377f0682;
|
||||
pub const WAL_MAGIC_BE: u32 = 0x377f0683;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
#[repr(C)] // This helps with encoding because rust does not respect the order in structs, so in
|
||||
// this case we want to keep the order
|
||||
pub struct WalHeader {
|
||||
pub magic: [u8; 4],
|
||||
pub magic: u32,
|
||||
pub file_format: u32,
|
||||
pub page_size: u32,
|
||||
pub checkpoint_seq: u32,
|
||||
@@ -1018,7 +1025,7 @@ fn finish_read_wal_header(buf: Rc<RefCell<Buffer>>, header: Rc<RefCell<WalHeader
|
||||
let buf = buf.borrow();
|
||||
let buf = buf.as_slice();
|
||||
let mut header = header.borrow_mut();
|
||||
header.magic.copy_from_slice(&buf[0..4]);
|
||||
header.magic = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
|
||||
header.file_format = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
|
||||
header.page_size = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
|
||||
header.checkpoint_seq = u32::from_be_bytes([buf[12], buf[13], buf[14], buf[15]]);
|
||||
@@ -1057,12 +1064,14 @@ pub fn begin_write_wal_frame(
|
||||
page: &Rc<RefCell<Page>>,
|
||||
db_size: u32,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<()> {
|
||||
wal_header: &WalHeader,
|
||||
checksums: (u32, u32),
|
||||
) -> Result<(u32, u32)> {
|
||||
let page_finish = page.clone();
|
||||
let page_id = page.borrow().id;
|
||||
trace!("begin_write_wal_frame(offset={}, page={})", offset, page_id);
|
||||
|
||||
let header = WalFrameHeader {
|
||||
let mut header = WalFrameHeader {
|
||||
page_number: page_id as u32,
|
||||
db_size,
|
||||
salt_1: 0,
|
||||
@@ -1070,7 +1079,7 @@ pub fn begin_write_wal_frame(
|
||||
checksum_1: 0,
|
||||
checksum_2: 0,
|
||||
};
|
||||
let buffer = {
|
||||
let (buffer, checksums) = {
|
||||
let page = page.borrow();
|
||||
let contents = page.contents.as_ref().unwrap();
|
||||
let drop_fn = Rc::new(|_buf| {});
|
||||
@@ -1080,16 +1089,29 @@ pub fn begin_write_wal_frame(
|
||||
drop_fn,
|
||||
);
|
||||
let buf = buffer.as_mut_slice();
|
||||
|
||||
buf[0..4].copy_from_slice(&header.page_number.to_be_bytes());
|
||||
buf[4..8].copy_from_slice(&header.db_size.to_be_bytes());
|
||||
|
||||
{
|
||||
let contents_buf = contents.as_ptr();
|
||||
let expects_be = wal_header.magic & 1; // LSB is set on big endian checksums
|
||||
let use_native_endian = cfg!(target_endian = "big") as u32 == expects_be; // check if checksum
|
||||
// type and native type is the same so that we know when to swap bytes
|
||||
let checksums = checksum_wal(&buf[0..8], wal_header, checksums, use_native_endian);
|
||||
let checksums = checksum_wal(contents_buf, wal_header, checksums, use_native_endian);
|
||||
header.checksum_1 = checksums.0;
|
||||
header.checksum_2 = checksums.1;
|
||||
header.salt_1 = wal_header.salt_1;
|
||||
header.salt_2 = wal_header.salt_2;
|
||||
}
|
||||
|
||||
buf[8..12].copy_from_slice(&header.salt_1.to_be_bytes());
|
||||
buf[12..16].copy_from_slice(&header.salt_2.to_be_bytes());
|
||||
buf[16..20].copy_from_slice(&header.checksum_1.to_be_bytes());
|
||||
buf[20..24].copy_from_slice(&header.checksum_2.to_be_bytes());
|
||||
buf[WAL_FRAME_HEADER_SIZE..].copy_from_slice(contents.as_ptr());
|
||||
|
||||
Rc::new(RefCell::new(buffer))
|
||||
(Rc::new(RefCell::new(buffer)), checksums)
|
||||
};
|
||||
|
||||
*write_counter.borrow_mut() += 1;
|
||||
@@ -1109,7 +1131,7 @@ pub fn begin_write_wal_frame(
|
||||
};
|
||||
let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete)));
|
||||
io.pwrite(offset, buffer.clone(), c)?;
|
||||
Ok(())
|
||||
Ok(checksums)
|
||||
}
|
||||
|
||||
pub fn begin_write_wal_header(io: &Rc<dyn File>, header: &WalHeader) -> Result<()> {
|
||||
@@ -1119,7 +1141,7 @@ pub fn begin_write_wal_header(io: &Rc<dyn File>, header: &WalHeader) -> Result<(
|
||||
let mut buffer = Buffer::allocate(512, drop_fn);
|
||||
let buf = buffer.as_mut_slice();
|
||||
|
||||
buf[0..4].copy_from_slice(&header.magic);
|
||||
buf[0..4].copy_from_slice(&header.magic.to_be_bytes());
|
||||
buf[4..8].copy_from_slice(&header.file_format.to_be_bytes());
|
||||
buf[8..12].copy_from_slice(&header.page_size.to_be_bytes());
|
||||
buf[12..16].copy_from_slice(&header.checkpoint_seq.to_be_bytes());
|
||||
@@ -1167,6 +1189,42 @@ pub fn payload_overflows(
|
||||
(true, space_left + 4)
|
||||
}
|
||||
|
||||
pub fn checksum_wal(
|
||||
buf: &[u8],
|
||||
wal_header: &WalHeader,
|
||||
input: (u32, u32),
|
||||
native_endian: bool, // Sqlite interprets big endian as "native"
|
||||
) -> (u32, u32) {
|
||||
assert!(buf.len() % 8 == 0, "buffer must be a multiple of 8");
|
||||
let mut s0: u32 = input.0;
|
||||
let mut s1: u32 = input.1;
|
||||
let mut i = 0;
|
||||
if native_endian {
|
||||
while i < buf.len() {
|
||||
let v0 = u32::from_ne_bytes(buf[i..i + 4].try_into().unwrap());
|
||||
let v1 = u32::from_ne_bytes(buf[i + 4..i + 8].try_into().unwrap());
|
||||
s0 = s0.wrapping_add(v0.wrapping_add(s1));
|
||||
s1 = s1.wrapping_add(v1.wrapping_add(s0));
|
||||
i += 8;
|
||||
}
|
||||
} else {
|
||||
while i < buf.len() {
|
||||
let v0 = u32::from_ne_bytes(buf[i..i + 4].try_into().unwrap()).swap_bytes();
|
||||
let v1 = u32::from_ne_bytes(buf[i + 4..i + 8].try_into().unwrap()).swap_bytes();
|
||||
s0 = s0.wrapping_add(v0.wrapping_add(s1));
|
||||
s1 = s1.wrapping_add(v1.wrapping_add(s0));
|
||||
i += 8;
|
||||
}
|
||||
}
|
||||
(s0, s1)
|
||||
}
|
||||
|
||||
impl WalHeader {
|
||||
pub fn as_bytes(&self) -> &[u8] {
|
||||
unsafe { std::mem::transmute::<&WalHeader, &[u8; std::mem::size_of::<WalHeader>()]>(self) }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -10,6 +10,8 @@ use crate::storage::sqlite3_ondisk::{
|
||||
use crate::Completion;
|
||||
use crate::{storage::pager::Page, Result};
|
||||
|
||||
use self::sqlite3_ondisk::{checksum_wal, WAL_MAGIC_BE, WAL_MAGIC_LE};
|
||||
|
||||
use super::buffer_pool::BufferPool;
|
||||
use super::pager::Pager;
|
||||
use super::sqlite3_ondisk::{self, begin_write_btree_page, WalHeader};
|
||||
@@ -72,6 +74,9 @@ pub struct WalFile {
|
||||
|
||||
syncing: Rc<RefCell<bool>>,
|
||||
page_size: usize,
|
||||
|
||||
last_checksum: RefCell<(u32, u32)>, // Check of last frame in WAL, this is a cumulative checksum
|
||||
// over all frames in the WAL
|
||||
}
|
||||
|
||||
pub enum CheckpointStatus {
|
||||
@@ -144,13 +149,20 @@ impl Wal for WalFile {
|
||||
offset,
|
||||
page_id
|
||||
);
|
||||
begin_write_wal_frame(
|
||||
let header = self.wal_header.borrow();
|
||||
let header = header.as_ref().unwrap();
|
||||
let header = header.borrow();
|
||||
let checksums = *self.last_checksum.borrow();
|
||||
let checksums = begin_write_wal_frame(
|
||||
self.file.borrow().as_ref().unwrap(),
|
||||
offset,
|
||||
&page,
|
||||
db_size,
|
||||
write_counter,
|
||||
&*header,
|
||||
checksums,
|
||||
)?;
|
||||
self.last_checksum.replace(checksums);
|
||||
self.max_frame.replace(frame_id + 1);
|
||||
{
|
||||
let mut frame_cache = self.frame_cache.borrow_mut();
|
||||
@@ -245,6 +257,7 @@ impl WalFile {
|
||||
ongoing_checkpoint: HashSet::new(),
|
||||
syncing: Rc::new(RefCell::new(false)),
|
||||
page_size,
|
||||
last_checksum: RefCell::new((0, 0)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,16 +277,37 @@ impl WalFile {
|
||||
self.io.run_once()?;
|
||||
self.wal_header.replace(Some(wal_header));
|
||||
} else {
|
||||
let wal_header = WalHeader {
|
||||
magic: (0x377f0682_u32).to_be_bytes(),
|
||||
// magic is a single number represented as WAL_MAGIC_LE but the big endian
|
||||
// counterpart is just the same number with LSB set to 1.
|
||||
let magic = if cfg!(target_endian = "big") {
|
||||
WAL_MAGIC_BE
|
||||
} else {
|
||||
WAL_MAGIC_LE
|
||||
};
|
||||
let mut wal_header = WalHeader {
|
||||
magic,
|
||||
file_format: 3007000,
|
||||
page_size: self.page_size as u32,
|
||||
checkpoint_seq: 0, // TODO implement sequence number
|
||||
salt_1: 0, // TODO implement salt
|
||||
salt_2: 0,
|
||||
checksum_1: 0,
|
||||
checksum_2: 0, // TODO implement checksum header
|
||||
checksum_2: 0,
|
||||
};
|
||||
let native = cfg!(target_endian = "big"); // if target_endian is
|
||||
// already big then we don't care but if isn't, header hasn't yet been
|
||||
// encoded to big endian, therefore we wan't to swap bytes to compute this
|
||||
// checksum.
|
||||
let checksums = *self.last_checksum.borrow_mut();
|
||||
let checksums = checksum_wal(
|
||||
&wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes
|
||||
&wal_header,
|
||||
checksums,
|
||||
native, // this is false because we haven't encoded the wal header yet
|
||||
);
|
||||
wal_header.checksum_1 = checksums.0;
|
||||
wal_header.checksum_2 = checksums.1;
|
||||
self.last_checksum.replace(checksums);
|
||||
sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?;
|
||||
self.wal_header
|
||||
.replace(Some(Rc::new(RefCell::new(wal_header))));
|
||||
|
||||
Reference in New Issue
Block a user