mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-20 07:25:14 +01:00
Hold and ensure release of proper locks if we trunc the db file post-checkpoint
This commit is contained in:
@@ -351,7 +351,7 @@ pub struct Pager {
|
||||
free_page_state: RefCell<FreePageState>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
#[derive(Debug, Clone)]
|
||||
/// The status of the current cache flush.
|
||||
pub enum PagerCommitResult {
|
||||
/// The WAL was written to disk and fsynced.
|
||||
@@ -1297,7 +1297,7 @@ impl Pager {
|
||||
return Ok(CheckpointResult::default());
|
||||
}
|
||||
|
||||
let checkpoint_result = self.io.block(|| {
|
||||
let mut checkpoint_result = self.io.block(|| {
|
||||
self.wal
|
||||
.borrow_mut()
|
||||
.checkpoint(self, Rc::new(RefCell::new(0)), mode)
|
||||
@@ -1323,6 +1323,7 @@ impl Pager {
|
||||
tracing::trace!("Database file syncd after truncation");
|
||||
}))?;
|
||||
}
|
||||
checkpoint_result.release_guard();
|
||||
}
|
||||
|
||||
// TODO: only clear cache of things that are really invalidated
|
||||
|
||||
@@ -39,12 +39,14 @@ pub const NO_LOCK: u32 = 0;
|
||||
pub const SHARED_LOCK: u32 = 1;
|
||||
pub const WRITE_LOCK: u32 = 2;
|
||||
|
||||
#[derive(Debug, Copy, Clone, Default)]
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct CheckpointResult {
|
||||
/// number of frames in WAL
|
||||
pub num_wal_frames: u64,
|
||||
/// number of frames moved successfully from WAL to db file after checkpoint
|
||||
pub num_checkpointed_frames: u64,
|
||||
/// will drop automatically and free any remaining locks we need to hold
|
||||
maybe_guard: Option<CheckpointGuard>,
|
||||
}
|
||||
|
||||
impl CheckpointResult {
|
||||
@@ -52,11 +54,16 @@ impl CheckpointResult {
|
||||
Self {
|
||||
num_wal_frames: n_frames,
|
||||
num_checkpointed_frames: n_ckpt,
|
||||
maybe_guard: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub const fn everything_backfilled(&self) -> bool {
|
||||
self.num_wal_frames > 0 && self.num_wal_frames == self.num_checkpointed_frames
|
||||
}
|
||||
pub fn release_guard(&mut self) {
|
||||
let _ = self.maybe_guard.take();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, EnumString)]
|
||||
@@ -502,6 +509,7 @@ impl fmt::Debug for WalFileShared {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
enum CheckpointGuard {
|
||||
Writer { ptr: Arc<UnsafeCell<WalFileShared>> },
|
||||
Read0 { ptr: Arc<UnsafeCell<WalFileShared>> },
|
||||
@@ -912,7 +920,8 @@ impl Wal for WalFile {
|
||||
if shared_max <= nbackfills {
|
||||
// if there's nothing to do and we are fully back-filled, to match sqlite
|
||||
// we return the previous number of backfilled pages from last checkpoint.
|
||||
return Ok(IOResult::Done(self.prev_checkpoint));
|
||||
self.prev_checkpoint.release_guard(); // just in case we are still holding
|
||||
return Ok(IOResult::Done(self.prev_checkpoint.clone()));
|
||||
}
|
||||
let shared = self.get_shared();
|
||||
let busy = !shared.checkpoint_lock.write();
|
||||
@@ -1026,9 +1035,10 @@ impl Wal for WalFile {
|
||||
let max_frame = shared.max_frame.load(Ordering::SeqCst);
|
||||
let nbackfills = shared.nbackfills.load(Ordering::SeqCst);
|
||||
|
||||
let (checkpoint_result, everything_backfilled) = {
|
||||
let (mut checkpoint_result, everything_backfilled) = {
|
||||
// Record two num pages fields to return as checkpoint result to caller.
|
||||
// Ref: pnLog, pnCkpt on https://www.sqlite.org/c3ref/wal_checkpoint_v2.html
|
||||
|
||||
let frames_in_wal = max_frame.saturating_sub(nbackfills);
|
||||
let frames_checkpointed = self
|
||||
.ongoing_checkpoint
|
||||
@@ -1036,8 +1046,7 @@ impl Wal for WalFile {
|
||||
.saturating_sub(self.ongoing_checkpoint.min_frame - 1);
|
||||
let checkpoint_result =
|
||||
CheckpointResult::new(frames_in_wal, frames_checkpointed);
|
||||
let everything_backfilled = shared.max_frame.load(Ordering::SeqCst)
|
||||
== self.ongoing_checkpoint.max_frame;
|
||||
let everything_backfilled = max_frame == self.ongoing_checkpoint.max_frame;
|
||||
(checkpoint_result, everything_backfilled)
|
||||
};
|
||||
// we will just overwrite nbackfills with 0 if we are resetting
|
||||
@@ -1056,8 +1065,15 @@ impl Wal for WalFile {
|
||||
// a. the max frame == num wal frames
|
||||
// b. the db file size != num of db pages * page_size
|
||||
// and truncate the db file if necessary.
|
||||
if checkpoint_result.everything_backfilled() {
|
||||
// temporarily hold the locks in the case that the db file must be truncated
|
||||
checkpoint_result.maybe_guard = self.checkpoint_guard.take();
|
||||
} else {
|
||||
// we can drop them now
|
||||
let _ = self.checkpoint_guard.take();
|
||||
}
|
||||
self.prev_checkpoint = checkpoint_result.clone();
|
||||
self.ongoing_checkpoint.state = CheckpointState::Start;
|
||||
let _ = self.checkpoint_guard.take();
|
||||
return Ok(IOResult::Done(checkpoint_result));
|
||||
}
|
||||
}
|
||||
@@ -1233,8 +1249,7 @@ impl WalFile {
|
||||
self.syncing.set(false);
|
||||
}
|
||||
|
||||
// coordinate among many readers what the maximum safe frame is for us
|
||||
// to backfill when checkpointing.
|
||||
// Coordinate what the maximum safe frame is for us to backfill when checkpointing.
|
||||
fn determine_max_safe_checkpoint_frame(&self) -> u64 {
|
||||
let shared = self.get_shared();
|
||||
let mut max_safe_frame = shared.max_frame.load(Ordering::SeqCst);
|
||||
@@ -1307,7 +1322,8 @@ impl WalFile {
|
||||
.inspect_err(|e| {
|
||||
handle_err(e);
|
||||
})?;
|
||||
// For TRUNCATE: physically shrink the WAL to 0 B
|
||||
|
||||
// For TRUNCATE mode: shrink the WAL file to 0 B
|
||||
if matches!(mode, CheckpointMode::Truncate) {
|
||||
let c = Completion::new_trunc(|_| {
|
||||
tracing::trace!("WAL file truncated to 0 B");
|
||||
|
||||
@@ -336,6 +336,7 @@ pub fn op_checkpoint(
|
||||
Ok(CheckpointResult {
|
||||
num_wal_frames: num_wal_pages,
|
||||
num_checkpointed_frames: num_checkpointed_pages,
|
||||
..
|
||||
}) => {
|
||||
// https://sqlite.org/pragma.html#pragma_wal_checkpoint
|
||||
// 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy).
|
||||
|
||||
Reference in New Issue
Block a user