Merge 'Ignore WAL frames after bad checksum' from Pere Diaz Bou

SQLite basically ignores bad frames instead of panicking, let's try to
do the same.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #1956
This commit is contained in:
Pere Diaz Bou
2025-07-25 15:31:12 +02:00
2 changed files with 113 additions and 91 deletions

View File

@@ -1401,15 +1401,22 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
use_native_endian_checksum,
);
if calculated_header_checksum != (header_locked.checksum_1, header_locked.checksum_2) {
panic!(
"WAL header checksum mismatch. Expected ({}, {}), Got ({}, {})",
let checksum_header_failed = if calculated_header_checksum
!= (header_locked.checksum_1, header_locked.checksum_2)
{
tracing::error!(
"WAL header checksum mismatch. Expected ({}, {}), Got ({}, {}). Ignoring frames starting from frame {}",
header_locked.checksum_1,
header_locked.checksum_2,
calculated_header_checksum.0,
calculated_header_checksum.1
calculated_header_checksum.1,
0
);
}
true
} else {
false
};
let mut cumulative_checksum = (header_locked.checksum_1, header_locked.checksum_2);
let page_size_u32 = header_locked.page_size;
@@ -1426,87 +1433,105 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
let wfs_data = unsafe { &mut *wal_file_shared_for_completion.get() };
while current_offset + WAL_FRAME_HEADER_SIZE + page_size <= buf_slice.len() {
let frame_header_slice =
&buf_slice[current_offset..current_offset + WAL_FRAME_HEADER_SIZE];
let page_data_slice = &buf_slice[current_offset + WAL_FRAME_HEADER_SIZE
..current_offset + WAL_FRAME_HEADER_SIZE + page_size];
if !checksum_header_failed {
while current_offset + WAL_FRAME_HEADER_SIZE + page_size <= buf_slice.len() {
let frame_header_slice =
&buf_slice[current_offset..current_offset + WAL_FRAME_HEADER_SIZE];
let page_data_slice = &buf_slice[current_offset + WAL_FRAME_HEADER_SIZE
..current_offset + WAL_FRAME_HEADER_SIZE + page_size];
let frame_h_page_number =
u32::from_be_bytes(frame_header_slice[0..4].try_into().unwrap());
let frame_h_db_size = u32::from_be_bytes(frame_header_slice[4..8].try_into().unwrap());
let frame_h_salt_1 = u32::from_be_bytes(frame_header_slice[8..12].try_into().unwrap());
let frame_h_salt_2 = u32::from_be_bytes(frame_header_slice[12..16].try_into().unwrap());
let frame_h_checksum_1 =
u32::from_be_bytes(frame_header_slice[16..20].try_into().unwrap());
let frame_h_checksum_2 =
u32::from_be_bytes(frame_header_slice[20..24].try_into().unwrap());
let frame_h_page_number =
u32::from_be_bytes(frame_header_slice[0..4].try_into().unwrap());
let frame_h_db_size =
u32::from_be_bytes(frame_header_slice[4..8].try_into().unwrap());
let frame_h_salt_1 =
u32::from_be_bytes(frame_header_slice[8..12].try_into().unwrap());
let frame_h_salt_2 =
u32::from_be_bytes(frame_header_slice[12..16].try_into().unwrap());
let frame_h_checksum_1 =
u32::from_be_bytes(frame_header_slice[16..20].try_into().unwrap());
let frame_h_checksum_2 =
u32::from_be_bytes(frame_header_slice[20..24].try_into().unwrap());
// It contains more frames with mismatched SALT values, which means they're leftovers from previous checkpoints
if frame_h_salt_1 != header_locked.salt_1 || frame_h_salt_2 != header_locked.salt_2 {
tracing::trace!(
"WAL frame salt mismatch: expected ({}, {}), got ({}, {}), ignoring frame",
header_locked.salt_1,
header_locked.salt_2,
frame_h_salt_1,
frame_h_salt_2
if frame_h_page_number == 0 {
tracing::trace!(
"WAL frame with page number 0. Ignoring frames starting from frame {}",
frame_idx
);
break;
}
// It contains more frames with mismatched SALT values, which means they're leftovers from previous checkpoints
if frame_h_salt_1 != header_locked.salt_1 || frame_h_salt_2 != header_locked.salt_2
{
tracing::trace!(
"WAL frame salt mismatch: expected ({}, {}), got ({}, {}). Ignoring frames starting from frame {}",
header_locked.salt_1,
header_locked.salt_2,
frame_h_salt_1,
frame_h_salt_2,
frame_idx
);
break;
}
let checksum_after_fh_meta = checksum_wal(
&frame_header_slice[0..8],
&header_locked,
cumulative_checksum,
use_native_endian_checksum,
);
break;
}
let checksum_after_fh_meta = checksum_wal(
&frame_header_slice[0..8],
&header_locked,
cumulative_checksum,
use_native_endian_checksum,
);
let calculated_frame_checksum = checksum_wal(
page_data_slice,
&header_locked,
checksum_after_fh_meta,
use_native_endian_checksum,
);
tracing::debug!(
"read_entire_wal_dumb(frame_h_checksum=({}, {}), calculated_frame_checksum=({}, {}))",
frame_h_checksum_1,
frame_h_checksum_2,
calculated_frame_checksum.0,
calculated_frame_checksum.1
);
if calculated_frame_checksum != (frame_h_checksum_1, frame_h_checksum_2) {
panic!(
"WAL frame checksum mismatch. Expected ({}, {}), Got ({}, {})",
let calculated_frame_checksum = checksum_wal(
page_data_slice,
&header_locked,
checksum_after_fh_meta,
use_native_endian_checksum,
);
tracing::debug!(
"read_entire_wal_dumb(frame_h_checksum=({}, {}), calculated_frame_checksum=({}, {}))",
frame_h_checksum_1,
frame_h_checksum_2,
calculated_frame_checksum.0,
calculated_frame_checksum.1
);
if calculated_frame_checksum != (frame_h_checksum_1, frame_h_checksum_2) {
tracing::error!(
"WAL frame checksum mismatch. Expected ({}, {}), Got ({}, {}). Ignoring frames starting from frame {}",
frame_h_checksum_1,
frame_h_checksum_2,
calculated_frame_checksum.0,
calculated_frame_checksum.1,
frame_idx
);
break;
}
cumulative_checksum = calculated_frame_checksum;
wfs_data
.frame_cache
.lock()
.entry(frame_h_page_number as u64)
.or_default()
.push(frame_idx);
wfs_data
.pages_in_frames
.lock()
.push(frame_h_page_number as u64);
let is_commit_record = frame_h_db_size > 0;
if is_commit_record {
wfs_data.max_frame.store(frame_idx, Ordering::SeqCst);
wfs_data.last_checksum = cumulative_checksum;
}
frame_idx += 1;
current_offset += WAL_FRAME_HEADER_SIZE + page_size;
}
cumulative_checksum = calculated_frame_checksum;
wfs_data
.frame_cache
.lock()
.entry(frame_h_page_number as u64)
.or_default()
.push(frame_idx);
wfs_data
.pages_in_frames
.lock()
.push(frame_h_page_number as u64);
let is_commit_record = frame_h_db_size > 0;
if is_commit_record {
wfs_data.max_frame.store(frame_idx, Ordering::SeqCst);
}
frame_idx += 1;
current_offset += WAL_FRAME_HEADER_SIZE + page_size;
}
wfs_data.last_checksum = cumulative_checksum;
wfs_data.nbackfills.store(0, Ordering::SeqCst);
wfs_data.loaded.store(true, Ordering::SeqCst);
});
let c = Completion::new_read(buf_for_pread, complete);

View File

@@ -3,7 +3,7 @@ use crate::common::{compare_string, do_flush, TempDatabase};
use log::debug;
use std::io::{Read, Seek, Write};
use std::sync::Arc;
use turso_core::{Connection, Database, Row, Statement, StepResult, Value};
use turso_core::{Connection, Database, LimboError, Row, Statement, StepResult, Value};
const WAL_HEADER_SIZE: usize = 32;
const WAL_FRAME_HEADER_SIZE: usize = 24;
@@ -696,7 +696,7 @@ fn test_wal_bad_frame() -> anyhow::Result<()> {
db_path
};
{
let result = std::panic::catch_unwind(|| {
let result = {
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
let db = Database::open_file_with_flags(
io.clone(),
@@ -716,19 +716,20 @@ fn test_wal_bad_frame() -> anyhow::Result<()> {
let x = row.get::<i64>(0).unwrap();
assert_eq!(x, 0);
})
});
};
match result {
Err(panic_info) => {
let panic_msg = panic_info
.downcast_ref::<String>()
.map(|s| s.as_str())
.or_else(|| panic_info.downcast_ref::<&str>().copied())
.unwrap_or("Unknown panic message");
Err(error) => {
dbg!(&error);
let panic_msg = error.downcast_ref::<LimboError>().unwrap();
let msg = match panic_msg {
LimboError::ParseError(message) => message,
_ => panic!("Unexpected panic message: {panic_msg}"),
};
assert!(
panic_msg.contains("WAL frame checksum mismatch."),
"Expected panic message not found. Got: {panic_msg}"
msg.contains("no such table: t2"),
"Expected panic message not found. Got: {msg}"
);
}
Ok(_) => panic!("Expected query to panic, but it succeeded"),
@@ -784,8 +785,8 @@ pub fn run_query_core(
query: &str,
mut on_row: Option<impl FnMut(&Row)>,
) -> anyhow::Result<()> {
match conn.query(query) {
Ok(Some(ref mut rows)) => loop {
if let Some(ref mut rows) = conn.query(query)? {
loop {
match rows.step()? {
StepResult::IO => {
rows.run_once()?;
@@ -799,10 +800,6 @@ pub fn run_query_core(
}
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{err}");
}
};
Ok(())