Remove read_entire_wal_dumb in favor of reading chunks

This commit is contained in:
PThorpe92
2025-09-09 16:06:27 -04:00
parent 37ec77eec2
commit 02bebf02a5
2 changed files with 10 additions and 258 deletions

View File

@@ -1622,240 +1622,9 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec<u8>) {
payload.extend_from_slice(&varint[0..n]);
}
/// We need to read the WAL file on open to reconstruct the WAL frame cache.
pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<RwLock<WalFileShared>>> {
let size = file.size()?;
#[allow(clippy::arc_with_non_send_sync)]
let buf_for_pread = Arc::new(Buffer::new_temporary(size as usize));
let header = Arc::new(SpinLock::new(WalHeader::default()));
let read_locks = std::array::from_fn(|_| TursoRwLock::new());
for (i, l) in read_locks.iter().enumerate() {
l.write();
l.set_value_exclusive(if i < 2 { 0 } else { READMARK_NOT_USED });
l.unlock();
}
#[allow(clippy::arc_with_non_send_sync)]
let wal_file_shared_ret = Arc::new(RwLock::new(WalFileShared {
enabled: AtomicBool::new(true),
wal_header: header.clone(),
min_frame: AtomicU64::new(0),
max_frame: AtomicU64::new(0),
nbackfills: AtomicU64::new(0),
frame_cache: Arc::new(SpinLock::new(HashMap::new())),
last_checksum: (0, 0),
file: Some(file.clone()),
read_locks,
write_lock: TursoRwLock::new(),
loaded: AtomicBool::new(false),
checkpoint_lock: TursoRwLock::new(),
initialized: AtomicBool::new(false),
}));
let wal_file_shared_for_completion = wal_file_shared_ret.clone();
let complete: Box<ReadComplete> = Box::new(move |res: Result<(Arc<Buffer>, i32), _>| {
let Ok((buf, bytes_read)) = res else {
return;
};
let buf_slice = buf.as_slice();
turso_assert!(
bytes_read == buf_slice.len() as i32,
"read({bytes_read}) != expected({})",
buf_slice.len()
);
let mut header_locked = header.lock();
// Read header
header_locked.magic =
u32::from_be_bytes([buf_slice[0], buf_slice[1], buf_slice[2], buf_slice[3]]);
header_locked.file_format =
u32::from_be_bytes([buf_slice[4], buf_slice[5], buf_slice[6], buf_slice[7]]);
header_locked.page_size =
u32::from_be_bytes([buf_slice[8], buf_slice[9], buf_slice[10], buf_slice[11]]);
header_locked.checkpoint_seq =
u32::from_be_bytes([buf_slice[12], buf_slice[13], buf_slice[14], buf_slice[15]]);
header_locked.salt_1 =
u32::from_be_bytes([buf_slice[16], buf_slice[17], buf_slice[18], buf_slice[19]]);
header_locked.salt_2 =
u32::from_be_bytes([buf_slice[20], buf_slice[21], buf_slice[22], buf_slice[23]]);
header_locked.checksum_1 =
u32::from_be_bytes([buf_slice[24], buf_slice[25], buf_slice[26], buf_slice[27]]);
header_locked.checksum_2 =
u32::from_be_bytes([buf_slice[28], buf_slice[29], buf_slice[30], buf_slice[31]]);
tracing::debug!("read_entire_wal_dumb(header={:?})", *header_locked);
// Read frames into frame_cache and pages_in_frames
if buf_slice.len() < WAL_HEADER_SIZE {
panic!("WAL file too small for header");
}
let use_native_endian_checksum =
cfg!(target_endian = "big") == ((header_locked.magic & 1) != 0);
let calculated_header_checksum = checksum_wal(
&buf_slice[0..24],
&header_locked,
(0, 0),
use_native_endian_checksum,
);
let checksum_header_failed = if calculated_header_checksum
!= (header_locked.checksum_1, header_locked.checksum_2)
{
tracing::error!(
"WAL header checksum mismatch. Expected ({}, {}), Got ({}, {}). Ignoring frames starting from frame {}",
header_locked.checksum_1,
header_locked.checksum_2,
calculated_header_checksum.0,
calculated_header_checksum.1,
0
);
true
} else {
false
};
let mut cumulative_checksum = (header_locked.checksum_1, header_locked.checksum_2);
let page_size_u32 = header_locked.page_size;
if PageSize::new(page_size_u32).is_none() {
panic!("Invalid page size in WAL header: {page_size_u32}");
}
let page_size = page_size_u32 as usize;
let mut current_offset = WAL_HEADER_SIZE;
let mut frame_idx = 1_u64;
let mut wfs_data = wal_file_shared_for_completion.write();
if !checksum_header_failed {
while current_offset + WAL_FRAME_HEADER_SIZE + page_size <= buf_slice.len() {
let frame_header_slice =
&buf_slice[current_offset..current_offset + WAL_FRAME_HEADER_SIZE];
let page_data_slice = &buf_slice[current_offset + WAL_FRAME_HEADER_SIZE
..current_offset + WAL_FRAME_HEADER_SIZE + page_size];
let frame_h_page_number =
u32::from_be_bytes(frame_header_slice[0..4].try_into().unwrap());
let frame_h_db_size =
u32::from_be_bytes(frame_header_slice[4..8].try_into().unwrap());
let frame_h_salt_1 =
u32::from_be_bytes(frame_header_slice[8..12].try_into().unwrap());
let frame_h_salt_2 =
u32::from_be_bytes(frame_header_slice[12..16].try_into().unwrap());
let frame_h_checksum_1 =
u32::from_be_bytes(frame_header_slice[16..20].try_into().unwrap());
let frame_h_checksum_2 =
u32::from_be_bytes(frame_header_slice[20..24].try_into().unwrap());
if frame_h_page_number == 0 {
tracing::trace!(
"WAL frame with page number 0. Ignoring frames starting from frame {}",
frame_idx
);
break;
}
// It contains more frames with mismatched SALT values, which means they're leftovers from previous checkpoints
if frame_h_salt_1 != header_locked.salt_1 || frame_h_salt_2 != header_locked.salt_2
{
tracing::trace!(
"WAL frame salt mismatch: expected ({}, {}), got ({}, {}). Ignoring frames starting from frame {}",
header_locked.salt_1,
header_locked.salt_2,
frame_h_salt_1,
frame_h_salt_2,
frame_idx
);
break;
}
let checksum_after_fh_meta = checksum_wal(
&frame_header_slice[0..8],
&header_locked,
cumulative_checksum,
use_native_endian_checksum,
);
let calculated_frame_checksum = checksum_wal(
page_data_slice,
&header_locked,
checksum_after_fh_meta,
use_native_endian_checksum,
);
tracing::debug!(
"read_entire_wal_dumb(frame_h_checksum=({}, {}), calculated_frame_checksum=({}, {}))",
frame_h_checksum_1,
frame_h_checksum_2,
calculated_frame_checksum.0,
calculated_frame_checksum.1
);
if calculated_frame_checksum != (frame_h_checksum_1, frame_h_checksum_2) {
tracing::error!(
"WAL frame checksum mismatch. Expected ({}, {}), Got ({}, {}). Ignoring frames starting from frame {}",
frame_h_checksum_1,
frame_h_checksum_2,
calculated_frame_checksum.0,
calculated_frame_checksum.1,
frame_idx
);
break;
}
cumulative_checksum = calculated_frame_checksum;
wfs_data
.frame_cache
.lock()
.entry(frame_h_page_number as u64)
.or_default()
.push(frame_idx);
let is_commit_record = frame_h_db_size > 0;
if is_commit_record {
wfs_data.max_frame.store(frame_idx, Ordering::SeqCst);
wfs_data.last_checksum = cumulative_checksum;
}
frame_idx += 1;
current_offset += WAL_FRAME_HEADER_SIZE + page_size;
}
}
let max_frame = wfs_data.max_frame.load(Ordering::SeqCst);
// cleanup in-memory index from tail frames which was written after the last committed frame
let mut frame_cache = wfs_data.frame_cache.lock();
for (page, frames) in frame_cache.iter_mut() {
// remove any frame IDs > max_frame
let original_len = frames.len();
frames.retain(|&frame_id| frame_id <= max_frame);
if frames.len() < original_len {
tracing::debug!(
"removed {} frame(s) from page {} from the in-memory WAL index because they were written after the last committed frame {}",
original_len - frames.len(),
page,
max_frame
);
}
}
// also remove any pages that now have no frames
frame_cache.retain(|_page, frames| !frames.is_empty());
wfs_data.nbackfills.store(0, Ordering::SeqCst);
wfs_data.loaded.store(true, Ordering::SeqCst);
if size >= WAL_HEADER_SIZE as u64 {
wfs_data.initialized.store(true, Ordering::SeqCst);
}
});
let c = Completion::new_read(buf_for_pread, complete);
let _c = file.pread(0, c)?;
Ok(wal_file_shared_ret)
}
/// Reads a WAL file in streaming fashion to avoid OOM on large files.
///
/// Stream through frames in chunks, building frame_cache incrementally
/// Track last valid commit frame for consistency
pub fn read_wal_streaming(
pub fn build_shared_wal(
file: &Arc<dyn File>,
io: &Arc<dyn crate::IO>,
) -> Result<Arc<RwLock<WalFileShared>>> {

View File

@@ -2231,39 +2231,22 @@ impl WalFile {
}
}
/// 32MB maximum WAL file size to read whole file into one buffer
const WAL_SIZE_LIMIT: u64 = 1024 * 1024 * 32;
impl WalFileShared {
pub fn open_shared_if_exists(
io: &Arc<dyn IO>,
path: &str,
) -> Result<Arc<RwLock<WalFileShared>>> {
let file = io.open_file(path, crate::io::OpenFlags::Create, false)?;
let wal_file_shared = match file.size()? {
0 => return WalFileShared::new_noop(),
n if n <= WAL_SIZE_LIMIT => sqlite3_ondisk::read_entire_wal_dumb(&file)?,
_ => {
tracing::info!(
"WAL file is large (>{WAL_SIZE_LIMIT} bytes), using streaming reader"
);
sqlite3_ondisk::read_wal_streaming(&file, io)?
}
};
let mut remaining: usize = 100_000;
loop {
io.run_once()?;
if wal_file_shared
.try_read()
.is_some_and(|wfs| wfs.loaded.load(Ordering::Acquire))
{
break;
}
remaining = remaining.checked_sub(1).ok_or_else(|| {
LimboError::InternalError("Timed out waiting for WAL to load".into())
})?;
if file.size()? == 0 {
return WalFileShared::new_noop();
}
let wal_file_shared = sqlite3_ondisk::build_shared_wal(&file, io)?;
turso_assert!(
wal_file_shared
.try_read()
.is_some_and(|wfs| wfs.loaded.load(Ordering::Acquire)),
"Unable to read WAL shared state"
);
Ok(wal_file_shared)
}