Use a single packed u64 for LimboRwLock and relax memory ordering

This commit is contained in:
PThorpe92
2025-08-09 14:08:25 -04:00
parent 54613cceec
commit f81bc3236a
2 changed files with 177 additions and 191 deletions

View File

@@ -48,7 +48,7 @@ use pack1::{I32BE, U16BE, U32BE};
use tracing::{instrument, Level};
use super::pager::PageRef;
use super::wal::LimboRwLock;
use super::wal::TursoRwLock;
use crate::error::LimboError;
use crate::fast_lock::SpinLock;
use crate::io::{Buffer, Complete, Completion};
@@ -60,7 +60,7 @@ use crate::storage::btree::{payload_overflow_threshold_max, payload_overflow_thr
use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::DatabaseStorage;
use crate::storage::pager::Pager;
use crate::storage::wal::PendingFlush;
use crate::storage::wal::{PendingFlush, READMARK_NOT_USED};
use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype};
use crate::{turso_assert, File, Result, WalFileShared};
use std::cell::{RefCell, UnsafeCell};
@@ -1557,6 +1557,12 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
#[allow(clippy::arc_with_non_send_sync)]
let buf_for_pread = Arc::new(Buffer::new_temporary(size as usize));
let header = Arc::new(SpinLock::new(WalHeader::default()));
let read_locks = std::array::from_fn(|_| TursoRwLock::new());
for (i, l) in read_locks.iter().enumerate() {
l.write();
l.set_value_exclusive(if i < 2 { 0 } else { READMARK_NOT_USED });
l.unlock();
}
#[allow(clippy::arc_with_non_send_sync)]
let wal_file_shared_ret = Arc::new(UnsafeCell::new(WalFileShared {
wal_header: header.clone(),
@@ -1567,16 +1573,10 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
pages_in_frames: Arc::new(SpinLock::new(Vec::new())),
last_checksum: (0, 0),
file: file.clone(),
read_locks: [
LimboRwLock::new(),
LimboRwLock::new(),
LimboRwLock::new(),
LimboRwLock::new(),
LimboRwLock::new(),
],
write_lock: LimboRwLock::new(),
read_locks,
write_lock: TursoRwLock::new(),
loaded: AtomicBool::new(false),
checkpoint_lock: LimboRwLock::new(),
checkpoint_lock: TursoRwLock::new(),
}));
let wal_file_shared_for_completion = wal_file_shared_ret.clone();

View File

@@ -8,7 +8,7 @@ use strum::EnumString;
use tracing::{instrument, Level};
use std::fmt::Formatter;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::{
cell::{Cell, RefCell},
fmt,
@@ -33,14 +33,6 @@ use super::buffer_pool::BufferPool;
use super::pager::{PageRef, Pager};
use super::sqlite3_ondisk::{self, WalHeader};
pub const READMARK_NOT_USED: u32 = 0xffffffff;
pub const NO_LOCK: u32 = 0;
pub const SHARED_LOCK: u32 = 1;
pub const WRITE_LOCK: u32 = 2;
const NO_LOCK_HELD: usize = usize::MAX;
#[derive(Debug, Clone, Default)]
pub struct CheckpointResult {
/// number of frames in WAL
@@ -89,122 +81,129 @@ pub enum CheckpointMode {
Truncate,
}
#[repr(transparent)]
#[derive(Debug, Default)]
pub struct LimboRwLock {
lock: AtomicU32,
nreads: AtomicU32,
value: AtomicU32,
}
/// A 64-bit read-write lock with embedded 32-bit value storage.
/// Using a single Atomic allows the reader count and lock state are updated
/// atomically together while sitting in a single cpu cache line.
///
/// # Memory Layout:
/// ```
/// [63:32] Value bits - 32 bits for stored value
/// [31:1] Reader count - 31 bits for reader count
/// [0] Writer bit - 1 bit indicating exclusive write lock
/// ```
///
/// # Synchronization Guarantees:
/// - Acquire semantics on lock acquisition ensure visibility of all writes
/// made by the previous lock holder
/// - Release semantics on unlock ensure all writes made while holding the
/// lock are visible to the next acquirer
/// - The embedded value can be atomically read without holding any lock
pub struct TursoRwLock(AtomicU64);
impl LimboRwLock {
pub fn new() -> Self {
Self {
lock: AtomicU32::new(NO_LOCK),
nreads: AtomicU32::new(0),
value: AtomicU32::new(READMARK_NOT_USED),
pub const READMARK_NOT_USED: u32 = 0xffffffff;
const NO_LOCK_HELD: usize = usize::MAX;
impl TursoRwLock {
/// Bit 0: Writer flag
const WRITER: u64 = 0b1;
/// Reader increment value (bit 1)
const READER_INC: u64 = 0b10;
/// Reader count starts at bit 1
const READER_SHIFT: u32 = 1;
/// Mask for 31 reader bits [31:1]
const READER_COUNT_MASK: u64 = 0x7fff_ffffu64 << Self::READER_SHIFT;
/// Value starts at bit 32
const VALUE_SHIFT: u32 = 32;
/// Mask for 32 value bits [63:32]
const VALUE_MASK: u64 = 0xffff_ffffu64 << Self::VALUE_SHIFT;
#[inline]
pub const fn new() -> Self {
Self(AtomicU64::new(0))
}
const fn has_writer(val: u64) -> bool {
val & Self::WRITER != 0
}
const fn has_readers(val: u64) -> bool {
val & Self::READER_COUNT_MASK != 0
}
#[inline]
/// Try to acquire a shared read lock.
pub fn read(&self) -> bool {
let cur = self.0.load(Ordering::Acquire);
// If a writer is present we cannot proceed.
if Self::has_writer(cur) {
return false;
}
// 2 billion readers is a high enough number where we will skip the branch
// and assume that we are not overflowing :)
let desired = cur.wrapping_add(Self::READER_INC);
// for success, Acquire establishes happens-before relationship with the previous Release from unlock
// for failure we only care about reading it for the next iteration so we can use Relaxed.
self.0
.compare_exchange_weak(cur, desired, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
}
/// Try to take an exclusive lock. Succeeds if no readers and no writer.
#[inline]
pub fn write(&self) -> bool {
let cur = self.0.load(Ordering::Acquire);
// exclusive lock, so require no readers and no writer
if Self::has_writer(cur) || Self::has_readers(cur) {
return false;
}
let desired = cur | Self::WRITER;
self.0
.compare_exchange(cur, desired, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
}
#[inline]
/// Unlock whatever lock is currently held.
/// For write lock: clear writer bit
/// For read lock: decrement reader count
pub fn unlock(&self) {
let cur = self.0.load(Ordering::Acquire);
if (cur & Self::WRITER) != 0 {
// Clear writer bit, preserve everything else (including value)
// Release ordering ensures all our writes are visible to next acquirer
let cur = self.0.fetch_and(!Self::WRITER, Ordering::Release);
turso_assert!(!Self::has_readers(cur), "write lock was held with readers");
} else {
// drop one reader, last reader leaves value intact
let prev = self.0.fetch_sub(Self::READER_INC, Ordering::Release);
turso_assert!(
(prev & Self::READER_COUNT_MASK) >= Self::READER_INC,
"unlock called with no readers"
);
}
}
/// Shared lock. Returns true if it was successful, false if it couldn't lock it
pub fn read(&mut self) -> bool {
let lock = self.lock.load(Ordering::SeqCst);
let ok = match lock {
NO_LOCK => {
let res = self.lock.compare_exchange(
lock,
SHARED_LOCK,
Ordering::SeqCst,
Ordering::SeqCst,
);
let ok = res.is_ok();
if ok {
self.nreads.fetch_add(1, Ordering::SeqCst);
}
ok
}
SHARED_LOCK => {
// There is this race condition where we could've unlocked after loading lock ==
// SHARED_LOCK.
self.nreads.fetch_add(1, Ordering::SeqCst);
let lock_after_load = self.lock.load(Ordering::SeqCst);
if lock_after_load != lock {
// try to lock it again
let res = self.lock.compare_exchange(
lock_after_load,
SHARED_LOCK,
Ordering::SeqCst,
Ordering::SeqCst,
);
let ok = res.is_ok();
if ok {
// we were able to acquire it back
true
} else {
// we couldn't acquire it back, reduce number again
self.nreads.fetch_sub(1, Ordering::SeqCst);
false
}
} else {
true
}
}
WRITE_LOCK => false,
_ => unreachable!(),
};
tracing::trace!("read_lock({})", ok);
ok
#[inline]
/// Read the embedded 32-bit value atomically regardless of slot occupancy.
pub fn get_value(&self) -> u32 {
(self.0.load(Ordering::Acquire) >> Self::VALUE_SHIFT) as u32
}
/// Locks exclusively. Returns true if it was successful, false if it couldn't lock it
pub fn write(&mut self) -> bool {
let lock = self.lock.load(Ordering::SeqCst);
let ok = match lock {
NO_LOCK => {
let res = self.lock.compare_exchange(
lock,
WRITE_LOCK,
Ordering::SeqCst,
Ordering::SeqCst,
);
res.is_ok()
}
SHARED_LOCK => {
// no op
false
}
WRITE_LOCK => false,
_ => unreachable!(),
};
tracing::trace!("write_lock({})", ok);
ok
}
/// Unlock the current held lock.
pub fn unlock(&mut self) {
let lock = self.lock.load(Ordering::SeqCst);
tracing::trace!("unlock(value={})", lock);
match lock {
NO_LOCK => {}
SHARED_LOCK => {
let prev = self.nreads.fetch_sub(1, Ordering::SeqCst);
if prev == 1 {
let res = self.lock.compare_exchange(
lock,
NO_LOCK,
Ordering::SeqCst,
Ordering::SeqCst,
);
assert!(res.is_ok());
}
}
WRITE_LOCK => {
let res =
self.lock
.compare_exchange(lock, NO_LOCK, Ordering::SeqCst, Ordering::SeqCst);
assert!(res.is_ok());
}
_ => unreachable!(),
}
#[inline]
/// Set the embedded value while holding the write lock.
pub fn set_value_exclusive(&self, v: u32) {
// Must be called only while WRITER bit is set
let cur = self.0.load(Ordering::Relaxed);
turso_assert!(Self::has_writer(cur), "must hold exclusive lock");
let desired = (cur & !Self::VALUE_MASK) | ((v as u64) << Self::VALUE_SHIFT);
self.0.store(desired, Ordering::Relaxed);
}
}
@@ -343,7 +342,7 @@ impl Batch {
let inner = &mut *scratch.inner.get();
inner.contents = Some(PageContent::new(0, new_buf));
// reset flags on scratch so it won't be cleared later with the real page
inner.flags.store(0, Ordering::SeqCst);
inner.flags.store(0, Ordering::Release);
}
}
}
@@ -550,13 +549,13 @@ pub struct WalFileShared {
/// from a partially checkpointed db file.
/// Slots14 carry a framenumber in value and may be shared by many readers. Slot 1 is the
/// default read lock and is to contain the max_frame in WAL.
pub read_locks: [LimboRwLock; 5],
pub read_locks: [TursoRwLock; 5],
/// There is only one write allowed in WAL mode. This lock takes care of ensuring there is only
/// one used.
pub write_lock: LimboRwLock,
pub write_lock: TursoRwLock,
/// Serialises checkpointer threads, only one checkpoint can be in flight at any time. Blocking and exclusive only
pub checkpoint_lock: LimboRwLock,
pub checkpoint_lock: TursoRwLock,
pub loaded: AtomicBool,
}
@@ -671,8 +670,8 @@ impl Wal for WalFile {
);
let (shared_max, nbackfills, last_checksum, checkpoint_seq) = {
let shared = self.get_shared();
let mx = shared.max_frame.load(Ordering::SeqCst);
let nb = shared.nbackfills.load(Ordering::SeqCst);
let mx = shared.max_frame.load(Ordering::Acquire);
let nb = shared.nbackfills.load(Ordering::Acquire);
let ck = shared.last_checksum;
let checkpoint_seq = shared.wal_header.lock().checkpoint_seq;
(mx, nb, ck, checkpoint_seq)
@@ -707,7 +706,7 @@ impl Wal for WalFile {
let mut best_idx: i64 = -1;
let mut best_mark: u32 = 0;
for (idx, lock) in self.get_shared().read_locks.iter().enumerate().skip(1) {
let m = lock.value.load(Ordering::SeqCst);
let m = lock.get_value();
if m != READMARK_NOT_USED && m <= shared_max as u32 && m > best_mark {
best_mark = m;
best_idx = idx as i64;
@@ -721,7 +720,7 @@ impl Wal for WalFile {
continue; // busy slot
}
// claim or bump this slot
lock.value.store(shared_max as u32, Ordering::SeqCst);
lock.set_value_exclusive(shared_max as u32);
best_idx = idx as i64;
best_mark = shared_max as u32;
lock.unlock();
@@ -742,8 +741,8 @@ impl Wal for WalFile {
return Ok((LimboResult::Busy, db_changed));
}
(
shared.max_frame.load(Ordering::SeqCst),
shared.nbackfills.load(Ordering::SeqCst),
shared.max_frame.load(Ordering::Acquire),
shared.nbackfills.load(Ordering::Acquire),
shared.last_checksum,
shared.wal_header.lock().checkpoint_seq,
)
@@ -823,8 +822,8 @@ impl Wal for WalFile {
let (shared_max, nbackfills, last_checksum) = {
let shared = self.get_shared();
(
shared.max_frame.load(Ordering::SeqCst),
shared.nbackfills.load(Ordering::SeqCst),
shared.max_frame.load(Ordering::Acquire),
shared.nbackfills.load(Ordering::Acquire),
shared.last_checksum,
)
};
@@ -866,8 +865,8 @@ impl Wal for WalFile {
// if it's not, than pages from WAL range [frame_watermark..nBackfill] are already in the DB file,
// and in case if page first occurrence in WAL was after frame_watermark - we will be unable to read proper previous version of the page
turso_assert!(
frame_watermark.is_none() || frame_watermark.unwrap() >= self.get_shared().nbackfills.load(Ordering::SeqCst),
"frame_watermark must be >= than current WAL backfill amount: frame_watermark={:?}, nBackfill={}", frame_watermark, self.get_shared().nbackfills.load(Ordering::SeqCst)
frame_watermark.is_none() || frame_watermark.unwrap() >= self.get_shared().nbackfills.load(Ordering::Acquire),
"frame_watermark must be >= than current WAL backfill amount: frame_watermark={:?}, nBackfill={}", frame_watermark, self.get_shared().nbackfills.load(Ordering::Acquire)
);
// if we are holding read_lock 0, skip and read right from db file.
@@ -1030,7 +1029,7 @@ impl Wal for WalFile {
write_counter: Rc<RefCell<usize>>,
) -> Result<Completion> {
let shared = self.get_shared();
if shared.max_frame.load(Ordering::SeqCst).eq(&0) {
if shared.max_frame.load(Ordering::Acquire).eq(&0) {
self.ensure_header_if_needed()?;
}
let page_id = page.get().id;
@@ -1083,8 +1082,8 @@ impl Wal for WalFile {
#[instrument(skip_all, level = Level::DEBUG)]
fn should_checkpoint(&self) -> bool {
let shared = self.get_shared();
let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize;
let nbackfills = shared.nbackfills.load(Ordering::SeqCst) as usize;
let frame_id = shared.max_frame.load(Ordering::Acquire) as usize;
let nbackfills = shared.nbackfills.load(Ordering::Acquire) as usize;
frame_id > self.checkpoint_threshold + nbackfills
}
@@ -1136,7 +1135,7 @@ impl Wal for WalFile {
}
fn get_max_frame_in_wal(&self) -> u64 {
self.get_shared().max_frame.load(Ordering::SeqCst)
self.get_shared().max_frame.load(Ordering::Acquire)
}
fn get_max_frame(&self) -> u64 {
@@ -1154,7 +1153,7 @@ impl Wal for WalFile {
{
// TODO(pere): implement proper hashmap, this sucks :).
let shared = self.get_shared();
let max_frame = shared.max_frame.load(Ordering::SeqCst);
let max_frame = shared.max_frame.load(Ordering::Acquire);
tracing::debug!(to_max_frame = max_frame);
{
let mut frame_cache = shared.frame_cache.lock();
@@ -1182,7 +1181,7 @@ impl Wal for WalFile {
#[instrument(skip_all, level = Level::DEBUG)]
fn finish_append_frames_commit(&mut self) -> Result<()> {
let shared = self.get_shared();
shared.max_frame.store(self.max_frame, Ordering::SeqCst);
shared.max_frame.store(self.max_frame, Ordering::Release);
tracing::trace!(self.max_frame, ?self.last_checksum);
shared.last_checksum = self.last_checksum;
Ok(())
@@ -1228,7 +1227,7 @@ impl WalFile {
Self {
io,
// default to max frame in WAL, so that when we read schema we can read from WAL too if it's there.
max_frame: unsafe { (*shared.get()).max_frame.load(Ordering::SeqCst) },
max_frame: unsafe { (*shared.get()).max_frame.load(Ordering::Acquire) },
shared,
ongoing_checkpoint: OngoingCheckpoint {
scratch_page: checkpoint_page,
@@ -1308,7 +1307,7 @@ impl WalFile {
tracing::debug!("ensure_header_if_needed");
self.last_checksum = {
let shared = self.get_shared();
if shared.max_frame.load(Ordering::SeqCst) != 0 {
if shared.max_frame.load(Ordering::Acquire) != 0 {
return Ok(());
}
if shared.file.size()? >= WAL_HEADER_SIZE as u64 {
@@ -1359,8 +1358,8 @@ impl WalFile {
CheckpointState::Start => {
let (max_frame, nbackfills) = {
let shared = self.get_shared();
let max_frame = shared.max_frame.load(Ordering::SeqCst);
let n_backfills = shared.nbackfills.load(Ordering::SeqCst);
let max_frame = shared.max_frame.load(Ordering::Acquire);
let n_backfills = shared.nbackfills.load(Ordering::Acquire);
(max_frame, n_backfills)
};
let needs_backfill = max_frame > nbackfills;
@@ -1473,7 +1472,7 @@ impl WalFile {
}
CheckpointState::WaitFlush => {
match self.ongoing_checkpoint.pending_flush.as_ref() {
Some(pf) if pf.done.load(Ordering::SeqCst) => {
Some(pf) if pf.done.load(Ordering::Acquire) => {
// flush is done, we can continue
tracing::trace!("checkpoint backfilling batch done");
}
@@ -1512,8 +1511,8 @@ impl WalFile {
}
let mut checkpoint_result = {
let shared = self.get_shared();
let current_mx = shared.max_frame.load(Ordering::SeqCst);
let nbackfills = shared.nbackfills.load(Ordering::SeqCst);
let current_mx = shared.max_frame.load(Ordering::Acquire);
let nbackfills = shared.nbackfills.load(Ordering::Acquire);
// Record two num pages fields to return as checkpoint result to caller.
// Ref: pnLog, pnCkpt on https://www.sqlite.org/c3ref/wal_checkpoint_v2.html
@@ -1545,7 +1544,7 @@ impl WalFile {
// store the max frame we were able to successfully checkpoint.
self.get_shared()
.nbackfills
.store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst);
.store(self.ongoing_checkpoint.max_frame, Ordering::Release);
if matches!(mode, CheckpointMode::Restart | CheckpointMode::Truncate) {
if checkpoint_result.everything_backfilled() {
@@ -1618,10 +1617,10 @@ impl WalFile {
/// We never modify slot values while a reader holds that slot.
fn determine_max_safe_checkpoint_frame(&self) -> u64 {
let shared = self.get_shared();
let mut max_safe_frame = shared.max_frame.load(Ordering::SeqCst);
let mut max_safe_frame = shared.max_frame.load(Ordering::Acquire);
for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate().skip(1) {
let this_mark = read_lock.value.load(Ordering::SeqCst);
let this_mark = read_lock.get_value();
if this_mark < max_safe_frame as u32 {
let busy = !read_lock.write();
if !busy {
@@ -1631,7 +1630,7 @@ impl WalFile {
} else {
READMARK_NOT_USED
};
read_lock.value.store(val, Ordering::SeqCst);
read_lock.set_value_exclusive(val);
read_lock.unlock();
} else {
max_safe_frame = this_mark as u64;
@@ -1668,7 +1667,7 @@ impl WalFile {
return Err(LimboError::Busy);
}
// after the log is reset, we must set all secondary marks to READMARK_NOT_USED so the next reader selects a fresh slot
lock.value.store(READMARK_NOT_USED, Ordering::SeqCst);
lock.set_value_exclusive(READMARK_NOT_USED);
}
}
@@ -1759,7 +1758,7 @@ impl WalFileShared {
let mut max_loops = 100_000;
while !unsafe { &*wal_file_shared.get() }
.loaded
.load(Ordering::SeqCst)
.load(Ordering::Acquire)
{
io.run_once()?;
max_loops -= 1;
@@ -1814,18 +1813,15 @@ impl WalFileShared {
};
io.wait_for_completion(c)?;
tracing::debug!("new_shared(header={:?})", header);
let read_locks = array::from_fn(|_| LimboRwLock {
lock: AtomicU32::new(NO_LOCK),
nreads: AtomicU32::new(0),
value: AtomicU32::new(READMARK_NOT_USED),
});
let read_locks = array::from_fn(|_| TursoRwLock::new());
// slot zero is always zero as it signifies that reads can be done from the db file
// directly, and slot 1 is the default read mark containing the max frame. in this case
// our max frame is zero so both slots 0 and 1 begin at 0
read_locks[0].value.store(0, Ordering::SeqCst);
read_locks[1].value.store(0, Ordering::SeqCst);
for (i, lock) in read_locks.iter().enumerate() {
lock.write();
lock.set_value_exclusive(if i < 2 { 0 } else { READMARK_NOT_USED });
lock.unlock();
}
let shared = WalFileShared {
wal_header: Arc::new(SpinLock::new(wal_header)),
min_frame: AtomicU64::new(0),
@@ -1836,12 +1832,8 @@ impl WalFileShared {
file,
pages_in_frames: Arc::new(SpinLock::new(Vec::new())),
read_locks,
write_lock: LimboRwLock {
lock: AtomicU32::new(NO_LOCK),
nreads: AtomicU32::new(0),
value: AtomicU32::new(READMARK_NOT_USED),
},
checkpoint_lock: LimboRwLock::new(),
write_lock: TursoRwLock::new(),
checkpoint_lock: TursoRwLock::new(),
loaded: AtomicBool::new(true),
};
Ok(Arc::new(UnsafeCell::new(shared)))
@@ -1878,8 +1870,8 @@ impl WalFileShared {
hdr.salt_1 = hdr.salt_1.wrapping_add(1);
hdr.salt_2 = io.generate_random_number() as u32;
self.max_frame.store(0, Ordering::SeqCst);
self.nbackfills.store(0, Ordering::SeqCst);
self.max_frame.store(0, Ordering::Release);
self.nbackfills.store(0, Ordering::Release);
self.last_checksum = (hdr.checksum_1, hdr.checksum_2);
}
@@ -1887,10 +1879,10 @@ impl WalFileShared {
self.pages_in_frames.lock().clear();
// read-marks
self.read_locks[0].value.store(0, Ordering::SeqCst);
self.read_locks[1].value.store(0, Ordering::SeqCst);
for l in &self.read_locks[2..] {
l.value.store(READMARK_NOT_USED, Ordering::SeqCst);
self.read_locks[0].set_value_exclusive(0);
self.read_locks[1].set_value_exclusive(0);
for lock in &self.read_locks[2..] {
lock.set_value_exclusive(READMARK_NOT_USED);
}
Ok(())
}
@@ -2302,9 +2294,7 @@ pub mod test {
// Verify read marks after restart
let read_marks_after: Vec<_> = unsafe {
let s = &*wal_shared.get();
(0..5)
.map(|i| s.read_locks[i].value.load(Ordering::SeqCst))
.collect()
(0..5).map(|i| s.read_locks[i].get_value()).collect()
};
assert_eq!(read_marks_after[0], 0, "Slot 0 should remain 0");
@@ -2402,11 +2392,7 @@ pub mod test {
}
// check that read mark 1 (default reader) was updated to max_frame
let read_mark_1 = unsafe {
(*wal_shared.get()).read_locks[1]
.value
.load(Ordering::SeqCst)
};
let read_mark_1 = unsafe { (*wal_shared.get()).read_locks[1].get_value() };
assert_eq!(
read_mark_1 as u64, max_frame_before,