refactor checkpoint in pager

This commit is contained in:
pedrocarlo
2025-08-12 14:25:23 -03:00
committed by Jussi Saurio
parent a7f5912e7d
commit 56905fc340
2 changed files with 135 additions and 291 deletions

View File

@@ -262,29 +262,30 @@ enum CacheFlushState {
#[derive(Clone, Copy, Debug)]
/// The state of the current pager cache commit.
enum CommitState {
/// Idle.
/// Appends all frames to the WAL.
Start,
/// Append a single frame to the WAL.
AppendFrame { current_page_to_append_idx: usize },
/// Wait for append frame to complete.
/// If the current page is the last page to append, sync wal and clear dirty pages and cache.
WaitAppendFrame { current_page_to_append_idx: usize },
/// Fsync the on-disk WAL.
SyncWal,
/// After Fsync the on-disk WAL.
AfterSyncWal,
/// Checkpoint the WAL to the database file (if needed).
Checkpoint,
/// Fsync the database file.
SyncDbFile,
/// Waiting for the database file to be fsynced.
WaitSyncDbFile,
/// After database file is fsynced.
AfterSyncDbFile,
}
#[derive(Clone, Debug, Copy)]
#[derive(Clone, Debug, Default)]
enum CheckpointState {
#[default]
Checkpoint,
SyncDbFile,
WaitSyncDbFile,
CheckpointDone,
SyncDbFile {
res: CheckpointResult,
},
CheckpointDone {
res: CheckpointResult,
},
}
/// The mode of allocating a btree page.
@@ -304,17 +305,6 @@ pub enum BtreePageAllocMode {
/// This will keep track of the state of current cache commit in order to not repeat work
struct CommitInfo {
state: CommitState,
/// Number of writes taking place. When in_flight gets to 0 we can schedule a fsync.
in_flight_writes: Rc<RefCell<usize>>,
/// Dirty pages to be flushed.
dirty_pages: Vec<usize>,
}
/// This will keep track of the state of current cache flush in order to not repeat work
struct FlushInfo {
state: CacheFlushState,
/// Number of writes taking place.
in_flight_writes: Rc<RefCell<usize>>,
/// Dirty pages to be flushed.
dirty_pages: Vec<usize>,
}
@@ -430,7 +420,6 @@ pub struct Pager {
dirty_pages: Rc<RefCell<HashSet<usize, hash::BuildHasherDefault<hash::DefaultHasher>>>>,
commit_info: RefCell<CommitInfo>,
flush_info: RefCell<FlushInfo>,
checkpoint_state: RefCell<CheckpointState>,
checkpoint_inflight: Rc<RefCell<usize>>,
syncing: Rc<RefCell<bool>>,
@@ -545,7 +534,6 @@ impl Pager {
))),
commit_info: RefCell::new(CommitInfo {
state: CommitState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
dirty_pages: Vec::new(),
}),
syncing: Rc::new(RefCell::new(false)),
@@ -558,11 +546,6 @@ impl Pager {
allocate_page1_state,
page_size: Cell::new(None),
reserved_space: OnceCell::new(),
flush_info: RefCell::new(FlushInfo {
state: CacheFlushState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
dirty_pages: Vec::new(),
}),
free_page_state: RefCell::new(FreePageState::Start),
allocate_page_state: RefCell::new(AllocatePageState::Start),
max_page_count: Cell::new(DEFAULT_MAX_PAGE_COUNT),
@@ -1160,7 +1143,7 @@ impl Pager {
/// Flush all dirty pages to disk.
/// Unlike commit_dirty_pages, this function does not commit, checkpoint now sync the WAL/Database.
#[instrument(skip_all, level = Level::INFO)]
pub fn cacheflush(&self) -> Result<IOResult<()>> {
pub fn cacheflush(&self) -> Result<Vec<Completion>> {
let Some(wal) = self.wal.as_ref() else {
// TODO: when ephemeral table spills to disk, it should cacheflush pages directly to the temporary database file.
// This handling is not yet implemented, but it should be when spilling is implemented.
@@ -1168,92 +1151,32 @@ impl Pager {
"cacheflush() called on database without WAL".to_string(),
));
};
let state = self.flush_info.borrow().state;
trace!(?state);
match state {
CacheFlushState::Start => {
let dirty_pages = self
.dirty_pages
.borrow()
.iter()
.copied()
.collect::<Vec<usize>>();
let mut flush_info = self.flush_info.borrow_mut();
if dirty_pages.is_empty() {
Ok(IOResult::Done(()))
} else {
flush_info.dirty_pages = dirty_pages;
flush_info.state = CacheFlushState::AppendFrame {
current_page_to_append_idx: 0,
};
Ok(IOResult::IO)
}
}
CacheFlushState::AppendFrame {
current_page_to_append_idx,
} => {
let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx];
let page = {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_id);
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
page
};
let _c = wal.borrow_mut().append_frame(
page.clone(),
0,
self.flush_info.borrow().in_flight_writes.clone(),
)?;
self.flush_info.borrow_mut().state = CacheFlushState::WaitAppendFrame {
current_page_to_append_idx,
};
Ok(IOResult::IO)
}
CacheFlushState::WaitAppendFrame {
current_page_to_append_idx,
} => {
let in_flight = self.flush_info.borrow().in_flight_writes.clone();
if *in_flight.borrow() > 0 {
return Ok(IOResult::IO);
}
// Clear dirty now
let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx];
let page = {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_id);
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
page
};
page.clear_dirty();
// Continue with next page
let is_last_page =
current_page_to_append_idx == self.flush_info.borrow().dirty_pages.len() - 1;
if is_last_page {
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = CacheFlushState::Start;
Ok(IOResult::Done(()))
} else {
self.flush_info.borrow_mut().state = CacheFlushState::AppendFrame {
current_page_to_append_idx: current_page_to_append_idx + 1,
};
Ok(IOResult::IO)
}
}
let dirty_pages = self
.dirty_pages
.borrow()
.iter()
.copied()
.collect::<Vec<usize>>();
let mut completions = Vec::with_capacity(dirty_pages.len());
for page_id in dirty_pages {
let page = {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_id);
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
page
};
let c = wal.borrow_mut().append_frame(page.clone(), 0)?;
// TODO: invalidade previous completions if this one fails
completions.push(c);
}
// Pages are cleared dirty on callback completion
Ok(completions)
}
/// Flush all dirty pages to disk.
@@ -1276,107 +1199,56 @@ impl Pager {
trace!(?state);
match state {
CommitState::Start => {
let dirty_pages = self
.dirty_pages
.borrow()
.iter()
.copied()
.collect::<Vec<usize>>();
let mut commit_info = self.commit_info.borrow_mut();
if dirty_pages.is_empty() {
let db_size = {
self.io
.block(|| self.with_header(|header| header.database_size))?
.get()
};
let dirty_len = self.dirty_pages.borrow().iter().len();
let mut completions = Vec::with_capacity(dirty_len);
for (curr_page_idx, page_id) in
self.dirty_pages.borrow().iter().copied().enumerate()
{
let is_last_frame = curr_page_idx == dirty_len - 1;
let db_size = if is_last_frame { db_size } else { 0 };
let page = {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_id);
let page = cache.get(&page_key).unwrap_or_else(|| {
panic!(
"we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={page_id}"
)
});
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
page
};
// TODO: invalidade previous completions on error here
let c = wal.borrow_mut().append_frame(page.clone(), db_size)?;
completions.push(c);
}
self.dirty_pages.borrow_mut().clear();
// Nothing to append
if completions.is_empty() {
return Ok(IOResult::Done(PagerCommitResult::WalWritten));
} else {
commit_info.dirty_pages = dirty_pages;
commit_info.state = CommitState::AppendFrame {
current_page_to_append_idx: 0,
};
}
}
CommitState::AppendFrame {
current_page_to_append_idx,
} => {
let page_id = self.commit_info.borrow().dirty_pages[current_page_to_append_idx];
let is_last_frame = current_page_to_append_idx
== self.commit_info.borrow().dirty_pages.len() - 1;
let page = {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_id);
let page = cache.get(&page_key).unwrap_or_else(|| {
panic!(
"we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={page_id}"
)
});
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
page
};
let db_size = {
let db_size = self
.io
.block(|| self.with_header(|header| header.database_size))?
.get();
if is_last_frame {
db_size
} else {
0
}
};
let _c = wal.borrow_mut().append_frame(
page.clone(),
db_size,
self.commit_info.borrow().in_flight_writes.clone(),
)?;
self.commit_info.borrow_mut().state = CommitState::WaitAppendFrame {
current_page_to_append_idx,
};
}
CommitState::WaitAppendFrame {
current_page_to_append_idx,
} => {
let in_flight = self.commit_info.borrow().in_flight_writes.clone();
if *in_flight.borrow() > 0 {
return Ok(IOResult::IO);
}
// First clear dirty
let page_id = self.commit_info.borrow().dirty_pages[current_page_to_append_idx];
let page = {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_id);
let page = cache.get(&page_key).unwrap_or_else(|| {
panic!(
"we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={page_id}"
)
});
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!(
"commit_dirty_pages(page={}, page_type={:?}",
page_id,
page_type
);
page
};
page.clear_dirty();
// Now advance to next page if there are more
let is_last_frame = current_page_to_append_idx
== self.commit_info.borrow().dirty_pages.len() - 1;
if is_last_frame {
self.dirty_pages.borrow_mut().clear();
self.commit_info.borrow_mut().state = CommitState::SyncWal;
} else {
self.commit_info.borrow_mut().state = CommitState::AppendFrame {
current_page_to_append_idx: current_page_to_append_idx + 1,
}
return Ok(IOResult::IO(IOCompletions::Many(completions)));
}
}
CommitState::SyncWal => {
return_if_io!(wal.borrow_mut().sync());
self.commit_info.borrow_mut().state = CommitState::AfterSyncWal;
let c = wal.borrow_mut().sync()?;
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
CommitState::AfterSyncWal => {
if wal_checkpoint_disabled || !wal.borrow().should_checkpoint() {
self.commit_info.borrow_mut().state = CommitState::Start;
break PagerCommitResult::WalWritten;
@@ -1388,17 +1260,17 @@ impl Pager {
self.commit_info.borrow_mut().state = CommitState::SyncDbFile;
}
CommitState::SyncDbFile => {
let _c =
sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
self.commit_info.borrow_mut().state = CommitState::WaitSyncDbFile;
let c = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
self.commit_info.borrow_mut().state = CommitState::AfterSyncDbFile;
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
CommitState::WaitSyncDbFile => {
if *self.syncing.borrow() {
return Ok(IOResult::IO);
} else {
self.commit_info.borrow_mut().state = CommitState::Start;
break PagerCommitResult::Checkpointed(checkpoint_result);
}
CommitState::AfterSyncDbFile => {
turso_assert!(
self.syncing.borrow().clone(),
"should have finished syncing"
);
self.commit_info.borrow_mut().state = CommitState::Start;
break PagerCommitResult::Checkpointed(checkpoint_result);
}
}
};
@@ -1471,42 +1343,26 @@ impl Pager {
"checkpoint() called on database without WAL".to_string(),
));
};
let mut checkpoint_result = CheckpointResult::default();
loop {
let state = *self.checkpoint_state.borrow();
let state = std::mem::take(&mut *self.checkpoint_state.borrow_mut());
trace!(?state);
match state {
CheckpointState::Checkpoint => {
let in_flight = self.checkpoint_inflight.clone();
let res = return_if_io!(wal.borrow_mut().checkpoint(
self,
in_flight,
CheckpointMode::Passive
));
checkpoint_result = res;
self.checkpoint_state.replace(CheckpointState::SyncDbFile);
}
CheckpointState::SyncDbFile => {
let _c =
sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
let res =
return_if_io!(wal.borrow_mut().checkpoint(self, CheckpointMode::Passive));
self.checkpoint_state
.replace(CheckpointState::WaitSyncDbFile);
.replace(CheckpointState::SyncDbFile { res });
}
CheckpointState::WaitSyncDbFile => {
if *self.syncing.borrow() {
return Ok(IOResult::IO);
} else {
self.checkpoint_state
.replace(CheckpointState::CheckpointDone);
}
CheckpointState::SyncDbFile { res } => {
let c = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?;
self.checkpoint_state
.replace(CheckpointState::CheckpointDone { res });
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
CheckpointState::CheckpointDone => {
return if *self.checkpoint_inflight.borrow() > 0 {
Ok(IOResult::IO)
} else {
self.checkpoint_state.replace(CheckpointState::Checkpoint);
Ok(IOResult::Done(checkpoint_result))
};
CheckpointState::CheckpointDone { res } => {
turso_assert!(self.syncing.borrow().clone(), "syncing should be done");
self.checkpoint_state.replace(CheckpointState::Checkpoint);
return Ok(IOResult::Done(res));
}
}
}
@@ -1534,18 +1390,10 @@ impl Pager {
};
let mut wal = wal.borrow_mut();
// fsync the wal syncronously before beginning checkpoint
while let Ok(IOResult::IO) = wal.sync() {
// TODO: for now forget about timeouts as they fail regularly in SIM
// need to think of a better way to do this
// if attempts >= 1000 {
// return Err(LimboError::InternalError(
// "Failed to fsync WAL before final checkpoint, fd likely closed".into(),
// ));
// }
self.io.run_once()?;
_attempts += 1;
}
// TODO: for now forget about timeouts as they fail regularly in SIM
// need to think of a better way to do this
let c = wal.sync()?;
self.io.wait_for_completion(c)?;
}
self.wal_checkpoint(wal_checkpoint_disabled, CheckpointMode::Passive)?;
Ok(())
@@ -1566,11 +1414,7 @@ impl Pager {
return Ok(CheckpointResult::default());
}
let write_counter = Rc::new(RefCell::new(0));
let mut checkpoint_result = self.io.block(|| {
wal.borrow_mut()
.checkpoint(self, write_counter.clone(), mode)
})?;
let mut checkpoint_result = self.io.block(|| wal.borrow_mut().checkpoint(self, mode))?;
if checkpoint_result.everything_backfilled() && checkpoint_result.num_backfilled != 0 {
let db_size = self
@@ -2105,14 +1949,8 @@ impl Pager {
self.checkpoint_state.replace(CheckpointState::Checkpoint);
self.checkpoint_inflight.replace(0);
self.syncing.replace(false);
self.flush_info.replace(FlushInfo {
state: CacheFlushState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
dirty_pages: Vec::new(),
});
self.commit_info.replace(CommitInfo {
state: CommitState::Start,
in_flight_writes: Rc::new(RefCell::new(0)),
dirty_pages: Vec::new(),
});
self.allocate_page_state.replace(AllocatePageState::Start);

View File

@@ -1997,9 +1997,10 @@ pub mod test {
conn.execute("create table test(id integer primary key, value text)")
.unwrap();
bulk_inserts(&conn, 20, 3);
db.io
.block(|| conn.pager.borrow_mut().cacheflush())
.unwrap();
let completions = conn.pager.borrow_mut().cacheflush().unwrap();
for c in completions {
db.io.wait_for_completion(c).unwrap();
}
// Snapshot header & counters before the RESTART checkpoint.
let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone();
@@ -2091,9 +2092,10 @@ pub mod test {
.execute("create table test(id integer primary key, value text)")
.unwrap();
bulk_inserts(&conn1.clone(), 15, 2);
db.io
.block(|| conn1.pager.borrow_mut().cacheflush())
.unwrap();
let completions = conn1.pager.borrow_mut().cacheflush().unwrap();
for c in completions {
db.io.wait_for_completion(c).unwrap();
}
// Force a read transaction that will freeze a lower read mark
let readmark = {
@@ -2105,9 +2107,10 @@ pub mod test {
// generate more frames that the reader will not see.
bulk_inserts(&conn1.clone(), 15, 2);
db.io
.block(|| conn1.pager.borrow_mut().cacheflush())
.unwrap();
let completions = conn1.pager.borrow_mut().cacheflush().unwrap();
for c in completions {
db.io.wait_for_completion(c).unwrap();
}
// Run passive checkpoint, expect partial
let (res1, max_before) = {
@@ -2766,9 +2769,10 @@ pub mod test {
bulk_inserts(&conn, 8, 4);
// Ensure frames are flushed to the WAL
db.io
.block(|| conn.pager.borrow_mut().cacheflush())
.unwrap();
let completions = conn.pager.borrow_mut().cacheflush().unwrap();
for c in completions {
db.io.wait_for_completion(c).unwrap();
}
// Snapshot the current mxFrame before running FULL
let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone();
@@ -2798,9 +2802,10 @@ pub mod test {
// First commit some data and flush (reader will snapshot here)
bulk_inserts(&writer, 2, 3);
db.io
.block(|| writer.pager.borrow_mut().cacheflush())
.unwrap();
let completions = writer.pager.borrow_mut().cacheflush().unwrap();
for c in completions {
db.io.wait_for_completion(c).unwrap();
}
// Start a read transaction pinned at the current snapshot
{
@@ -2817,9 +2822,10 @@ pub mod test {
// Advance WAL beyond the reader's snapshot
bulk_inserts(&writer, 3, 4);
db.io
.block(|| writer.pager.borrow_mut().cacheflush())
.unwrap();
let completions = writer.pager.borrow_mut().cacheflush().unwrap();
for c in completions {
db.io.wait_for_completion(c).unwrap();
}
let mx_now = unsafe {
(&*db.maybe_shared_wal.read().as_ref().unwrap().get())
.max_frame