mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-25 20:14:21 +01:00
Merge 'Fix read_entire_wal_dumb: incrementally build the frame cache' from Preston Thorpe
closes #2240 Incrementally build the frame cache by reading the WAL file in chunks instead of reading the entire file into memory. <img width="247" height="254" alt="image" src="https://github.com/user- attachments/assets/803645ab-002a-4efd-ac47-b2f690e63fc7" /> Reviewed-by: Pere Diaz Bou <pere-altea@homail.com> Reviewed-by: Nikita Sivukhin (@sivukhin) Closes #2986
This commit is contained in:
@@ -1622,11 +1622,14 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec<u8>) {
|
||||
payload.extend_from_slice(&varint[0..n]);
|
||||
}
|
||||
|
||||
/// We need to read the WAL file on open to reconstruct the WAL frame cache.
|
||||
pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<RwLock<WalFileShared>>> {
|
||||
/// Stream through frames in chunks, building frame_cache incrementally
|
||||
/// Track last valid commit frame for consistency
|
||||
pub fn build_shared_wal(
|
||||
file: &Arc<dyn File>,
|
||||
io: &Arc<dyn crate::IO>,
|
||||
) -> Result<Arc<RwLock<WalFileShared>>> {
|
||||
let size = file.size()?;
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
let buf_for_pread = Arc::new(Buffer::new_temporary(size as usize));
|
||||
|
||||
let header = Arc::new(SpinLock::new(WalHeader::default()));
|
||||
let read_locks = std::array::from_fn(|_| TursoRwLock::new());
|
||||
for (i, l) in read_locks.iter().enumerate() {
|
||||
@@ -1634,8 +1637,8 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<RwLock<WalFileSh
|
||||
l.set_value_exclusive(if i < 2 { 0 } else { READMARK_NOT_USED });
|
||||
l.unlock();
|
||||
}
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
let wal_file_shared_ret = Arc::new(RwLock::new(WalFileShared {
|
||||
|
||||
let wal_file_shared = Arc::new(RwLock::new(WalFileShared {
|
||||
enabled: AtomicBool::new(true),
|
||||
wal_header: header.clone(),
|
||||
min_frame: AtomicU64::new(0),
|
||||
@@ -1650,205 +1653,306 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<RwLock<WalFileSh
|
||||
checkpoint_lock: TursoRwLock::new(),
|
||||
initialized: AtomicBool::new(false),
|
||||
}));
|
||||
let wal_file_shared_for_completion = wal_file_shared_ret.clone();
|
||||
let complete: Box<ReadComplete> = Box::new(move |res: Result<(Arc<Buffer>, i32), _>| {
|
||||
|
||||
if size < WAL_HEADER_SIZE as u64 {
|
||||
wal_file_shared.write().loaded.store(true, Ordering::SeqCst);
|
||||
return Ok(wal_file_shared);
|
||||
}
|
||||
|
||||
let reader = Arc::new(StreamingWalReader::new(
|
||||
file.clone(),
|
||||
wal_file_shared.clone(),
|
||||
header.clone(),
|
||||
size,
|
||||
));
|
||||
|
||||
let h = reader.clone().read_header()?;
|
||||
io.wait_for_completion(h)?;
|
||||
|
||||
loop {
|
||||
if reader.done.load(Ordering::Acquire) {
|
||||
break;
|
||||
}
|
||||
let offset = reader.off_atomic.load(Ordering::Acquire);
|
||||
if offset >= size {
|
||||
reader.finalize_loading();
|
||||
break;
|
||||
}
|
||||
|
||||
let (_read_size, c) = reader.clone().submit_one_chunk(offset)?;
|
||||
io.wait_for_completion(c)?;
|
||||
|
||||
let new_off = reader.off_atomic.load(Ordering::Acquire);
|
||||
if new_off <= offset {
|
||||
reader.finalize_loading();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(wal_file_shared)
|
||||
}
|
||||
|
||||
pub(super) struct StreamingWalReader {
|
||||
file: Arc<dyn File>,
|
||||
wal_shared: Arc<RwLock<WalFileShared>>,
|
||||
header: Arc<SpinLock<WalHeader>>,
|
||||
file_size: u64,
|
||||
state: RwLock<StreamingState>,
|
||||
off_atomic: AtomicU64,
|
||||
page_atomic: AtomicU64,
|
||||
pub(super) done: AtomicBool,
|
||||
}
|
||||
|
||||
/// Mutable state for streaming reader
|
||||
struct StreamingState {
|
||||
frame_idx: u64,
|
||||
cumulative_checksum: (u32, u32),
|
||||
last_valid_frame: u64,
|
||||
pending_frames: HashMap<u64, Vec<u64>>,
|
||||
page_size: usize,
|
||||
use_native_endian: bool,
|
||||
header_valid: bool,
|
||||
}
|
||||
|
||||
impl StreamingWalReader {
|
||||
fn new(
|
||||
file: Arc<dyn File>,
|
||||
wal_shared: Arc<RwLock<WalFileShared>>,
|
||||
header: Arc<SpinLock<WalHeader>>,
|
||||
file_size: u64,
|
||||
) -> Self {
|
||||
Self {
|
||||
file,
|
||||
wal_shared,
|
||||
header,
|
||||
file_size,
|
||||
off_atomic: AtomicU64::new(0),
|
||||
page_atomic: AtomicU64::new(0),
|
||||
done: AtomicBool::new(false),
|
||||
state: RwLock::new(StreamingState {
|
||||
frame_idx: 1,
|
||||
cumulative_checksum: (0, 0),
|
||||
last_valid_frame: 0,
|
||||
pending_frames: HashMap::new(),
|
||||
page_size: 0,
|
||||
use_native_endian: false,
|
||||
header_valid: false,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn read_header(self: Arc<Self>) -> crate::Result<Completion> {
|
||||
let header_buf = Arc::new(Buffer::new_temporary(WAL_HEADER_SIZE));
|
||||
let reader = self.clone();
|
||||
let completion: Box<ReadComplete> = Box::new(move |res| {
|
||||
let _reader = reader.clone();
|
||||
_reader.handle_header_read(res);
|
||||
});
|
||||
let c = Completion::new_read(header_buf, completion);
|
||||
self.file.pread(0, c)
|
||||
}
|
||||
|
||||
fn submit_one_chunk(self: Arc<Self>, offset: u64) -> crate::Result<(usize, Completion)> {
|
||||
let page_size = self.page_atomic.load(Ordering::Acquire) as usize;
|
||||
if page_size == 0 {
|
||||
return Err(crate::LimboError::InternalError(
|
||||
"page size not initialized".into(),
|
||||
));
|
||||
}
|
||||
let frame_size = WAL_FRAME_HEADER_SIZE + page_size;
|
||||
if frame_size == 0 {
|
||||
return Err(crate::LimboError::InternalError(
|
||||
"invalid frame size".into(),
|
||||
));
|
||||
}
|
||||
const BASE: usize = 16 * 1024 * 1024;
|
||||
let aligned = (BASE / frame_size) * frame_size;
|
||||
let read_size = aligned
|
||||
.max(frame_size)
|
||||
.min((self.file_size - offset) as usize);
|
||||
if read_size == 0 {
|
||||
// end-of-file; let caller finalize
|
||||
return Ok((0, Completion::new_dummy()));
|
||||
}
|
||||
|
||||
let buf = Arc::new(Buffer::new_temporary(read_size));
|
||||
let me = self.clone();
|
||||
let completion: Box<ReadComplete> = Box::new(move |res| {
|
||||
tracing::debug!("WAL chunk read complete");
|
||||
let reader = me.clone();
|
||||
reader.handle_chunk_read(res);
|
||||
});
|
||||
let c = Completion::new_read(buf, completion);
|
||||
let guard = self.file.pread(offset, c)?;
|
||||
Ok((read_size, guard))
|
||||
}
|
||||
|
||||
fn handle_header_read(self: Arc<Self>, res: Result<(Arc<Buffer>, i32), CompletionError>) {
|
||||
let Ok((buf, bytes_read)) = res else {
|
||||
self.finalize_loading();
|
||||
return;
|
||||
};
|
||||
let buf_slice = buf.as_slice();
|
||||
turso_assert!(
|
||||
bytes_read == buf_slice.len() as i32,
|
||||
"read({bytes_read}) != expected({})",
|
||||
buf_slice.len()
|
||||
);
|
||||
let mut header_locked = header.lock();
|
||||
// Read header
|
||||
header_locked.magic =
|
||||
u32::from_be_bytes([buf_slice[0], buf_slice[1], buf_slice[2], buf_slice[3]]);
|
||||
header_locked.file_format =
|
||||
u32::from_be_bytes([buf_slice[4], buf_slice[5], buf_slice[6], buf_slice[7]]);
|
||||
header_locked.page_size =
|
||||
u32::from_be_bytes([buf_slice[8], buf_slice[9], buf_slice[10], buf_slice[11]]);
|
||||
header_locked.checkpoint_seq =
|
||||
u32::from_be_bytes([buf_slice[12], buf_slice[13], buf_slice[14], buf_slice[15]]);
|
||||
header_locked.salt_1 =
|
||||
u32::from_be_bytes([buf_slice[16], buf_slice[17], buf_slice[18], buf_slice[19]]);
|
||||
header_locked.salt_2 =
|
||||
u32::from_be_bytes([buf_slice[20], buf_slice[21], buf_slice[22], buf_slice[23]]);
|
||||
header_locked.checksum_1 =
|
||||
u32::from_be_bytes([buf_slice[24], buf_slice[25], buf_slice[26], buf_slice[27]]);
|
||||
header_locked.checksum_2 =
|
||||
u32::from_be_bytes([buf_slice[28], buf_slice[29], buf_slice[30], buf_slice[31]]);
|
||||
tracing::debug!("read_entire_wal_dumb(header={:?})", *header_locked);
|
||||
|
||||
// Read frames into frame_cache and pages_in_frames
|
||||
if buf_slice.len() < WAL_HEADER_SIZE {
|
||||
panic!("WAL file too small for header");
|
||||
if bytes_read != WAL_HEADER_SIZE as i32 {
|
||||
self.finalize_loading();
|
||||
return;
|
||||
}
|
||||
|
||||
let use_native_endian_checksum =
|
||||
cfg!(target_endian = "big") == ((header_locked.magic & 1) != 0);
|
||||
let (page_sz, c1, c2, use_native, ok) = {
|
||||
let mut h = self.header.lock();
|
||||
let s = buf.as_slice();
|
||||
h.magic = u32::from_be_bytes(s[0..4].try_into().unwrap());
|
||||
h.file_format = u32::from_be_bytes(s[4..8].try_into().unwrap());
|
||||
h.page_size = u32::from_be_bytes(s[8..12].try_into().unwrap());
|
||||
h.checkpoint_seq = u32::from_be_bytes(s[12..16].try_into().unwrap());
|
||||
h.salt_1 = u32::from_be_bytes(s[16..20].try_into().unwrap());
|
||||
h.salt_2 = u32::from_be_bytes(s[20..24].try_into().unwrap());
|
||||
h.checksum_1 = u32::from_be_bytes(s[24..28].try_into().unwrap());
|
||||
h.checksum_2 = u32::from_be_bytes(s[28..32].try_into().unwrap());
|
||||
tracing::debug!("WAL header: {:?}", *h);
|
||||
|
||||
let calculated_header_checksum = checksum_wal(
|
||||
&buf_slice[0..24],
|
||||
&header_locked,
|
||||
(0, 0),
|
||||
use_native_endian_checksum,
|
||||
);
|
||||
|
||||
let checksum_header_failed = if calculated_header_checksum
|
||||
!= (header_locked.checksum_1, header_locked.checksum_2)
|
||||
let use_native = cfg!(target_endian = "big") == ((h.magic & 1) != 0);
|
||||
let calc = checksum_wal(&s[0..24], &h, (0, 0), use_native);
|
||||
(
|
||||
h.page_size,
|
||||
h.checksum_1,
|
||||
h.checksum_2,
|
||||
use_native,
|
||||
calc == (h.checksum_1, h.checksum_2),
|
||||
)
|
||||
};
|
||||
if PageSize::new(page_sz).is_none() || !ok {
|
||||
self.finalize_loading();
|
||||
return;
|
||||
}
|
||||
{
|
||||
tracing::error!(
|
||||
"WAL header checksum mismatch. Expected ({}, {}), Got ({}, {}). Ignoring frames starting from frame {}",
|
||||
header_locked.checksum_1,
|
||||
header_locked.checksum_2,
|
||||
calculated_header_checksum.0,
|
||||
calculated_header_checksum.1,
|
||||
0
|
||||
let mut st = self.state.write();
|
||||
st.page_size = page_sz as usize;
|
||||
st.use_native_endian = use_native;
|
||||
st.cumulative_checksum = (c1, c2);
|
||||
st.header_valid = true;
|
||||
}
|
||||
self.off_atomic
|
||||
.store(WAL_HEADER_SIZE as u64, Ordering::Release);
|
||||
self.page_atomic.store(page_sz as u64, Ordering::Release);
|
||||
}
|
||||
|
||||
);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
fn handle_chunk_read(self: Arc<Self>, res: Result<(Arc<Buffer>, i32), CompletionError>) {
|
||||
let Ok((buf, bytes_read)) = res else {
|
||||
self.finalize_loading();
|
||||
return;
|
||||
};
|
||||
let buf_slice = &buf.as_slice()[..bytes_read as usize];
|
||||
// Snapshot salts/endianness once to avoid per-frame header locks
|
||||
let (header_copy, use_native) = {
|
||||
let st = self.state.read();
|
||||
let h = self.header.lock();
|
||||
(*h, st.use_native_endian)
|
||||
};
|
||||
|
||||
let mut cumulative_checksum = (header_locked.checksum_1, header_locked.checksum_2);
|
||||
let page_size_u32 = header_locked.page_size;
|
||||
|
||||
if PageSize::new(page_size_u32).is_none() {
|
||||
panic!("Invalid page size in WAL header: {page_size_u32}");
|
||||
let consumed = self.process_frames(buf_slice, &header_copy, use_native);
|
||||
self.off_atomic.fetch_add(consumed as u64, Ordering::AcqRel);
|
||||
// If we didn’t consume the full chunk, we hit a stop condition
|
||||
if consumed < buf_slice.len() || self.off_atomic.load(Ordering::Acquire) >= self.file_size {
|
||||
self.finalize_loading();
|
||||
}
|
||||
let page_size = page_size_u32 as usize;
|
||||
}
|
||||
|
||||
let mut current_offset = WAL_HEADER_SIZE;
|
||||
let mut frame_idx = 1_u64;
|
||||
// Processes frames from a buffer, returns bytes processed
|
||||
fn process_frames(&self, buf: &[u8], header: &WalHeader, use_native: bool) -> usize {
|
||||
let mut st = self.state.write();
|
||||
let page_size = st.page_size;
|
||||
let frame_size = WAL_FRAME_HEADER_SIZE + page_size;
|
||||
let mut pos = 0;
|
||||
|
||||
let mut wfs_data = wal_file_shared_for_completion.write();
|
||||
while pos + frame_size <= buf.len() {
|
||||
let fh = &buf[pos..pos + WAL_FRAME_HEADER_SIZE];
|
||||
let page = &buf[pos + WAL_FRAME_HEADER_SIZE..pos + frame_size];
|
||||
|
||||
if !checksum_header_failed {
|
||||
while current_offset + WAL_FRAME_HEADER_SIZE + page_size <= buf_slice.len() {
|
||||
let frame_header_slice =
|
||||
&buf_slice[current_offset..current_offset + WAL_FRAME_HEADER_SIZE];
|
||||
let page_data_slice = &buf_slice[current_offset + WAL_FRAME_HEADER_SIZE
|
||||
..current_offset + WAL_FRAME_HEADER_SIZE + page_size];
|
||||
let page_number = u32::from_be_bytes(fh[0..4].try_into().unwrap());
|
||||
let db_size = u32::from_be_bytes(fh[4..8].try_into().unwrap());
|
||||
let s1 = u32::from_be_bytes(fh[8..12].try_into().unwrap());
|
||||
let s2 = u32::from_be_bytes(fh[12..16].try_into().unwrap());
|
||||
let c1 = u32::from_be_bytes(fh[16..20].try_into().unwrap());
|
||||
let c2 = u32::from_be_bytes(fh[20..24].try_into().unwrap());
|
||||
|
||||
let frame_h_page_number =
|
||||
u32::from_be_bytes(frame_header_slice[0..4].try_into().unwrap());
|
||||
let frame_h_db_size =
|
||||
u32::from_be_bytes(frame_header_slice[4..8].try_into().unwrap());
|
||||
let frame_h_salt_1 =
|
||||
u32::from_be_bytes(frame_header_slice[8..12].try_into().unwrap());
|
||||
let frame_h_salt_2 =
|
||||
u32::from_be_bytes(frame_header_slice[12..16].try_into().unwrap());
|
||||
let frame_h_checksum_1 =
|
||||
u32::from_be_bytes(frame_header_slice[16..20].try_into().unwrap());
|
||||
let frame_h_checksum_2 =
|
||||
u32::from_be_bytes(frame_header_slice[20..24].try_into().unwrap());
|
||||
if page_number == 0 {
|
||||
break;
|
||||
}
|
||||
if s1 != header.salt_1 || s2 != header.salt_2 {
|
||||
break;
|
||||
}
|
||||
|
||||
if frame_h_page_number == 0 {
|
||||
tracing::trace!(
|
||||
"WAL frame with page number 0. Ignoring frames starting from frame {}",
|
||||
frame_idx
|
||||
);
|
||||
break;
|
||||
let seed = checksum_wal(&fh[0..8], header, st.cumulative_checksum, use_native);
|
||||
let calc = checksum_wal(page, header, seed, use_native);
|
||||
if calc != (c1, c2) {
|
||||
break;
|
||||
}
|
||||
|
||||
st.cumulative_checksum = calc;
|
||||
let frame_idx = st.frame_idx;
|
||||
st.pending_frames
|
||||
.entry(page_number as u64)
|
||||
.or_default()
|
||||
.push(frame_idx);
|
||||
|
||||
if db_size > 0 {
|
||||
st.last_valid_frame = st.frame_idx;
|
||||
self.flush_pending_frames(&mut st);
|
||||
}
|
||||
st.frame_idx += 1;
|
||||
pos += frame_size;
|
||||
}
|
||||
pos
|
||||
}
|
||||
|
||||
fn flush_pending_frames(&self, state: &mut StreamingState) {
|
||||
if state.pending_frames.is_empty() {
|
||||
return;
|
||||
}
|
||||
let wfs = self.wal_shared.read();
|
||||
{
|
||||
let mut frame_cache = wfs.frame_cache.lock();
|
||||
for (page, mut frames) in state.pending_frames.drain() {
|
||||
// Only include frames up to last valid commit
|
||||
frames.retain(|&f| f <= state.last_valid_frame);
|
||||
if !frames.is_empty() {
|
||||
frame_cache.entry(page).or_default().extend(frames);
|
||||
}
|
||||
// It contains more frames with mismatched SALT values, which means they're leftovers from previous checkpoints
|
||||
if frame_h_salt_1 != header_locked.salt_1 || frame_h_salt_2 != header_locked.salt_2
|
||||
{
|
||||
tracing::trace!(
|
||||
"WAL frame salt mismatch: expected ({}, {}), got ({}, {}). Ignoring frames starting from frame {}",
|
||||
header_locked.salt_1,
|
||||
header_locked.salt_2,
|
||||
frame_h_salt_1,
|
||||
frame_h_salt_2,
|
||||
frame_idx
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
let checksum_after_fh_meta = checksum_wal(
|
||||
&frame_header_slice[0..8],
|
||||
&header_locked,
|
||||
cumulative_checksum,
|
||||
use_native_endian_checksum,
|
||||
);
|
||||
let calculated_frame_checksum = checksum_wal(
|
||||
page_data_slice,
|
||||
&header_locked,
|
||||
checksum_after_fh_meta,
|
||||
use_native_endian_checksum,
|
||||
);
|
||||
tracing::debug!(
|
||||
"read_entire_wal_dumb(frame_h_checksum=({}, {}), calculated_frame_checksum=({}, {}))",
|
||||
frame_h_checksum_1,
|
||||
frame_h_checksum_2,
|
||||
calculated_frame_checksum.0,
|
||||
calculated_frame_checksum.1
|
||||
);
|
||||
|
||||
if calculated_frame_checksum != (frame_h_checksum_1, frame_h_checksum_2) {
|
||||
tracing::error!(
|
||||
"WAL frame checksum mismatch. Expected ({}, {}), Got ({}, {}). Ignoring frames starting from frame {}",
|
||||
frame_h_checksum_1,
|
||||
frame_h_checksum_2,
|
||||
calculated_frame_checksum.0,
|
||||
calculated_frame_checksum.1,
|
||||
frame_idx
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
cumulative_checksum = calculated_frame_checksum;
|
||||
|
||||
wfs_data
|
||||
.frame_cache
|
||||
.lock()
|
||||
.entry(frame_h_page_number as u64)
|
||||
.or_default()
|
||||
.push(frame_idx);
|
||||
|
||||
let is_commit_record = frame_h_db_size > 0;
|
||||
if is_commit_record {
|
||||
wfs_data.max_frame.store(frame_idx, Ordering::SeqCst);
|
||||
wfs_data.last_checksum = cumulative_checksum;
|
||||
}
|
||||
|
||||
frame_idx += 1;
|
||||
current_offset += WAL_FRAME_HEADER_SIZE + page_size;
|
||||
}
|
||||
}
|
||||
wfs.max_frame
|
||||
.store(state.last_valid_frame, Ordering::Release);
|
||||
}
|
||||
|
||||
let max_frame = wfs_data.max_frame.load(Ordering::SeqCst);
|
||||
/// Finalizes the loading process
|
||||
fn finalize_loading(&self) {
|
||||
let mut wfs = self.wal_shared.write();
|
||||
let st = self.state.read();
|
||||
|
||||
// cleanup in-memory index from tail frames which was written after the last committed frame
|
||||
let mut frame_cache = wfs_data.frame_cache.lock();
|
||||
for (page, frames) in frame_cache.iter_mut() {
|
||||
// remove any frame IDs > max_frame
|
||||
let original_len = frames.len();
|
||||
frames.retain(|&frame_id| frame_id <= max_frame);
|
||||
if frames.len() < original_len {
|
||||
tracing::debug!(
|
||||
"removed {} frame(s) from page {} from the in-memory WAL index because they were written after the last committed frame {}",
|
||||
original_len - frames.len(),
|
||||
page,
|
||||
max_frame
|
||||
);
|
||||
let max_frame = st.last_valid_frame;
|
||||
if max_frame > 0 {
|
||||
let mut frame_cache = wfs.frame_cache.lock();
|
||||
for frames in frame_cache.values_mut() {
|
||||
frames.retain(|&f| f <= max_frame);
|
||||
}
|
||||
frame_cache.retain(|_, frames| !frames.is_empty());
|
||||
}
|
||||
// also remove any pages that now have no frames
|
||||
frame_cache.retain(|_page, frames| !frames.is_empty());
|
||||
|
||||
wfs_data.nbackfills.store(0, Ordering::SeqCst);
|
||||
wfs_data.loaded.store(true, Ordering::SeqCst);
|
||||
if size >= WAL_HEADER_SIZE as u64 {
|
||||
wfs_data.initialized.store(true, Ordering::SeqCst);
|
||||
wfs.max_frame.store(max_frame, Ordering::SeqCst);
|
||||
wfs.last_checksum = st.cumulative_checksum;
|
||||
if st.header_valid {
|
||||
wfs.initialized.store(true, Ordering::SeqCst);
|
||||
}
|
||||
});
|
||||
let c = Completion::new_read(buf_for_pread, complete);
|
||||
let _c = file.pread(0, c)?;
|
||||
wfs.nbackfills.store(0, Ordering::SeqCst);
|
||||
wfs.loaded.store(true, Ordering::SeqCst);
|
||||
|
||||
Ok(wal_file_shared_ret)
|
||||
self.done.store(true, Ordering::Release);
|
||||
tracing::info!(
|
||||
"WAL loading complete: {} frames processed, last commit at frame {}",
|
||||
st.frame_idx - 1,
|
||||
max_frame
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn begin_read_wal_frame_raw(
|
||||
@@ -1859,7 +1963,6 @@ pub fn begin_read_wal_frame_raw(
|
||||
) -> Result<Completion> {
|
||||
tracing::trace!("begin_read_wal_frame_raw(offset={})", offset);
|
||||
let buf = Arc::new(buffer_pool.get_wal_frame());
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
let c = Completion::new_read(buf, complete);
|
||||
let c = io.pread(offset, c)?;
|
||||
Ok(c)
|
||||
|
||||
@@ -553,7 +553,6 @@ impl fmt::Debug for OngoingCheckpoint {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct WalFile {
|
||||
io: Arc<dyn IO>,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
@@ -660,7 +659,6 @@ impl fmt::Debug for WalFile {
|
||||
// TODO(pere): lock only important parts + pin WalFileShared
|
||||
/// WalFileShared is the part of a WAL that will be shared between threads. A wal has information
|
||||
/// that needs to be communicated between threads so this struct does the job.
|
||||
#[allow(dead_code)]
|
||||
pub struct WalFileShared {
|
||||
pub enabled: AtomicBool,
|
||||
pub wal_header: Arc<SpinLock<WalHeader>>,
|
||||
@@ -676,7 +674,6 @@ pub struct WalFileShared {
|
||||
pub frame_cache: Arc<SpinLock<HashMap<u64, Vec<u64>>>>,
|
||||
pub last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL
|
||||
pub file: Option<Arc<dyn File>>,
|
||||
|
||||
/// Read locks advertise the maximum WAL frame a reader may access.
|
||||
/// Slot 0 is special, when it is held (shared) the reader bypasses the WAL and uses the main DB file.
|
||||
/// When checkpointing, we must acquire the exclusive read lock 0 to ensure that no readers read
|
||||
@@ -2238,21 +2235,17 @@ impl WalFileShared {
|
||||
path: &str,
|
||||
) -> Result<Arc<RwLock<WalFileShared>>> {
|
||||
let file = io.open_file(path, crate::io::OpenFlags::Create, false)?;
|
||||
if file.size()? > 0 {
|
||||
let wal_file_shared = sqlite3_ondisk::read_entire_wal_dumb(&file)?;
|
||||
// TODO: Return a completion instead.
|
||||
let mut max_loops = 100_000;
|
||||
while !wal_file_shared.read().loaded.load(Ordering::Acquire) {
|
||||
io.run_once()?;
|
||||
max_loops -= 1;
|
||||
if max_loops == 0 {
|
||||
panic!("WAL file not loaded");
|
||||
}
|
||||
}
|
||||
Ok(wal_file_shared)
|
||||
} else {
|
||||
WalFileShared::new_noop()
|
||||
if file.size()? == 0 {
|
||||
return WalFileShared::new_noop();
|
||||
}
|
||||
let wal_file_shared = sqlite3_ondisk::build_shared_wal(&file, io)?;
|
||||
turso_assert!(
|
||||
wal_file_shared
|
||||
.try_read()
|
||||
.is_some_and(|wfs| wfs.loaded.load(Ordering::Acquire)),
|
||||
"Unable to read WAL shared state"
|
||||
);
|
||||
Ok(wal_file_shared)
|
||||
}
|
||||
|
||||
pub fn is_initialized(&self) -> Result<bool> {
|
||||
|
||||
Reference in New Issue
Block a user