diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 4307345a2..899da558a 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -571,9 +571,9 @@ pub struct WalFile { /// the max frame for this connection. max_frame_read_lock_index: AtomicUsize, /// Max frame allowed to lookup range=(minframe..max_frame) - max_frame: u64, + max_frame: AtomicU64, /// Start of range to look for frames range=(minframe..max_frame) - min_frame: u64, + min_frame: AtomicU64, /// Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL last_checksum: (u32, u32), checkpoint_seq: AtomicU32, @@ -827,7 +827,7 @@ impl Wal for WalFile { let checkpoint_seq = shared.wal_header.lock().checkpoint_seq; (mx, nb, ck, checkpoint_seq) }; - let db_changed = shared_max != self.max_frame + let db_changed = shared_max != self.max_frame.load(Ordering::Acquire) || last_checksum != self.last_checksum || checkpoint_seq != self.checkpoint_seq.load(Ordering::Acquire); @@ -844,10 +844,10 @@ impl Wal for WalFile { } // we need to keep self.max_frame set to the appropriate // max frame in the wal at the time this transaction starts. - self.max_frame = shared_max; + self.max_frame.store(shared_max, Ordering::Release); self.max_frame_read_lock_index .store(lock_0_idx, Ordering::Release); - self.min_frame = nbackfills + 1; + self.min_frame.store(nbackfills + 1, Ordering::Release); self.last_checksum = last_checksum; return Ok(db_changed); } @@ -938,14 +938,14 @@ impl Wal for WalFile { { return Err(LimboError::Busy); } - self.min_frame = nb2 + 1; - self.max_frame = best_mark as u64; + self.min_frame.store(nb2 + 1, Ordering::Release); + self.max_frame.store(best_mark as u64, Ordering::Release); self.max_frame_read_lock_index .store(best_idx as usize, Ordering::Release); tracing::debug!( "begin_read_tx(min={}, max={}, slot={}, max_frame_in_wal={})", - self.min_frame, - self.max_frame, + self.min_frame.load(Ordering::Acquire), + self.max_frame.load(Ordering::Acquire), best_idx, shared_max ); @@ -988,16 +988,16 @@ impl Wal for WalFile { shared.nbackfills.load(Ordering::Acquire), shared.last_checksum, ); - if self.max_frame == shared_max { + if self.max_frame.load(Ordering::Acquire) == shared_max { // Snapshot still valid; adopt counters drop(shared); self.last_checksum = last_checksum; - self.min_frame = nbackfills + 1; + self.min_frame.store(nbackfills + 1, Ordering::Release); return Ok(()); } // Snapshot is stale, give up and let caller retry from scratch - tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame, shared_max); + tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame.load(Ordering::Acquire), shared_max); shared.write_lock.unlock(); Err(LimboError::Busy) } @@ -1019,7 +1019,7 @@ impl Wal for WalFile { ); turso_assert!( - frame_watermark.unwrap_or(0) <= self.max_frame, + frame_watermark.unwrap_or(0) <= self.max_frame.load(Ordering::Acquire), "frame_watermark must be <= than current WAL max_frame value" ); @@ -1039,7 +1039,7 @@ impl Wal for WalFile { // // by default, SQLite tries to restart log file in this case - but for now let's keep it simple in the turso-db if self.max_frame_read_lock_index.load(Ordering::Acquire) == 0 - && self.max_frame < self.min_frame + && self.max_frame.load(Ordering::Acquire) < self.min_frame.load(Ordering::Acquire) { tracing::debug!( "find_frame(page_id={}, frame_watermark={:?}): max_frame is 0 - read from DB file", @@ -1050,15 +1050,15 @@ impl Wal for WalFile { } let shared = self.get_shared(); let frames = shared.frame_cache.lock(); - let range = frame_watermark - .map(|x| 0..=x) - .unwrap_or(self.min_frame..=self.max_frame); + let range = frame_watermark.map(|x| 0..=x).unwrap_or( + self.min_frame.load(Ordering::Acquire)..=self.max_frame.load(Ordering::Acquire), + ); tracing::debug!( "find_frame(page_id={}, frame_watermark={:?}): min_frame={}, max_frame={}", page_id, frame_watermark, - self.min_frame, - self.max_frame + self.min_frame.load(Ordering::Acquire), + self.max_frame.load(Ordering::Acquire) ); if let Some(list) = frames.get(&page_id) { if let Some(f) = list.iter().rfind(|&&f| range.contains(&f)) { @@ -1211,14 +1211,15 @@ impl Wal for WalFile { self.page_size(), ))); } - if frame_id > self.max_frame + 1 { + if frame_id > self.max_frame.load(Ordering::Acquire) + 1 { // attempt to write frame out of sequential order - error out return Err(LimboError::InvalidArgument(format!( "frame_id is beyond next frame in the WAL: frame_id={}, max_frame={}", - frame_id, self.max_frame + frame_id, + self.max_frame.load(Ordering::Acquire) ))); } - if frame_id <= self.max_frame { + if frame_id <= self.max_frame.load(Ordering::Acquire) { // just validate if page content from the frame matches frame in the WAL let offset = self.frame_offset(frame_id); let conflict = Arc::new(Cell::new(false)); @@ -1341,11 +1342,11 @@ impl Wal for WalFile { } fn get_max_frame(&self) -> u64 { - self.max_frame + self.max_frame.load(Ordering::Acquire) } fn get_min_frame(&self) -> u64 { - self.min_frame + self.min_frame.load(Ordering::Acquire) } #[instrument(err, skip_all, level = Level::DEBUG)] @@ -1364,7 +1365,7 @@ impl Wal for WalFile { (max_frame, shared.last_checksum) }; self.last_checksum = last_checksum; - self.max_frame = max_frame; + self.max_frame.store(max_frame, Ordering::Release); self.reset_internal_states(); Ok(()) } @@ -1372,8 +1373,10 @@ impl Wal for WalFile { #[instrument(skip_all, level = Level::DEBUG)] fn finish_append_frames_commit(&mut self) -> Result<()> { let mut shared = self.get_shared_mut(); - shared.max_frame.store(self.max_frame, Ordering::Release); - tracing::trace!(self.max_frame, ?self.last_checksum); + shared + .max_frame + .store(self.max_frame.load(Ordering::Acquire), Ordering::Release); + tracing::trace!(max_frame = self.max_frame.load(Ordering::Acquire), ?self.last_checksum); shared.last_checksum = self.last_checksum; Ok(()) } @@ -1435,7 +1438,7 @@ impl Wal for WalFile { checksum }; - self.max_frame = 0; + self.max_frame.store(0, Ordering::Release); let shared = self.get_shared(); assert!(shared.enabled.load(Ordering::SeqCst), "WAL must be enabled"); let file = shared.file.as_ref().unwrap(); @@ -1492,7 +1495,7 @@ impl Wal for WalFile { // Rolling checksum input to each frame build let mut rolling_checksum: (u32, u32) = self.last_checksum; - let mut next_frame_id = self.max_frame + 1; + let mut next_frame_id = self.max_frame.load(Ordering::Acquire) + 1; // Build every frame in order, updating the rolling checksum for (idx, page) in pages.iter().enumerate() { let page_id = page.get().id; @@ -1538,7 +1541,7 @@ impl Wal for WalFile { next_frame_id += 1; } - let first_frame_id = self.max_frame + 1; + let first_frame_id = self.max_frame.load(Ordering::Acquire) + 1; let start_off = self.frame_offset(first_frame_id); // pre-advance in-memory WAL state @@ -1584,7 +1587,7 @@ impl Wal for WalFile { fn update_max_frame(&mut self) { let new_max_frame = self.get_shared().max_frame.load(Ordering::Acquire); - self.max_frame = new_max_frame; + self.max_frame.store(new_max_frame, Ordering::Release); } } @@ -1607,7 +1610,7 @@ impl WalFile { Self { io, // default to max frame in WAL, so that when we read schema we can read from WAL too if it's there. - max_frame, + max_frame: AtomicU64::new(max_frame), shared, ongoing_checkpoint: OngoingCheckpoint { time: now, @@ -1624,7 +1627,7 @@ impl WalFile { buffer_pool, checkpoint_seq: AtomicU32::new(0), syncing: Arc::new(AtomicBool::new(false)), - min_frame: 0, + min_frame: AtomicU64::new(0), max_frame_read_lock_index: AtomicUsize::new(NO_LOCK_HELD), last_checksum, prev_checkpoint: CheckpointResult::default(), @@ -1684,7 +1687,7 @@ impl WalFile { fn complete_append_frame(&mut self, page_id: u64, frame_id: u64, checksums: (u32, u32)) { self.last_checksum = checksums; - self.max_frame = frame_id; + self.max_frame.store(frame_id, Ordering::Release); let shared = self.get_shared(); { let mut frame_cache = shared.frame_cache.lock(); @@ -2112,8 +2115,8 @@ impl WalFile { self.get_shared_mut().restart_wal_header(&self.io, mode); let cksm = self.get_shared().last_checksum; self.last_checksum = cksm; - self.max_frame = 0; - self.min_frame = 0; + self.max_frame.store(0, Ordering::Release); + self.min_frame.store(0, Ordering::Release); self.checkpoint_seq.fetch_add(1, Ordering::Release); Ok(()) }