Merge 'WAL frame checksum support' from Daniel Boll

closes #1151

Closes #1184
This commit is contained in:
Pekka Enberg
2025-03-26 17:48:52 +02:00
3 changed files with 41 additions and 27 deletions

View File

@@ -346,7 +346,7 @@ impl Pager {
}
pub fn cacheflush(&self) -> Result<CheckpointStatus> {
let mut checkpoint_result = CheckpointResult::new();
let mut checkpoint_result = CheckpointResult::default();
loop {
let state = self.flush_info.borrow().state;
trace!("cacheflush {:?}", state);
@@ -424,7 +424,7 @@ impl Pager {
}
pub fn checkpoint(&self) -> Result<CheckpointStatus> {
let mut checkpoint_result = CheckpointResult::new();
let mut checkpoint_result = CheckpointResult::default();
loop {
let state = *self.checkpoint_state.borrow();
trace!("pager_checkpoint(state={:?})", state);

View File

@@ -1354,8 +1354,8 @@ pub fn begin_write_wal_frame(
let mut header = WalFrameHeader {
page_number: page_id as u32,
db_size,
salt_1: 0,
salt_2: 0,
salt_1: wal_header.salt_1,
salt_2: wal_header.salt_2,
checksum_1: 0,
checksum_2: 0,
};
@@ -1371,28 +1371,34 @@ pub fn begin_write_wal_frame(
let buf = buffer.as_mut_slice();
buf[0..4].copy_from_slice(&header.page_number.to_be_bytes());
buf[4..8].copy_from_slice(&header.db_size.to_be_bytes());
{
let contents_buf = contents.as_ptr();
let expects_be = wal_header.magic & 1; // LSB is set on big endian checksums
let use_native_endian = cfg!(target_endian = "big") as u32 == expects_be; // check if checksum
// type and native type is the same so that we know when to swap bytes
let checksums = checksum_wal(&buf[0..8], wal_header, checksums, use_native_endian);
let checksums = checksum_wal(contents_buf, wal_header, checksums, use_native_endian);
header.checksum_1 = checksums.0;
header.checksum_2 = checksums.1;
header.salt_1 = wal_header.salt_1;
header.salt_2 = wal_header.salt_2;
}
buf[8..12].copy_from_slice(&header.salt_1.to_be_bytes());
buf[12..16].copy_from_slice(&header.salt_2.to_be_bytes());
let contents_buf = contents.as_ptr();
let content_len = contents_buf.len();
buf[WAL_FRAME_HEADER_SIZE..WAL_FRAME_HEADER_SIZE + content_len]
.copy_from_slice(contents_buf);
if content_len < 4096 {
buf[WAL_FRAME_HEADER_SIZE + content_len..WAL_FRAME_HEADER_SIZE + 4096].fill(0);
}
let expects_be = wal_header.magic & 1;
let use_native_endian = cfg!(target_endian = "big") as u32 == expects_be;
let header_checksum = checksum_wal(&buf[0..8], wal_header, checksums, use_native_endian); // Only 8 bytes
let final_checksum = checksum_wal(
&buf[WAL_FRAME_HEADER_SIZE..WAL_FRAME_HEADER_SIZE + 4096],
wal_header,
header_checksum,
use_native_endian,
);
header.checksum_1 = final_checksum.0;
header.checksum_2 = final_checksum.1;
buf[16..20].copy_from_slice(&header.checksum_1.to_be_bytes());
buf[20..24].copy_from_slice(&header.checksum_2.to_be_bytes());
buf[WAL_FRAME_HEADER_SIZE..].copy_from_slice(contents.as_ptr());
#[allow(clippy::arc_with_non_send_sync)]
(Arc::new(RefCell::new(buffer)), checksums)
(Arc::new(RefCell::new(buffer)), final_checksum)
};
*write_counter.borrow_mut() += 1;
@@ -1411,6 +1417,7 @@ pub fn begin_write_wal_frame(
};
let c = Completion::Write(WriteCompletion::new(write_complete));
io.pwrite(offset, buffer.clone(), c)?;
trace!("Frame written and synced at offset={offset}");
Ok(checksums)
}

View File

@@ -11,7 +11,7 @@ use crate::result::LimboResult;
use crate::storage::sqlite3_ondisk::{
begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
};
use crate::{Buffer, Result};
use crate::{Buffer, LimboError, Result};
use crate::{Completion, Page};
use self::sqlite3_ondisk::{checksum_wal, PageContent, WAL_MAGIC_BE, WAL_MAGIC_LE};
@@ -34,6 +34,12 @@ pub struct CheckpointResult {
pub num_checkpointed_frames: u64,
}
impl Default for CheckpointResult {
fn default() -> Self {
Self::new()
}
}
impl CheckpointResult {
pub fn new() -> Self {
Self {
@@ -443,7 +449,7 @@ impl Wal for WalFile {
let frame_id = if shared.max_frame == 0 {
1
} else {
shared.max_frame
shared.max_frame + 1
};
let offset = self.frame_offset(frame_id);
trace!(
@@ -465,7 +471,7 @@ impl Wal for WalFile {
checksums,
)?;
shared.last_checksum = checksums;
shared.max_frame = frame_id + 1;
shared.max_frame = frame_id;
{
let frames = shared.frame_cache.get_mut(&(page_id as u64));
match frames {
@@ -721,8 +727,9 @@ impl WalFile {
}
fn frame_offset(&self, frame_id: u64) -> usize {
assert!(frame_id > 0, "Frame ID must be 1-based");
let page_size = self.page_size;
let page_offset = frame_id * (page_size as u64 + WAL_FRAME_HEADER_SIZE as u64);
let page_offset = (frame_id - 1) * (page_size + WAL_FRAME_HEADER_SIZE) as u64;
let offset = WAL_HEADER_SIZE as u64 + page_offset;
offset as usize
}
@@ -738,7 +745,7 @@ impl WalFileShared {
let header = if file.size()? > 0 {
let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) {
Ok(header) => header,
Err(err) => panic!("Couldn't read header page: {:?}", err),
Err(err) => return Err(LimboError::ParseError(err.to_string())),
};
tracing::info!("recover not implemented yet");
// TODO: Return a completion instead.
@@ -755,8 +762,8 @@ impl WalFileShared {
file_format: 3007000,
page_size: page_size as u32,
checkpoint_seq: 0, // TODO implement sequence number
salt_1: 0, // TODO implement salt
salt_2: 0,
salt_1: io.generate_random_number() as u32,
salt_2: io.generate_random_number() as u32,
checksum_1: 0,
checksum_2: 0,
};