From 02bebf02a54064640299ef903ba8efb4b6bffc87 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 9 Sep 2025 16:06:27 -0400 Subject: [PATCH] Remove read_entire_wal_dumb in favor of reading chunks --- core/storage/sqlite3_ondisk.rs | 233 +-------------------------------- core/storage/wal.rs | 35 ++--- 2 files changed, 10 insertions(+), 258 deletions(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 0cc2ce5dd..29ab3d6d3 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1622,240 +1622,9 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec) { payload.extend_from_slice(&varint[0..n]); } -/// We need to read the WAL file on open to reconstruct the WAL frame cache. -pub fn read_entire_wal_dumb(file: &Arc) -> Result>> { - let size = file.size()?; - #[allow(clippy::arc_with_non_send_sync)] - let buf_for_pread = Arc::new(Buffer::new_temporary(size as usize)); - let header = Arc::new(SpinLock::new(WalHeader::default())); - let read_locks = std::array::from_fn(|_| TursoRwLock::new()); - for (i, l) in read_locks.iter().enumerate() { - l.write(); - l.set_value_exclusive(if i < 2 { 0 } else { READMARK_NOT_USED }); - l.unlock(); - } - #[allow(clippy::arc_with_non_send_sync)] - let wal_file_shared_ret = Arc::new(RwLock::new(WalFileShared { - enabled: AtomicBool::new(true), - wal_header: header.clone(), - min_frame: AtomicU64::new(0), - max_frame: AtomicU64::new(0), - nbackfills: AtomicU64::new(0), - frame_cache: Arc::new(SpinLock::new(HashMap::new())), - last_checksum: (0, 0), - file: Some(file.clone()), - read_locks, - write_lock: TursoRwLock::new(), - loaded: AtomicBool::new(false), - checkpoint_lock: TursoRwLock::new(), - initialized: AtomicBool::new(false), - })); - let wal_file_shared_for_completion = wal_file_shared_ret.clone(); - let complete: Box = Box::new(move |res: Result<(Arc, i32), _>| { - let Ok((buf, bytes_read)) = res else { - return; - }; - let buf_slice = buf.as_slice(); - turso_assert!( - bytes_read == buf_slice.len() as i32, - "read({bytes_read}) != expected({})", - buf_slice.len() - ); - let mut header_locked = header.lock(); - // Read header - header_locked.magic = - u32::from_be_bytes([buf_slice[0], buf_slice[1], buf_slice[2], buf_slice[3]]); - header_locked.file_format = - u32::from_be_bytes([buf_slice[4], buf_slice[5], buf_slice[6], buf_slice[7]]); - header_locked.page_size = - u32::from_be_bytes([buf_slice[8], buf_slice[9], buf_slice[10], buf_slice[11]]); - header_locked.checkpoint_seq = - u32::from_be_bytes([buf_slice[12], buf_slice[13], buf_slice[14], buf_slice[15]]); - header_locked.salt_1 = - u32::from_be_bytes([buf_slice[16], buf_slice[17], buf_slice[18], buf_slice[19]]); - header_locked.salt_2 = - u32::from_be_bytes([buf_slice[20], buf_slice[21], buf_slice[22], buf_slice[23]]); - header_locked.checksum_1 = - u32::from_be_bytes([buf_slice[24], buf_slice[25], buf_slice[26], buf_slice[27]]); - header_locked.checksum_2 = - u32::from_be_bytes([buf_slice[28], buf_slice[29], buf_slice[30], buf_slice[31]]); - tracing::debug!("read_entire_wal_dumb(header={:?})", *header_locked); - - // Read frames into frame_cache and pages_in_frames - if buf_slice.len() < WAL_HEADER_SIZE { - panic!("WAL file too small for header"); - } - - let use_native_endian_checksum = - cfg!(target_endian = "big") == ((header_locked.magic & 1) != 0); - - let calculated_header_checksum = checksum_wal( - &buf_slice[0..24], - &header_locked, - (0, 0), - use_native_endian_checksum, - ); - - let checksum_header_failed = if calculated_header_checksum - != (header_locked.checksum_1, header_locked.checksum_2) - { - tracing::error!( - "WAL header checksum mismatch. Expected ({}, {}), Got ({}, {}). Ignoring frames starting from frame {}", - header_locked.checksum_1, - header_locked.checksum_2, - calculated_header_checksum.0, - calculated_header_checksum.1, - 0 - - ); - true - } else { - false - }; - - let mut cumulative_checksum = (header_locked.checksum_1, header_locked.checksum_2); - let page_size_u32 = header_locked.page_size; - - if PageSize::new(page_size_u32).is_none() { - panic!("Invalid page size in WAL header: {page_size_u32}"); - } - let page_size = page_size_u32 as usize; - - let mut current_offset = WAL_HEADER_SIZE; - let mut frame_idx = 1_u64; - - let mut wfs_data = wal_file_shared_for_completion.write(); - - if !checksum_header_failed { - while current_offset + WAL_FRAME_HEADER_SIZE + page_size <= buf_slice.len() { - let frame_header_slice = - &buf_slice[current_offset..current_offset + WAL_FRAME_HEADER_SIZE]; - let page_data_slice = &buf_slice[current_offset + WAL_FRAME_HEADER_SIZE - ..current_offset + WAL_FRAME_HEADER_SIZE + page_size]; - - let frame_h_page_number = - u32::from_be_bytes(frame_header_slice[0..4].try_into().unwrap()); - let frame_h_db_size = - u32::from_be_bytes(frame_header_slice[4..8].try_into().unwrap()); - let frame_h_salt_1 = - u32::from_be_bytes(frame_header_slice[8..12].try_into().unwrap()); - let frame_h_salt_2 = - u32::from_be_bytes(frame_header_slice[12..16].try_into().unwrap()); - let frame_h_checksum_1 = - u32::from_be_bytes(frame_header_slice[16..20].try_into().unwrap()); - let frame_h_checksum_2 = - u32::from_be_bytes(frame_header_slice[20..24].try_into().unwrap()); - - if frame_h_page_number == 0 { - tracing::trace!( - "WAL frame with page number 0. Ignoring frames starting from frame {}", - frame_idx - ); - break; - } - // It contains more frames with mismatched SALT values, which means they're leftovers from previous checkpoints - if frame_h_salt_1 != header_locked.salt_1 || frame_h_salt_2 != header_locked.salt_2 - { - tracing::trace!( - "WAL frame salt mismatch: expected ({}, {}), got ({}, {}). Ignoring frames starting from frame {}", - header_locked.salt_1, - header_locked.salt_2, - frame_h_salt_1, - frame_h_salt_2, - frame_idx - ); - break; - } - - let checksum_after_fh_meta = checksum_wal( - &frame_header_slice[0..8], - &header_locked, - cumulative_checksum, - use_native_endian_checksum, - ); - let calculated_frame_checksum = checksum_wal( - page_data_slice, - &header_locked, - checksum_after_fh_meta, - use_native_endian_checksum, - ); - tracing::debug!( - "read_entire_wal_dumb(frame_h_checksum=({}, {}), calculated_frame_checksum=({}, {}))", - frame_h_checksum_1, - frame_h_checksum_2, - calculated_frame_checksum.0, - calculated_frame_checksum.1 - ); - - if calculated_frame_checksum != (frame_h_checksum_1, frame_h_checksum_2) { - tracing::error!( - "WAL frame checksum mismatch. Expected ({}, {}), Got ({}, {}). Ignoring frames starting from frame {}", - frame_h_checksum_1, - frame_h_checksum_2, - calculated_frame_checksum.0, - calculated_frame_checksum.1, - frame_idx - ); - break; - } - - cumulative_checksum = calculated_frame_checksum; - - wfs_data - .frame_cache - .lock() - .entry(frame_h_page_number as u64) - .or_default() - .push(frame_idx); - - let is_commit_record = frame_h_db_size > 0; - if is_commit_record { - wfs_data.max_frame.store(frame_idx, Ordering::SeqCst); - wfs_data.last_checksum = cumulative_checksum; - } - - frame_idx += 1; - current_offset += WAL_FRAME_HEADER_SIZE + page_size; - } - } - - let max_frame = wfs_data.max_frame.load(Ordering::SeqCst); - - // cleanup in-memory index from tail frames which was written after the last committed frame - let mut frame_cache = wfs_data.frame_cache.lock(); - for (page, frames) in frame_cache.iter_mut() { - // remove any frame IDs > max_frame - let original_len = frames.len(); - frames.retain(|&frame_id| frame_id <= max_frame); - if frames.len() < original_len { - tracing::debug!( - "removed {} frame(s) from page {} from the in-memory WAL index because they were written after the last committed frame {}", - original_len - frames.len(), - page, - max_frame - ); - } - } - // also remove any pages that now have no frames - frame_cache.retain(|_page, frames| !frames.is_empty()); - - wfs_data.nbackfills.store(0, Ordering::SeqCst); - wfs_data.loaded.store(true, Ordering::SeqCst); - if size >= WAL_HEADER_SIZE as u64 { - wfs_data.initialized.store(true, Ordering::SeqCst); - } - }); - let c = Completion::new_read(buf_for_pread, complete); - let _c = file.pread(0, c)?; - - Ok(wal_file_shared_ret) -} - -/// Reads a WAL file in streaming fashion to avoid OOM on large files. -/// /// Stream through frames in chunks, building frame_cache incrementally /// Track last valid commit frame for consistency -pub fn read_wal_streaming( +pub fn build_shared_wal( file: &Arc, io: &Arc, ) -> Result>> { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 940c4ce96..323f50e29 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -2231,39 +2231,22 @@ impl WalFile { } } -/// 32MB maximum WAL file size to read whole file into one buffer -const WAL_SIZE_LIMIT: u64 = 1024 * 1024 * 32; - impl WalFileShared { pub fn open_shared_if_exists( io: &Arc, path: &str, ) -> Result>> { let file = io.open_file(path, crate::io::OpenFlags::Create, false)?; - let wal_file_shared = match file.size()? { - 0 => return WalFileShared::new_noop(), - n if n <= WAL_SIZE_LIMIT => sqlite3_ondisk::read_entire_wal_dumb(&file)?, - _ => { - tracing::info!( - "WAL file is large (>{WAL_SIZE_LIMIT} bytes), using streaming reader" - ); - sqlite3_ondisk::read_wal_streaming(&file, io)? - } - }; - - let mut remaining: usize = 100_000; - loop { - io.run_once()?; - if wal_file_shared - .try_read() - .is_some_and(|wfs| wfs.loaded.load(Ordering::Acquire)) - { - break; - } - remaining = remaining.checked_sub(1).ok_or_else(|| { - LimboError::InternalError("Timed out waiting for WAL to load".into()) - })?; + if file.size()? == 0 { + return WalFileShared::new_noop(); } + let wal_file_shared = sqlite3_ondisk::build_shared_wal(&file, io)?; + turso_assert!( + wal_file_shared + .try_read() + .is_some_and(|wfs| wfs.loaded.load(Ordering::Acquire)), + "Unable to read WAL shared state" + ); Ok(wal_file_shared) }