diff --git a/core/result.rs b/core/result.rs index 3056528ce..a13754b32 100644 --- a/core/result.rs +++ b/core/result.rs @@ -1,4 +1,5 @@ /// Common results that different functions can return in limbo. +#[derive(Debug)] pub enum LimboResult { /// Couldn't acquire a lock Busy, diff --git a/core/storage/wal.rs b/core/storage/wal.rs index c8b5a8876..e8b87a49d 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -39,6 +39,8 @@ pub const NO_LOCK: u32 = 0; pub const SHARED_LOCK: u32 = 1; pub const WRITE_LOCK: u32 = 2; +const NO_LOCK_HELD: usize = usize::MAX; + #[derive(Debug, Clone, Default)] pub struct CheckpointResult { /// number of frames in WAL @@ -427,10 +429,11 @@ pub struct WalFile { shared: Arc>, ongoing_checkpoint: OngoingCheckpoint, checkpoint_threshold: usize, + has_snapshot: Cell, // min and max frames for this connection /// This is the index to the read_lock in WalFileShared that we are holding. This lock contains /// the max frame for this connection. - max_frame_read_lock_index: usize, + max_frame_read_lock_index: Cell, /// Max frame allowed to lookup range=(minframe..max_frame) max_frame: u64, /// Start of range to look for frames range=(minframe..max_frame) @@ -662,22 +665,31 @@ impl Wal for WalFile { fn begin_read_tx(&mut self) -> Result<(LimboResult, bool)> { let max_frame_in_wal = self.get_shared().max_frame.load(Ordering::SeqCst); let nbackfills = self.get_shared().nbackfills.load(Ordering::SeqCst); - let db_has_changed = max_frame_in_wal > self.max_frame; - // Check if we can use slot 0 (bypass WAL) - // If we can obtain a shared lock on slot 0 and max_frame == nBackfills, we know every WAL frame has already - // been copied into the main database file, so we can skip mapping the WAL entirely. + // WAL is already fully back‑filled into the main DB image + // (mxFrame == nBackfill). Readers can therefore ignore the + // WAL and fetch pages directly from the DB file. We do this + // by taking read‑lock 0. if max_frame_in_wal == nbackfills { - let shared = self.get_shared(); - let lock = &mut shared.read_locks[0]; - if lock.read() { - self.max_frame_read_lock_index = 0; - self.max_frame = max_frame_in_wal; - self.min_frame = max_frame_in_wal + 1; // Ignore WAL; - return Ok((LimboResult::Ok, db_has_changed)); + let lock0 = &mut self.get_shared().read_locks[0]; + if !lock0.read() { + return Ok((LimboResult::Busy, db_has_changed)); } + self.max_frame = max_frame_in_wal; + // we need to keep self.max_frame set to the appropriate + // max frame in the wal at the time this transaction starts. + // but here we set min_frame=max_frame + 1 to keep an empty snapshot window, + // to demonstrate that we do not care about any frames, + // while still capturing a snapshot that we may need if we ever want to upgrade + // to a write transaction. + self.min_frame = max_frame_in_wal + 1; + self.max_frame_read_lock_index.set(0); + self.has_snapshot.set(true); + self.last_checksum = self.get_shared().last_checksum; + return Ok((LimboResult::Ok, db_has_changed)); } + let mut max_read_mark_index = -1; let mut max_read_mark = 0; // Find the largest mark we can find, ignore frames that are impossible to be in range and @@ -724,15 +736,17 @@ impl Wal for WalFile { ) }; self.min_frame = min_frame; - self.max_frame_read_lock_index = max_read_mark_index as usize; + self.max_frame_read_lock_index + .set(max_read_mark_index as usize); self.max_frame = max_read_mark as u64; self.last_checksum = last_checksum; + self.has_snapshot.set(true); self.start_pages_in_frames = start_pages_in_frames; tracing::debug!( "begin_read_tx(min_frame={}, max_frame={}, lock={}, max_frame_in_wal={})", self.min_frame, self.max_frame, - self.max_frame_read_lock_index, + max_read_mark_index, max_frame_in_wal ); Ok((LimboResult::Ok, db_has_changed)) @@ -742,9 +756,18 @@ impl Wal for WalFile { #[inline(always)] #[instrument(skip_all, level = Level::DEBUG)] fn end_read_tx(&self) { - tracing::debug!("end_read_tx(lock={})", self.max_frame_read_lock_index); - let read_lock = &mut self.get_shared().read_locks[self.max_frame_read_lock_index]; - read_lock.unlock(); + let held = self.max_frame_read_lock_index.get(); + turso_assert!( + held != NO_LOCK_HELD, + "We must have a read lock held to end a read transaction" + ); + tracing::debug!("end_read_tx(lock={})", held); + { + let read_lock = &mut self.get_shared().read_locks[held]; + read_lock.unlock(); + } + self.has_snapshot.set(false); + self.max_frame_read_lock_index.set(NO_LOCK_HELD); } /// Begin a write transaction @@ -753,16 +776,24 @@ impl Wal for WalFile { if !self.get_shared().write_lock.write() { return Ok(LimboResult::Busy); } - // If the max frame is not the same as the one in the shared state, it means another - // transaction wrote to the WAL after we started our read transaction. This means our - // snapshot is not consistent with the one in the shared state and we need to start another - // one. - let shared = self.get_shared(); - if self.max_frame != shared.max_frame.load(Ordering::SeqCst) { - shared.write_lock.unlock(); - return Ok(LimboResult::Busy); + let shared_max = self.get_shared().max_frame.load(Ordering::SeqCst); + + // If we have a snapshot and self.max_frame == shared.max_frame, + // then the snapshot is still valid and it's safe to promote to write tx. + // It is also valid if we do not yet have a snapshot. + if !self.has_snapshot.get() || self.max_frame == shared_max { + // Both cases mean we can safely use the shared state. + self.max_frame = shared_max; + self.last_checksum = self.get_shared().last_checksum; + self.min_frame = self.get_shared().nbackfills.load(Ordering::SeqCst) + 1; + self.has_snapshot.set(true); + return Ok(LimboResult::Ok); } - Ok(LimboResult::Ok) + // Otherwise, another transaction wrote to the WAL after we started our read transaction. + // This means our snapshot is not consistent with the one in the shared state and we need to start over. + let shared = self.get_shared(); + shared.write_lock.unlock(); + return Ok(LimboResult::Busy); } /// End a write transaction @@ -1118,12 +1149,13 @@ impl WalFile { max_frame: 0, current_page: 0, }, + has_snapshot: false.into(), checkpoint_threshold: 1000, buffer_pool, syncing: Rc::new(Cell::new(false)), sync_state: Cell::new(SyncState::NotSyncing), min_frame: 0, - max_frame_read_lock_index: 0, + max_frame_read_lock_index: NO_LOCK_HELD.into(), last_checksum, prev_checkpoint: CheckpointResult::default(), checkpoint_guard: None, @@ -1313,15 +1345,18 @@ impl WalFile { current_mx == self.ongoing_checkpoint.max_frame, ) }; + // we will just overwrite nbackfills with 0 if we are resetting self.get_shared() .nbackfills .store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst); - if everything_backfilled - && matches!(mode, CheckpointMode::Restart | CheckpointMode::Truncate) - { - self.restart_log(mode)?; + if matches!(mode, CheckpointMode::Restart | CheckpointMode::Truncate) { + if everything_backfilled { + self.restart_log(mode)?; + } else { + return Err(LimboError::Busy); + } } // store a copy of the checkpoint result to return in the future if pragma @@ -2145,7 +2180,8 @@ pub mod test { { let pager = conn2.pager.borrow_mut(); let mut wal = pager.wal.borrow_mut(); - assert!(matches!(wal.begin_write_tx().unwrap(), LimboResult::Ok)); + let res = wal.begin_write_tx().unwrap(); + assert!(matches!(res, LimboResult::Ok), "result: {res:?}"); } // should fail because writer lock is held