mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-09 10:14:21 +01:00
Add Full checkpoint mode in WAL
This commit is contained in:
@@ -375,6 +375,11 @@ struct OngoingCheckpoint {
|
||||
max_frame: u64,
|
||||
current_page: u64,
|
||||
}
|
||||
impl OngoingCheckpoint {
|
||||
fn has_work(&self) -> bool {
|
||||
self.min_frame < self.max_frame
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) struct PendingFlush {
|
||||
// page ids to clear
|
||||
@@ -598,9 +603,6 @@ impl CheckpointLocks {
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
match mode {
|
||||
CheckpointMode::Full => Err(LimboError::InternalError(
|
||||
"Full checkpoint mode is not yet supported".into(),
|
||||
)),
|
||||
// Passive mode is the only mode not requiring a write lock, as it doesn't block
|
||||
// readers or writers. It acquires the checkpoint lock to ensure that no other
|
||||
// concurrent checkpoint happens, and acquires the exclusive read lock 0
|
||||
@@ -615,6 +617,22 @@ impl CheckpointLocks {
|
||||
}
|
||||
Ok(Self::Read0 { ptr })
|
||||
}
|
||||
CheckpointMode::Full => {
|
||||
// Full blocks writers and holds read0 exclusively (readers may still use >0 slots)
|
||||
let read0 = &mut shared.read_locks[0];
|
||||
if !read0.write() {
|
||||
shared.checkpoint_lock.unlock();
|
||||
tracing::trace!("CheckpointGuard: read0 lock failed (Full), Busy");
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
if !shared.write_lock.write() {
|
||||
read0.unlock();
|
||||
shared.checkpoint_lock.unlock();
|
||||
tracing::trace!("CheckpointGuard: write lock failed (Full), Busy");
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
Ok(Self::Writer { ptr })
|
||||
}
|
||||
CheckpointMode::Restart | CheckpointMode::Truncate => {
|
||||
// like all modes, we must acquire an exclusive checkpoint lock and lock on read 0
|
||||
// to prevent a reader from reading a partially checkpointed db file.
|
||||
@@ -1093,11 +1111,6 @@ impl Wal for WalFile {
|
||||
_write_counter: Rc<RefCell<usize>>,
|
||||
mode: CheckpointMode,
|
||||
) -> Result<IOResult<CheckpointResult>> {
|
||||
if matches!(mode, CheckpointMode::Full) {
|
||||
return Err(LimboError::InternalError(
|
||||
"Full checkpoint mode is not implemented yet".into(),
|
||||
));
|
||||
}
|
||||
self.checkpoint_inner(pager, _write_counter, mode)
|
||||
.inspect_err(|_| {
|
||||
let _ = self.checkpoint_guard.take();
|
||||
@@ -1369,10 +1382,20 @@ impl WalFile {
|
||||
}
|
||||
// acquire the appropriate exclusive locks depending on the checkpoint mode
|
||||
self.acquire_proper_checkpoint_guard(mode)?;
|
||||
self.ongoing_checkpoint.max_frame = self.determine_max_safe_checkpoint_frame();
|
||||
self.ongoing_checkpoint.max_frame =
|
||||
match self.determine_max_safe_checkpoint_frame(mode) {
|
||||
Err(_) => return Err(LimboError::Busy),
|
||||
Ok(res) => res,
|
||||
};
|
||||
self.ongoing_checkpoint.state = if matches!(mode, CheckpointMode::Full)
|
||||
&& !self.ongoing_checkpoint.has_work()
|
||||
{
|
||||
CheckpointState::Done
|
||||
} else {
|
||||
CheckpointState::ReadFrame
|
||||
};
|
||||
self.ongoing_checkpoint.min_frame = nbackfills + 1;
|
||||
self.ongoing_checkpoint.current_page = 0;
|
||||
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
|
||||
tracing::trace!(
|
||||
"checkpoint_start(min_frame={}, max_frame={})",
|
||||
self.ongoing_checkpoint.min_frame,
|
||||
@@ -1614,29 +1637,36 @@ impl WalFile {
|
||||
/// Others: lower `mxSafeFrame` and continue scanning.
|
||||
///
|
||||
/// We never modify slot values while a reader holds that slot.
|
||||
fn determine_max_safe_checkpoint_frame(&self) -> u64 {
|
||||
fn determine_max_safe_checkpoint_frame(&self, mode: CheckpointMode) -> Result<u64> {
|
||||
let shared = self.get_shared();
|
||||
let mut max_safe_frame = shared.max_frame.load(Ordering::Acquire);
|
||||
let shared_max = shared.max_frame.load(Ordering::Acquire);
|
||||
|
||||
for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate().skip(1) {
|
||||
let this_mark = read_lock.get_value();
|
||||
if this_mark < max_safe_frame as u32 {
|
||||
let busy = !read_lock.write();
|
||||
if !busy {
|
||||
let val = if read_lock_idx == 1 {
|
||||
// store the shared max_frame for the default read slot 1
|
||||
max_safe_frame as u32
|
||||
} else {
|
||||
READMARK_NOT_USED
|
||||
};
|
||||
read_lock.set_value_exclusive(val);
|
||||
read_lock.unlock();
|
||||
// Start optimistic: we want to advance everyone to shared_max
|
||||
for (idx, lock) in shared.read_locks.iter_mut().enumerate().skip(1) {
|
||||
let mark = lock.get_value();
|
||||
if mark == READMARK_NOT_USED || mark >= shared_max as u32 {
|
||||
continue;
|
||||
}
|
||||
// Try to bump this slot to shared_max (requires exclusive on the slot)
|
||||
if lock.write() {
|
||||
// Slot is free to edit, bump to shared_max (slot 1 keeps a real mark, others can be cleaned)
|
||||
let val = if idx == 1 {
|
||||
shared_max as u32
|
||||
} else {
|
||||
max_safe_frame = this_mark as u64;
|
||||
}
|
||||
READMARK_NOT_USED
|
||||
};
|
||||
lock.set_value_exclusive(val);
|
||||
lock.unlock();
|
||||
} else {
|
||||
// Reader is using this slot.
|
||||
return match mode {
|
||||
// clamp down for passive
|
||||
CheckpointMode::Passive => Ok(mark as u64),
|
||||
_ => Err(LimboError::Busy), // all others must wait
|
||||
};
|
||||
}
|
||||
}
|
||||
max_safe_frame
|
||||
Ok(shared_max)
|
||||
}
|
||||
|
||||
/// Called once the entire WAL has been back‑filled in RESTART or TRUNCATE mode.
|
||||
|
||||
Reference in New Issue
Block a user