From 56905fc340ae2182d0da9182bc97ee2e7aa74b13 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Tue, 12 Aug 2025 14:25:23 -0300 Subject: [PATCH] refactor `checkpoint` in pager --- core/storage/pager.rs | 384 ++++++++++++------------------------------ core/storage/wal.rs | 42 +++-- 2 files changed, 135 insertions(+), 291 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 849705d77..56463296d 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -262,29 +262,30 @@ enum CacheFlushState { #[derive(Clone, Copy, Debug)] /// The state of the current pager cache commit. enum CommitState { - /// Idle. + /// Appends all frames to the WAL. Start, - /// Append a single frame to the WAL. - AppendFrame { current_page_to_append_idx: usize }, - /// Wait for append frame to complete. - /// If the current page is the last page to append, sync wal and clear dirty pages and cache. - WaitAppendFrame { current_page_to_append_idx: usize }, /// Fsync the on-disk WAL. SyncWal, + /// After Fsync the on-disk WAL. + AfterSyncWal, /// Checkpoint the WAL to the database file (if needed). Checkpoint, /// Fsync the database file. SyncDbFile, - /// Waiting for the database file to be fsynced. - WaitSyncDbFile, + /// After database file is fsynced. + AfterSyncDbFile, } -#[derive(Clone, Debug, Copy)] +#[derive(Clone, Debug, Default)] enum CheckpointState { + #[default] Checkpoint, - SyncDbFile, - WaitSyncDbFile, - CheckpointDone, + SyncDbFile { + res: CheckpointResult, + }, + CheckpointDone { + res: CheckpointResult, + }, } /// The mode of allocating a btree page. @@ -304,17 +305,6 @@ pub enum BtreePageAllocMode { /// This will keep track of the state of current cache commit in order to not repeat work struct CommitInfo { state: CommitState, - /// Number of writes taking place. When in_flight gets to 0 we can schedule a fsync. - in_flight_writes: Rc>, - /// Dirty pages to be flushed. - dirty_pages: Vec, -} - -/// This will keep track of the state of current cache flush in order to not repeat work -struct FlushInfo { - state: CacheFlushState, - /// Number of writes taking place. - in_flight_writes: Rc>, /// Dirty pages to be flushed. dirty_pages: Vec, } @@ -430,7 +420,6 @@ pub struct Pager { dirty_pages: Rc>>>, commit_info: RefCell, - flush_info: RefCell, checkpoint_state: RefCell, checkpoint_inflight: Rc>, syncing: Rc>, @@ -545,7 +534,6 @@ impl Pager { ))), commit_info: RefCell::new(CommitInfo { state: CommitState::Start, - in_flight_writes: Rc::new(RefCell::new(0)), dirty_pages: Vec::new(), }), syncing: Rc::new(RefCell::new(false)), @@ -558,11 +546,6 @@ impl Pager { allocate_page1_state, page_size: Cell::new(None), reserved_space: OnceCell::new(), - flush_info: RefCell::new(FlushInfo { - state: CacheFlushState::Start, - in_flight_writes: Rc::new(RefCell::new(0)), - dirty_pages: Vec::new(), - }), free_page_state: RefCell::new(FreePageState::Start), allocate_page_state: RefCell::new(AllocatePageState::Start), max_page_count: Cell::new(DEFAULT_MAX_PAGE_COUNT), @@ -1160,7 +1143,7 @@ impl Pager { /// Flush all dirty pages to disk. /// Unlike commit_dirty_pages, this function does not commit, checkpoint now sync the WAL/Database. #[instrument(skip_all, level = Level::INFO)] - pub fn cacheflush(&self) -> Result> { + pub fn cacheflush(&self) -> Result> { let Some(wal) = self.wal.as_ref() else { // TODO: when ephemeral table spills to disk, it should cacheflush pages directly to the temporary database file. // This handling is not yet implemented, but it should be when spilling is implemented. @@ -1168,92 +1151,32 @@ impl Pager { "cacheflush() called on database without WAL".to_string(), )); }; - let state = self.flush_info.borrow().state; - trace!(?state); - match state { - CacheFlushState::Start => { - let dirty_pages = self - .dirty_pages - .borrow() - .iter() - .copied() - .collect::>(); - let mut flush_info = self.flush_info.borrow_mut(); - if dirty_pages.is_empty() { - Ok(IOResult::Done(())) - } else { - flush_info.dirty_pages = dirty_pages; - flush_info.state = CacheFlushState::AppendFrame { - current_page_to_append_idx: 0, - }; - Ok(IOResult::IO) - } - } - CacheFlushState::AppendFrame { - current_page_to_append_idx, - } => { - let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx]; - let page = { - let mut cache = self.page_cache.write(); - let page_key = PageCacheKey::new(page_id); - let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); - let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); - trace!( - "commit_dirty_pages(page={}, page_type={:?}", - page_id, - page_type - ); - page - }; - - let _c = wal.borrow_mut().append_frame( - page.clone(), - 0, - self.flush_info.borrow().in_flight_writes.clone(), - )?; - self.flush_info.borrow_mut().state = CacheFlushState::WaitAppendFrame { - current_page_to_append_idx, - }; - Ok(IOResult::IO) - } - CacheFlushState::WaitAppendFrame { - current_page_to_append_idx, - } => { - let in_flight = self.flush_info.borrow().in_flight_writes.clone(); - if *in_flight.borrow() > 0 { - return Ok(IOResult::IO); - } - - // Clear dirty now - let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx]; - let page = { - let mut cache = self.page_cache.write(); - let page_key = PageCacheKey::new(page_id); - let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); - let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); - trace!( - "commit_dirty_pages(page={}, page_type={:?}", - page_id, - page_type - ); - page - }; - page.clear_dirty(); - // Continue with next page - let is_last_page = - current_page_to_append_idx == self.flush_info.borrow().dirty_pages.len() - 1; - if is_last_page { - self.dirty_pages.borrow_mut().clear(); - self.flush_info.borrow_mut().state = CacheFlushState::Start; - Ok(IOResult::Done(())) - } else { - self.flush_info.borrow_mut().state = CacheFlushState::AppendFrame { - current_page_to_append_idx: current_page_to_append_idx + 1, - }; - Ok(IOResult::IO) - } - } + let dirty_pages = self + .dirty_pages + .borrow() + .iter() + .copied() + .collect::>(); + let mut completions = Vec::with_capacity(dirty_pages.len()); + for page_id in dirty_pages { + let page = { + let mut cache = self.page_cache.write(); + let page_key = PageCacheKey::new(page_id); + let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); + let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); + trace!( + "commit_dirty_pages(page={}, page_type={:?}", + page_id, + page_type + ); + page + }; + let c = wal.borrow_mut().append_frame(page.clone(), 0)?; + // TODO: invalidade previous completions if this one fails + completions.push(c); } + // Pages are cleared dirty on callback completion + Ok(completions) } /// Flush all dirty pages to disk. @@ -1276,107 +1199,56 @@ impl Pager { trace!(?state); match state { CommitState::Start => { - let dirty_pages = self - .dirty_pages - .borrow() - .iter() - .copied() - .collect::>(); - let mut commit_info = self.commit_info.borrow_mut(); - if dirty_pages.is_empty() { + let db_size = { + self.io + .block(|| self.with_header(|header| header.database_size))? + .get() + }; + let dirty_len = self.dirty_pages.borrow().iter().len(); + let mut completions = Vec::with_capacity(dirty_len); + for (curr_page_idx, page_id) in + self.dirty_pages.borrow().iter().copied().enumerate() + { + let is_last_frame = curr_page_idx == dirty_len - 1; + + let db_size = if is_last_frame { db_size } else { 0 }; + + let page = { + let mut cache = self.page_cache.write(); + let page_key = PageCacheKey::new(page_id); + let page = cache.get(&page_key).unwrap_or_else(|| { + panic!( + "we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={page_id}" + ) + }); + let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); + trace!( + "commit_dirty_pages(page={}, page_type={:?}", + page_id, + page_type + ); + page + }; + + // TODO: invalidade previous completions on error here + let c = wal.borrow_mut().append_frame(page.clone(), db_size)?; + completions.push(c); + } + self.dirty_pages.borrow_mut().clear(); + // Nothing to append + if completions.is_empty() { return Ok(IOResult::Done(PagerCommitResult::WalWritten)); } else { - commit_info.dirty_pages = dirty_pages; - commit_info.state = CommitState::AppendFrame { - current_page_to_append_idx: 0, - }; - } - } - CommitState::AppendFrame { - current_page_to_append_idx, - } => { - let page_id = self.commit_info.borrow().dirty_pages[current_page_to_append_idx]; - let is_last_frame = current_page_to_append_idx - == self.commit_info.borrow().dirty_pages.len() - 1; - let page = { - let mut cache = self.page_cache.write(); - let page_key = PageCacheKey::new(page_id); - let page = cache.get(&page_key).unwrap_or_else(|| { - panic!( - "we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={page_id}" - ) - }); - let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); - trace!( - "commit_dirty_pages(page={}, page_type={:?}", - page_id, - page_type - ); - page - }; - - let db_size = { - let db_size = self - .io - .block(|| self.with_header(|header| header.database_size))? - .get(); - if is_last_frame { - db_size - } else { - 0 - } - }; - let _c = wal.borrow_mut().append_frame( - page.clone(), - db_size, - self.commit_info.borrow().in_flight_writes.clone(), - )?; - self.commit_info.borrow_mut().state = CommitState::WaitAppendFrame { - current_page_to_append_idx, - }; - } - CommitState::WaitAppendFrame { - current_page_to_append_idx, - } => { - let in_flight = self.commit_info.borrow().in_flight_writes.clone(); - if *in_flight.borrow() > 0 { - return Ok(IOResult::IO); - } - // First clear dirty - let page_id = self.commit_info.borrow().dirty_pages[current_page_to_append_idx]; - let page = { - let mut cache = self.page_cache.write(); - let page_key = PageCacheKey::new(page_id); - let page = cache.get(&page_key).unwrap_or_else(|| { - panic!( - "we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={page_id}" - ) - }); - let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); - trace!( - "commit_dirty_pages(page={}, page_type={:?}", - page_id, - page_type - ); - page - }; - page.clear_dirty(); - - // Now advance to next page if there are more - let is_last_frame = current_page_to_append_idx - == self.commit_info.borrow().dirty_pages.len() - 1; - if is_last_frame { - self.dirty_pages.borrow_mut().clear(); self.commit_info.borrow_mut().state = CommitState::SyncWal; - } else { - self.commit_info.borrow_mut().state = CommitState::AppendFrame { - current_page_to_append_idx: current_page_to_append_idx + 1, - } + return Ok(IOResult::IO(IOCompletions::Many(completions))); } } CommitState::SyncWal => { - return_if_io!(wal.borrow_mut().sync()); - + self.commit_info.borrow_mut().state = CommitState::AfterSyncWal; + let c = wal.borrow_mut().sync()?; + return Ok(IOResult::IO(IOCompletions::Single(c))); + } + CommitState::AfterSyncWal => { if wal_checkpoint_disabled || !wal.borrow().should_checkpoint() { self.commit_info.borrow_mut().state = CommitState::Start; break PagerCommitResult::WalWritten; @@ -1388,17 +1260,17 @@ impl Pager { self.commit_info.borrow_mut().state = CommitState::SyncDbFile; } CommitState::SyncDbFile => { - let _c = - sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?; - self.commit_info.borrow_mut().state = CommitState::WaitSyncDbFile; + let c = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?; + self.commit_info.borrow_mut().state = CommitState::AfterSyncDbFile; + return Ok(IOResult::IO(IOCompletions::Single(c))); } - CommitState::WaitSyncDbFile => { - if *self.syncing.borrow() { - return Ok(IOResult::IO); - } else { - self.commit_info.borrow_mut().state = CommitState::Start; - break PagerCommitResult::Checkpointed(checkpoint_result); - } + CommitState::AfterSyncDbFile => { + turso_assert!( + self.syncing.borrow().clone(), + "should have finished syncing" + ); + self.commit_info.borrow_mut().state = CommitState::Start; + break PagerCommitResult::Checkpointed(checkpoint_result); } } }; @@ -1471,42 +1343,26 @@ impl Pager { "checkpoint() called on database without WAL".to_string(), )); }; - let mut checkpoint_result = CheckpointResult::default(); loop { - let state = *self.checkpoint_state.borrow(); + let state = std::mem::take(&mut *self.checkpoint_state.borrow_mut()); trace!(?state); match state { CheckpointState::Checkpoint => { - let in_flight = self.checkpoint_inflight.clone(); - let res = return_if_io!(wal.borrow_mut().checkpoint( - self, - in_flight, - CheckpointMode::Passive - )); - checkpoint_result = res; - self.checkpoint_state.replace(CheckpointState::SyncDbFile); - } - CheckpointState::SyncDbFile => { - let _c = - sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?; + let res = + return_if_io!(wal.borrow_mut().checkpoint(self, CheckpointMode::Passive)); self.checkpoint_state - .replace(CheckpointState::WaitSyncDbFile); + .replace(CheckpointState::SyncDbFile { res }); } - CheckpointState::WaitSyncDbFile => { - if *self.syncing.borrow() { - return Ok(IOResult::IO); - } else { - self.checkpoint_state - .replace(CheckpointState::CheckpointDone); - } + CheckpointState::SyncDbFile { res } => { + let c = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone())?; + self.checkpoint_state + .replace(CheckpointState::CheckpointDone { res }); + return Ok(IOResult::IO(IOCompletions::Single(c))); } - CheckpointState::CheckpointDone => { - return if *self.checkpoint_inflight.borrow() > 0 { - Ok(IOResult::IO) - } else { - self.checkpoint_state.replace(CheckpointState::Checkpoint); - Ok(IOResult::Done(checkpoint_result)) - }; + CheckpointState::CheckpointDone { res } => { + turso_assert!(self.syncing.borrow().clone(), "syncing should be done"); + self.checkpoint_state.replace(CheckpointState::Checkpoint); + return Ok(IOResult::Done(res)); } } } @@ -1534,18 +1390,10 @@ impl Pager { }; let mut wal = wal.borrow_mut(); // fsync the wal syncronously before beginning checkpoint - while let Ok(IOResult::IO) = wal.sync() { - // TODO: for now forget about timeouts as they fail regularly in SIM - // need to think of a better way to do this - - // if attempts >= 1000 { - // return Err(LimboError::InternalError( - // "Failed to fsync WAL before final checkpoint, fd likely closed".into(), - // )); - // } - self.io.run_once()?; - _attempts += 1; - } + // TODO: for now forget about timeouts as they fail regularly in SIM + // need to think of a better way to do this + let c = wal.sync()?; + self.io.wait_for_completion(c)?; } self.wal_checkpoint(wal_checkpoint_disabled, CheckpointMode::Passive)?; Ok(()) @@ -1566,11 +1414,7 @@ impl Pager { return Ok(CheckpointResult::default()); } - let write_counter = Rc::new(RefCell::new(0)); - let mut checkpoint_result = self.io.block(|| { - wal.borrow_mut() - .checkpoint(self, write_counter.clone(), mode) - })?; + let mut checkpoint_result = self.io.block(|| wal.borrow_mut().checkpoint(self, mode))?; if checkpoint_result.everything_backfilled() && checkpoint_result.num_backfilled != 0 { let db_size = self @@ -2105,14 +1949,8 @@ impl Pager { self.checkpoint_state.replace(CheckpointState::Checkpoint); self.checkpoint_inflight.replace(0); self.syncing.replace(false); - self.flush_info.replace(FlushInfo { - state: CacheFlushState::Start, - in_flight_writes: Rc::new(RefCell::new(0)), - dirty_pages: Vec::new(), - }); self.commit_info.replace(CommitInfo { state: CommitState::Start, - in_flight_writes: Rc::new(RefCell::new(0)), dirty_pages: Vec::new(), }); self.allocate_page_state.replace(AllocatePageState::Start); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index b70bfe583..2132e397f 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1997,9 +1997,10 @@ pub mod test { conn.execute("create table test(id integer primary key, value text)") .unwrap(); bulk_inserts(&conn, 20, 3); - db.io - .block(|| conn.pager.borrow_mut().cacheflush()) - .unwrap(); + let completions = conn.pager.borrow_mut().cacheflush().unwrap(); + for c in completions { + db.io.wait_for_completion(c).unwrap(); + } // Snapshot header & counters before the RESTART checkpoint. let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone(); @@ -2091,9 +2092,10 @@ pub mod test { .execute("create table test(id integer primary key, value text)") .unwrap(); bulk_inserts(&conn1.clone(), 15, 2); - db.io - .block(|| conn1.pager.borrow_mut().cacheflush()) - .unwrap(); + let completions = conn1.pager.borrow_mut().cacheflush().unwrap(); + for c in completions { + db.io.wait_for_completion(c).unwrap(); + } // Force a read transaction that will freeze a lower read mark let readmark = { @@ -2105,9 +2107,10 @@ pub mod test { // generate more frames that the reader will not see. bulk_inserts(&conn1.clone(), 15, 2); - db.io - .block(|| conn1.pager.borrow_mut().cacheflush()) - .unwrap(); + let completions = conn1.pager.borrow_mut().cacheflush().unwrap(); + for c in completions { + db.io.wait_for_completion(c).unwrap(); + } // Run passive checkpoint, expect partial let (res1, max_before) = { @@ -2766,9 +2769,10 @@ pub mod test { bulk_inserts(&conn, 8, 4); // Ensure frames are flushed to the WAL - db.io - .block(|| conn.pager.borrow_mut().cacheflush()) - .unwrap(); + let completions = conn.pager.borrow_mut().cacheflush().unwrap(); + for c in completions { + db.io.wait_for_completion(c).unwrap(); + } // Snapshot the current mxFrame before running FULL let wal_shared = db.maybe_shared_wal.read().as_ref().unwrap().clone(); @@ -2798,9 +2802,10 @@ pub mod test { // First commit some data and flush (reader will snapshot here) bulk_inserts(&writer, 2, 3); - db.io - .block(|| writer.pager.borrow_mut().cacheflush()) - .unwrap(); + let completions = writer.pager.borrow_mut().cacheflush().unwrap(); + for c in completions { + db.io.wait_for_completion(c).unwrap(); + } // Start a read transaction pinned at the current snapshot { @@ -2817,9 +2822,10 @@ pub mod test { // Advance WAL beyond the reader's snapshot bulk_inserts(&writer, 3, 4); - db.io - .block(|| writer.pager.borrow_mut().cacheflush()) - .unwrap(); + let completions = writer.pager.borrow_mut().cacheflush().unwrap(); + for c in completions { + db.io.wait_for_completion(c).unwrap(); + } let mx_now = unsafe { (&*db.maybe_shared_wal.read().as_ref().unwrap().get()) .max_frame