diff --git a/core/lib.rs b/core/lib.rs index 6c0b7b6c2..038fc6218 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -61,6 +61,7 @@ use std::{ use storage::btree::{btree_init_page, BTreePageInner}; #[cfg(feature = "fs")] use storage::database::DatabaseFile; +pub use storage::pager::PagerCacheflushStatus; pub use storage::{ buffer_pool::BufferPool, database::DatabaseStorage, @@ -216,7 +217,7 @@ impl Database { let pager = Rc::new(Pager::finish_open( self.header.clone(), self.db_file.clone(), - Some(wal), + wal, self.io.clone(), Arc::new(RwLock::new(DumbLruPageCache::default())), buffer_pool, @@ -503,7 +504,11 @@ impl Connection { self.pager.wal_frame_count() } - pub fn cacheflush(&self) -> Result { + /// Flush dirty pages to disk. + /// This will write the dirty pages to the WAL and then fsync the WAL. + /// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to + /// the database file and then fsync the database file. + pub fn cacheflush(&self) -> Result { self.pager.cacheflush() } diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 7485cbdf3..57e88802a 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -6308,7 +6308,7 @@ mod tests { let page_cache = Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(2000))); let pager = { let db_header = Arc::new(SpinLock::new(db_header.clone())); - Pager::finish_open(db_header, db_file, Some(wal), io, page_cache, buffer_pool).unwrap() + Pager::finish_open(db_header, db_file, wal, io, page_cache, buffer_pool).unwrap() }; let pager = Rc::new(pager); // FIXME: handle page cache is full @@ -6486,8 +6486,8 @@ mod tests { .unwrap(); loop { match pager.end_tx().unwrap() { - crate::CheckpointStatus::Done(_) => break, - crate::CheckpointStatus::IO => { + crate::PagerCacheflushStatus::Done(_) => break, + crate::PagerCacheflushStatus::IO => { pager.io.run_once().unwrap(); } } @@ -6600,8 +6600,8 @@ mod tests { cursor.move_to_root(); loop { match pager.end_tx().unwrap() { - crate::CheckpointStatus::Done(_) => break, - crate::CheckpointStatus::IO => { + crate::PagerCacheflushStatus::Done(_) => break, + crate::PagerCacheflushStatus::IO => { pager.io.run_once().unwrap(); } } @@ -6790,7 +6790,7 @@ mod tests { Pager::finish_open( db_header.clone(), db_file, - Some(wal), + wal, io, Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(10))), buffer_pool, diff --git a/core/storage/pager.rs b/core/storage/pager.rs index e289e8281..d297c2692 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -4,7 +4,7 @@ use crate::storage::btree::BTreePageInner; use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType}; -use crate::storage::wal::{CheckpointResult, Wal}; +use crate::storage::wal::{CheckpointResult, Wal, WalFsyncStatus}; use crate::{Buffer, LimboError, Result}; use parking_lot::RwLock; use std::cell::{RefCell, UnsafeCell}; @@ -136,12 +136,19 @@ impl Page { } #[derive(Clone, Copy, Debug)] +/// The state of the current pager cache flush. enum FlushState { + /// Idle. Start, + /// Waiting for all in-flight writes to the on-disk WAL to complete. WaitAppendFrames, + /// Fsync the on-disk WAL. SyncWal, + /// Checkpoint the WAL to the database file (if needed). Checkpoint, + /// Fsync the database file. SyncDbFile, + /// Waiting for the database file to be fsynced. WaitSyncDbFile, } @@ -167,7 +174,7 @@ pub struct Pager { /// Source of the database pages. pub db_file: Arc, /// The write-ahead log (WAL) for the database. - wal: Option>>, + wal: Rc>, /// A page cache for the database. page_cache: Arc>, /// Buffer pool for temporary data storage. @@ -183,6 +190,24 @@ pub struct Pager { syncing: Rc>, } +#[derive(Debug, Copy, Clone)] +/// The status of the current cache flush. +/// A Done state means that the WAL was committed to disk and fsynced, +/// plus potentially checkpointed to the DB (and the DB then fsynced). +pub enum PagerCacheflushStatus { + Done(PagerCacheflushResult), + IO, +} + +#[derive(Debug, Copy, Clone)] +pub enum PagerCacheflushResult { + /// The WAL was written to disk and fsynced. + WalWritten, + /// The WAL was written, fsynced, and a checkpoint was performed. + /// The database file was then also fsynced. + Checkpointed(CheckpointResult), +} + impl Pager { /// Begins opening a database by reading the database header. pub fn begin_open(db_file: Arc) -> Result>> { @@ -193,7 +218,7 @@ impl Pager { pub fn finish_open( db_header_ref: Arc>, db_file: Arc, - wal: Option>>, + wal: Rc>, io: Arc, page_cache: Arc>, buffer_pool: Rc, @@ -271,42 +296,28 @@ impl Pager { #[inline(always)] pub fn begin_read_tx(&self) -> Result { - if let Some(wal) = &self.wal { - return wal.borrow_mut().begin_read_tx(); - } - - Ok(LimboResult::Ok) + self.wal.borrow_mut().begin_read_tx() } #[inline(always)] pub fn begin_write_tx(&self) -> Result { - if let Some(wal) = &self.wal { - return wal.borrow_mut().begin_write_tx(); - } - - Ok(LimboResult::Ok) + self.wal.borrow_mut().begin_write_tx() } - pub fn end_tx(&self) -> Result { - if let Some(wal) = &self.wal { - let checkpoint_status = self.cacheflush()?; - return match checkpoint_status { - CheckpointStatus::IO => Ok(checkpoint_status), - CheckpointStatus::Done(_) => { - wal.borrow().end_write_tx()?; - wal.borrow().end_read_tx()?; - Ok(checkpoint_status) - } - }; - } - - Ok(CheckpointStatus::Done(CheckpointResult::default())) + pub fn end_tx(&self) -> Result { + let cacheflush_status = self.cacheflush()?; + return match cacheflush_status { + PagerCacheflushStatus::IO => Ok(PagerCacheflushStatus::IO), + PagerCacheflushStatus::Done(_) => { + self.wal.borrow().end_write_tx()?; + self.wal.borrow().end_read_tx()?; + Ok(cacheflush_status) + } + }; } pub fn end_read_tx(&self) -> Result<()> { - if let Some(wal) = &self.wal { - wal.borrow().end_read_tx()?; - } + self.wal.borrow().end_read_tx()?; Ok(()) } @@ -314,10 +325,7 @@ impl Pager { pub fn read_page(&self, page_idx: usize) -> Result { tracing::trace!("read_page(page_idx = {})", page_idx); let mut page_cache = self.page_cache.write(); - let max_frame = match &self.wal { - Some(wal) => wal.borrow().get_max_frame(), - None => 0, - }; + let max_frame = self.wal.borrow().get_max_frame(); let page_key = PageCacheKey::new(page_idx, Some(max_frame)); if let Some(page) = page_cache.get(&page_key) { tracing::trace!("read_page(page_idx = {}) = cached", page_idx); @@ -326,31 +334,31 @@ impl Pager { let page = Arc::new(Page::new(page_idx)); page.set_locked(); - if let Some(wal) = &self.wal { - if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64)? { - wal.borrow() - .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; - { - page.set_uptodate(); - } - // TODO(pere) should probably first insert to page cache, and if successful, - // read frame or page - match page_cache.insert(page_key, page.clone()) { - Ok(_) => {} - Err(CacheError::Full) => return Err(LimboError::CacheFull), - Err(CacheError::KeyExists) => { - unreachable!("Page should not exist in cache after get() miss") - } - Err(e) => { - return Err(LimboError::InternalError(format!( - "Failed to insert page into cache: {:?}", - e - ))) - } - } - return Ok(page); + if let Some(frame_id) = self.wal.borrow().find_frame(page_idx as u64)? { + self.wal + .borrow() + .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; + { + page.set_uptodate(); } + // TODO(pere) should probably first insert to page cache, and if successful, + // read frame or page + match page_cache.insert(page_key, page.clone()) { + Ok(_) => {} + Err(CacheError::Full) => return Err(LimboError::CacheFull), + Err(CacheError::KeyExists) => { + unreachable!("Page should not exist in cache after get() miss") + } + Err(e) => { + return Err(LimboError::InternalError(format!( + "Failed to insert page into cache: {:?}", + e + ))) + } + } + return Ok(page); } + sqlite3_ondisk::begin_read_page( self.db_file.clone(), self.buffer_pool.clone(), @@ -391,15 +399,14 @@ impl Pager { } pub fn wal_frame_count(&self) -> Result { - let mut frame_count = 0; - let wal = self.wal.clone(); - if let Some(wal) = &wal { - frame_count = wal.borrow().get_max_frame_in_wal(); - } - Ok(frame_count) + Ok(self.wal.borrow().get_max_frame_in_wal()) } - pub fn cacheflush(&self) -> Result { + /// Flush dirty pages to disk. + /// In the base case, it will write the dirty pages to the WAL and then fsync the WAL. + /// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to + /// the database file and then fsync the database file. + pub fn cacheflush(&self) -> Result { let mut checkpoint_result = CheckpointResult::default(); loop { let state = self.flush_info.borrow().state; @@ -407,23 +414,18 @@ impl Pager { match state { FlushState::Start => { let db_size = self.db_header.lock().database_size; - let max_frame = match &self.wal { - Some(wal) => wal.borrow().get_max_frame(), - None => 0, - }; + let max_frame = self.wal.borrow().get_max_frame(); for page_id in self.dirty_pages.borrow().iter() { let mut cache = self.page_cache.write(); let page_key = PageCacheKey::new(*page_id, Some(max_frame)); let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); - if let Some(wal) = &self.wal { - let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); - trace!("cacheflush(page={}, page_type={:?}", page_id, page_type); - wal.borrow_mut().append_frame( - page.clone(), - db_size, - self.flush_info.borrow().in_flight_writes.clone(), - )?; - } + let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); + trace!("cacheflush(page={}, page_type={:?}", page_id, page_type); + self.wal.borrow_mut().append_frame( + page.clone(), + db_size, + self.flush_info.borrow().in_flight_writes.clone(), + )?; page.clear_dirty(); } // This is okay assuming we use shared cache by default. @@ -433,33 +435,28 @@ impl Pager { } self.dirty_pages.borrow_mut().clear(); self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames; - return Ok(CheckpointStatus::IO); + return Ok(PagerCacheflushStatus::IO); } FlushState::WaitAppendFrames => { let in_flight = *self.flush_info.borrow().in_flight_writes.borrow(); if in_flight == 0 { self.flush_info.borrow_mut().state = FlushState::SyncWal; } else { - return Ok(CheckpointStatus::IO); + return Ok(PagerCacheflushStatus::IO); } } FlushState::SyncWal => { - let wal = self.wal.clone().ok_or(LimboError::InternalError( - "SyncWal was called without a existing wal".to_string(), - ))?; - match wal.borrow_mut().sync() { - Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO), - Ok(CheckpointStatus::Done(res)) => checkpoint_result = res, - Err(e) => return Err(e), + if WalFsyncStatus::IO == self.wal.borrow_mut().sync()? { + return Ok(PagerCacheflushStatus::IO); } - let should_checkpoint = wal.borrow().should_checkpoint(); - if should_checkpoint { - self.flush_info.borrow_mut().state = FlushState::Checkpoint; - } else { + if !self.wal.borrow().should_checkpoint() { self.flush_info.borrow_mut().state = FlushState::Start; - break; + return Ok(PagerCacheflushStatus::Done( + PagerCacheflushResult::WalWritten, + )); } + self.flush_info.borrow_mut().state = FlushState::Checkpoint; } FlushState::Checkpoint => { match self.checkpoint()? { @@ -467,7 +464,7 @@ impl Pager { checkpoint_result = res; self.flush_info.borrow_mut().state = FlushState::SyncDbFile; } - CheckpointStatus::IO => return Ok(CheckpointStatus::IO), + CheckpointStatus::IO => return Ok(PagerCacheflushStatus::IO), }; } FlushState::SyncDbFile => { @@ -476,7 +473,7 @@ impl Pager { } FlushState::WaitSyncDbFile => { if *self.syncing.borrow() { - return Ok(CheckpointStatus::IO); + return Ok(PagerCacheflushStatus::IO); } else { self.flush_info.borrow_mut().state = FlushState::Start; break; @@ -484,7 +481,9 @@ impl Pager { } } } - Ok(CheckpointStatus::Done(checkpoint_result)) + Ok(PagerCacheflushStatus::Done( + PagerCacheflushResult::Checkpointed(checkpoint_result), + )) } pub fn checkpoint(&self) -> Result { @@ -495,13 +494,11 @@ impl Pager { match state { CheckpointState::Checkpoint => { let in_flight = self.checkpoint_inflight.clone(); - let wal = self.wal.clone().ok_or(LimboError::InternalError( - "Checkpoint was called without a existing wal".to_string(), - ))?; - match wal - .borrow_mut() - .checkpoint(self, in_flight, CheckpointMode::Passive)? - { + match self.wal.borrow_mut().checkpoint( + self, + in_flight, + CheckpointMode::Passive, + )? { CheckpointStatus::IO => return Ok(CheckpointStatus::IO), CheckpointStatus::Done(res) => { checkpoint_result = res; @@ -538,7 +535,7 @@ impl Pager { pub fn clear_page_cache(&self) -> CheckpointResult { let checkpoint_result: CheckpointResult; loop { - match self.wal.clone().unwrap().borrow_mut().checkpoint( + match self.wal.borrow_mut().checkpoint( self, Rc::new(RefCell::new(0)), CheckpointMode::Passive, @@ -667,10 +664,7 @@ impl Pager { // setup page and add to cache page.set_dirty(); self.add_dirty(page.get().id); - let max_frame = match &self.wal { - Some(wal) => wal.borrow().get_max_frame(), - None => 0, - }; + let max_frame = self.wal.borrow().get_max_frame(); let page_key = PageCacheKey::new(page.get().id, Some(max_frame)); let mut cache = self.page_cache.write(); @@ -692,10 +686,7 @@ impl Pager { page: PageRef, ) -> Result<(), LimboError> { let mut cache = self.page_cache.write(); - let max_frame = match &self.wal { - Some(wal) => wal.borrow().get_max_frame(), - None => 0, - }; + let max_frame = self.wal.borrow().get_max_frame(); let page_key = PageCacheKey::new(id, Some(max_frame)); // FIXME: use specific page key for writer instead of max frame, this will make readers not conflict diff --git a/core/storage/wal.rs b/core/storage/wal.rs index e9c1e36f9..760507ec3 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -190,12 +190,89 @@ pub trait Wal { write_counter: Rc>, mode: CheckpointMode, ) -> Result; - fn sync(&mut self) -> Result; + fn sync(&mut self) -> Result; fn get_max_frame_in_wal(&self) -> u64; fn get_max_frame(&self) -> u64; fn get_min_frame(&self) -> u64; } +/// A dummy WAL implementation that does nothing. +/// This is used for ephemeral indexes where a WAL is not really +/// needed, and is preferable to passing an Option around +/// everywhere. +pub struct DummyWAL; + +impl Wal for DummyWAL { + fn begin_read_tx(&mut self) -> Result { + Ok(LimboResult::Ok) + } + + fn end_read_tx(&self) -> Result { + Ok(LimboResult::Ok) + } + + fn begin_write_tx(&mut self) -> Result { + Ok(LimboResult::Ok) + } + + fn end_write_tx(&self) -> Result { + Ok(LimboResult::Ok) + } + + fn find_frame(&self, _page_id: u64) -> Result> { + Ok(None) + } + + fn read_frame( + &self, + _frame_id: u64, + _page: crate::PageRef, + _buffer_pool: Rc, + ) -> Result<()> { + Ok(()) + } + + fn append_frame( + &mut self, + _page: crate::PageRef, + _db_size: u32, + _write_counter: Rc>, + ) -> Result<()> { + Ok(()) + } + + fn should_checkpoint(&self) -> bool { + false + } + + fn checkpoint( + &mut self, + _pager: &Pager, + _write_counter: Rc>, + _mode: crate::CheckpointMode, + ) -> Result { + Ok(crate::CheckpointStatus::Done( + crate::CheckpointResult::default(), + )) + } + + fn sync(&mut self) -> Result { + Ok(crate::storage::wal::WalFsyncStatus::Done) + } + + fn get_max_frame_in_wal(&self) -> u64 { + 0 + } + + fn get_max_frame(&self) -> u64 { + 0 + } + + fn get_min_frame(&self) -> u64 { + 0 + } +} + // Syncing requires a state machine because we need to schedule a sync and then wait until it is // finished. If we don't wait there will be undefined behaviour that no one wants to debug. #[derive(Copy, Clone, Debug)] @@ -214,6 +291,12 @@ pub enum CheckpointState { Done, } +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum WalFsyncStatus { + Done, + IO, +} + #[derive(Debug, Copy, Clone)] pub enum CheckpointStatus { Done(CheckpointResult), @@ -646,7 +729,7 @@ impl Wal for WalFile { } } - fn sync(&mut self) -> Result { + fn sync(&mut self) -> Result { let state = *self.sync_state.borrow(); match state { SyncState::NotSyncing => { @@ -664,18 +747,14 @@ impl Wal for WalFile { shared.file.sync(completion)?; } self.sync_state.replace(SyncState::Syncing); - Ok(CheckpointStatus::IO) + Ok(WalFsyncStatus::IO) } SyncState::Syncing => { if *self.syncing.borrow() { - Ok(CheckpointStatus::IO) + Ok(WalFsyncStatus::IO) } else { self.sync_state.replace(SyncState::NotSyncing); - let checkpoint_result = CheckpointResult { - num_wal_frames: self.max_frame, - num_checkpointed_frames: self.ongoing_checkpoint.max_frame, - }; - Ok(CheckpointStatus::Done(checkpoint_result)) + Ok(WalFsyncStatus::Done) } } } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 00bf09ec7..41400470c 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -4,6 +4,7 @@ use crate::schema::Schema; use crate::storage::database::FileMemoryStorage; use crate::storage::page_cache::DumbLruPageCache; use crate::storage::pager::CreateBTreeFlags; +use crate::storage::wal::DummyWAL; use crate::types::ImmutableRecord; use crate::{ error::{LimboError, SQLITE_CONSTRAINT, SQLITE_CONSTRAINT_PRIMARYKEY}, @@ -49,7 +50,7 @@ use crate::{ use super::{ insn::{Cookie, RegisterOrLiteral}, - HaltState, + CommitState, }; use parking_lot::RwLock; use rand::thread_rng; @@ -1651,7 +1652,7 @@ pub fn op_halt( ))); } } - match program.halt(pager.clone(), state, mv_store)? { + match program.commit_txn(pager.clone(), state, mv_store)? { StepResult::Done => Ok(InsnFunctionStepResult::Done), StepResult::IO => Ok(InsnFunctionStepResult::IO), StepResult::Row => Ok(InsnFunctionStepResult::Row), @@ -1726,8 +1727,8 @@ pub fn op_auto_commit( unreachable!("unexpected Insn {:?}", insn) }; let conn = program.connection.upgrade().unwrap(); - if matches!(state.halt_state, Some(HaltState::Checkpointing)) { - return match program.halt(pager.clone(), state, mv_store)? { + if state.commit_state == CommitState::Committing { + return match program.commit_txn(pager.clone(), state, mv_store)? { super::StepResult::Done => Ok(InsnFunctionStepResult::Done), super::StepResult::IO => Ok(InsnFunctionStepResult::IO), super::StepResult::Row => Ok(InsnFunctionStepResult::Row), @@ -1755,7 +1756,7 @@ pub fn op_auto_commit( "cannot commit - no transaction is active".to_string(), )); } - return match program.halt(pager.clone(), state, mv_store)? { + return match program.commit_txn(pager.clone(), state, mv_store)? { super::StepResult::Done => Ok(InsnFunctionStepResult::Done), super::StepResult::IO => Ok(InsnFunctionStepResult::IO), super::StepResult::Row => Ok(InsnFunctionStepResult::Row), @@ -4701,7 +4702,7 @@ pub fn op_open_ephemeral( let pager = Rc::new(Pager::finish_open( db_header, db_file, - None, + Rc::new(RefCell::new(DummyWAL)), io, page_cache, buffer_pool, diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 5c8765492..f9ce85acd 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -28,7 +28,7 @@ use crate::{ error::LimboError, fast_lock::SpinLock, function::{AggFunc, FuncCtx}, - storage::sqlite3_ondisk::SmallVec, + storage::{pager::PagerCacheflushStatus, sqlite3_ondisk::SmallVec}, }; use crate::{ @@ -38,8 +38,6 @@ use crate::{ vdbe::{builder::CursorType, insn::Insn}, }; -use crate::CheckpointStatus; - #[cfg(feature = "json")] use crate::json::JsonCacheCell; use crate::{Connection, MvStore, Result, TransactionState}; @@ -234,9 +232,16 @@ impl Drop for VTabOpaqueCursor { } } -#[derive(Copy, Clone)] -enum HaltState { - Checkpointing, +#[derive(Copy, Clone, PartialEq, Eq)] +/// The commit state of the program. +/// There are two states: +/// - Ready: The program is ready to run the next instruction, or has shut down after +/// the last instruction. +/// - Committing: The program is committing a write transaction. It is waiting for the pager to finish flushing the cache to disk, +/// primarily to the WAL, but also possibly checkpointing the WAL to the database file. +enum CommitState { + Ready, + Committing, } #[derive(Debug, Clone)] @@ -269,7 +274,7 @@ pub struct ProgramState { pub(crate) mv_tx_id: Option, interrupted: bool, parameters: HashMap, Value>, - halt_state: Option, + commit_state: CommitState, #[cfg(feature = "json")] json_cache: JsonCacheCell, op_idx_delete_state: Option, @@ -293,7 +298,7 @@ impl ProgramState { mv_tx_id: None, interrupted: false, parameters: HashMap::new(), - halt_state: None, + commit_state: CommitState::Ready, #[cfg(feature = "json")] json_cache: JsonCacheCell::new(), op_idx_delete_state: None, @@ -417,7 +422,7 @@ impl Program { } } - pub fn halt( + pub fn commit_txn( &self, pager: Rc, program_state: &mut ProgramState, @@ -441,18 +446,14 @@ impl Program { .expect("only weak ref to connection?"); let auto_commit = connection.auto_commit.get(); tracing::trace!("Halt auto_commit {}", auto_commit); - assert!( - program_state.halt_state.is_none() - || (matches!(program_state.halt_state.unwrap(), HaltState::Checkpointing)) - ); - if program_state.halt_state.is_some() { - self.step_end_write_txn(&pager, &mut program_state.halt_state, connection.deref()) + if program_state.commit_state == CommitState::Committing { + self.step_end_write_txn(&pager, &mut program_state.commit_state, connection.deref()) } else if auto_commit { let current_state = connection.transaction_state.get(); match current_state { TransactionState::Write => self.step_end_write_txn( &pager, - &mut program_state.halt_state, + &mut program_state.commit_state, connection.deref(), ), TransactionState::Read => { @@ -476,23 +477,23 @@ impl Program { fn step_end_write_txn( &self, pager: &Rc, - halt_state: &mut Option, + commit_state: &mut CommitState, connection: &Connection, ) -> Result { - let checkpoint_status = pager.end_tx()?; - match checkpoint_status { - CheckpointStatus::Done(_) => { + let cacheflush_status = pager.end_tx()?; + match cacheflush_status { + PagerCacheflushStatus::Done(_) => { if self.change_cnt_on { if let Some(conn) = self.connection.upgrade() { conn.set_changes(self.n_change.get()); } } connection.transaction_state.replace(TransactionState::None); - let _ = halt_state.take(); + *commit_state = CommitState::Ready; } - CheckpointStatus::IO => { - tracing::trace!("Checkpointing IO"); - *halt_state = Some(HaltState::Checkpointing); + PagerCacheflushStatus::IO => { + tracing::trace!("Cacheflush IO"); + *commit_state = CommitState::Committing; return Ok(StepResult::IO); } } diff --git a/tests/integration/common.rs b/tests/integration/common.rs index 88046546f..d688d14c6 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -1,4 +1,4 @@ -use limbo_core::{CheckpointStatus, Connection, Database, IO}; +use limbo_core::{Connection, Database, PagerCacheflushStatus, IO}; use rand::{rng, RngCore}; use rusqlite::params; use std::path::{Path, PathBuf}; @@ -86,10 +86,10 @@ impl TempDatabase { pub(crate) fn do_flush(conn: &Rc, tmp_db: &TempDatabase) -> anyhow::Result<()> { loop { match conn.cacheflush()? { - CheckpointStatus::Done(_) => { + PagerCacheflushStatus::Done(_) => { break; } - CheckpointStatus::IO => { + PagerCacheflushStatus::IO => { tmp_db.io.run_once()?; } }