diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 696d10d05..0d7fbcee9 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1622,11 +1622,14 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec) { payload.extend_from_slice(&varint[0..n]); } -/// We need to read the WAL file on open to reconstruct the WAL frame cache. -pub fn read_entire_wal_dumb(file: &Arc) -> Result>> { +/// Stream through frames in chunks, building frame_cache incrementally +/// Track last valid commit frame for consistency +pub fn build_shared_wal( + file: &Arc, + io: &Arc, +) -> Result>> { let size = file.size()?; - #[allow(clippy::arc_with_non_send_sync)] - let buf_for_pread = Arc::new(Buffer::new_temporary(size as usize)); + let header = Arc::new(SpinLock::new(WalHeader::default())); let read_locks = std::array::from_fn(|_| TursoRwLock::new()); for (i, l) in read_locks.iter().enumerate() { @@ -1634,8 +1637,8 @@ pub fn read_entire_wal_dumb(file: &Arc) -> Result) -> Result = Box::new(move |res: Result<(Arc, i32), _>| { + + if size < WAL_HEADER_SIZE as u64 { + wal_file_shared.write().loaded.store(true, Ordering::SeqCst); + return Ok(wal_file_shared); + } + + let reader = Arc::new(StreamingWalReader::new( + file.clone(), + wal_file_shared.clone(), + header.clone(), + size, + )); + + let h = reader.clone().read_header()?; + io.wait_for_completion(h)?; + + loop { + if reader.done.load(Ordering::Acquire) { + break; + } + let offset = reader.off_atomic.load(Ordering::Acquire); + if offset >= size { + reader.finalize_loading(); + break; + } + + let (_read_size, c) = reader.clone().submit_one_chunk(offset)?; + io.wait_for_completion(c)?; + + let new_off = reader.off_atomic.load(Ordering::Acquire); + if new_off <= offset { + reader.finalize_loading(); + break; + } + } + + Ok(wal_file_shared) +} + +pub(super) struct StreamingWalReader { + file: Arc, + wal_shared: Arc>, + header: Arc>, + file_size: u64, + state: RwLock, + off_atomic: AtomicU64, + page_atomic: AtomicU64, + pub(super) done: AtomicBool, +} + +/// Mutable state for streaming reader +struct StreamingState { + frame_idx: u64, + cumulative_checksum: (u32, u32), + last_valid_frame: u64, + pending_frames: HashMap>, + page_size: usize, + use_native_endian: bool, + header_valid: bool, +} + +impl StreamingWalReader { + fn new( + file: Arc, + wal_shared: Arc>, + header: Arc>, + file_size: u64, + ) -> Self { + Self { + file, + wal_shared, + header, + file_size, + off_atomic: AtomicU64::new(0), + page_atomic: AtomicU64::new(0), + done: AtomicBool::new(false), + state: RwLock::new(StreamingState { + frame_idx: 1, + cumulative_checksum: (0, 0), + last_valid_frame: 0, + pending_frames: HashMap::new(), + page_size: 0, + use_native_endian: false, + header_valid: false, + }), + } + } + + fn read_header(self: Arc) -> crate::Result { + let header_buf = Arc::new(Buffer::new_temporary(WAL_HEADER_SIZE)); + let reader = self.clone(); + let completion: Box = Box::new(move |res| { + let _reader = reader.clone(); + _reader.handle_header_read(res); + }); + let c = Completion::new_read(header_buf, completion); + self.file.pread(0, c) + } + + fn submit_one_chunk(self: Arc, offset: u64) -> crate::Result<(usize, Completion)> { + let page_size = self.page_atomic.load(Ordering::Acquire) as usize; + if page_size == 0 { + return Err(crate::LimboError::InternalError( + "page size not initialized".into(), + )); + } + let frame_size = WAL_FRAME_HEADER_SIZE + page_size; + if frame_size == 0 { + return Err(crate::LimboError::InternalError( + "invalid frame size".into(), + )); + } + const BASE: usize = 16 * 1024 * 1024; + let aligned = (BASE / frame_size) * frame_size; + let read_size = aligned + .max(frame_size) + .min((self.file_size - offset) as usize); + if read_size == 0 { + // end-of-file; let caller finalize + return Ok((0, Completion::new_dummy())); + } + + let buf = Arc::new(Buffer::new_temporary(read_size)); + let me = self.clone(); + let completion: Box = Box::new(move |res| { + tracing::debug!("WAL chunk read complete"); + let reader = me.clone(); + reader.handle_chunk_read(res); + }); + let c = Completion::new_read(buf, completion); + let guard = self.file.pread(offset, c)?; + Ok((read_size, guard)) + } + + fn handle_header_read(self: Arc, res: Result<(Arc, i32), CompletionError>) { let Ok((buf, bytes_read)) = res else { + self.finalize_loading(); return; }; - let buf_slice = buf.as_slice(); - turso_assert!( - bytes_read == buf_slice.len() as i32, - "read({bytes_read}) != expected({})", - buf_slice.len() - ); - let mut header_locked = header.lock(); - // Read header - header_locked.magic = - u32::from_be_bytes([buf_slice[0], buf_slice[1], buf_slice[2], buf_slice[3]]); - header_locked.file_format = - u32::from_be_bytes([buf_slice[4], buf_slice[5], buf_slice[6], buf_slice[7]]); - header_locked.page_size = - u32::from_be_bytes([buf_slice[8], buf_slice[9], buf_slice[10], buf_slice[11]]); - header_locked.checkpoint_seq = - u32::from_be_bytes([buf_slice[12], buf_slice[13], buf_slice[14], buf_slice[15]]); - header_locked.salt_1 = - u32::from_be_bytes([buf_slice[16], buf_slice[17], buf_slice[18], buf_slice[19]]); - header_locked.salt_2 = - u32::from_be_bytes([buf_slice[20], buf_slice[21], buf_slice[22], buf_slice[23]]); - header_locked.checksum_1 = - u32::from_be_bytes([buf_slice[24], buf_slice[25], buf_slice[26], buf_slice[27]]); - header_locked.checksum_2 = - u32::from_be_bytes([buf_slice[28], buf_slice[29], buf_slice[30], buf_slice[31]]); - tracing::debug!("read_entire_wal_dumb(header={:?})", *header_locked); - - // Read frames into frame_cache and pages_in_frames - if buf_slice.len() < WAL_HEADER_SIZE { - panic!("WAL file too small for header"); + if bytes_read != WAL_HEADER_SIZE as i32 { + self.finalize_loading(); + return; } - let use_native_endian_checksum = - cfg!(target_endian = "big") == ((header_locked.magic & 1) != 0); + let (page_sz, c1, c2, use_native, ok) = { + let mut h = self.header.lock(); + let s = buf.as_slice(); + h.magic = u32::from_be_bytes(s[0..4].try_into().unwrap()); + h.file_format = u32::from_be_bytes(s[4..8].try_into().unwrap()); + h.page_size = u32::from_be_bytes(s[8..12].try_into().unwrap()); + h.checkpoint_seq = u32::from_be_bytes(s[12..16].try_into().unwrap()); + h.salt_1 = u32::from_be_bytes(s[16..20].try_into().unwrap()); + h.salt_2 = u32::from_be_bytes(s[20..24].try_into().unwrap()); + h.checksum_1 = u32::from_be_bytes(s[24..28].try_into().unwrap()); + h.checksum_2 = u32::from_be_bytes(s[28..32].try_into().unwrap()); + tracing::debug!("WAL header: {:?}", *h); - let calculated_header_checksum = checksum_wal( - &buf_slice[0..24], - &header_locked, - (0, 0), - use_native_endian_checksum, - ); - - let checksum_header_failed = if calculated_header_checksum - != (header_locked.checksum_1, header_locked.checksum_2) + let use_native = cfg!(target_endian = "big") == ((h.magic & 1) != 0); + let calc = checksum_wal(&s[0..24], &h, (0, 0), use_native); + ( + h.page_size, + h.checksum_1, + h.checksum_2, + use_native, + calc == (h.checksum_1, h.checksum_2), + ) + }; + if PageSize::new(page_sz).is_none() || !ok { + self.finalize_loading(); + return; + } { - tracing::error!( - "WAL header checksum mismatch. Expected ({}, {}), Got ({}, {}). Ignoring frames starting from frame {}", - header_locked.checksum_1, - header_locked.checksum_2, - calculated_header_checksum.0, - calculated_header_checksum.1, - 0 + let mut st = self.state.write(); + st.page_size = page_sz as usize; + st.use_native_endian = use_native; + st.cumulative_checksum = (c1, c2); + st.header_valid = true; + } + self.off_atomic + .store(WAL_HEADER_SIZE as u64, Ordering::Release); + self.page_atomic.store(page_sz as u64, Ordering::Release); + } - ); - true - } else { - false + fn handle_chunk_read(self: Arc, res: Result<(Arc, i32), CompletionError>) { + let Ok((buf, bytes_read)) = res else { + self.finalize_loading(); + return; + }; + let buf_slice = &buf.as_slice()[..bytes_read as usize]; + // Snapshot salts/endianness once to avoid per-frame header locks + let (header_copy, use_native) = { + let st = self.state.read(); + let h = self.header.lock(); + (*h, st.use_native_endian) }; - let mut cumulative_checksum = (header_locked.checksum_1, header_locked.checksum_2); - let page_size_u32 = header_locked.page_size; - - if PageSize::new(page_size_u32).is_none() { - panic!("Invalid page size in WAL header: {page_size_u32}"); + let consumed = self.process_frames(buf_slice, &header_copy, use_native); + self.off_atomic.fetch_add(consumed as u64, Ordering::AcqRel); + // If we didn’t consume the full chunk, we hit a stop condition + if consumed < buf_slice.len() || self.off_atomic.load(Ordering::Acquire) >= self.file_size { + self.finalize_loading(); } - let page_size = page_size_u32 as usize; + } - let mut current_offset = WAL_HEADER_SIZE; - let mut frame_idx = 1_u64; + // Processes frames from a buffer, returns bytes processed + fn process_frames(&self, buf: &[u8], header: &WalHeader, use_native: bool) -> usize { + let mut st = self.state.write(); + let page_size = st.page_size; + let frame_size = WAL_FRAME_HEADER_SIZE + page_size; + let mut pos = 0; - let mut wfs_data = wal_file_shared_for_completion.write(); + while pos + frame_size <= buf.len() { + let fh = &buf[pos..pos + WAL_FRAME_HEADER_SIZE]; + let page = &buf[pos + WAL_FRAME_HEADER_SIZE..pos + frame_size]; - if !checksum_header_failed { - while current_offset + WAL_FRAME_HEADER_SIZE + page_size <= buf_slice.len() { - let frame_header_slice = - &buf_slice[current_offset..current_offset + WAL_FRAME_HEADER_SIZE]; - let page_data_slice = &buf_slice[current_offset + WAL_FRAME_HEADER_SIZE - ..current_offset + WAL_FRAME_HEADER_SIZE + page_size]; + let page_number = u32::from_be_bytes(fh[0..4].try_into().unwrap()); + let db_size = u32::from_be_bytes(fh[4..8].try_into().unwrap()); + let s1 = u32::from_be_bytes(fh[8..12].try_into().unwrap()); + let s2 = u32::from_be_bytes(fh[12..16].try_into().unwrap()); + let c1 = u32::from_be_bytes(fh[16..20].try_into().unwrap()); + let c2 = u32::from_be_bytes(fh[20..24].try_into().unwrap()); - let frame_h_page_number = - u32::from_be_bytes(frame_header_slice[0..4].try_into().unwrap()); - let frame_h_db_size = - u32::from_be_bytes(frame_header_slice[4..8].try_into().unwrap()); - let frame_h_salt_1 = - u32::from_be_bytes(frame_header_slice[8..12].try_into().unwrap()); - let frame_h_salt_2 = - u32::from_be_bytes(frame_header_slice[12..16].try_into().unwrap()); - let frame_h_checksum_1 = - u32::from_be_bytes(frame_header_slice[16..20].try_into().unwrap()); - let frame_h_checksum_2 = - u32::from_be_bytes(frame_header_slice[20..24].try_into().unwrap()); + if page_number == 0 { + break; + } + if s1 != header.salt_1 || s2 != header.salt_2 { + break; + } - if frame_h_page_number == 0 { - tracing::trace!( - "WAL frame with page number 0. Ignoring frames starting from frame {}", - frame_idx - ); - break; + let seed = checksum_wal(&fh[0..8], header, st.cumulative_checksum, use_native); + let calc = checksum_wal(page, header, seed, use_native); + if calc != (c1, c2) { + break; + } + + st.cumulative_checksum = calc; + let frame_idx = st.frame_idx; + st.pending_frames + .entry(page_number as u64) + .or_default() + .push(frame_idx); + + if db_size > 0 { + st.last_valid_frame = st.frame_idx; + self.flush_pending_frames(&mut st); + } + st.frame_idx += 1; + pos += frame_size; + } + pos + } + + fn flush_pending_frames(&self, state: &mut StreamingState) { + if state.pending_frames.is_empty() { + return; + } + let wfs = self.wal_shared.read(); + { + let mut frame_cache = wfs.frame_cache.lock(); + for (page, mut frames) in state.pending_frames.drain() { + // Only include frames up to last valid commit + frames.retain(|&f| f <= state.last_valid_frame); + if !frames.is_empty() { + frame_cache.entry(page).or_default().extend(frames); } - // It contains more frames with mismatched SALT values, which means they're leftovers from previous checkpoints - if frame_h_salt_1 != header_locked.salt_1 || frame_h_salt_2 != header_locked.salt_2 - { - tracing::trace!( - "WAL frame salt mismatch: expected ({}, {}), got ({}, {}). Ignoring frames starting from frame {}", - header_locked.salt_1, - header_locked.salt_2, - frame_h_salt_1, - frame_h_salt_2, - frame_idx - ); - break; - } - - let checksum_after_fh_meta = checksum_wal( - &frame_header_slice[0..8], - &header_locked, - cumulative_checksum, - use_native_endian_checksum, - ); - let calculated_frame_checksum = checksum_wal( - page_data_slice, - &header_locked, - checksum_after_fh_meta, - use_native_endian_checksum, - ); - tracing::debug!( - "read_entire_wal_dumb(frame_h_checksum=({}, {}), calculated_frame_checksum=({}, {}))", - frame_h_checksum_1, - frame_h_checksum_2, - calculated_frame_checksum.0, - calculated_frame_checksum.1 - ); - - if calculated_frame_checksum != (frame_h_checksum_1, frame_h_checksum_2) { - tracing::error!( - "WAL frame checksum mismatch. Expected ({}, {}), Got ({}, {}). Ignoring frames starting from frame {}", - frame_h_checksum_1, - frame_h_checksum_2, - calculated_frame_checksum.0, - calculated_frame_checksum.1, - frame_idx - ); - break; - } - - cumulative_checksum = calculated_frame_checksum; - - wfs_data - .frame_cache - .lock() - .entry(frame_h_page_number as u64) - .or_default() - .push(frame_idx); - - let is_commit_record = frame_h_db_size > 0; - if is_commit_record { - wfs_data.max_frame.store(frame_idx, Ordering::SeqCst); - wfs_data.last_checksum = cumulative_checksum; - } - - frame_idx += 1; - current_offset += WAL_FRAME_HEADER_SIZE + page_size; } } + wfs.max_frame + .store(state.last_valid_frame, Ordering::Release); + } - let max_frame = wfs_data.max_frame.load(Ordering::SeqCst); + /// Finalizes the loading process + fn finalize_loading(&self) { + let mut wfs = self.wal_shared.write(); + let st = self.state.read(); - // cleanup in-memory index from tail frames which was written after the last committed frame - let mut frame_cache = wfs_data.frame_cache.lock(); - for (page, frames) in frame_cache.iter_mut() { - // remove any frame IDs > max_frame - let original_len = frames.len(); - frames.retain(|&frame_id| frame_id <= max_frame); - if frames.len() < original_len { - tracing::debug!( - "removed {} frame(s) from page {} from the in-memory WAL index because they were written after the last committed frame {}", - original_len - frames.len(), - page, - max_frame - ); + let max_frame = st.last_valid_frame; + if max_frame > 0 { + let mut frame_cache = wfs.frame_cache.lock(); + for frames in frame_cache.values_mut() { + frames.retain(|&f| f <= max_frame); } + frame_cache.retain(|_, frames| !frames.is_empty()); } - // also remove any pages that now have no frames - frame_cache.retain(|_page, frames| !frames.is_empty()); - wfs_data.nbackfills.store(0, Ordering::SeqCst); - wfs_data.loaded.store(true, Ordering::SeqCst); - if size >= WAL_HEADER_SIZE as u64 { - wfs_data.initialized.store(true, Ordering::SeqCst); + wfs.max_frame.store(max_frame, Ordering::SeqCst); + wfs.last_checksum = st.cumulative_checksum; + if st.header_valid { + wfs.initialized.store(true, Ordering::SeqCst); } - }); - let c = Completion::new_read(buf_for_pread, complete); - let _c = file.pread(0, c)?; + wfs.nbackfills.store(0, Ordering::SeqCst); + wfs.loaded.store(true, Ordering::SeqCst); - Ok(wal_file_shared_ret) + self.done.store(true, Ordering::Release); + tracing::info!( + "WAL loading complete: {} frames processed, last commit at frame {}", + st.frame_idx - 1, + max_frame + ); + } } pub fn begin_read_wal_frame_raw( @@ -1859,7 +1963,6 @@ pub fn begin_read_wal_frame_raw( ) -> Result { tracing::trace!("begin_read_wal_frame_raw(offset={})", offset); let buf = Arc::new(buffer_pool.get_wal_frame()); - #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new_read(buf, complete); let c = io.pread(offset, c)?; Ok(c) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index f932964fb..d21267bbb 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -553,7 +553,6 @@ impl fmt::Debug for OngoingCheckpoint { } } -#[allow(dead_code)] pub struct WalFile { io: Arc, buffer_pool: Arc, @@ -660,7 +659,6 @@ impl fmt::Debug for WalFile { // TODO(pere): lock only important parts + pin WalFileShared /// WalFileShared is the part of a WAL that will be shared between threads. A wal has information /// that needs to be communicated between threads so this struct does the job. -#[allow(dead_code)] pub struct WalFileShared { pub enabled: AtomicBool, pub wal_header: Arc>, @@ -676,7 +674,6 @@ pub struct WalFileShared { pub frame_cache: Arc>>>, pub last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL pub file: Option>, - /// Read locks advertise the maximum WAL frame a reader may access. /// Slot 0 is special, when it is held (shared) the reader bypasses the WAL and uses the main DB file. /// When checkpointing, we must acquire the exclusive read lock 0 to ensure that no readers read @@ -2238,21 +2235,17 @@ impl WalFileShared { path: &str, ) -> Result>> { let file = io.open_file(path, crate::io::OpenFlags::Create, false)?; - if file.size()? > 0 { - let wal_file_shared = sqlite3_ondisk::read_entire_wal_dumb(&file)?; - // TODO: Return a completion instead. - let mut max_loops = 100_000; - while !wal_file_shared.read().loaded.load(Ordering::Acquire) { - io.run_once()?; - max_loops -= 1; - if max_loops == 0 { - panic!("WAL file not loaded"); - } - } - Ok(wal_file_shared) - } else { - WalFileShared::new_noop() + if file.size()? == 0 { + return WalFileShared::new_noop(); } + let wal_file_shared = sqlite3_ondisk::build_shared_wal(&file, io)?; + turso_assert!( + wal_file_shared + .try_read() + .is_some_and(|wfs| wfs.loaded.load(Ordering::Acquire)), + "Unable to read WAL shared state" + ); + Ok(wal_file_shared) } pub fn is_initialized(&self) -> Result {