diff --git a/core/storage/wal.rs b/core/storage/wal.rs index e9ad03cdb..ad62aa43d 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -70,6 +70,7 @@ impl CheckpointResult { #[strum(ascii_case_insensitive)] pub enum CheckpointMode { /// Checkpoint as many frames as possible without waiting for any database readers or writers to finish, then sync the database file if all frames in the log were checkpointed. + /// Passive never blocks readers or writers, only ensures (like all modes do) that there are no other checkpointers. Passive, /// This mode blocks until there is no database writer and all readers are reading from the most recent database snapshot. It then checkpoints all frames in the log file and syncs the database file. This mode blocks new database writers while it is pending, but new database readers are allowed to continue unimpeded. Full, @@ -459,6 +460,61 @@ impl fmt::Debug for WalFile { } } +/* +* sqlite3/src/wal.c +* +** nBackfill is the number of frames in the WAL that have been written +** back into the database. (We call the act of moving content from WAL to +** database "backfilling".) The nBackfill number is never greater than +** WalIndexHdr.mxFrame. nBackfill can only be increased by threads +** holding the WAL_CKPT_LOCK lock (which includes a recovery thread). +** However, a WAL_WRITE_LOCK thread can move the value of nBackfill from +** mxFrame back to zero when the WAL is reset. +** +** nBackfillAttempted is the largest value of nBackfill that a checkpoint +** has attempted to achieve. Normally nBackfill==nBackfillAtempted, however +** the nBackfillAttempted is set before any backfilling is done and the +** nBackfill is only set after all backfilling completes. So if a checkpoint +** crashes, nBackfillAttempted might be larger than nBackfill. The +** WalIndexHdr.mxFrame must never be less than nBackfillAttempted. +** +** The aLock[] field is a set of bytes used for locking. These bytes should +** never be read or written. +** +** There is one entry in aReadMark[] for each reader lock. If a reader +** holds read-lock K, then the value in aReadMark[K] is no greater than +** the mxFrame for that reader. The value READMARK_NOT_USED (0xffffffff) +** for any aReadMark[] means that entry is unused. aReadMark[0] is +** a special case; its value is never used and it exists as a place-holder +** to avoid having to offset aReadMark[] indexes by one. Readers holding +** WAL_READ_LOCK(0) always ignore the entire WAL and read all content +** directly from the database. +** +** The value of aReadMark[K] may only be changed by a thread that +** is holding an exclusive lock on WAL_READ_LOCK(K). Thus, the value of +** aReadMark[K] cannot changed while there is a reader is using that mark +** since the reader will be holding a shared lock on WAL_READ_LOCK(K). +** +** The checkpointer may only transfer frames from WAL to database where +** the frame numbers are less than or equal to every aReadMark[] that is +** in use (that is, every aReadMark[j] for which there is a corresponding +** WAL_READ_LOCK(j)). New readers (usually) pick the aReadMark[] with the +** largest value and will increase an unused aReadMark[] to mxFrame if there +** is not already an aReadMark[] equal to mxFrame. The exception to the +** previous sentence is when nBackfill equals mxFrame (meaning that everything +** in the WAL has been backfilled into the database) then new readers +** will choose aReadMark[0] which has value 0 and hence such reader will +** get all their all content directly from the database file and ignore +** the WAL. +** +** Writers normally append new frames to the end of the WAL. However, +** if nBackfill equals mxFrame (meaning that all WAL content has been +** written back into the database) and if no readers are using the WAL +** (in other words, if there are no WAL_READ_LOCK(i) where i>0) then +** the writer will first "reset" the WAL back to the beginning and start +** writing new content beginning at frame 1. +*/ + // TODO(pere): lock only important parts + pin WalFileShared /// WalFileShared is the part of a WAL that will be shared between threads. A wal has information /// that needs to be communicated between threads so this struct does the job. @@ -479,14 +535,19 @@ pub struct WalFileShared { pub pages_in_frames: Arc>>, pub last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL pub file: Arc, - /// read_locks is a list of read locks that can coexist with the max_frame number stored in - /// value. There is a limited amount because and unbounded amount of connections could be - /// fatal. Therefore, for now we copy how SQLite behaves with limited amounts of read max - /// frames that is equal to 5 + + /// Read locks advertise the maximum WAL frame a reader may access. + /// Slot 0 is special, when it is held (shared) the reader bypasses the WAL and uses the main DB file. + /// When checkpointing, we must acquire the exclusive read lock 0 to ensure that no readers read + /// from a partially checkpointed db file. + /// Slots 1‑4 carry a frame‑number in value and may be shared by many readers. Slot 1 is the + /// default read lock and is to contain the max_frame in WAL. pub read_locks: [LimboRwLock; 5], /// There is only one write allowed in WAL mode. This lock takes care of ensuring there is only /// one used. pub write_lock: LimboRwLock, + + /// Serialises checkpointer threads, only one checkpoint can be in flight at any time. Blocking and exclusive only pub checkpoint_lock: LimboRwLock, pub loaded: AtomicBool, } @@ -514,6 +575,13 @@ enum CheckpointGuard { Read0 { ptr: Arc> }, } +/// Database checkpointers takes the following locks, in order: +/// The exclusive CHECKPOINTER lock. +/// The exclusive WRITER lock (FULL, RESTART and TRUNCATE only). +/// Exclusive lock on read-mark slots 1-N. These are immediately released after being taken. +/// Exclusive lock on read-mark 0. +/// Exclusive lock on read-mark slots 1-N again. These are immediately released after being taken (RESTART and TRUNCATE only). +/// All of the above use blocking locks. impl CheckpointGuard { fn new(ptr: Arc>, mode: CheckpointMode) -> Result { let shared = &mut unsafe { &mut *ptr.get() }; @@ -523,47 +591,52 @@ impl CheckpointGuard { return Err(LimboError::Busy); } match mode { + CheckpointMode::Full => Err(LimboError::InternalError( + "Full checkpoint mode is not yet supported".into(), + )), + // Passive mode is the only mode not requiring a write lock, as it doesn't block + // readers or writers. It acquires the checkpoint lock to ensure that no other + // concurrent checkpoint happens, and acquires the exclusive read lock 0 + // to ensure that no readers read from a partially checkpointed db file. CheckpointMode::Passive => { let read0 = &mut shared.read_locks[0]; if !read0.write() { shared.checkpoint_lock.unlock(); - tracing::trace!("CheckpointGuard::new: read0 lock failed, returning Busy"); + tracing::trace!("CheckpointGuard: read0 lock failed, returning Busy"); // for passive and full we need to hold the read0 lock return Err(LimboError::Busy); } Ok(Self::Read0 { ptr }) } CheckpointMode::Restart | CheckpointMode::Truncate => { + // like all modes, we must acquire an exclusive checkpoint lock and lock on read 0 + // to prevent a reader from reading a partially checkpointed db file. + let read0 = &mut shared.read_locks[0]; + if !read0.write() { + shared.checkpoint_lock.unlock(); + tracing::trace!("CheckpointGuard: read0 lock failed, returning Busy"); + return Err(LimboError::Busy); + } // if we are resetting the log we must hold the write lock for the duration. + // ensures no writer can append frames while we reset the log. if !shared.write_lock.write() { shared.checkpoint_lock.unlock(); - tracing::trace!("CheckpointGuard::new: read0 lock failed, returning Busy"); + read0.unlock(); + tracing::trace!("CheckpointGuard: write lock failed, returning Busy"); return Err(LimboError::Busy); } Ok(Self::Writer { ptr }) } - _ => todo!("not implemented yet"), } } } -/// macro to remove the writer lock in the event that a checkpoint errors out and would -/// otherwise leak the lock. Only used during checkpointing. -macro_rules! ensure_unlock { - ($self:ident, $fun:expr) => { - $fun.inspect_err(|e| { - tracing::error!( - "CheckpointGuard::ensure_unlock: error occurred, releasing held locks: {e}" - ); - let _ = $self.checkpoint_guard.take(); - })?; - }; -} impl Drop for CheckpointGuard { fn drop(&mut self) { match self { CheckpointGuard::Writer { ptr: shared } => unsafe { (*shared.get()).write_lock.unlock(); + (*shared.get()).read_locks[0].unlock(); (*shared.get()).checkpoint_lock.unlock(); }, CheckpointGuard::Read0 { ptr: shared } => unsafe { @@ -584,11 +657,11 @@ impl Wal for WalFile { let db_has_changed = max_frame_in_wal > self.max_frame; // Check if we can use slot 0 (bypass WAL) + // If we can obtain a shared lock on slot 0 and max_frame == nBackfills, we know every WAL frame has already + // been copied into the main database file, so we can skip mapping the WAL entirely. if max_frame_in_wal == nbackfills { let shared = self.get_shared(); let lock = &mut shared.read_locks[0]; - // if wal is fully checkpointed, we can use read mark 0 to read directly - // from the db file if lock.read() { self.max_frame_read_lock_index = 0; self.max_frame = max_frame_in_wal; @@ -668,8 +741,9 @@ impl Wal for WalFile { /// Begin a write transaction #[instrument(skip_all, level = Level::DEBUG)] fn begin_write_tx(&mut self) -> Result { - if !self.get_shared().write_lock.write() { - tracing::debug!("begin_write_transaction(busy=true)"); + let busy = !self.get_shared().write_lock.write(); + tracing::debug!("begin_write_transaction(busy={busy})"); + if busy { return Ok(LimboResult::Busy); } // If the max frame is not the same as the one in the shared state, it means another @@ -918,174 +992,10 @@ impl Wal for WalFile { "Full checkpoint mode is not implemented yet".into(), )); } - 'checkpoint_loop: loop { - let state = self.ongoing_checkpoint.state; - tracing::debug!(?state); - match state { - // Acquire the relevant exclusive lock (slot‑0 or WRITER) - // and checkpoint_lock so no other checkpointer can run. - // fsync WAL if there are unapplied frames. - // Decide the largest frame we are allowed to back‑fill. - CheckpointState::Start => { - // TODO(pere): check what frames are safe to checkpoint between many readers! - let (shared_max, nbackfills) = { - let shared = self.get_shared(); - ( - shared.max_frame.load(Ordering::SeqCst), - shared.nbackfills.load(Ordering::SeqCst), - ) - }; - let needs_backfill = shared_max > nbackfills; - if !needs_backfill && matches!(mode, CheckpointMode::Passive) { - // there are no frames to copy and we don't need to reset the log so we can - // return early success. - self.prev_checkpoint.release_guard(); // just in case we are still holding - return Ok(IOResult::Done(self.prev_checkpoint.clone())); - } - // acquire the appropriate locks depending on the checkpoint mode - self.acquire_proper_checkpoint_guard(mode)?; - self.ongoing_checkpoint.max_frame = self.determine_max_safe_checkpoint_frame(); - self.ongoing_checkpoint.min_frame = nbackfills + 1; - self.ongoing_checkpoint.current_page = 0; - self.ongoing_checkpoint.state = CheckpointState::ReadFrame; - tracing::trace!( - "checkpoint_start(min_frame={}, max_frame={})", - self.ongoing_checkpoint.min_frame, - self.ongoing_checkpoint.max_frame, - ); - } - // Find the next page that has a frame in the safe interval and - // schedule a read of that frame. - CheckpointState::ReadFrame => { - let shared = self.get_shared(); - let min_frame = self.ongoing_checkpoint.min_frame; - let max_frame = self.ongoing_checkpoint.max_frame; - let pages_in_frames = shared.pages_in_frames.clone(); - let pages_in_frames = pages_in_frames.lock(); - - let frame_cache = shared.frame_cache.clone(); - let frame_cache = frame_cache.lock(); - assert!(self.ongoing_checkpoint.current_page as usize <= pages_in_frames.len()); - if self.ongoing_checkpoint.current_page as usize == pages_in_frames.len() { - self.ongoing_checkpoint.state = CheckpointState::Done; - continue 'checkpoint_loop; - } - let page = pages_in_frames[self.ongoing_checkpoint.current_page as usize]; - let frames = frame_cache - .get(&page) - .expect("page must be in frame cache if it's in list"); - - for frame in frames.iter().rev() { - if *frame >= min_frame && *frame <= max_frame { - tracing::debug!( - "checkpoint page(state={:?}, page={}, frame={})", - state, - page, - *frame - ); - self.ongoing_checkpoint.page.get().id = page as usize; - ensure_unlock!( - self, - self.read_frame( - *frame, - self.ongoing_checkpoint.page.clone(), - self.buffer_pool.clone(), - ) - ); - self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame; - continue 'checkpoint_loop; - } - } - self.ongoing_checkpoint.current_page += 1; - } - CheckpointState::WaitReadFrame => { - if self.ongoing_checkpoint.page.is_locked() { - return Ok(IOResult::IO); - } else { - self.ongoing_checkpoint.state = CheckpointState::WritePage; - } - } - CheckpointState::WritePage => { - self.ongoing_checkpoint.page.set_dirty(); - let c = begin_write_btree_page( - pager, - &self.ongoing_checkpoint.page, - write_counter.clone(), - )?; - self.ongoing_checkpoint.state = CheckpointState::WaitWritePage; - } - CheckpointState::WaitWritePage => { - if *write_counter.borrow() > 0 { - return Ok(IOResult::IO); - } - // If page was in cache clear it. - if let Some(page) = pager.cache_get(self.ongoing_checkpoint.page.get().id) { - page.clear_dirty(); - } - self.ongoing_checkpoint.page.clear_dirty(); - let shared = self.get_shared(); - if (self.ongoing_checkpoint.current_page as usize) - < shared.pages_in_frames.lock().len() - { - self.ongoing_checkpoint.current_page += 1; - self.ongoing_checkpoint.state = CheckpointState::ReadFrame; - } else { - self.ongoing_checkpoint.state = CheckpointState::Done; - } - } - // All eligible frames copied to the db file - // Update nBackfills - // In Restart or Truncate mode, we need to restart the log over and possibly truncate the file - // Release all locks and return the current num of wal frames and the amount we backfilled - CheckpointState::Done => { - if *write_counter.borrow() > 0 { - return Ok(IOResult::IO); - } - let shared = self.get_shared(); - shared.checkpoint_lock.unlock(); - let max_frame = shared.max_frame.load(Ordering::SeqCst); - let nbackfills = shared.nbackfills.load(Ordering::SeqCst); - - let (mut checkpoint_result, everything_backfilled) = { - // Record two num pages fields to return as checkpoint result to caller. - // Ref: pnLog, pnCkpt on https://www.sqlite.org/c3ref/wal_checkpoint_v2.html - - let frames_in_wal = max_frame.saturating_sub(nbackfills); - let frames_checkpointed = self - .ongoing_checkpoint - .max_frame - .saturating_sub(self.ongoing_checkpoint.min_frame - 1); - let checkpoint_result = - CheckpointResult::new(frames_in_wal, frames_checkpointed); - let everything_backfilled = max_frame == self.ongoing_checkpoint.max_frame; - (checkpoint_result, everything_backfilled) - }; - // we will just overwrite nbackfills with 0 if we are resetting - self.get_shared() - .nbackfills - .store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst); - if everything_backfilled - && matches!(mode, CheckpointMode::Restart | CheckpointMode::Truncate) - { - ensure_unlock!(self, self.restart_log(mode)); - } - self.prev_checkpoint = checkpoint_result.clone(); - // we cannot truncate the db file here, because we are currently inside a - // mut borrow of pager.wal, and accessing the header will attempt a borrow, - // so the caller will determine if: - // a. the max frame == num wal frames - // b. the db file size != num of db pages * page_size - // and truncate the db file if necessary. - if checkpoint_result.everything_backfilled() { - checkpoint_result.maybe_guard = self.checkpoint_guard.take(); - } else { - let _ = self.checkpoint_guard.take(); - } - self.ongoing_checkpoint.state = CheckpointState::Start; - return Ok(IOResult::Done(checkpoint_result)); - } - } - } + self.checkpoint_inner(pager, write_counter, mode) + .inspect_err(|_| { + let _ = self.checkpoint_guard.take(); + }) } #[instrument(err, skip_all, level = Level::DEBUG)] @@ -1257,7 +1167,181 @@ impl WalFile { self.syncing.set(false); } - // Coordinate what the maximum safe frame is for us to backfill when checkpointing. + fn checkpoint_inner( + &mut self, + pager: &Pager, + write_counter: Rc>, + mode: CheckpointMode, + ) -> Result> { + 'checkpoint_loop: loop { + let state = self.ongoing_checkpoint.state; + tracing::debug!(?state); + match state { + // Acquire the relevant exclusive locks and checkpoint_lock + // so no other checkpointer can run. fsync WAL if there are unapplied frames. + // Decide the largest frame we are allowed to back‑fill. + CheckpointState::Start => { + let (max_frame, nbackfills) = { + let shared = self.get_shared(); + let max_frame = shared.max_frame.load(Ordering::SeqCst); + let n_backfills = shared.nbackfills.load(Ordering::SeqCst); + (max_frame, n_backfills) + }; + let needs_backfill = max_frame > nbackfills; + + if !needs_backfill && matches!(mode, CheckpointMode::Passive) { + // there are no frames to copy over and we don't need to reset + // the log so we can return early success. + return Ok(IOResult::Done(self.prev_checkpoint.clone())); + } + // acquire either the read0 or write lock depending on the checkpoint mode + self.acquire_proper_checkpoint_guard(mode)?; + self.ongoing_checkpoint.max_frame = self.determine_max_safe_checkpoint_frame(); + self.ongoing_checkpoint.min_frame = nbackfills + 1; + self.ongoing_checkpoint.current_page = 0; + self.ongoing_checkpoint.state = CheckpointState::ReadFrame; + tracing::trace!( + "checkpoint_start(min_frame={}, max_frame={})", + self.ongoing_checkpoint.min_frame, + self.ongoing_checkpoint.max_frame, + ); + } + // Find the next page that has a frame in the safe interval and + // schedule a read of that frame. + CheckpointState::ReadFrame => { + let shared = self.get_shared(); + let min_frame = self.ongoing_checkpoint.min_frame; + let max_frame = self.ongoing_checkpoint.max_frame; + let pages_in_frames = shared.pages_in_frames.clone(); + let pages_in_frames = pages_in_frames.lock(); + + let frame_cache = shared.frame_cache.clone(); + let frame_cache = frame_cache.lock(); + assert!(self.ongoing_checkpoint.current_page as usize <= pages_in_frames.len()); + if self.ongoing_checkpoint.current_page as usize == pages_in_frames.len() { + self.ongoing_checkpoint.state = CheckpointState::Done; + continue 'checkpoint_loop; + } + let page = pages_in_frames[self.ongoing_checkpoint.current_page as usize]; + let frames = frame_cache + .get(&page) + .expect("page must be in frame cache if it's in list"); + + for frame in frames.iter().rev() { + if *frame >= min_frame && *frame <= max_frame { + tracing::debug!( + "checkpoint page(state={:?}, page={}, frame={})", + state, + page, + *frame + ); + self.ongoing_checkpoint.page.get().id = page as usize; + self.read_frame( + *frame, + self.ongoing_checkpoint.page.clone(), + self.buffer_pool.clone(), + )?; + self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame; + continue 'checkpoint_loop; + } + } + self.ongoing_checkpoint.current_page += 1; + } + CheckpointState::WaitReadFrame => { + if self.ongoing_checkpoint.page.is_locked() { + return Ok(IOResult::IO); + } else { + self.ongoing_checkpoint.state = CheckpointState::WritePage; + } + } + CheckpointState::WritePage => { + self.ongoing_checkpoint.page.set_dirty(); + begin_write_btree_page( + pager, + &self.ongoing_checkpoint.page, + write_counter.clone(), + )?; + self.ongoing_checkpoint.state = CheckpointState::WaitWritePage; + } + CheckpointState::WaitWritePage => { + if *write_counter.borrow() > 0 { + return Ok(IOResult::IO); + } + // If page was in cache clear it. + if let Some(page) = pager.cache_get(self.ongoing_checkpoint.page.get().id) { + page.clear_dirty(); + } + self.ongoing_checkpoint.page.clear_dirty(); + let shared = self.get_shared(); + if (self.ongoing_checkpoint.current_page as usize) + < shared.pages_in_frames.lock().len() + { + self.ongoing_checkpoint.current_page += 1; + self.ongoing_checkpoint.state = CheckpointState::ReadFrame; + } else { + self.ongoing_checkpoint.state = CheckpointState::Done; + } + } + // All eligible frames copied to the db file + // Update nBackfills + // In Restart or Truncate mode, we need to restart the log over and possibly truncate the file + // Release all locks and return the current num of wal frames and the amount we backfilled + CheckpointState::Done => { + if *write_counter.borrow() > 0 { + return Ok(IOResult::IO); + } + let (mut checkpoint_result, everything_backfilled) = { + let shared = self.get_shared(); + let current_mx = shared.max_frame.load(Ordering::SeqCst); + let nbackfills = shared.nbackfills.load(Ordering::SeqCst); + + let frames_in_wal = current_mx.saturating_sub(nbackfills); + let frames_checkpointed = current_mx.saturating_sub(self.ongoing_checkpoint.min_frame - 1); + // Record two num pages fields to return as checkpoint result to caller. + // Ref: pnLog, pnCkpt on https://www.sqlite.org/c3ref/wal_checkpoint_v2.html + ( + CheckpointResult::new(frames_in_wal, frames_checkpointed), + // if the current max frame that we read while holding checkpoint_lock is equal to the max frame of the + // ongoing checkpoint, it means that we have backfilled everything + current_mx == self.ongoing_checkpoint.max_frame, + ) + }; + // we will just overwrite nbackfills with 0 if we are resetting + self.get_shared() + .nbackfills + .store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst); + + if everything_backfilled + && matches!(mode, CheckpointMode::Restart | CheckpointMode::Truncate) + { + self.restart_log(mode)?; + } + + // store a copy of the checkpoint result to return in the future if pragma + // wal_checkpoint is called and we haven't backfilled again since. + self.prev_checkpoint = checkpoint_result.clone(); + + // we cannot truncate the db file here because we are currently inside a + // mut borrow of pager.wal, and accessing the header will attempt a borrow + // during 'read_page', so the caller will use the result to determine if: + // a. the max frame == num wal frames (everything backfilled) + // b. the physical db file size differs from the expected pages * page_size + // and truncate + sync the db file if necessary. + if checkpoint_result.everything_backfilled() { + checkpoint_result.maybe_guard = self.checkpoint_guard.take(); + } else { + let _ = self.checkpoint_guard.take(); + } + self.ongoing_checkpoint.state = CheckpointState::Start; + return Ok(IOResult::Done(checkpoint_result)); + } + } + } + } + + /// Coordinate what the maximum safe frame is for us to backfill when checkpointing. + /// We can never backfill a frame with a higher number than any reader's max frame, + /// because we might overwrite content the reader is reading from the database file. fn determine_max_safe_checkpoint_frame(&self) -> u64 { let shared = self.get_shared(); let mut max_safe_frame = shared.max_frame.load(Ordering::SeqCst); @@ -1269,6 +1353,8 @@ impl WalFile { if this_mark < max_safe_frame as u32 { let busy = !read_lock.write(); if !busy { + // readmark 1 is the default reader, and should always contain the max safe + // frame for new readers so we bump it up to the current max frame instead of clearing it. let new_mark = if read_lock_idx == 1 { max_safe_frame as u32 } else { @@ -1310,6 +1396,7 @@ impl WalFile { // Reader is active, cannot proceed return Err(LimboError::Busy); } + // after the log is reset, we must set all secondary marks to READMARK_NOT_USED so the next reader selects a fresh slot lock.value.store(READMARK_NOT_USED, Ordering::SeqCst); } } @@ -1338,22 +1425,32 @@ impl WalFile { tracing::trace!("WAL file truncated to 0 B"); }); let shared = self.get_shared(); - shared.file.truncate(0, c.clone()).inspect_err(|e| { - handle_err(e); - })?; + // for now at least, lets do all this IO syncronously + let c = shared.file.truncate(0, c.clone()).inspect_err(handle_err)?; + self.io.wait_for_completion(c).inspect_err(handle_err)?; + let hdr = shared.wal_header.lock(); // sqlite just lets the next writer create it when the first frame is written. // we can write the new header here for simplicity. - sqlite3_ondisk::begin_write_wal_header(&shared.file, &hdr).inspect_err(|e| { - handle_err(e); - })?; - let c = Completion::new_sync(|_| { - tracing::trace!("WAL file synced after truncation"); - }); - // fsync after truncation - shared.file.sync(c).inspect_err(|e| { - handle_err(e); - })?; + self.io + .wait_for_completion( + sqlite3_ondisk::begin_write_wal_header(&shared.file, &hdr) + .inspect_err(handle_err)?, + ) + .inspect_err(handle_err)?; + self.io + .wait_for_completion( + shared + .file + .sync( + Completion::new_sync(|_| { + tracing::trace!("WAL file synced after reset/truncation"); + }) + .into(), + ) + .inspect_err(handle_err)?, + ) + .inspect_err(handle_err)?; } // release read‑locks 1..4 @@ -1463,7 +1560,10 @@ impl WalFileShared { nreads: AtomicU32::new(0), value: AtomicU32::new(READMARK_NOT_USED), }); - // slots 0 and 1 begin at 0 + + // slot zero is always zero as it signifies that reads can be done from the db file + // directly, and slot 1 is the default read mark containing the max frame. in this case + // our max frame is zero so both slots 0 and 1 begin at 0 read_locks[0].value.store(0, Ordering::SeqCst); read_locks[1].value.store(0, Ordering::SeqCst); @@ -1508,6 +1608,10 @@ impl WalFileShared { /// client to write to the database (which may be this one) does so by /// writing frames into the start of the log file. fn restart_wal_header(&mut self, io: &Arc, mode: CheckpointMode) -> Result<()> { + turso_assert!( + matches!(mode, CheckpointMode::Restart | CheckpointMode::Truncate), + "CheckpointMode must be Restart or Truncate" + ); // bump checkpoint sequence let mut hdr = self.wal_header.lock(); hdr.checkpoint_seq = hdr.checkpoint_seq.wrapping_add(1); @@ -1530,7 +1634,7 @@ impl WalFileShared { // for RESTART, we write a new header to the WAL file. truncate will simply // write it in memory and let the following writer append it to the empty WAL file - if !matches!(mode, CheckpointMode::Truncate) { + if matches!(mode, CheckpointMode::Restart) { // if we are truncating the WAL, we don't bother writing a new header let c = sqlite3_ondisk::begin_write_wal_header(&self.file, &hdr)?; io.wait_for_completion(c)?; @@ -1768,11 +1872,20 @@ pub mod test { conn1.pager.borrow_mut().cacheflush().unwrap(); // Force a read transaction that will freeze a lower read mark - { + let readmark = { let pager = conn2.pager.borrow_mut(); let mut wal2 = pager.wal.borrow_mut(); +<<<<<<< HEAD assert!(matches!(wal2.begin_read_tx().unwrap().0, LimboResult::Ok)); } +||||||| parent of 27000501 (Apply suggestions/fixes and add extensive comments to wal chkpt) + assert!(matches!(wal2.begin_read_tx().unwrap(), LimboResult::Ok)); + } +======= + assert!(matches!(wal2.begin_read_tx().unwrap(), LimboResult::Ok)); + wal2.get_max_frame() + }; +>>>>>>> 27000501 (Apply suggestions/fixes and add extensive comments to wal chkpt) // generate more frames that the reader will not see. bulk_inserts(&conn1.clone(), 15, 2); @@ -1797,7 +1910,10 @@ pub mod test { res1.num_checkpointed_frames, res1.num_wal_frames ); - + assert_eq!( + res1.num_checkpointed_frames, readmark, + "Checkpointed frames should match read mark" + ); // Release reader { let pager = conn2.pager.borrow_mut(); @@ -1839,16 +1955,13 @@ pub mod test { Ok(IOResult::IO) => { conn1.run_once().unwrap(); } - Ok(IOResult::Done(result)) => { - assert_eq!( - result.num_checkpointed_frames, 0, - "Should not checkpoint any frames (empty)" + e => { + assert!( + matches!(e, Err(LimboError::Busy)), + "reader is holding readmark0 we should return Busy" ); break; } - Err(e) => { - panic!("Checkpoint failed: {e}"); - } } } drop(w); @@ -1875,7 +1988,10 @@ pub mod test { panic!("Checkpoint should not have succeeded"); } Err(e) => { - assert!(matches!(e, LimboError::Busy), "should block readers"); + assert!( + matches!(e, LimboError::Busy), + "should return busy if we have readers" + ); break; } } @@ -1935,11 +2051,12 @@ pub mod test { let conn_r2 = db.connect().unwrap(); // R1 starts reading - { + let r1_max_frame = { let pager = conn_r1.pager.borrow_mut(); let mut wal = pager.wal.borrow_mut(); assert!(matches!(wal.begin_read_tx().unwrap(), LimboResult::Ok)); - } + wal.get_max_frame() + }; bulk_inserts(&conn_writer, 5, 10); // R2 starts reading, sees more frames than R1 @@ -1961,6 +2078,10 @@ pub mod test { checkpoint_result.num_checkpointed_frames < checkpoint_result.num_wal_frames, "Should not checkpoint all frames when readers are active" ); + assert_eq!( + checkpoint_result.num_checkpointed_frames, r1_max_frame, + "Should have checkpointed up to R1's max frame" + ); // Verify R2 still sees its frames assert_eq!(