core/wal: introduce transaction_count, same as iChange in sqlite

This commit is contained in:
Pere Diaz Bou
2025-10-03 12:36:38 +02:00
parent c98bf9b593
commit 8f103f7c35
2 changed files with 35 additions and 19 deletions

View File

@@ -1619,6 +1619,7 @@ pub fn build_shared_wal(
min_frame: AtomicU64::new(0),
max_frame: AtomicU64::new(0),
nbackfills: AtomicU64::new(0),
transaction_count: AtomicU64::new(0),
frame_cache: Arc::new(SpinLock::new(HashMap::new())),
last_checksum: (0, 0),
file: Some(file.clone()),

View File

@@ -577,6 +577,7 @@ pub struct WalFile {
/// Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL
last_checksum: (u32, u32),
checkpoint_seq: AtomicU32,
transaction_count: AtomicU64,
/// Count of possible pages to checkpoint, and number of backfilled
prev_checkpoint: CheckpointResult,
@@ -670,6 +671,7 @@ pub struct WalFileShared {
pub min_frame: AtomicU64,
pub max_frame: AtomicU64,
pub nbackfills: AtomicU64,
pub transaction_count: AtomicU64,
// Frame cache maps a Page to all the frames it has stored in WAL in ascending order.
// This is to easily find the frame it must checkpoint each connection if a checkpoint is
// necessary.
@@ -819,17 +821,16 @@ impl Wal for WalFile {
self.max_frame_read_lock_index.load(Ordering::Acquire),
NO_LOCK_HELD
);
let (shared_max, nbackfills, last_checksum, checkpoint_seq) = {
let (shared_max, nbackfills, last_checksum, checkpoint_seq, transaction_count) = {
let shared = self.get_shared();
let mx = shared.max_frame.load(Ordering::Acquire);
let nb = shared.nbackfills.load(Ordering::Acquire);
let ck = shared.last_checksum;
let checkpoint_seq = shared.wal_header.lock().checkpoint_seq;
(mx, nb, ck, checkpoint_seq)
let transaction_count = shared.transaction_count.load(Ordering::Acquire);
(mx, nb, ck, checkpoint_seq, transaction_count)
};
let db_changed = shared_max != self.max_frame.load(Ordering::Acquire)
|| last_checksum != self.last_checksum
|| checkpoint_seq != self.checkpoint_seq.load(Ordering::Acquire);
let db_changed = self.db_changed(&self.get_shared());
// WAL is already fully backfilled into the main DB image
// (mxFrame == nBackfill). Readers can therefore ignore the
@@ -850,6 +851,8 @@ impl Wal for WalFile {
self.min_frame.store(nbackfills + 1, Ordering::Release);
self.last_checksum = last_checksum;
self.checkpoint_seq.store(checkpoint_seq, Ordering::Release);
self.transaction_count
.store(transaction_count, Ordering::Release);
return Ok(db_changed);
}
@@ -945,6 +948,8 @@ impl Wal for WalFile {
.store(best_idx as usize, Ordering::Release);
self.last_checksum = last_checksum;
self.checkpoint_seq.store(checkpoint_seq, Ordering::Release);
self.transaction_count
.store(transaction_count, Ordering::Release);
tracing::debug!(
"begin_read_tx(min={}, max={}, slot={}, max_frame_in_wal={})",
self.min_frame.load(Ordering::Acquire),
@@ -986,26 +991,14 @@ impl Wal for WalFile {
if !shared.write_lock.write() {
return Err(LimboError::Busy);
}
let (shared_max, nbackfills, last_checksum, checkpoint_seq) = {
let mx = shared.max_frame.load(Ordering::Acquire);
let nb = shared.nbackfills.load(Ordering::Acquire);
let ck = shared.last_checksum;
let checkpoint_seq = shared.wal_header.lock().checkpoint_seq;
(mx, nb, ck, checkpoint_seq)
};
let db_changed = shared_max != self.max_frame.load(Ordering::Acquire)
|| last_checksum != self.last_checksum
|| checkpoint_seq != self.checkpoint_seq.load(Ordering::Acquire);
let db_changed = self.db_changed(&shared);
if !db_changed {
// Snapshot still valid; adopt counters
drop(shared);
self.last_checksum = last_checksum;
self.min_frame.store(nbackfills + 1, Ordering::Release);
return Ok(());
}
// Snapshot is stale, give up and let caller retry from scratch
tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame.load(Ordering::Acquire), shared_max);
tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame.load(Ordering::Acquire), shared.max_frame.load(Ordering::Acquire));
shared.write_lock.unlock();
Err(LimboError::Busy)
}
@@ -1386,6 +1379,11 @@ impl Wal for WalFile {
.store(self.max_frame.load(Ordering::Acquire), Ordering::Release);
tracing::trace!(max_frame = self.max_frame.load(Ordering::Acquire), ?self.last_checksum);
shared.last_checksum = self.last_checksum;
self.transaction_count.fetch_add(1, Ordering::Release);
shared.transaction_count.store(
self.transaction_count.load(Ordering::Acquire),
Ordering::Release,
);
Ok(())
}
@@ -1636,6 +1634,7 @@ impl WalFile {
checkpoint_seq: AtomicU32::new(0),
syncing: Arc::new(AtomicBool::new(false)),
min_frame: AtomicU64::new(0),
transaction_count: AtomicU64::new(0),
max_frame_read_lock_index: AtomicUsize::new(NO_LOCK_HELD),
last_checksum,
prev_checkpoint: CheckpointResult::default(),
@@ -2260,6 +2259,20 @@ impl WalFile {
buf: buf_slot,
})
}
fn db_changed(&self, shared: &WalFileShared) -> bool {
let shared_max = shared.max_frame.load(Ordering::Acquire);
let nbackfills = shared.nbackfills.load(Ordering::Acquire);
let last_checksum = shared.last_checksum;
let checkpoint_seq = shared.wal_header.lock().checkpoint_seq;
let transaction_count = shared.transaction_count.load(Ordering::Acquire);
shared_max != self.max_frame.load(Ordering::Acquire)
|| last_checksum != self.last_checksum
|| checkpoint_seq != self.checkpoint_seq.load(Ordering::Acquire)
|| transaction_count != self.transaction_count.load(Ordering::Acquire)
|| nbackfills + 1 != self.min_frame.load(Ordering::Acquire)
}
}
impl WalFileShared {
@@ -2308,6 +2321,7 @@ impl WalFileShared {
min_frame: AtomicU64::new(0),
max_frame: AtomicU64::new(0),
nbackfills: AtomicU64::new(0),
transaction_count: AtomicU64::new(0),
frame_cache: Arc::new(SpinLock::new(HashMap::new())),
last_checksum: (0, 0),
file: None,
@@ -2352,6 +2366,7 @@ impl WalFileShared {
min_frame: AtomicU64::new(0),
max_frame: AtomicU64::new(0),
nbackfills: AtomicU64::new(0),
transaction_count: AtomicU64::new(0),
frame_cache: Arc::new(SpinLock::new(HashMap::new())),
last_checksum: (0, 0),
file: Some(file),