mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-19 09:34:18 +01:00
properly set last_checksum after recovering wal
We store `last_checksum` to do cumulative checksumming. After reading wal for recovery, we didn't set last checksum properly in case there were no frames so this cause us to not initialize last_checksum properly.
This commit is contained in:
@@ -1359,6 +1359,7 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
|
|||||||
u32::from_be_bytes([buf_slice[24], buf_slice[25], buf_slice[26], buf_slice[27]]);
|
u32::from_be_bytes([buf_slice[24], buf_slice[25], buf_slice[26], buf_slice[27]]);
|
||||||
header_locked.checksum_2 =
|
header_locked.checksum_2 =
|
||||||
u32::from_be_bytes([buf_slice[28], buf_slice[29], buf_slice[30], buf_slice[31]]);
|
u32::from_be_bytes([buf_slice[28], buf_slice[29], buf_slice[30], buf_slice[31]]);
|
||||||
|
tracing::debug!("read_entire_wal_dumb(header={:?})", *header_locked);
|
||||||
|
|
||||||
// Read frames into frame_cache and pages_in_frames
|
// Read frames into frame_cache and pages_in_frames
|
||||||
if buf_slice.len() < WAL_HEADER_SIZE {
|
if buf_slice.len() < WAL_HEADER_SIZE {
|
||||||
@@ -1441,6 +1442,13 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
|
|||||||
use_native_endian_checksum,
|
use_native_endian_checksum,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
tracing::debug!(
|
||||||
|
"read_entire_wal_dumb(frame_h_checksum=({}, {}), calculated_frame_checksum=({}, {}))",
|
||||||
|
frame_h_checksum_1,
|
||||||
|
frame_h_checksum_2,
|
||||||
|
calculated_frame_checksum.0,
|
||||||
|
calculated_frame_checksum.1
|
||||||
|
);
|
||||||
if calculated_frame_checksum != (frame_h_checksum_1, frame_h_checksum_2) {
|
if calculated_frame_checksum != (frame_h_checksum_1, frame_h_checksum_2) {
|
||||||
panic!(
|
panic!(
|
||||||
"WAL frame checksum mismatch. Expected ({}, {}), Got ({}, {})",
|
"WAL frame checksum mismatch. Expected ({}, {}), Got ({}, {})",
|
||||||
@@ -1467,13 +1475,13 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
|
|||||||
let is_commit_record = frame_h_db_size > 0;
|
let is_commit_record = frame_h_db_size > 0;
|
||||||
if is_commit_record {
|
if is_commit_record {
|
||||||
wfs_data.max_frame.store(frame_idx, Ordering::SeqCst);
|
wfs_data.max_frame.store(frame_idx, Ordering::SeqCst);
|
||||||
wfs_data.last_checksum = cumulative_checksum;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
frame_idx += 1;
|
frame_idx += 1;
|
||||||
current_offset += WAL_FRAME_HEADER_SIZE + page_size;
|
current_offset += WAL_FRAME_HEADER_SIZE + page_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wfs_data.last_checksum = cumulative_checksum;
|
||||||
wfs_data.loaded.store(true, Ordering::SeqCst);
|
wfs_data.loaded.store(true, Ordering::SeqCst);
|
||||||
});
|
});
|
||||||
let c = Completion::new(CompletionType::Read(ReadCompletion::new(
|
let c = Completion::new(CompletionType::Read(ReadCompletion::new(
|
||||||
@@ -1563,6 +1571,11 @@ pub fn begin_write_wal_frame(
|
|||||||
);
|
);
|
||||||
header.checksum_1 = final_checksum.0;
|
header.checksum_1 = final_checksum.0;
|
||||||
header.checksum_2 = final_checksum.1;
|
header.checksum_2 = final_checksum.1;
|
||||||
|
tracing::trace!(
|
||||||
|
"begin_write_wal_frame(checksum=({}, {}))",
|
||||||
|
header.checksum_1,
|
||||||
|
header.checksum_2
|
||||||
|
);
|
||||||
|
|
||||||
buf[16..20].copy_from_slice(&header.checksum_1.to_be_bytes());
|
buf[16..20].copy_from_slice(&header.checksum_1.to_be_bytes());
|
||||||
buf[20..24].copy_from_slice(&header.checksum_2.to_be_bytes());
|
buf[20..24].copy_from_slice(&header.checksum_2.to_be_bytes());
|
||||||
@@ -1599,6 +1612,7 @@ pub fn begin_write_wal_frame(
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<()> {
|
pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<()> {
|
||||||
|
tracing::trace!("begin_write_wal_header");
|
||||||
let buffer = {
|
let buffer = {
|
||||||
let drop_fn = Rc::new(|_buf| {});
|
let drop_fn = Rc::new(|_buf| {});
|
||||||
|
|
||||||
|
|||||||
@@ -977,6 +977,7 @@ impl WalFile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let header = unsafe { shared.get().as_mut().unwrap().wal_header.lock() };
|
let header = unsafe { shared.get().as_mut().unwrap().wal_header.lock() };
|
||||||
|
let last_checksum = unsafe { (*shared.get()).last_checksum };
|
||||||
Self {
|
Self {
|
||||||
io,
|
io,
|
||||||
// default to max frame in WAL, so that when we read schema we can read from WAL too if it's there.
|
// default to max frame in WAL, so that when we read schema we can read from WAL too if it's there.
|
||||||
@@ -995,7 +996,7 @@ impl WalFile {
|
|||||||
sync_state: Cell::new(SyncState::NotSyncing),
|
sync_state: Cell::new(SyncState::NotSyncing),
|
||||||
min_frame: 0,
|
min_frame: 0,
|
||||||
max_frame_read_lock_index: 0,
|
max_frame_read_lock_index: 0,
|
||||||
last_checksum: (0, 0),
|
last_checksum,
|
||||||
start_pages_in_frames: 0,
|
start_pages_in_frames: 0,
|
||||||
header: *header,
|
header: *header,
|
||||||
}
|
}
|
||||||
@@ -1083,6 +1084,7 @@ impl WalFileShared {
|
|||||||
let checksum = header.lock();
|
let checksum = header.lock();
|
||||||
(checksum.checksum_1, checksum.checksum_2)
|
(checksum.checksum_1, checksum.checksum_2)
|
||||||
};
|
};
|
||||||
|
tracing::debug!("new_shared(header={:?})", header);
|
||||||
let shared = WalFileShared {
|
let shared = WalFileShared {
|
||||||
wal_header: header,
|
wal_header: header,
|
||||||
min_frame: AtomicU64::new(0),
|
min_frame: AtomicU64::new(0),
|
||||||
|
|||||||
@@ -734,6 +734,34 @@ fn test_wal_bad_frame() -> anyhow::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_read_wal_dumb_no_frames() -> anyhow::Result<()> {
|
||||||
|
maybe_setup_tracing();
|
||||||
|
let _ = env_logger::try_init();
|
||||||
|
let db_path = {
|
||||||
|
let tmp_db = TempDatabase::new_empty(false);
|
||||||
|
let conn = tmp_db.connect_limbo();
|
||||||
|
conn.close()?;
|
||||||
|
let db_path = tmp_db.path.clone();
|
||||||
|
db_path
|
||||||
|
};
|
||||||
|
// Second connection must recover from the WAL file. Last checksum should be filled correctly.
|
||||||
|
{
|
||||||
|
let tmp_db = TempDatabase::new_with_existent(&db_path, false);
|
||||||
|
let conn = tmp_db.connect_limbo();
|
||||||
|
conn.execute("CREATE TABLE t0(x)")?;
|
||||||
|
conn.close()?;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let tmp_db = TempDatabase::new_with_existent(&db_path, false);
|
||||||
|
let conn = tmp_db.connect_limbo();
|
||||||
|
conn.execute("INSERT INTO t0(x) VALUES (1)")?;
|
||||||
|
conn.close()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn run_query(tmp_db: &TempDatabase, conn: &Arc<Connection>, query: &str) -> anyhow::Result<()> {
|
fn run_query(tmp_db: &TempDatabase, conn: &Arc<Connection>, query: &str) -> anyhow::Result<()> {
|
||||||
run_query_core(tmp_db, conn, query, None::<fn(&Row)>)
|
run_query_core(tmp_db, conn, query, None::<fn(&Row)>)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user