diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 01a814c31..ff40c4ab7 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -346,7 +346,7 @@ impl Pager { } pub fn cacheflush(&self) -> Result { - let mut checkpoint_result = CheckpointResult::new(); + let mut checkpoint_result = CheckpointResult::default(); loop { let state = self.flush_info.borrow().state; trace!("cacheflush {:?}", state); @@ -424,7 +424,7 @@ impl Pager { } pub fn checkpoint(&self) -> Result { - let mut checkpoint_result = CheckpointResult::new(); + let mut checkpoint_result = CheckpointResult::default(); loop { let state = *self.checkpoint_state.borrow(); trace!("pager_checkpoint(state={:?})", state); diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index f7406008a..e37c9ae7a 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1354,8 +1354,8 @@ pub fn begin_write_wal_frame( let mut header = WalFrameHeader { page_number: page_id as u32, db_size, - salt_1: 0, - salt_2: 0, + salt_1: wal_header.salt_1, + salt_2: wal_header.salt_2, checksum_1: 0, checksum_2: 0, }; @@ -1371,28 +1371,34 @@ pub fn begin_write_wal_frame( let buf = buffer.as_mut_slice(); buf[0..4].copy_from_slice(&header.page_number.to_be_bytes()); buf[4..8].copy_from_slice(&header.db_size.to_be_bytes()); - - { - let contents_buf = contents.as_ptr(); - let expects_be = wal_header.magic & 1; // LSB is set on big endian checksums - let use_native_endian = cfg!(target_endian = "big") as u32 == expects_be; // check if checksum - // type and native type is the same so that we know when to swap bytes - let checksums = checksum_wal(&buf[0..8], wal_header, checksums, use_native_endian); - let checksums = checksum_wal(contents_buf, wal_header, checksums, use_native_endian); - header.checksum_1 = checksums.0; - header.checksum_2 = checksums.1; - header.salt_1 = wal_header.salt_1; - header.salt_2 = wal_header.salt_2; - } - buf[8..12].copy_from_slice(&header.salt_1.to_be_bytes()); buf[12..16].copy_from_slice(&header.salt_2.to_be_bytes()); + + let contents_buf = contents.as_ptr(); + let content_len = contents_buf.len(); + buf[WAL_FRAME_HEADER_SIZE..WAL_FRAME_HEADER_SIZE + content_len] + .copy_from_slice(contents_buf); + if content_len < 4096 { + buf[WAL_FRAME_HEADER_SIZE + content_len..WAL_FRAME_HEADER_SIZE + 4096].fill(0); + } + + let expects_be = wal_header.magic & 1; + let use_native_endian = cfg!(target_endian = "big") as u32 == expects_be; + let header_checksum = checksum_wal(&buf[0..8], wal_header, checksums, use_native_endian); // Only 8 bytes + let final_checksum = checksum_wal( + &buf[WAL_FRAME_HEADER_SIZE..WAL_FRAME_HEADER_SIZE + 4096], + wal_header, + header_checksum, + use_native_endian, + ); + header.checksum_1 = final_checksum.0; + header.checksum_2 = final_checksum.1; + buf[16..20].copy_from_slice(&header.checksum_1.to_be_bytes()); buf[20..24].copy_from_slice(&header.checksum_2.to_be_bytes()); - buf[WAL_FRAME_HEADER_SIZE..].copy_from_slice(contents.as_ptr()); #[allow(clippy::arc_with_non_send_sync)] - (Arc::new(RefCell::new(buffer)), checksums) + (Arc::new(RefCell::new(buffer)), final_checksum) }; *write_counter.borrow_mut() += 1; @@ -1411,6 +1417,7 @@ pub fn begin_write_wal_frame( }; let c = Completion::Write(WriteCompletion::new(write_complete)); io.pwrite(offset, buffer.clone(), c)?; + trace!("Frame written and synced at offset={offset}"); Ok(checksums) } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index b14de5e25..a54632bda 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -11,7 +11,7 @@ use crate::result::LimboResult; use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; -use crate::{Buffer, Result}; +use crate::{Buffer, LimboError, Result}; use crate::{Completion, Page}; use self::sqlite3_ondisk::{checksum_wal, PageContent, WAL_MAGIC_BE, WAL_MAGIC_LE}; @@ -34,6 +34,12 @@ pub struct CheckpointResult { pub num_checkpointed_frames: u64, } +impl Default for CheckpointResult { + fn default() -> Self { + Self::new() + } +} + impl CheckpointResult { pub fn new() -> Self { Self { @@ -443,7 +449,7 @@ impl Wal for WalFile { let frame_id = if shared.max_frame == 0 { 1 } else { - shared.max_frame + shared.max_frame + 1 }; let offset = self.frame_offset(frame_id); trace!( @@ -465,7 +471,7 @@ impl Wal for WalFile { checksums, )?; shared.last_checksum = checksums; - shared.max_frame = frame_id + 1; + shared.max_frame = frame_id; { let frames = shared.frame_cache.get_mut(&(page_id as u64)); match frames { @@ -721,8 +727,9 @@ impl WalFile { } fn frame_offset(&self, frame_id: u64) -> usize { + assert!(frame_id > 0, "Frame ID must be 1-based"); let page_size = self.page_size; - let page_offset = frame_id * (page_size as u64 + WAL_FRAME_HEADER_SIZE as u64); + let page_offset = (frame_id - 1) * (page_size + WAL_FRAME_HEADER_SIZE) as u64; let offset = WAL_HEADER_SIZE as u64 + page_offset; offset as usize } @@ -738,7 +745,7 @@ impl WalFileShared { let header = if file.size()? > 0 { let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) { Ok(header) => header, - Err(err) => panic!("Couldn't read header page: {:?}", err), + Err(err) => return Err(LimboError::ParseError(err.to_string())), }; tracing::info!("recover not implemented yet"); // TODO: Return a completion instead. @@ -755,8 +762,8 @@ impl WalFileShared { file_format: 3007000, page_size: page_size as u32, checkpoint_seq: 0, // TODO implement sequence number - salt_1: 0, // TODO implement salt - salt_2: 0, + salt_1: io.generate_random_number() as u32, + salt_2: io.generate_random_number() as u32, checksum_1: 0, checksum_2: 0, };