make checkpoint async

This commit is contained in:
Nikita Sivukhin
2025-09-17 16:36:08 +04:00
parent c4c9d409c9
commit 2c09d17dfe
2 changed files with 151 additions and 81 deletions

View File

@@ -1675,15 +1675,20 @@ impl Pager {
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn wal_checkpoint(&self, mode: CheckpointMode) -> Result<CheckpointResult> {
pub fn wal_checkpoint_start(&self, mode: CheckpointMode) -> Result<IOResult<CheckpointResult>> {
let Some(wal) = self.wal.as_ref() else {
return Err(LimboError::InternalError(
"wal_checkpoint() called on database without WAL".to_string(),
));
};
let mut checkpoint_result = self.io.block(|| wal.borrow_mut().checkpoint(self, mode))?;
wal.borrow_mut().checkpoint(self, mode)
}
pub fn wal_checkpoint_finish(
&self,
checkpoint_result: &mut CheckpointResult,
) -> Result<IOResult<()>> {
'ensure_sync: {
if checkpoint_result.num_backfilled != 0 {
if checkpoint_result.everything_backfilled() {
@@ -1694,27 +1699,35 @@ impl Pager {
let page_size = self.page_size.get().unwrap_or_default();
let expected = (db_size * page_size.get()) as u64;
if expected < self.db_file.size()? {
self.io.wait_for_completion(self.db_file.truncate(
expected as usize,
Completion::new_trunc(move |_| {
tracing::trace!(
"Database file truncated to expected size: {} bytes",
expected
);
}),
)?)?;
self.io
.wait_for_completion(self.db_file.sync(Completion::new_sync(
move |_| {
tracing::trace!("Database file syncd after truncation");
},
))?)?;
if !checkpoint_result.db_truncate_sent {
let c = self.db_file.truncate(
expected as usize,
Completion::new_trunc(move |_| {
tracing::trace!(
"Database file truncated to expected size: {} bytes",
expected
);
}),
)?;
checkpoint_result.db_truncate_sent = true;
io_yield_one!(c);
}
if !checkpoint_result.db_sync_sent {
let c = self.db_file.sync(Completion::new_sync(move |_| {
tracing::trace!("Database file syncd after truncation");
}))?;
checkpoint_result.db_sync_sent = true;
io_yield_one!(c);
}
break 'ensure_sync;
}
}
// if we backfilled at all, we have to sync the db-file here
self.io
.wait_for_completion(self.db_file.sync(Completion::new_sync(move |_| {}))?)?;
if !checkpoint_result.db_sync_sent {
// if we backfilled at all, we have to sync the db-file here
let c = self.db_file.sync(Completion::new_sync(move |_| {}))?;
checkpoint_result.db_sync_sent = true;
io_yield_one!(c);
}
}
}
checkpoint_result.release_guard();
@@ -1723,7 +1736,14 @@ impl Pager {
.write()
.clear()
.map_err(|e| LimboError::InternalError(format!("Failed to clear page cache: {e:?}")))?;
Ok(checkpoint_result)
Ok(IOResult::Done(()))
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn wal_checkpoint(&self, mode: CheckpointMode) -> Result<CheckpointResult> {
let mut result = self.io.block(|| self.wal_checkpoint_start(mode))?;
self.io.block(|| self.wal_checkpoint_finish(&mut result))?;
Ok(result)
}
pub fn freepage_list(&self) -> u32 {

View File

@@ -24,8 +24,8 @@ use crate::storage::sqlite3_ondisk::{
};
use crate::types::{IOCompletions, IOResult};
use crate::{
bail_corrupt_error, io_yield_many, turso_assert, Buffer, Completion, CompletionError,
IOContext, LimboError, Result,
bail_corrupt_error, io_yield_many, io_yield_one, return_if_io, turso_assert, Buffer,
Completion, CompletionError, IOContext, LimboError, Result,
};
#[derive(Debug, Clone, Default)]
@@ -38,6 +38,8 @@ pub struct CheckpointResult {
/// In the case of everything backfilled, we need to hold the locks until the db
/// file is truncated.
maybe_guard: Option<CheckpointLocks>,
pub db_truncate_sent: bool,
pub db_sync_sent: bool,
}
impl Drop for CheckpointResult {
@@ -53,6 +55,8 @@ impl CheckpointResult {
num_backfilled: n_ckpt,
max_frame,
maybe_guard: None,
db_sync_sent: false,
db_truncate_sent: false,
}
}
@@ -314,11 +318,16 @@ pub trait Wal: Debug {
fn as_any(&self) -> &dyn std::any::Any;
}
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Clone)]
pub enum CheckpointState {
Start,
Processing,
Done,
Finalize,
Truncate {
checkpoint_result: Option<CheckpointResult>,
truncate_sent: bool,
sync_sent: bool,
},
}
/// IOV_MAX is 1024 on most systems, lets use 512 to be safe
@@ -1694,7 +1703,7 @@ impl WalFile {
mode: CheckpointMode,
) -> Result<IOResult<CheckpointResult>> {
loop {
let state = self.ongoing_checkpoint.state;
let state = &mut self.ongoing_checkpoint.state;
tracing::debug!(?state);
match state {
// Acquire the relevant exclusive locks and checkpoint_lock
@@ -1716,6 +1725,8 @@ impl WalFile {
num_backfilled: self.prev_checkpoint.num_backfilled,
max_frame: nbackfills,
maybe_guard: None,
db_sync_sent: false,
db_truncate_sent: false,
}));
}
// acquire the appropriate exclusive locks depending on the checkpoint mode
@@ -1868,19 +1879,19 @@ impl WalFile {
if !completions.is_empty() {
io_yield_many!(completions);
} else if self.ongoing_checkpoint.complete() {
self.ongoing_checkpoint.state = CheckpointState::Done;
self.ongoing_checkpoint.state = CheckpointState::Finalize;
}
}
// All eligible frames copied to the db file
// Update nBackfills
// In Restart or Truncate mode, we need to restart the log over and possibly truncate the file
// Release all locks and return the current num of wal frames and the amount we backfilled
CheckpointState::Done => {
CheckpointState::Finalize => {
turso_assert!(
self.ongoing_checkpoint.complete(),
"checkpoint pending flush must have finished"
);
let mut checkpoint_result = {
let checkpoint_result = {
let shared = self.get_shared();
let current_mx = shared.max_frame.load(Ordering::Acquire);
let nbackfills = shared.nbackfills.load(Ordering::Acquire);
@@ -1925,9 +1936,24 @@ impl WalFile {
if mode.require_all_backfilled() && !checkpoint_result.everything_backfilled() {
return Err(LimboError::Busy);
}
if mode.should_restart_log() {
self.restart_log(mode)?;
}
self.restart_log_if_needed(mode)?;
self.ongoing_checkpoint.state = CheckpointState::Truncate {
checkpoint_result: Some(checkpoint_result),
truncate_sent: false,
sync_sent: false,
};
}
CheckpointState::Truncate { .. } => {
return_if_io!(self.truncate_log_if_needed(mode));
let mut checkpoint_result = {
let CheckpointState::Truncate {
checkpoint_result, ..
} = &mut self.ongoing_checkpoint.state
else {
panic!("unxpected state");
};
checkpoint_result.take().unwrap()
};
// increment wal epoch to ensure no stale pages are used for backfilling
self.get_shared().epoch.fetch_add(1, Ordering::Release);
@@ -2025,11 +2051,10 @@ impl WalFile {
/// Called once the entire WAL has been backfilled in RESTART or TRUNCATE mode.
/// Must be invoked while writer and checkpoint locks are still held.
fn restart_log(&mut self, mode: CheckpointMode) -> Result<()> {
turso_assert!(
mode.should_restart_log(),
"CheckpointMode must be Restart or Truncate"
);
fn restart_log_if_needed(&mut self, mode: CheckpointMode) -> Result<()> {
if !mode.should_restart_log() {
return Ok(());
}
turso_assert!(
matches!(self.checkpoint_guard, Some(CheckpointLocks::Writer { .. })),
"We must hold writer and checkpoint locks to restart the log, found: {:?}",
@@ -2054,62 +2079,88 @@ impl WalFile {
}
}
let unlock = |e: Option<&LimboError>| {
// release all read locks we just acquired, the caller will take care of the others
let shared = self.shared.write();
for idx in 1..shared.read_locks.len() {
shared.read_locks[idx].unlock();
}
if let Some(e) = e {
tracing::error!(
"Failed to restart WAL header: {:?}, releasing read locks",
e
);
}
};
// reinitialize inmemory state
self.get_shared_mut()
.restart_wal_header(&self.io, mode)
.inspect_err(|e| {
unlock(Some(e));
})?;
self.get_shared_mut().restart_wal_header(&self.io, mode);
let cksm = self.get_shared().last_checksum;
self.last_checksum = cksm;
self.max_frame = 0;
self.min_frame = 0;
self.checkpoint_seq.fetch_add(1, Ordering::Release);
Ok(())
}
// For TRUNCATE mode: shrink the WAL file to 0B
if matches!(mode, CheckpointMode::Truncate { .. }) {
let c = Completion::new_trunc(|_| {
tracing::trace!("WAL file truncated to 0B");
});
fn truncate_log_if_needed(&mut self, mode: CheckpointMode) -> Result<IOResult<()>> {
if !matches!(mode, CheckpointMode::Truncate { .. }) {
Self::unlock(&self.shared, None);
return Ok(IOResult::Done(()));
}
let file = {
let shared = self.get_shared();
// for now at least, lets do all this IO syncronously
assert!(
shared.enabled.load(Ordering::Relaxed),
"WAL must be enabled"
);
let file = shared.file.as_ref().unwrap();
let c = file.truncate(0, c).inspect_err(|e| unlock(Some(e)))?;
shared.initialized.store(false, Ordering::Release);
self.io
.wait_for_completion(c)
.inspect_err(|e| unlock(Some(e)))?;
// fsync after truncation
self.io
.wait_for_completion(
file.sync(Completion::new_sync(|_| {
tracing::trace!("WAL file synced after reset/truncation");
}))
.inspect_err(|e| unlock(Some(e)))?,
)
.inspect_err(|e| unlock(Some(e)))?;
}
shared.file.as_ref().unwrap().clone()
};
// release readlocks 1..4
unlock(None);
Ok(())
let CheckpointState::Truncate {
sync_sent,
truncate_sent,
..
} = &mut self.ongoing_checkpoint.state
else {
panic!("unxpected state");
};
if !*truncate_sent {
// For TRUNCATE mode: shrink the WAL file to 0B
let c = Completion::new_trunc({
let shared = self.shared.clone();
move |result| {
if let Err(err) = result {
Self::unlock(&shared, Some(&LimboError::InternalError(err.to_string())));
} else {
tracing::trace!("WAL file truncated to 0 B");
}
}
});
let c = file
.truncate(0, c)
.inspect_err(|e| Self::unlock(&self.shared, Some(e)))?;
*truncate_sent = true;
io_yield_one!(c);
} else if !*sync_sent {
let shared = self.shared.clone();
let c = file
.sync(Completion::new_sync(move |result| {
if let Err(err) = result {
Self::unlock(&shared, Some(&LimboError::InternalError(err.to_string())));
} else {
tracing::trace!("WAL file synced after reset/truncation");
}
}))
.inspect_err(|e| Self::unlock(&self.shared, Some(e)))?;
*sync_sent = true;
io_yield_one!(c);
} else {
Self::unlock(&self.shared, None);
}
Ok(IOResult::Done(()))
}
fn unlock(shared: &Arc<RwLock<WalFileShared>>, e: Option<&LimboError>) {
// release all read locks we just acquired, the caller will take care of the others
let shared = shared.write();
for idx in 1..shared.read_locks.len() {
shared.read_locks[idx].unlock();
}
if let Some(e) = e {
tracing::error!(
"Failed to restart WAL header: {:?}, releasing read locks",
e
);
}
}
fn acquire_proper_checkpoint_guard(&mut self, mode: CheckpointMode) -> Result<()> {
@@ -2327,7 +2378,7 @@ impl WalFileShared {
/// This function updates the shared-memory structures so that the next
/// client to write to the database (which may be this one) does so by
/// writing frames into the start of the log file.
fn restart_wal_header(&mut self, io: &Arc<dyn IO>, mode: CheckpointMode) -> Result<()> {
fn restart_wal_header(&mut self, io: &Arc<dyn IO>, mode: CheckpointMode) {
turso_assert!(
matches!(
mode,
@@ -2354,7 +2405,6 @@ impl WalFileShared {
for lock in &self.read_locks[2..] {
lock.set_value_exclusive(READMARK_NOT_USED);
}
Ok(())
}
}