diff --git a/core/lib.rs b/core/lib.rs index 6a275b383..6beae3585 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -566,17 +566,7 @@ impl Connection { /// Close a connection and checkpoint. pub fn close(&self) -> Result<()> { - loop { - // TODO: make this async? - match self.pager.checkpoint()? { - CheckpointStatus::Done(_) => { - return Ok(()); - } - CheckpointStatus::IO => { - self.pager.io.run_once()?; - } - }; - } + self.pager.checkpoint_shutdown() } pub fn last_insert_rowid(&self) -> i64 { diff --git a/core/storage/pager.rs b/core/storage/pager.rs index dd77b951c..ce266b250 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -821,6 +821,25 @@ impl Pager { .expect("Failed to clear page cache"); } + pub fn checkpoint_shutdown(&self) -> Result<()> { + let mut attempts = 0; + { + let mut wal = self.wal.borrow_mut(); + // fsync the wal syncronously before beginning checkpoint + while let Ok(WalFsyncStatus::IO) = wal.sync() { + if attempts >= 10 { + return Err(LimboError::InternalError( + "Failed to fsync WAL before final checkpoint, fd likely closed".into(), + )); + } + self.io.run_once()?; + attempts += 1; + } + } + self.wal_checkpoint(); + Ok(()) + } + pub fn wal_checkpoint(&self) -> CheckpointResult { let checkpoint_result: CheckpointResult; loop { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 0e115d321..9f4b63e74 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -72,7 +72,7 @@ pub enum CheckpointMode { Truncate, } -#[derive(Debug)] +#[derive(Debug, Default)] pub struct LimboRwLock { lock: AtomicU32, nreads: AtomicU32, @@ -390,8 +390,8 @@ pub struct WalFile { io: Arc, buffer_pool: Rc, - sync_state: RefCell, - syncing: Rc>, + syncing: Rc>, + sync_state: Cell, page_size: u32, shared: Arc>, @@ -410,8 +410,8 @@ pub struct WalFile { impl fmt::Debug for WalFile { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WalFile") + .field("syncing", &self.syncing.get()) .field("sync_state", &self.sync_state) - .field("syncing", &self.syncing) .field("page_size", &self.page_size) .field("shared", &self.shared) .field("ongoing_checkpoint", &self.ongoing_checkpoint) @@ -748,7 +748,6 @@ impl Wal for WalFile { self.buffer_pool.clone(), )?; self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame; - self.ongoing_checkpoint.current_page += 1; continue 'checkpoint_loop; } } @@ -778,6 +777,7 @@ impl Wal for WalFile { if (self.ongoing_checkpoint.current_page as usize) < shared.pages_in_frames.lock().len() { + self.ongoing_checkpoint.current_page += 1; self.ongoing_checkpoint.state = CheckpointState::ReadFrame; } else { self.ongoing_checkpoint.state = CheckpointState::Done; @@ -828,31 +828,29 @@ impl Wal for WalFile { #[instrument(skip_all, level = Level::DEBUG)] fn sync(&mut self) -> Result { - let state = *self.sync_state.borrow(); - match state { + match self.sync_state.get() { SyncState::NotSyncing => { - let shared = self.get_shared(); tracing::debug!("wal_sync"); - { - let syncing = self.syncing.clone(); - *syncing.borrow_mut() = true; - let completion = Completion::Sync(SyncCompletion { - complete: Box::new(move |_| { - tracing::debug!("wal_sync finish"); - *syncing.borrow_mut() = false; - }), - is_completed: Cell::new(false), - }); - shared.file.sync(Arc::new(completion))?; - } - self.sync_state.replace(SyncState::Syncing); + let syncing = self.syncing.clone(); + self.syncing.set(true); + let completion = Completion::Sync(SyncCompletion { + complete: Box::new(move |_| { + tracing::debug!("wal_sync finish"); + syncing.set(false); + }), + is_completed: Cell::new(false), + }); + let shared = self.get_shared(); + shared.file.sync(Arc::new(completion))?; + self.sync_state.set(SyncState::Syncing); Ok(WalFsyncStatus::IO) } SyncState::Syncing => { - if *self.syncing.borrow() { + if self.syncing.get() { + tracing::debug!("wal_sync is already syncing"); Ok(WalFsyncStatus::IO) } else { - self.sync_state.replace(SyncState::NotSyncing); + self.sync_state.set(SyncState::NotSyncing); Ok(WalFsyncStatus::Done) } } @@ -901,11 +899,11 @@ impl WalFile { max_frame: 0, current_page: 0, }, - syncing: Rc::new(RefCell::new(false)), checkpoint_threshold: 1000, page_size, buffer_pool, - sync_state: RefCell::new(SyncState::NotSyncing), + syncing: Rc::new(Cell::new(false)), + sync_state: Cell::new(SyncState::NotSyncing), max_frame: 0, min_frame: 0, max_frame_read_lock_index: 0,