Fix transaction read0 shortcut in WAL and track whether we have snapshot

This commit is contained in:
PThorpe92
2025-07-23 22:41:30 -04:00
committed by Jussi Saurio
parent ff1987a45c
commit 7640535ba4
2 changed files with 69 additions and 32 deletions

View File

@@ -1,4 +1,5 @@
/// Common results that different functions can return in limbo.
#[derive(Debug)]
pub enum LimboResult {
/// Couldn't acquire a lock
Busy,

View File

@@ -39,6 +39,8 @@ pub const NO_LOCK: u32 = 0;
pub const SHARED_LOCK: u32 = 1;
pub const WRITE_LOCK: u32 = 2;
const NO_LOCK_HELD: usize = usize::MAX;
#[derive(Debug, Clone, Default)]
pub struct CheckpointResult {
/// number of frames in WAL
@@ -427,10 +429,11 @@ pub struct WalFile {
shared: Arc<UnsafeCell<WalFileShared>>,
ongoing_checkpoint: OngoingCheckpoint,
checkpoint_threshold: usize,
has_snapshot: Cell<bool>,
// min and max frames for this connection
/// This is the index to the read_lock in WalFileShared that we are holding. This lock contains
/// the max frame for this connection.
max_frame_read_lock_index: usize,
max_frame_read_lock_index: Cell<usize>,
/// Max frame allowed to lookup range=(minframe..max_frame)
max_frame: u64,
/// Start of range to look for frames range=(minframe..max_frame)
@@ -662,22 +665,31 @@ impl Wal for WalFile {
fn begin_read_tx(&mut self) -> Result<(LimboResult, bool)> {
let max_frame_in_wal = self.get_shared().max_frame.load(Ordering::SeqCst);
let nbackfills = self.get_shared().nbackfills.load(Ordering::SeqCst);
let db_has_changed = max_frame_in_wal > self.max_frame;
// Check if we can use slot 0 (bypass WAL)
// If we can obtain a shared lock on slot0 and max_frame == nBackfills, we know every WAL frame has already
// been copied into the main database file, so we can skip mapping the WAL entirely.
// WAL is already fully backfilled into the main DB image
// (mxFrame == nBackfill). Readers can therefore ignore the
// WAL and fetch pages directly from the DB file. We do this
// by taking readlock 0.
if max_frame_in_wal == nbackfills {
let shared = self.get_shared();
let lock = &mut shared.read_locks[0];
if lock.read() {
self.max_frame_read_lock_index = 0;
self.max_frame = max_frame_in_wal;
self.min_frame = max_frame_in_wal + 1; // Ignore WAL;
return Ok((LimboResult::Ok, db_has_changed));
let lock0 = &mut self.get_shared().read_locks[0];
if !lock0.read() {
return Ok((LimboResult::Busy, db_has_changed));
}
self.max_frame = max_frame_in_wal;
// we need to keep self.max_frame set to the appropriate
// max frame in the wal at the time this transaction starts.
// but here we set min_frame=max_frame + 1 to keep an empty snapshot window,
// to demonstrate that we do not care about any frames,
// while still capturing a snapshot that we may need if we ever want to upgrade
// to a write transaction.
self.min_frame = max_frame_in_wal + 1;
self.max_frame_read_lock_index.set(0);
self.has_snapshot.set(true);
self.last_checksum = self.get_shared().last_checksum;
return Ok((LimboResult::Ok, db_has_changed));
}
let mut max_read_mark_index = -1;
let mut max_read_mark = 0;
// Find the largest mark we can find, ignore frames that are impossible to be in range and
@@ -724,15 +736,17 @@ impl Wal for WalFile {
)
};
self.min_frame = min_frame;
self.max_frame_read_lock_index = max_read_mark_index as usize;
self.max_frame_read_lock_index
.set(max_read_mark_index as usize);
self.max_frame = max_read_mark as u64;
self.last_checksum = last_checksum;
self.has_snapshot.set(true);
self.start_pages_in_frames = start_pages_in_frames;
tracing::debug!(
"begin_read_tx(min_frame={}, max_frame={}, lock={}, max_frame_in_wal={})",
self.min_frame,
self.max_frame,
self.max_frame_read_lock_index,
max_read_mark_index,
max_frame_in_wal
);
Ok((LimboResult::Ok, db_has_changed))
@@ -742,9 +756,18 @@ impl Wal for WalFile {
#[inline(always)]
#[instrument(skip_all, level = Level::DEBUG)]
fn end_read_tx(&self) {
tracing::debug!("end_read_tx(lock={})", self.max_frame_read_lock_index);
let read_lock = &mut self.get_shared().read_locks[self.max_frame_read_lock_index];
read_lock.unlock();
let held = self.max_frame_read_lock_index.get();
turso_assert!(
held != NO_LOCK_HELD,
"We must have a read lock held to end a read transaction"
);
tracing::debug!("end_read_tx(lock={})", held);
{
let read_lock = &mut self.get_shared().read_locks[held];
read_lock.unlock();
}
self.has_snapshot.set(false);
self.max_frame_read_lock_index.set(NO_LOCK_HELD);
}
/// Begin a write transaction
@@ -753,16 +776,24 @@ impl Wal for WalFile {
if !self.get_shared().write_lock.write() {
return Ok(LimboResult::Busy);
}
// If the max frame is not the same as the one in the shared state, it means another
// transaction wrote to the WAL after we started our read transaction. This means our
// snapshot is not consistent with the one in the shared state and we need to start another
// one.
let shared = self.get_shared();
if self.max_frame != shared.max_frame.load(Ordering::SeqCst) {
shared.write_lock.unlock();
return Ok(LimboResult::Busy);
let shared_max = self.get_shared().max_frame.load(Ordering::SeqCst);
// If we have a snapshot and self.max_frame == shared.max_frame,
// then the snapshot is still valid and it's safe to promote to write tx.
// It is also valid if we do not yet have a snapshot.
if !self.has_snapshot.get() || self.max_frame == shared_max {
// Both cases mean we can safely use the shared state.
self.max_frame = shared_max;
self.last_checksum = self.get_shared().last_checksum;
self.min_frame = self.get_shared().nbackfills.load(Ordering::SeqCst) + 1;
self.has_snapshot.set(true);
return Ok(LimboResult::Ok);
}
Ok(LimboResult::Ok)
// Otherwise, another transaction wrote to the WAL after we started our read transaction.
// This means our snapshot is not consistent with the one in the shared state and we need to start over.
let shared = self.get_shared();
shared.write_lock.unlock();
return Ok(LimboResult::Busy);
}
/// End a write transaction
@@ -1118,12 +1149,13 @@ impl WalFile {
max_frame: 0,
current_page: 0,
},
has_snapshot: false.into(),
checkpoint_threshold: 1000,
buffer_pool,
syncing: Rc::new(Cell::new(false)),
sync_state: Cell::new(SyncState::NotSyncing),
min_frame: 0,
max_frame_read_lock_index: 0,
max_frame_read_lock_index: NO_LOCK_HELD.into(),
last_checksum,
prev_checkpoint: CheckpointResult::default(),
checkpoint_guard: None,
@@ -1313,15 +1345,18 @@ impl WalFile {
current_mx == self.ongoing_checkpoint.max_frame,
)
};
// we will just overwrite nbackfills with 0 if we are resetting
self.get_shared()
.nbackfills
.store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst);
if everything_backfilled
&& matches!(mode, CheckpointMode::Restart | CheckpointMode::Truncate)
{
self.restart_log(mode)?;
if matches!(mode, CheckpointMode::Restart | CheckpointMode::Truncate) {
if everything_backfilled {
self.restart_log(mode)?;
} else {
return Err(LimboError::Busy);
}
}
// store a copy of the checkpoint result to return in the future if pragma
@@ -2145,7 +2180,8 @@ pub mod test {
{
let pager = conn2.pager.borrow_mut();
let mut wal = pager.wal.borrow_mut();
assert!(matches!(wal.begin_write_tx().unwrap(), LimboResult::Ok));
let res = wal.begin_write_tx().unwrap();
assert!(matches!(res, LimboResult::Ok), "result: {res:?}");
}
// should fail because writer lock is held