Reconstruct WAL frame cache when WAL is opened

Currently we are simply unable to read any WAL frames from disk
once a fresh process w/ Limbo is opened, since we never try to read
anything from disk unless we already have it in our in-memory
frame cache.

This commit implements a crude way of reading entire WAL into memory
as a single buffer and reconstructing the frame cache.
This commit is contained in:
Jussi Saurio
2025-05-24 18:09:01 +03:00
parent 02e7726249
commit fc45e0ec0d
3 changed files with 209 additions and 52 deletions

View File

@@ -51,6 +51,10 @@ impl<T> SpinLock<T> {
}
SpinLockGuard { lock: self }
}
pub fn into_inner(self) -> UnsafeCell<T> {
self.value
}
}
#[cfg(test)]

View File

@@ -43,22 +43,25 @@
use crate::error::LimboError;
use crate::fast_lock::SpinLock;
use crate::io::{Buffer, Completion, ReadCompletion, SyncCompletion, WriteCompletion};
use crate::io::{Buffer, Complete, Completion, ReadCompletion, SyncCompletion, WriteCompletion};
use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::DatabaseStorage;
use crate::storage::pager::Pager;
use crate::types::{
ImmutableRecord, RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype,
};
use crate::{File, Result};
use std::cell::RefCell;
use crate::{File, Result, WalFileShared};
use std::cell::{RefCell, UnsafeCell};
use std::collections::HashMap;
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use tracing::trace;
use super::pager::PageRef;
use super::wal::LimboRwLock;
/// The size of the database header in bytes.
pub const DATABASE_HEADER_SIZE: usize = 100;
@@ -1354,37 +1357,171 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec<u8>) {
payload.extend_from_slice(&varint[0..n]);
}
pub fn begin_read_wal_header(io: &Arc<dyn File>) -> Result<Arc<SpinLock<WalHeader>>> {
/// We need to read the WAL file on open to reconstruct the WAL frame cache.
pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFileShared>>> {
let drop_fn = Rc::new(|_buf| {});
let size = file.size()?;
#[allow(clippy::arc_with_non_send_sync)]
let buf = Arc::new(RefCell::new(Buffer::allocate(512, drop_fn)));
let result = Arc::new(SpinLock::new(WalHeader::default()));
let header = result.clone();
let complete = Box::new(move |buf: Arc<RefCell<Buffer>>| {
let header = header.clone();
finish_read_wal_header(buf, header).unwrap();
});
let c = Completion::Read(ReadCompletion::new(buf, complete));
io.pread(0, c)?;
Ok(result)
}
let buf_for_pread = Arc::new(RefCell::new(Buffer::allocate(size as usize, drop_fn)));
let header = Arc::new(SpinLock::new(WalHeader::default()));
#[allow(clippy::arc_with_non_send_sync)]
let wal_file_shared_ret = Arc::new(UnsafeCell::new(WalFileShared {
wal_header: header.clone(),
min_frame: AtomicU64::new(0),
max_frame: AtomicU64::new(0),
nbackfills: AtomicU64::new(0),
frame_cache: Arc::new(SpinLock::new(HashMap::new())),
pages_in_frames: Arc::new(SpinLock::new(Vec::new())),
last_checksum: (0, 0),
file: file.clone(),
read_locks: [
LimboRwLock::new(),
LimboRwLock::new(),
LimboRwLock::new(),
LimboRwLock::new(),
LimboRwLock::new(),
],
write_lock: LimboRwLock::new(),
loaded: AtomicBool::new(false),
}));
let wal_file_shared_for_completion = wal_file_shared_ret.clone();
fn finish_read_wal_header(
buf: Arc<RefCell<Buffer>>,
header: Arc<SpinLock<WalHeader>>,
) -> Result<()> {
let buf = buf.borrow();
let buf = buf.as_slice();
let mut header = header.lock();
header.magic = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
header.file_format = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
header.page_size = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
header.checkpoint_seq = u32::from_be_bytes([buf[12], buf[13], buf[14], buf[15]]);
header.salt_1 = u32::from_be_bytes([buf[16], buf[17], buf[18], buf[19]]);
header.salt_2 = u32::from_be_bytes([buf[20], buf[21], buf[22], buf[23]]);
header.checksum_1 = u32::from_be_bytes([buf[24], buf[25], buf[26], buf[27]]);
header.checksum_2 = u32::from_be_bytes([buf[28], buf[29], buf[30], buf[31]]);
Ok(())
let complete: Box<Complete> = Box::new(move |buf: Arc<RefCell<Buffer>>| {
let buf = buf.borrow();
let buf_slice = buf.as_slice();
let mut header_locked = header.lock();
// Read header
header_locked.magic =
u32::from_be_bytes([buf_slice[0], buf_slice[1], buf_slice[2], buf_slice[3]]);
header_locked.file_format =
u32::from_be_bytes([buf_slice[4], buf_slice[5], buf_slice[6], buf_slice[7]]);
header_locked.page_size =
u32::from_be_bytes([buf_slice[8], buf_slice[9], buf_slice[10], buf_slice[11]]);
header_locked.checkpoint_seq =
u32::from_be_bytes([buf_slice[12], buf_slice[13], buf_slice[14], buf_slice[15]]);
header_locked.salt_1 =
u32::from_be_bytes([buf_slice[16], buf_slice[17], buf_slice[18], buf_slice[19]]);
header_locked.salt_2 =
u32::from_be_bytes([buf_slice[20], buf_slice[21], buf_slice[22], buf_slice[23]]);
header_locked.checksum_1 =
u32::from_be_bytes([buf_slice[24], buf_slice[25], buf_slice[26], buf_slice[27]]);
header_locked.checksum_2 =
u32::from_be_bytes([buf_slice[28], buf_slice[29], buf_slice[30], buf_slice[31]]);
// Read frames into frame_cache and pages_in_frames
if buf_slice.len() < WAL_HEADER_SIZE {
panic!("WAL file too small for header");
}
let use_native_endian_checksum =
cfg!(target_endian = "big") == ((header_locked.magic & 1) != 0);
let calculated_header_checksum = checksum_wal(
&buf_slice[0..24],
&*header_locked,
(0, 0),
use_native_endian_checksum,
);
if calculated_header_checksum != (header_locked.checksum_1, header_locked.checksum_2) {
panic!(
"WAL header checksum mismatch. Expected ({}, {}), Got ({}, {})",
header_locked.checksum_1,
header_locked.checksum_2,
calculated_header_checksum.0,
calculated_header_checksum.1
);
}
let mut cumulative_checksum = (header_locked.checksum_1, header_locked.checksum_2);
let page_size_u32 = header_locked.page_size;
if page_size_u32 < MIN_PAGE_SIZE
|| page_size_u32 > MAX_PAGE_SIZE
|| page_size_u32.count_ones() != 1
{
panic!("Invalid page size in WAL header: {}", page_size_u32);
}
let page_size = page_size_u32 as usize;
let mut current_offset = WAL_HEADER_SIZE;
let mut frame_idx = 1_u64;
let wfs_data = unsafe { &mut *wal_file_shared_for_completion.get() };
while current_offset + WAL_FRAME_HEADER_SIZE <= buf_slice.len()
&& current_offset + WAL_FRAME_HEADER_SIZE + page_size <= buf_slice.len()
{
let frame_header_slice =
&buf_slice[current_offset..current_offset + WAL_FRAME_HEADER_SIZE];
let page_data_slice = &buf_slice[current_offset + WAL_FRAME_HEADER_SIZE
..current_offset + WAL_FRAME_HEADER_SIZE + page_size];
let frame_h_page_number =
u32::from_be_bytes(frame_header_slice[0..4].try_into().unwrap());
let _frame_h_db_size = u32::from_be_bytes(frame_header_slice[4..8].try_into().unwrap());
let frame_h_salt_1 = u32::from_be_bytes(frame_header_slice[8..12].try_into().unwrap());
let frame_h_salt_2 = u32::from_be_bytes(frame_header_slice[12..16].try_into().unwrap());
let frame_h_checksum_1 =
u32::from_be_bytes(frame_header_slice[16..20].try_into().unwrap());
let frame_h_checksum_2 =
u32::from_be_bytes(frame_header_slice[20..24].try_into().unwrap());
if frame_h_salt_1 != header_locked.salt_1 || frame_h_salt_2 != header_locked.salt_2 {
panic!(
"WAL frame salt mismatch. Expected ({}, {}), Got ({}, {})",
header_locked.salt_1, header_locked.salt_2, frame_h_salt_1, frame_h_salt_2
);
}
let checksum_after_fh_meta = checksum_wal(
&frame_header_slice[0..8],
&*header_locked,
cumulative_checksum,
use_native_endian_checksum,
);
let calculated_frame_checksum = checksum_wal(
page_data_slice,
&*header_locked,
checksum_after_fh_meta,
use_native_endian_checksum,
);
if calculated_frame_checksum != (frame_h_checksum_1, frame_h_checksum_2) {
panic!(
"WAL frame checksum mismatch. Expected ({}, {}), Got ({}, {})",
frame_h_checksum_1,
frame_h_checksum_2,
calculated_frame_checksum.0,
calculated_frame_checksum.1
);
}
cumulative_checksum = calculated_frame_checksum;
wfs_data
.frame_cache
.lock()
.entry(frame_h_page_number as u64)
.or_default()
.push(frame_idx);
wfs_data
.pages_in_frames
.lock()
.push(frame_h_page_number as u64);
frame_idx += 1;
current_offset += WAL_FRAME_HEADER_SIZE + page_size;
}
wfs_data.max_frame.store(frame_idx, Ordering::SeqCst);
wfs_data.last_checksum = cumulative_checksum;
wfs_data.loaded.store(true, Ordering::SeqCst);
});
let c = Completion::Read(ReadCompletion::new(buf_for_pread, complete));
file.pread(0, c)?;
Ok(wal_file_shared_ret)
}
pub fn begin_read_wal_frame(
@@ -1463,7 +1600,7 @@ pub fn begin_write_wal_frame(
let expects_be = wal_header.magic & 1;
let use_native_endian = cfg!(target_endian = "big") as u32 == expects_be;
let header_checksum = checksum_wal(&buf[0..8], wal_header, checksums, use_native_endian); // Only 8 bytes
let header_checksum = checksum_wal(&buf[0..8], wal_header, checksums, use_native_endian);
let final_checksum = checksum_wal(
&buf[WAL_FRAME_HEADER_SIZE..WAL_FRAME_HEADER_SIZE + page_size as usize],
wal_header,

View File

@@ -3,7 +3,7 @@ use std::collections::HashMap;
use tracing::{debug, trace};
use std::fmt::Formatter;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::{cell::RefCell, fmt, rc::Rc, sync::Arc};
use crate::fast_lock::SpinLock;
@@ -12,7 +12,7 @@ use crate::result::LimboResult;
use crate::storage::sqlite3_ondisk::{
begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
};
use crate::{Buffer, LimboError, Result};
use crate::{Buffer, Result};
use crate::{Completion, Page};
use self::sqlite3_ondisk::{checksum_wal, PageContent, WAL_MAGIC_BE, WAL_MAGIC_LE};
@@ -59,13 +59,21 @@ pub enum CheckpointMode {
}
#[derive(Debug)]
struct LimboRwLock {
pub struct LimboRwLock {
lock: AtomicU32,
nreads: AtomicU32,
value: AtomicU32,
}
impl LimboRwLock {
pub fn new() -> Self {
Self {
lock: AtomicU32::new(NO_LOCK),
nreads: AtomicU32::new(0),
value: AtomicU32::new(READMARK_NOT_USED),
}
}
/// Shared lock. Returns true if it was successful, false if it couldn't lock it
pub fn read(&mut self) -> bool {
let lock = self.lock.load(Ordering::SeqCst);
@@ -283,29 +291,30 @@ impl fmt::Debug for WalFile {
/// that needs to be communicated between threads so this struct does the job.
#[allow(dead_code)]
pub struct WalFileShared {
wal_header: Arc<SpinLock<WalHeader>>,
min_frame: AtomicU64,
max_frame: AtomicU64,
nbackfills: AtomicU64,
pub wal_header: Arc<SpinLock<WalHeader>>,
pub min_frame: AtomicU64,
pub max_frame: AtomicU64,
pub nbackfills: AtomicU64,
// Frame cache maps a Page to all the frames it has stored in WAL in ascending order.
// This is to easily find the frame it must checkpoint each connection if a checkpoint is
// necessary.
// One difference between SQLite and limbo is that we will never support multi process, meaning
// we don't need WAL's index file. So we can do stuff like this without shared memory.
// TODO: this will need refactoring because this is incredible memory inefficient.
frame_cache: Arc<SpinLock<HashMap<u64, Vec<u64>>>>,
pub frame_cache: Arc<SpinLock<HashMap<u64, Vec<u64>>>>,
// Another memory inefficient array made to just keep track of pages that are in frame_cache.
pages_in_frames: Arc<SpinLock<Vec<u64>>>,
last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL
file: Arc<dyn File>,
pub pages_in_frames: Arc<SpinLock<Vec<u64>>>,
pub last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL
pub file: Arc<dyn File>,
/// read_locks is a list of read locks that can coexist with the max_frame number stored in
/// value. There is a limited amount because and unbounded amount of connections could be
/// fatal. Therefore, for now we copy how SQLite behaves with limited amounts of read max
/// frames that is equal to 5
read_locks: [LimboRwLock; 5],
pub read_locks: [LimboRwLock; 5],
/// There is only one write allowed in WAL mode. This lock takes care of ensuring there is only
/// one used.
write_lock: LimboRwLock,
pub write_lock: LimboRwLock,
pub loaded: AtomicBool,
}
impl fmt::Debug for WalFileShared {
@@ -747,14 +756,20 @@ impl WalFileShared {
) -> Result<Arc<UnsafeCell<WalFileShared>>> {
let file = io.open_file(path, crate::io::OpenFlags::Create, false)?;
let header = if file.size()? > 0 {
let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) {
Ok(header) => header,
Err(err) => return Err(LimboError::ParseError(err.to_string())),
};
tracing::info!("recover not implemented yet");
let wal_file_shared = sqlite3_ondisk::read_entire_wal_dumb(&file)?;
// TODO: Return a completion instead.
io.run_once()?;
wal_header
let mut max_loops = 1000;
while !unsafe { &*wal_file_shared.get() }
.loaded
.load(Ordering::SeqCst)
{
io.run_once()?;
max_loops -= 1;
if max_loops == 0 {
panic!("WAL file not loaded");
}
}
return Ok(wal_file_shared);
} else {
let magic = if cfg!(target_endian = "big") {
WAL_MAGIC_BE
@@ -832,6 +847,7 @@ impl WalFileShared {
nreads: AtomicU32::new(0),
value: AtomicU32::new(READMARK_NOT_USED),
},
loaded: AtomicBool::new(true),
};
Ok(Arc::new(UnsafeCell::new(shared)))
}