From fc45e0ec0d198ec324c6dcc7261e9df8022d62ba Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Sat, 24 May 2025 18:09:01 +0300 Subject: [PATCH] Reconstruct WAL frame cache when WAL is opened Currently we are simply unable to read any WAL frames from disk once a fresh process w/ Limbo is opened, since we never try to read anything from disk unless we already have it in our in-memory frame cache. This commit implements a crude way of reading entire WAL into memory as a single buffer and reconstructing the frame cache. --- core/fast_lock.rs | 4 + core/storage/sqlite3_ondisk.rs | 201 +++++++++++++++++++++++++++------ core/storage/wal.rs | 56 +++++---- 3 files changed, 209 insertions(+), 52 deletions(-) diff --git a/core/fast_lock.rs b/core/fast_lock.rs index 33933e772..55d766222 100644 --- a/core/fast_lock.rs +++ b/core/fast_lock.rs @@ -51,6 +51,10 @@ impl SpinLock { } SpinLockGuard { lock: self } } + + pub fn into_inner(self) -> UnsafeCell { + self.value + } } #[cfg(test)] diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 06a9569ab..af1f782d4 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -43,22 +43,25 @@ use crate::error::LimboError; use crate::fast_lock::SpinLock; -use crate::io::{Buffer, Completion, ReadCompletion, SyncCompletion, WriteCompletion}; +use crate::io::{Buffer, Complete, Completion, ReadCompletion, SyncCompletion, WriteCompletion}; use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::pager::Pager; use crate::types::{ ImmutableRecord, RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype, }; -use crate::{File, Result}; -use std::cell::RefCell; +use crate::{File, Result, WalFileShared}; +use std::cell::{RefCell, UnsafeCell}; +use std::collections::HashMap; use std::mem::MaybeUninit; use std::pin::Pin; use std::rc::Rc; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use tracing::trace; use super::pager::PageRef; +use super::wal::LimboRwLock; /// The size of the database header in bytes. pub const DATABASE_HEADER_SIZE: usize = 100; @@ -1354,37 +1357,171 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec) { payload.extend_from_slice(&varint[0..n]); } -pub fn begin_read_wal_header(io: &Arc) -> Result>> { +/// We need to read the WAL file on open to reconstruct the WAL frame cache. +pub fn read_entire_wal_dumb(file: &Arc) -> Result>> { let drop_fn = Rc::new(|_buf| {}); + let size = file.size()?; #[allow(clippy::arc_with_non_send_sync)] - let buf = Arc::new(RefCell::new(Buffer::allocate(512, drop_fn))); - let result = Arc::new(SpinLock::new(WalHeader::default())); - let header = result.clone(); - let complete = Box::new(move |buf: Arc>| { - let header = header.clone(); - finish_read_wal_header(buf, header).unwrap(); - }); - let c = Completion::Read(ReadCompletion::new(buf, complete)); - io.pread(0, c)?; - Ok(result) -} + let buf_for_pread = Arc::new(RefCell::new(Buffer::allocate(size as usize, drop_fn))); + let header = Arc::new(SpinLock::new(WalHeader::default())); + #[allow(clippy::arc_with_non_send_sync)] + let wal_file_shared_ret = Arc::new(UnsafeCell::new(WalFileShared { + wal_header: header.clone(), + min_frame: AtomicU64::new(0), + max_frame: AtomicU64::new(0), + nbackfills: AtomicU64::new(0), + frame_cache: Arc::new(SpinLock::new(HashMap::new())), + pages_in_frames: Arc::new(SpinLock::new(Vec::new())), + last_checksum: (0, 0), + file: file.clone(), + read_locks: [ + LimboRwLock::new(), + LimboRwLock::new(), + LimboRwLock::new(), + LimboRwLock::new(), + LimboRwLock::new(), + ], + write_lock: LimboRwLock::new(), + loaded: AtomicBool::new(false), + })); + let wal_file_shared_for_completion = wal_file_shared_ret.clone(); -fn finish_read_wal_header( - buf: Arc>, - header: Arc>, -) -> Result<()> { - let buf = buf.borrow(); - let buf = buf.as_slice(); - let mut header = header.lock(); - header.magic = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]); - header.file_format = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]); - header.page_size = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]); - header.checkpoint_seq = u32::from_be_bytes([buf[12], buf[13], buf[14], buf[15]]); - header.salt_1 = u32::from_be_bytes([buf[16], buf[17], buf[18], buf[19]]); - header.salt_2 = u32::from_be_bytes([buf[20], buf[21], buf[22], buf[23]]); - header.checksum_1 = u32::from_be_bytes([buf[24], buf[25], buf[26], buf[27]]); - header.checksum_2 = u32::from_be_bytes([buf[28], buf[29], buf[30], buf[31]]); - Ok(()) + let complete: Box = Box::new(move |buf: Arc>| { + let buf = buf.borrow(); + let buf_slice = buf.as_slice(); + let mut header_locked = header.lock(); + // Read header + header_locked.magic = + u32::from_be_bytes([buf_slice[0], buf_slice[1], buf_slice[2], buf_slice[3]]); + header_locked.file_format = + u32::from_be_bytes([buf_slice[4], buf_slice[5], buf_slice[6], buf_slice[7]]); + header_locked.page_size = + u32::from_be_bytes([buf_slice[8], buf_slice[9], buf_slice[10], buf_slice[11]]); + header_locked.checkpoint_seq = + u32::from_be_bytes([buf_slice[12], buf_slice[13], buf_slice[14], buf_slice[15]]); + header_locked.salt_1 = + u32::from_be_bytes([buf_slice[16], buf_slice[17], buf_slice[18], buf_slice[19]]); + header_locked.salt_2 = + u32::from_be_bytes([buf_slice[20], buf_slice[21], buf_slice[22], buf_slice[23]]); + header_locked.checksum_1 = + u32::from_be_bytes([buf_slice[24], buf_slice[25], buf_slice[26], buf_slice[27]]); + header_locked.checksum_2 = + u32::from_be_bytes([buf_slice[28], buf_slice[29], buf_slice[30], buf_slice[31]]); + + // Read frames into frame_cache and pages_in_frames + if buf_slice.len() < WAL_HEADER_SIZE { + panic!("WAL file too small for header"); + } + + let use_native_endian_checksum = + cfg!(target_endian = "big") == ((header_locked.magic & 1) != 0); + + let calculated_header_checksum = checksum_wal( + &buf_slice[0..24], + &*header_locked, + (0, 0), + use_native_endian_checksum, + ); + + if calculated_header_checksum != (header_locked.checksum_1, header_locked.checksum_2) { + panic!( + "WAL header checksum mismatch. Expected ({}, {}), Got ({}, {})", + header_locked.checksum_1, + header_locked.checksum_2, + calculated_header_checksum.0, + calculated_header_checksum.1 + ); + } + + let mut cumulative_checksum = (header_locked.checksum_1, header_locked.checksum_2); + let page_size_u32 = header_locked.page_size; + + if page_size_u32 < MIN_PAGE_SIZE + || page_size_u32 > MAX_PAGE_SIZE + || page_size_u32.count_ones() != 1 + { + panic!("Invalid page size in WAL header: {}", page_size_u32); + } + let page_size = page_size_u32 as usize; + + let mut current_offset = WAL_HEADER_SIZE; + let mut frame_idx = 1_u64; + + let wfs_data = unsafe { &mut *wal_file_shared_for_completion.get() }; + + while current_offset + WAL_FRAME_HEADER_SIZE <= buf_slice.len() + && current_offset + WAL_FRAME_HEADER_SIZE + page_size <= buf_slice.len() + { + let frame_header_slice = + &buf_slice[current_offset..current_offset + WAL_FRAME_HEADER_SIZE]; + let page_data_slice = &buf_slice[current_offset + WAL_FRAME_HEADER_SIZE + ..current_offset + WAL_FRAME_HEADER_SIZE + page_size]; + + let frame_h_page_number = + u32::from_be_bytes(frame_header_slice[0..4].try_into().unwrap()); + let _frame_h_db_size = u32::from_be_bytes(frame_header_slice[4..8].try_into().unwrap()); + let frame_h_salt_1 = u32::from_be_bytes(frame_header_slice[8..12].try_into().unwrap()); + let frame_h_salt_2 = u32::from_be_bytes(frame_header_slice[12..16].try_into().unwrap()); + let frame_h_checksum_1 = + u32::from_be_bytes(frame_header_slice[16..20].try_into().unwrap()); + let frame_h_checksum_2 = + u32::from_be_bytes(frame_header_slice[20..24].try_into().unwrap()); + + if frame_h_salt_1 != header_locked.salt_1 || frame_h_salt_2 != header_locked.salt_2 { + panic!( + "WAL frame salt mismatch. Expected ({}, {}), Got ({}, {})", + header_locked.salt_1, header_locked.salt_2, frame_h_salt_1, frame_h_salt_2 + ); + } + + let checksum_after_fh_meta = checksum_wal( + &frame_header_slice[0..8], + &*header_locked, + cumulative_checksum, + use_native_endian_checksum, + ); + let calculated_frame_checksum = checksum_wal( + page_data_slice, + &*header_locked, + checksum_after_fh_meta, + use_native_endian_checksum, + ); + + if calculated_frame_checksum != (frame_h_checksum_1, frame_h_checksum_2) { + panic!( + "WAL frame checksum mismatch. Expected ({}, {}), Got ({}, {})", + frame_h_checksum_1, + frame_h_checksum_2, + calculated_frame_checksum.0, + calculated_frame_checksum.1 + ); + } + + cumulative_checksum = calculated_frame_checksum; + + wfs_data + .frame_cache + .lock() + .entry(frame_h_page_number as u64) + .or_default() + .push(frame_idx); + wfs_data + .pages_in_frames + .lock() + .push(frame_h_page_number as u64); + + frame_idx += 1; + current_offset += WAL_FRAME_HEADER_SIZE + page_size; + } + + wfs_data.max_frame.store(frame_idx, Ordering::SeqCst); + wfs_data.last_checksum = cumulative_checksum; + wfs_data.loaded.store(true, Ordering::SeqCst); + }); + let c = Completion::Read(ReadCompletion::new(buf_for_pread, complete)); + file.pread(0, c)?; + + Ok(wal_file_shared_ret) } pub fn begin_read_wal_frame( @@ -1463,7 +1600,7 @@ pub fn begin_write_wal_frame( let expects_be = wal_header.magic & 1; let use_native_endian = cfg!(target_endian = "big") as u32 == expects_be; - let header_checksum = checksum_wal(&buf[0..8], wal_header, checksums, use_native_endian); // Only 8 bytes + let header_checksum = checksum_wal(&buf[0..8], wal_header, checksums, use_native_endian); let final_checksum = checksum_wal( &buf[WAL_FRAME_HEADER_SIZE..WAL_FRAME_HEADER_SIZE + page_size as usize], wal_header, diff --git a/core/storage/wal.rs b/core/storage/wal.rs index fd41af51b..e5ae80110 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use tracing::{debug, trace}; use std::fmt::Formatter; -use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; use std::{cell::RefCell, fmt, rc::Rc, sync::Arc}; use crate::fast_lock::SpinLock; @@ -12,7 +12,7 @@ use crate::result::LimboResult; use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; -use crate::{Buffer, LimboError, Result}; +use crate::{Buffer, Result}; use crate::{Completion, Page}; use self::sqlite3_ondisk::{checksum_wal, PageContent, WAL_MAGIC_BE, WAL_MAGIC_LE}; @@ -59,13 +59,21 @@ pub enum CheckpointMode { } #[derive(Debug)] -struct LimboRwLock { +pub struct LimboRwLock { lock: AtomicU32, nreads: AtomicU32, value: AtomicU32, } impl LimboRwLock { + pub fn new() -> Self { + Self { + lock: AtomicU32::new(NO_LOCK), + nreads: AtomicU32::new(0), + value: AtomicU32::new(READMARK_NOT_USED), + } + } + /// Shared lock. Returns true if it was successful, false if it couldn't lock it pub fn read(&mut self) -> bool { let lock = self.lock.load(Ordering::SeqCst); @@ -283,29 +291,30 @@ impl fmt::Debug for WalFile { /// that needs to be communicated between threads so this struct does the job. #[allow(dead_code)] pub struct WalFileShared { - wal_header: Arc>, - min_frame: AtomicU64, - max_frame: AtomicU64, - nbackfills: AtomicU64, + pub wal_header: Arc>, + pub min_frame: AtomicU64, + pub max_frame: AtomicU64, + pub nbackfills: AtomicU64, // Frame cache maps a Page to all the frames it has stored in WAL in ascending order. // This is to easily find the frame it must checkpoint each connection if a checkpoint is // necessary. // One difference between SQLite and limbo is that we will never support multi process, meaning // we don't need WAL's index file. So we can do stuff like this without shared memory. // TODO: this will need refactoring because this is incredible memory inefficient. - frame_cache: Arc>>>, + pub frame_cache: Arc>>>, // Another memory inefficient array made to just keep track of pages that are in frame_cache. - pages_in_frames: Arc>>, - last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL - file: Arc, + pub pages_in_frames: Arc>>, + pub last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL + pub file: Arc, /// read_locks is a list of read locks that can coexist with the max_frame number stored in /// value. There is a limited amount because and unbounded amount of connections could be /// fatal. Therefore, for now we copy how SQLite behaves with limited amounts of read max /// frames that is equal to 5 - read_locks: [LimboRwLock; 5], + pub read_locks: [LimboRwLock; 5], /// There is only one write allowed in WAL mode. This lock takes care of ensuring there is only /// one used. - write_lock: LimboRwLock, + pub write_lock: LimboRwLock, + pub loaded: AtomicBool, } impl fmt::Debug for WalFileShared { @@ -747,14 +756,20 @@ impl WalFileShared { ) -> Result>> { let file = io.open_file(path, crate::io::OpenFlags::Create, false)?; let header = if file.size()? > 0 { - let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) { - Ok(header) => header, - Err(err) => return Err(LimboError::ParseError(err.to_string())), - }; - tracing::info!("recover not implemented yet"); + let wal_file_shared = sqlite3_ondisk::read_entire_wal_dumb(&file)?; // TODO: Return a completion instead. - io.run_once()?; - wal_header + let mut max_loops = 1000; + while !unsafe { &*wal_file_shared.get() } + .loaded + .load(Ordering::SeqCst) + { + io.run_once()?; + max_loops -= 1; + if max_loops == 0 { + panic!("WAL file not loaded"); + } + } + return Ok(wal_file_shared); } else { let magic = if cfg!(target_endian = "big") { WAL_MAGIC_BE @@ -832,6 +847,7 @@ impl WalFileShared { nreads: AtomicU32::new(0), value: AtomicU32::new(READMARK_NOT_USED), }, + loaded: AtomicBool::new(true), }; Ok(Arc::new(UnsafeCell::new(shared))) }