mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-15 13:04:20 +01:00
update max frame in case we got a read lock with outdated max frame
This commit is contained in:
@@ -176,6 +176,7 @@ pub trait Wal {
|
||||
mode: CheckpointMode,
|
||||
) -> Result<CheckpointStatus>;
|
||||
fn sync(&mut self) -> Result<CheckpointStatus>;
|
||||
fn get_max_frame_in_wal(&self) -> u64;
|
||||
fn get_max_frame(&self) -> u64;
|
||||
fn get_min_frame(&self) -> u64;
|
||||
}
|
||||
@@ -333,8 +334,8 @@ impl Wal for WalFile {
|
||||
}
|
||||
}
|
||||
|
||||
// If we didn't find any mark, then let's add a new one
|
||||
if max_read_mark_index == -1 {
|
||||
// If we didn't find any mark or we can update, let's update them
|
||||
if (max_read_mark as u64) < max_frame_in_wal || max_read_mark_index == -1 {
|
||||
for (index, lock) in shared.read_locks.iter_mut().enumerate() {
|
||||
let busy = !lock.write();
|
||||
if !busy {
|
||||
@@ -361,10 +362,11 @@ impl Wal for WalFile {
|
||||
self.max_frame = max_read_mark as u64;
|
||||
self.min_frame = shared.nbackfills + 1;
|
||||
tracing::debug!(
|
||||
"begin_read_tx(min_frame={}, max_frame={}, lock={})",
|
||||
"begin_read_tx(min_frame={}, max_frame={}, lock={}, max_frame_in_wal={})",
|
||||
self.min_frame,
|
||||
self.max_frame,
|
||||
self.max_frame_read_lock_index
|
||||
self.max_frame_read_lock_index,
|
||||
max_frame_in_wal
|
||||
);
|
||||
Ok(LimboResult::Ok)
|
||||
}
|
||||
@@ -500,14 +502,18 @@ impl Wal for WalFile {
|
||||
// TODO(pere): check what frames are safe to checkpoint between many readers!
|
||||
self.ongoing_checkpoint.min_frame = self.min_frame;
|
||||
let mut shared = self.shared.write();
|
||||
let max_frame_in_wal = shared.max_frame as u32;
|
||||
let mut max_safe_frame = shared.max_frame;
|
||||
for read_lock in shared.read_locks.iter_mut() {
|
||||
for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate() {
|
||||
let this_mark = read_lock.value.load(Ordering::SeqCst);
|
||||
if this_mark < max_safe_frame as u32 {
|
||||
let busy = !read_lock.write();
|
||||
if !busy {
|
||||
read_lock.value.store(max_frame_in_wal, Ordering::SeqCst);
|
||||
let new_mark = if read_lock_idx == 0 {
|
||||
max_safe_frame as u32
|
||||
} else {
|
||||
READMARK_NOT_USED
|
||||
};
|
||||
read_lock.value.store(new_mark, Ordering::SeqCst);
|
||||
read_lock.unlock();
|
||||
} else {
|
||||
max_safe_frame = this_mark as u64;
|
||||
@@ -613,6 +619,7 @@ impl Wal for WalFile {
|
||||
shared.pages_in_frames.clear();
|
||||
shared.max_frame = 0;
|
||||
shared.nbackfills = 0;
|
||||
// TODO(pere): truncate wal file here.
|
||||
} else {
|
||||
shared.nbackfills = self.ongoing_checkpoint.max_frame;
|
||||
}
|
||||
@@ -658,6 +665,10 @@ impl Wal for WalFile {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_max_frame_in_wal(&self) -> u64 {
|
||||
self.shared.read().max_frame
|
||||
}
|
||||
|
||||
fn get_max_frame(&self) -> u64 {
|
||||
self.max_frame
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user