Fix read_entire_wal_dumb to prefer streaming read if over 32mb wal file

This commit is contained in:
PThorpe92
2025-09-09 13:06:01 -04:00
parent cad8be1ed6
commit 37ec77eec2
2 changed files with 362 additions and 14 deletions

View File

@@ -1851,6 +1851,342 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<RwLock<WalFileSh
Ok(wal_file_shared_ret)
}
/// Reads a WAL file in streaming fashion to avoid OOM on large files.
///
/// Stream through frames in chunks, building frame_cache incrementally
/// Track last valid commit frame for consistency
pub fn read_wal_streaming(
file: &Arc<dyn File>,
io: &Arc<dyn crate::IO>,
) -> Result<Arc<RwLock<WalFileShared>>> {
let size = file.size()?;
let header = Arc::new(SpinLock::new(WalHeader::default()));
let read_locks = std::array::from_fn(|_| TursoRwLock::new());
for (i, l) in read_locks.iter().enumerate() {
l.write();
l.set_value_exclusive(if i < 2 { 0 } else { READMARK_NOT_USED });
l.unlock();
}
#[allow(clippy::arc_with_non_send_sync)]
let wal_file_shared = Arc::new(RwLock::new(WalFileShared {
enabled: AtomicBool::new(true),
wal_header: header.clone(),
min_frame: AtomicU64::new(0),
max_frame: AtomicU64::new(0),
nbackfills: AtomicU64::new(0),
frame_cache: Arc::new(SpinLock::new(HashMap::new())),
last_checksum: (0, 0),
file: Some(file.clone()),
read_locks,
write_lock: TursoRwLock::new(),
loaded: AtomicBool::new(false),
checkpoint_lock: TursoRwLock::new(),
initialized: AtomicBool::new(false),
}));
if size < WAL_HEADER_SIZE as u64 {
wal_file_shared.write().loaded.store(true, Ordering::SeqCst);
return Ok(wal_file_shared);
}
let reader = Arc::new(StreamingWalReader::new(
file.clone(),
wal_file_shared.clone(),
header.clone(),
size,
));
let h = reader.clone().read_header()?;
io.wait_for_completion(h)?;
loop {
if reader.done.load(Ordering::Acquire) {
break;
}
let offset = reader.off_atomic.load(Ordering::Acquire);
if offset >= size {
reader.finalize_loading();
break;
}
let (_read_size, c) = reader.clone().submit_one_chunk(offset)?;
io.wait_for_completion(c)?;
let new_off = reader.off_atomic.load(Ordering::Acquire);
if new_off <= offset {
reader.finalize_loading();
break;
}
}
Ok(wal_file_shared)
}
pub(super) struct StreamingWalReader {
file: Arc<dyn File>,
wal_shared: Arc<RwLock<WalFileShared>>,
header: Arc<SpinLock<WalHeader>>,
file_size: u64,
state: RwLock<StreamingState>,
off_atomic: AtomicU64,
page_atomic: AtomicU64,
pub(super) done: AtomicBool,
}
/// Mutable state for streaming reader
struct StreamingState {
frame_idx: u64,
cumulative_checksum: (u32, u32),
last_valid_frame: u64,
pending_frames: HashMap<u64, Vec<u64>>,
page_size: usize,
use_native_endian: bool,
header_valid: bool,
}
impl StreamingWalReader {
fn new(
file: Arc<dyn File>,
wal_shared: Arc<RwLock<WalFileShared>>,
header: Arc<SpinLock<WalHeader>>,
file_size: u64,
) -> Self {
Self {
file,
wal_shared,
header,
file_size,
off_atomic: AtomicU64::new(0),
page_atomic: AtomicU64::new(0),
done: AtomicBool::new(false),
state: RwLock::new(StreamingState {
frame_idx: 1,
cumulative_checksum: (0, 0),
last_valid_frame: 0,
pending_frames: HashMap::new(),
page_size: 0,
use_native_endian: false,
header_valid: false,
}),
}
}
fn read_header(self: Arc<Self>) -> crate::Result<Completion> {
let header_buf = Arc::new(Buffer::new_temporary(WAL_HEADER_SIZE));
let reader = self.clone();
let completion: Box<ReadComplete> = Box::new(move |res| {
let _reader = reader.clone();
_reader.handle_header_read(res);
});
let c = Completion::new_read(header_buf, completion);
self.file.pread(0, c)
}
fn submit_one_chunk(self: Arc<Self>, offset: u64) -> crate::Result<(usize, Completion)> {
let page_size = self.page_atomic.load(Ordering::Acquire) as usize;
if page_size == 0 {
return Err(crate::LimboError::InternalError(
"page size not initialized".into(),
));
}
let frame_size = WAL_FRAME_HEADER_SIZE + page_size;
if frame_size == 0 {
return Err(crate::LimboError::InternalError(
"invalid frame size".into(),
));
}
const BASE: usize = 16 * 1024 * 1024;
let aligned = (BASE / frame_size) * frame_size;
let read_size = aligned
.max(frame_size)
.min((self.file_size - offset) as usize);
if read_size == 0 {
// end-of-file; let caller finalize
return Ok((0, Completion::new_dummy()));
}
let buf = Arc::new(Buffer::new_temporary(read_size));
let me = self.clone();
let completion: Box<ReadComplete> = Box::new(move |res| {
tracing::debug!("WAL chunk read complete");
let reader = me.clone();
reader.handle_chunk_read(res);
});
let c = Completion::new_read(buf, completion);
let guard = self.file.pread(offset, c)?;
Ok((read_size, guard))
}
fn handle_header_read(self: Arc<Self>, res: Result<(Arc<Buffer>, i32), CompletionError>) {
let Ok((buf, bytes_read)) = res else {
self.finalize_loading();
return;
};
if bytes_read != WAL_HEADER_SIZE as i32 {
self.finalize_loading();
return;
}
let (page_sz, c1, c2, use_native, ok) = {
let mut h = self.header.lock();
let s = buf.as_slice();
h.magic = u32::from_be_bytes(s[0..4].try_into().unwrap());
h.file_format = u32::from_be_bytes(s[4..8].try_into().unwrap());
h.page_size = u32::from_be_bytes(s[8..12].try_into().unwrap());
h.checkpoint_seq = u32::from_be_bytes(s[12..16].try_into().unwrap());
h.salt_1 = u32::from_be_bytes(s[16..20].try_into().unwrap());
h.salt_2 = u32::from_be_bytes(s[20..24].try_into().unwrap());
h.checksum_1 = u32::from_be_bytes(s[24..28].try_into().unwrap());
h.checksum_2 = u32::from_be_bytes(s[28..32].try_into().unwrap());
tracing::debug!("WAL header: {:?}", *h);
let use_native = cfg!(target_endian = "big") == ((h.magic & 1) != 0);
let calc = checksum_wal(&s[0..24], &h, (0, 0), use_native);
(
h.page_size,
h.checksum_1,
h.checksum_2,
use_native,
calc == (h.checksum_1, h.checksum_2),
)
};
if PageSize::new(page_sz).is_none() || !ok {
self.finalize_loading();
return;
}
{
let mut st = self.state.write();
st.page_size = page_sz as usize;
st.use_native_endian = use_native;
st.cumulative_checksum = (c1, c2);
st.header_valid = true;
}
self.off_atomic
.store(WAL_HEADER_SIZE as u64, Ordering::Release);
self.page_atomic.store(page_sz as u64, Ordering::Release);
}
fn handle_chunk_read(self: Arc<Self>, res: Result<(Arc<Buffer>, i32), CompletionError>) {
let Ok((buf, bytes_read)) = res else {
self.finalize_loading();
return;
};
let buf_slice = &buf.as_slice()[..bytes_read as usize];
// Snapshot salts/endianness once to avoid per-frame header locks
let (header_copy, use_native) = {
let st = self.state.read();
let h = self.header.lock();
(*h, st.use_native_endian)
};
let consumed = self.process_frames(buf_slice, &header_copy, use_native);
self.off_atomic.fetch_add(consumed as u64, Ordering::AcqRel);
// If we didnt consume the full chunk, we hit a stop condition
if consumed < buf_slice.len() || self.off_atomic.load(Ordering::Acquire) >= self.file_size {
self.finalize_loading();
}
}
// Processes frames from a buffer, returns bytes processed
fn process_frames(&self, buf: &[u8], header: &WalHeader, use_native: bool) -> usize {
let mut st = self.state.write();
let page_size = st.page_size;
let frame_size = WAL_FRAME_HEADER_SIZE + page_size;
let mut pos = 0;
while pos + frame_size <= buf.len() {
let fh = &buf[pos..pos + WAL_FRAME_HEADER_SIZE];
let page = &buf[pos + WAL_FRAME_HEADER_SIZE..pos + frame_size];
let page_number = u32::from_be_bytes(fh[0..4].try_into().unwrap());
let db_size = u32::from_be_bytes(fh[4..8].try_into().unwrap());
let s1 = u32::from_be_bytes(fh[8..12].try_into().unwrap());
let s2 = u32::from_be_bytes(fh[12..16].try_into().unwrap());
let c1 = u32::from_be_bytes(fh[16..20].try_into().unwrap());
let c2 = u32::from_be_bytes(fh[20..24].try_into().unwrap());
if page_number == 0 {
break;
}
if s1 != header.salt_1 || s2 != header.salt_2 {
break;
}
let seed = checksum_wal(&fh[0..8], header, st.cumulative_checksum, use_native);
let calc = checksum_wal(page, header, seed, use_native);
if calc != (c1, c2) {
break;
}
st.cumulative_checksum = calc;
let frame_idx = st.frame_idx;
st.pending_frames
.entry(page_number as u64)
.or_default()
.push(frame_idx);
if db_size > 0 {
st.last_valid_frame = st.frame_idx;
self.flush_pending_frames(&mut st);
}
st.frame_idx += 1;
pos += frame_size;
}
pos
}
fn flush_pending_frames(&self, state: &mut StreamingState) {
if state.pending_frames.is_empty() {
return;
}
let wfs = self.wal_shared.read();
{
let mut frame_cache = wfs.frame_cache.lock();
for (page, mut frames) in state.pending_frames.drain() {
// Only include frames up to last valid commit
frames.retain(|&f| f <= state.last_valid_frame);
if !frames.is_empty() {
frame_cache.entry(page).or_default().extend(frames);
}
}
}
wfs.max_frame
.store(state.last_valid_frame, Ordering::Release);
}
/// Finalizes the loading process
fn finalize_loading(&self) {
let mut wfs = self.wal_shared.write();
let st = self.state.read();
let max_frame = st.last_valid_frame;
if max_frame > 0 {
let mut frame_cache = wfs.frame_cache.lock();
for frames in frame_cache.values_mut() {
frames.retain(|&f| f <= max_frame);
}
frame_cache.retain(|_, frames| !frames.is_empty());
}
wfs.max_frame.store(max_frame, Ordering::SeqCst);
wfs.last_checksum = st.cumulative_checksum;
if st.header_valid {
wfs.initialized.store(true, Ordering::SeqCst);
}
wfs.nbackfills.store(0, Ordering::SeqCst);
wfs.loaded.store(true, Ordering::SeqCst);
self.done.store(true, Ordering::Release);
tracing::info!(
"WAL loading complete: {} frames processed, last commit at frame {}",
st.frame_idx - 1,
max_frame
);
}
}
pub fn begin_read_wal_frame_raw(
buffer_pool: &Arc<BufferPool>,
io: &Arc<dyn File>,

View File

@@ -676,7 +676,6 @@ pub struct WalFileShared {
pub frame_cache: Arc<SpinLock<HashMap<u64, Vec<u64>>>>,
pub last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL
pub file: Option<Arc<dyn File>>,
/// Read locks advertise the maximum WAL frame a reader may access.
/// Slot0 is special, when it is held (shared) the reader bypasses the WAL and uses the main DB file.
/// When checkpointing, we must acquire the exclusive read lock 0 to ensure that no readers read
@@ -2232,27 +2231,40 @@ impl WalFile {
}
}
/// 32MB maximum WAL file size to read whole file into one buffer
const WAL_SIZE_LIMIT: u64 = 1024 * 1024 * 32;
impl WalFileShared {
pub fn open_shared_if_exists(
io: &Arc<dyn IO>,
path: &str,
) -> Result<Arc<RwLock<WalFileShared>>> {
let file = io.open_file(path, crate::io::OpenFlags::Create, false)?;
if file.size()? > 0 {
let wal_file_shared = sqlite3_ondisk::read_entire_wal_dumb(&file)?;
// TODO: Return a completion instead.
let mut max_loops = 100_000;
while !wal_file_shared.read().loaded.load(Ordering::Acquire) {
io.run_once()?;
max_loops -= 1;
if max_loops == 0 {
panic!("WAL file not loaded");
}
let wal_file_shared = match file.size()? {
0 => return WalFileShared::new_noop(),
n if n <= WAL_SIZE_LIMIT => sqlite3_ondisk::read_entire_wal_dumb(&file)?,
_ => {
tracing::info!(
"WAL file is large (>{WAL_SIZE_LIMIT} bytes), using streaming reader"
);
sqlite3_ondisk::read_wal_streaming(&file, io)?
}
Ok(wal_file_shared)
} else {
WalFileShared::new_noop()
};
let mut remaining: usize = 100_000;
loop {
io.run_once()?;
if wal_file_shared
.try_read()
.is_some_and(|wfs| wfs.loaded.load(Ordering::Acquire))
{
break;
}
remaining = remaining.checked_sub(1).ok_or_else(|| {
LimboError::InternalError("Timed out waiting for WAL to load".into())
})?;
}
Ok(wal_file_shared)
}
pub fn is_initialized(&self) -> Result<bool> {