From 8f103f7c351f801c1418ff74afdd191546fce6dd Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 3 Oct 2025 12:36:38 +0200 Subject: [PATCH] core/wal: introduce transaction_count, same as iChange in sqlite --- core/storage/sqlite3_ondisk.rs | 1 + core/storage/wal.rs | 53 ++++++++++++++++++++++------------ 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 95f1be000..2611d0f4a 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1619,6 +1619,7 @@ pub fn build_shared_wal( min_frame: AtomicU64::new(0), max_frame: AtomicU64::new(0), nbackfills: AtomicU64::new(0), + transaction_count: AtomicU64::new(0), frame_cache: Arc::new(SpinLock::new(HashMap::new())), last_checksum: (0, 0), file: Some(file.clone()), diff --git a/core/storage/wal.rs b/core/storage/wal.rs index bbba04f92..005b0139c 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -577,6 +577,7 @@ pub struct WalFile { /// Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL last_checksum: (u32, u32), checkpoint_seq: AtomicU32, + transaction_count: AtomicU64, /// Count of possible pages to checkpoint, and number of backfilled prev_checkpoint: CheckpointResult, @@ -670,6 +671,7 @@ pub struct WalFileShared { pub min_frame: AtomicU64, pub max_frame: AtomicU64, pub nbackfills: AtomicU64, + pub transaction_count: AtomicU64, // Frame cache maps a Page to all the frames it has stored in WAL in ascending order. // This is to easily find the frame it must checkpoint each connection if a checkpoint is // necessary. @@ -819,17 +821,16 @@ impl Wal for WalFile { self.max_frame_read_lock_index.load(Ordering::Acquire), NO_LOCK_HELD ); - let (shared_max, nbackfills, last_checksum, checkpoint_seq) = { + let (shared_max, nbackfills, last_checksum, checkpoint_seq, transaction_count) = { let shared = self.get_shared(); let mx = shared.max_frame.load(Ordering::Acquire); let nb = shared.nbackfills.load(Ordering::Acquire); let ck = shared.last_checksum; let checkpoint_seq = shared.wal_header.lock().checkpoint_seq; - (mx, nb, ck, checkpoint_seq) + let transaction_count = shared.transaction_count.load(Ordering::Acquire); + (mx, nb, ck, checkpoint_seq, transaction_count) }; - let db_changed = shared_max != self.max_frame.load(Ordering::Acquire) - || last_checksum != self.last_checksum - || checkpoint_seq != self.checkpoint_seq.load(Ordering::Acquire); + let db_changed = self.db_changed(&self.get_shared()); // WAL is already fully back‑filled into the main DB image // (mxFrame == nBackfill). Readers can therefore ignore the @@ -850,6 +851,8 @@ impl Wal for WalFile { self.min_frame.store(nbackfills + 1, Ordering::Release); self.last_checksum = last_checksum; self.checkpoint_seq.store(checkpoint_seq, Ordering::Release); + self.transaction_count + .store(transaction_count, Ordering::Release); return Ok(db_changed); } @@ -945,6 +948,8 @@ impl Wal for WalFile { .store(best_idx as usize, Ordering::Release); self.last_checksum = last_checksum; self.checkpoint_seq.store(checkpoint_seq, Ordering::Release); + self.transaction_count + .store(transaction_count, Ordering::Release); tracing::debug!( "begin_read_tx(min={}, max={}, slot={}, max_frame_in_wal={})", self.min_frame.load(Ordering::Acquire), @@ -986,26 +991,14 @@ impl Wal for WalFile { if !shared.write_lock.write() { return Err(LimboError::Busy); } - let (shared_max, nbackfills, last_checksum, checkpoint_seq) = { - let mx = shared.max_frame.load(Ordering::Acquire); - let nb = shared.nbackfills.load(Ordering::Acquire); - let ck = shared.last_checksum; - let checkpoint_seq = shared.wal_header.lock().checkpoint_seq; - (mx, nb, ck, checkpoint_seq) - }; - let db_changed = shared_max != self.max_frame.load(Ordering::Acquire) - || last_checksum != self.last_checksum - || checkpoint_seq != self.checkpoint_seq.load(Ordering::Acquire); + let db_changed = self.db_changed(&shared); if !db_changed { - // Snapshot still valid; adopt counters drop(shared); - self.last_checksum = last_checksum; - self.min_frame.store(nbackfills + 1, Ordering::Release); return Ok(()); } // Snapshot is stale, give up and let caller retry from scratch - tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame.load(Ordering::Acquire), shared_max); + tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame.load(Ordering::Acquire), shared.max_frame.load(Ordering::Acquire)); shared.write_lock.unlock(); Err(LimboError::Busy) } @@ -1386,6 +1379,11 @@ impl Wal for WalFile { .store(self.max_frame.load(Ordering::Acquire), Ordering::Release); tracing::trace!(max_frame = self.max_frame.load(Ordering::Acquire), ?self.last_checksum); shared.last_checksum = self.last_checksum; + self.transaction_count.fetch_add(1, Ordering::Release); + shared.transaction_count.store( + self.transaction_count.load(Ordering::Acquire), + Ordering::Release, + ); Ok(()) } @@ -1636,6 +1634,7 @@ impl WalFile { checkpoint_seq: AtomicU32::new(0), syncing: Arc::new(AtomicBool::new(false)), min_frame: AtomicU64::new(0), + transaction_count: AtomicU64::new(0), max_frame_read_lock_index: AtomicUsize::new(NO_LOCK_HELD), last_checksum, prev_checkpoint: CheckpointResult::default(), @@ -2260,6 +2259,20 @@ impl WalFile { buf: buf_slot, }) } + + fn db_changed(&self, shared: &WalFileShared) -> bool { + let shared_max = shared.max_frame.load(Ordering::Acquire); + let nbackfills = shared.nbackfills.load(Ordering::Acquire); + let last_checksum = shared.last_checksum; + let checkpoint_seq = shared.wal_header.lock().checkpoint_seq; + let transaction_count = shared.transaction_count.load(Ordering::Acquire); + + shared_max != self.max_frame.load(Ordering::Acquire) + || last_checksum != self.last_checksum + || checkpoint_seq != self.checkpoint_seq.load(Ordering::Acquire) + || transaction_count != self.transaction_count.load(Ordering::Acquire) + || nbackfills + 1 != self.min_frame.load(Ordering::Acquire) + } } impl WalFileShared { @@ -2308,6 +2321,7 @@ impl WalFileShared { min_frame: AtomicU64::new(0), max_frame: AtomicU64::new(0), nbackfills: AtomicU64::new(0), + transaction_count: AtomicU64::new(0), frame_cache: Arc::new(SpinLock::new(HashMap::new())), last_checksum: (0, 0), file: None, @@ -2352,6 +2366,7 @@ impl WalFileShared { min_frame: AtomicU64::new(0), max_frame: AtomicU64::new(0), nbackfills: AtomicU64::new(0), + transaction_count: AtomicU64::new(0), frame_cache: Arc::new(SpinLock::new(HashMap::new())), last_checksum: (0, 0), file: Some(file),