From 9f966910bc9dc69e89613da179b3fe205c4a057b Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 9 Jun 2025 16:04:47 -0400 Subject: [PATCH 1/4] Add manual wal sync before checkpoint in connection Drop --- core/lib.rs | 18 +++++++----------- core/storage/pager.rs | 6 ++++++ 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 6a275b383..8368ac661 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -343,6 +343,12 @@ pub struct Connection { cache_size: Cell, } +impl Drop for Connection { + fn drop(&mut self) { + let _ = self.close(); + } +} + impl Connection { #[instrument(skip_all, level = Level::TRACE)] pub fn prepare(self: &Rc, sql: impl AsRef) -> Result { @@ -566,17 +572,7 @@ impl Connection { /// Close a connection and checkpoint. pub fn close(&self) -> Result<()> { - loop { - // TODO: make this async? - match self.pager.checkpoint()? { - CheckpointStatus::Done(_) => { - return Ok(()); - } - CheckpointStatus::IO => { - self.pager.io.run_once()?; - } - }; - } + self.pager.checkpoint_shutdown() } pub fn last_insert_rowid(&self) -> i64 { diff --git a/core/storage/pager.rs b/core/storage/pager.rs index dd77b951c..4121a39b8 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -821,6 +821,12 @@ impl Pager { .expect("Failed to clear page cache"); } + pub fn checkpoint_shutdown(&self) -> Result<()> { + self.wal.borrow_mut().sync()?; + self.wal_checkpoint(); + Ok(()) + } + pub fn wal_checkpoint(&self) -> CheckpointResult { let checkpoint_result: CheckpointResult; loop { From eecf6ae6e6be133f3e3b3961921830c05e656fa1 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 10 Jun 2025 09:08:34 -0400 Subject: [PATCH 2/4] Wait till we write the page to increment current page in wal checkpoint --- core/storage/wal.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 0e115d321..c1bae113f 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -748,7 +748,6 @@ impl Wal for WalFile { self.buffer_pool.clone(), )?; self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame; - self.ongoing_checkpoint.current_page += 1; continue 'checkpoint_loop; } } @@ -778,6 +777,7 @@ impl Wal for WalFile { if (self.ongoing_checkpoint.current_page as usize) < shared.pages_in_frames.lock().len() { + self.ongoing_checkpoint.current_page += 1; self.ongoing_checkpoint.state = CheckpointState::ReadFrame; } else { self.ongoing_checkpoint.state = CheckpointState::Done; From e134bd19da8dac8f821089c12d2348ac76b924ea Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 10 Jun 2025 09:16:02 -0400 Subject: [PATCH 3/4] Remove close from Drop impl --- core/lib.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 8368ac661..6beae3585 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -343,12 +343,6 @@ pub struct Connection { cache_size: Cell, } -impl Drop for Connection { - fn drop(&mut self) { - let _ = self.close(); - } -} - impl Connection { #[instrument(skip_all, level = Level::TRACE)] pub fn prepare(self: &Rc, sql: impl AsRef) -> Result { From 33b52bfb84cf4fb1b9e0e0e659059f6667866ded Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 10 Jun 2025 10:21:48 -0400 Subject: [PATCH 4/4] Replace refcel in wal sync, add counter timeout to conn close shutdown --- core/storage/pager.rs | 15 +++++++++++++- core/storage/wal.rs | 46 +++++++++++++++++++++---------------------- 2 files changed, 36 insertions(+), 25 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 4121a39b8..ce266b250 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -822,7 +822,20 @@ impl Pager { } pub fn checkpoint_shutdown(&self) -> Result<()> { - self.wal.borrow_mut().sync()?; + let mut attempts = 0; + { + let mut wal = self.wal.borrow_mut(); + // fsync the wal syncronously before beginning checkpoint + while let Ok(WalFsyncStatus::IO) = wal.sync() { + if attempts >= 10 { + return Err(LimboError::InternalError( + "Failed to fsync WAL before final checkpoint, fd likely closed".into(), + )); + } + self.io.run_once()?; + attempts += 1; + } + } self.wal_checkpoint(); Ok(()) } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index c1bae113f..9f4b63e74 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -72,7 +72,7 @@ pub enum CheckpointMode { Truncate, } -#[derive(Debug)] +#[derive(Debug, Default)] pub struct LimboRwLock { lock: AtomicU32, nreads: AtomicU32, @@ -390,8 +390,8 @@ pub struct WalFile { io: Arc, buffer_pool: Rc, - sync_state: RefCell, - syncing: Rc>, + syncing: Rc>, + sync_state: Cell, page_size: u32, shared: Arc>, @@ -410,8 +410,8 @@ pub struct WalFile { impl fmt::Debug for WalFile { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WalFile") + .field("syncing", &self.syncing.get()) .field("sync_state", &self.sync_state) - .field("syncing", &self.syncing) .field("page_size", &self.page_size) .field("shared", &self.shared) .field("ongoing_checkpoint", &self.ongoing_checkpoint) @@ -828,31 +828,29 @@ impl Wal for WalFile { #[instrument(skip_all, level = Level::DEBUG)] fn sync(&mut self) -> Result { - let state = *self.sync_state.borrow(); - match state { + match self.sync_state.get() { SyncState::NotSyncing => { - let shared = self.get_shared(); tracing::debug!("wal_sync"); - { - let syncing = self.syncing.clone(); - *syncing.borrow_mut() = true; - let completion = Completion::Sync(SyncCompletion { - complete: Box::new(move |_| { - tracing::debug!("wal_sync finish"); - *syncing.borrow_mut() = false; - }), - is_completed: Cell::new(false), - }); - shared.file.sync(Arc::new(completion))?; - } - self.sync_state.replace(SyncState::Syncing); + let syncing = self.syncing.clone(); + self.syncing.set(true); + let completion = Completion::Sync(SyncCompletion { + complete: Box::new(move |_| { + tracing::debug!("wal_sync finish"); + syncing.set(false); + }), + is_completed: Cell::new(false), + }); + let shared = self.get_shared(); + shared.file.sync(Arc::new(completion))?; + self.sync_state.set(SyncState::Syncing); Ok(WalFsyncStatus::IO) } SyncState::Syncing => { - if *self.syncing.borrow() { + if self.syncing.get() { + tracing::debug!("wal_sync is already syncing"); Ok(WalFsyncStatus::IO) } else { - self.sync_state.replace(SyncState::NotSyncing); + self.sync_state.set(SyncState::NotSyncing); Ok(WalFsyncStatus::Done) } } @@ -901,11 +899,11 @@ impl WalFile { max_frame: 0, current_page: 0, }, - syncing: Rc::new(RefCell::new(false)), checkpoint_threshold: 1000, page_size, buffer_pool, - sync_state: RefCell::new(SyncState::NotSyncing), + syncing: Rc::new(Cell::new(false)), + sync_state: Cell::new(SyncState::NotSyncing), max_frame: 0, min_frame: 0, max_frame_read_lock_index: 0,