Small cleanups to pager/wal/vdbe - mostly naming

- Instead of using a confusing CheckpointStatus for many different things,
  introduce the following statuses:
    * PagerCacheflushStatus - cacheflush can result in either:
      - the WAL being written to disk and fsynced
      - but also a checkpoint to the main BD file, and fsyncing the main DB file

      Reflect this in the type.
    * WalFsyncStatus - previously CheckpointStatus was also used for this, even
      though fsyncing the WAL doesn't checkpoint.
    * CheckpointStatus/CheckpointResult is now used only for actual checkpointing.

- Rename HaltState to CommitState (program.halt_state -> program.commit_state)
- Make WAL a non-optional property in Pager
  * This gets rid of a lot of if let Some(...) boilerplate
  * For ephemeral indexes, provide a DummyWAL implementation that does nothing.
- Rename program.halt() to program.commit_txn()
- Add some documentation comments to structs and functions
This commit is contained in:
Jussi Saurio
2025-05-26 10:37:34 +03:00
parent be89809335
commit 3ba9f2ab97
7 changed files with 237 additions and 160 deletions

View File

@@ -61,6 +61,7 @@ use std::{
use storage::btree::{btree_init_page, BTreePageInner};
#[cfg(feature = "fs")]
use storage::database::DatabaseFile;
pub use storage::pager::PagerCacheflushStatus;
pub use storage::{
buffer_pool::BufferPool,
database::DatabaseStorage,
@@ -216,7 +217,7 @@ impl Database {
let pager = Rc::new(Pager::finish_open(
self.header.clone(),
self.db_file.clone(),
Some(wal),
wal,
self.io.clone(),
Arc::new(RwLock::new(DumbLruPageCache::default())),
buffer_pool,
@@ -503,7 +504,11 @@ impl Connection {
self.pager.wal_frame_count()
}
pub fn cacheflush(&self) -> Result<CheckpointStatus> {
/// Flush dirty pages to disk.
/// This will write the dirty pages to the WAL and then fsync the WAL.
/// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
/// the database file and then fsync the database file.
pub fn cacheflush(&self) -> Result<PagerCacheflushStatus> {
self.pager.cacheflush()
}

View File

@@ -6308,7 +6308,7 @@ mod tests {
let page_cache = Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(2000)));
let pager = {
let db_header = Arc::new(SpinLock::new(db_header.clone()));
Pager::finish_open(db_header, db_file, Some(wal), io, page_cache, buffer_pool).unwrap()
Pager::finish_open(db_header, db_file, wal, io, page_cache, buffer_pool).unwrap()
};
let pager = Rc::new(pager);
// FIXME: handle page cache is full
@@ -6486,8 +6486,8 @@ mod tests {
.unwrap();
loop {
match pager.end_tx().unwrap() {
crate::CheckpointStatus::Done(_) => break,
crate::CheckpointStatus::IO => {
crate::PagerCacheflushStatus::Done(_) => break,
crate::PagerCacheflushStatus::IO => {
pager.io.run_once().unwrap();
}
}
@@ -6600,8 +6600,8 @@ mod tests {
cursor.move_to_root();
loop {
match pager.end_tx().unwrap() {
crate::CheckpointStatus::Done(_) => break,
crate::CheckpointStatus::IO => {
crate::PagerCacheflushStatus::Done(_) => break,
crate::PagerCacheflushStatus::IO => {
pager.io.run_once().unwrap();
}
}
@@ -6790,7 +6790,7 @@ mod tests {
Pager::finish_open(
db_header.clone(),
db_file,
Some(wal),
wal,
io,
Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(10))),
buffer_pool,

View File

@@ -4,7 +4,7 @@ use crate::storage::btree::BTreePageInner;
use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::DatabaseStorage;
use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent, PageType};
use crate::storage::wal::{CheckpointResult, Wal};
use crate::storage::wal::{CheckpointResult, Wal, WalFsyncStatus};
use crate::{Buffer, LimboError, Result};
use parking_lot::RwLock;
use std::cell::{RefCell, UnsafeCell};
@@ -136,12 +136,19 @@ impl Page {
}
#[derive(Clone, Copy, Debug)]
/// The state of the current pager cache flush.
enum FlushState {
/// Idle.
Start,
/// Waiting for all in-flight writes to the on-disk WAL to complete.
WaitAppendFrames,
/// Fsync the on-disk WAL.
SyncWal,
/// Checkpoint the WAL to the database file (if needed).
Checkpoint,
/// Fsync the database file.
SyncDbFile,
/// Waiting for the database file to be fsynced.
WaitSyncDbFile,
}
@@ -167,7 +174,7 @@ pub struct Pager {
/// Source of the database pages.
pub db_file: Arc<dyn DatabaseStorage>,
/// The write-ahead log (WAL) for the database.
wal: Option<Rc<RefCell<dyn Wal>>>,
wal: Rc<RefCell<dyn Wal>>,
/// A page cache for the database.
page_cache: Arc<RwLock<DumbLruPageCache>>,
/// Buffer pool for temporary data storage.
@@ -183,6 +190,24 @@ pub struct Pager {
syncing: Rc<RefCell<bool>>,
}
#[derive(Debug, Copy, Clone)]
/// The status of the current cache flush.
/// A Done state means that the WAL was committed to disk and fsynced,
/// plus potentially checkpointed to the DB (and the DB then fsynced).
pub enum PagerCacheflushStatus {
Done(PagerCacheflushResult),
IO,
}
#[derive(Debug, Copy, Clone)]
pub enum PagerCacheflushResult {
/// The WAL was written to disk and fsynced.
WalWritten,
/// The WAL was written, fsynced, and a checkpoint was performed.
/// The database file was then also fsynced.
Checkpointed(CheckpointResult),
}
impl Pager {
/// Begins opening a database by reading the database header.
pub fn begin_open(db_file: Arc<dyn DatabaseStorage>) -> Result<Arc<SpinLock<DatabaseHeader>>> {
@@ -193,7 +218,7 @@ impl Pager {
pub fn finish_open(
db_header_ref: Arc<SpinLock<DatabaseHeader>>,
db_file: Arc<dyn DatabaseStorage>,
wal: Option<Rc<RefCell<dyn Wal>>>,
wal: Rc<RefCell<dyn Wal>>,
io: Arc<dyn crate::io::IO>,
page_cache: Arc<RwLock<DumbLruPageCache>>,
buffer_pool: Rc<BufferPool>,
@@ -271,42 +296,28 @@ impl Pager {
#[inline(always)]
pub fn begin_read_tx(&self) -> Result<LimboResult> {
if let Some(wal) = &self.wal {
return wal.borrow_mut().begin_read_tx();
}
Ok(LimboResult::Ok)
self.wal.borrow_mut().begin_read_tx()
}
#[inline(always)]
pub fn begin_write_tx(&self) -> Result<LimboResult> {
if let Some(wal) = &self.wal {
return wal.borrow_mut().begin_write_tx();
}
Ok(LimboResult::Ok)
self.wal.borrow_mut().begin_write_tx()
}
pub fn end_tx(&self) -> Result<CheckpointStatus> {
if let Some(wal) = &self.wal {
let checkpoint_status = self.cacheflush()?;
return match checkpoint_status {
CheckpointStatus::IO => Ok(checkpoint_status),
CheckpointStatus::Done(_) => {
wal.borrow().end_write_tx()?;
wal.borrow().end_read_tx()?;
Ok(checkpoint_status)
}
};
}
Ok(CheckpointStatus::Done(CheckpointResult::default()))
pub fn end_tx(&self) -> Result<PagerCacheflushStatus> {
let cacheflush_status = self.cacheflush()?;
return match cacheflush_status {
PagerCacheflushStatus::IO => Ok(PagerCacheflushStatus::IO),
PagerCacheflushStatus::Done(_) => {
self.wal.borrow().end_write_tx()?;
self.wal.borrow().end_read_tx()?;
Ok(cacheflush_status)
}
};
}
pub fn end_read_tx(&self) -> Result<()> {
if let Some(wal) = &self.wal {
wal.borrow().end_read_tx()?;
}
self.wal.borrow().end_read_tx()?;
Ok(())
}
@@ -314,10 +325,7 @@ impl Pager {
pub fn read_page(&self, page_idx: usize) -> Result<PageRef, LimboError> {
tracing::trace!("read_page(page_idx = {})", page_idx);
let mut page_cache = self.page_cache.write();
let max_frame = match &self.wal {
Some(wal) => wal.borrow().get_max_frame(),
None => 0,
};
let max_frame = self.wal.borrow().get_max_frame();
let page_key = PageCacheKey::new(page_idx, Some(max_frame));
if let Some(page) = page_cache.get(&page_key) {
tracing::trace!("read_page(page_idx = {}) = cached", page_idx);
@@ -326,31 +334,31 @@ impl Pager {
let page = Arc::new(Page::new(page_idx));
page.set_locked();
if let Some(wal) = &self.wal {
if let Some(frame_id) = wal.borrow().find_frame(page_idx as u64)? {
wal.borrow()
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
{
page.set_uptodate();
}
// TODO(pere) should probably first insert to page cache, and if successful,
// read frame or page
match page_cache.insert(page_key, page.clone()) {
Ok(_) => {}
Err(CacheError::Full) => return Err(LimboError::CacheFull),
Err(CacheError::KeyExists) => {
unreachable!("Page should not exist in cache after get() miss")
}
Err(e) => {
return Err(LimboError::InternalError(format!(
"Failed to insert page into cache: {:?}",
e
)))
}
}
return Ok(page);
if let Some(frame_id) = self.wal.borrow().find_frame(page_idx as u64)? {
self.wal
.borrow()
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
{
page.set_uptodate();
}
// TODO(pere) should probably first insert to page cache, and if successful,
// read frame or page
match page_cache.insert(page_key, page.clone()) {
Ok(_) => {}
Err(CacheError::Full) => return Err(LimboError::CacheFull),
Err(CacheError::KeyExists) => {
unreachable!("Page should not exist in cache after get() miss")
}
Err(e) => {
return Err(LimboError::InternalError(format!(
"Failed to insert page into cache: {:?}",
e
)))
}
}
return Ok(page);
}
sqlite3_ondisk::begin_read_page(
self.db_file.clone(),
self.buffer_pool.clone(),
@@ -391,15 +399,14 @@ impl Pager {
}
pub fn wal_frame_count(&self) -> Result<u64> {
let mut frame_count = 0;
let wal = self.wal.clone();
if let Some(wal) = &wal {
frame_count = wal.borrow().get_max_frame_in_wal();
}
Ok(frame_count)
Ok(self.wal.borrow().get_max_frame_in_wal())
}
pub fn cacheflush(&self) -> Result<CheckpointStatus> {
/// Flush dirty pages to disk.
/// In the base case, it will write the dirty pages to the WAL and then fsync the WAL.
/// If the WAL size is over the checkpoint threshold, it will checkpoint the WAL to
/// the database file and then fsync the database file.
pub fn cacheflush(&self) -> Result<PagerCacheflushStatus> {
let mut checkpoint_result = CheckpointResult::default();
loop {
let state = self.flush_info.borrow().state;
@@ -407,23 +414,18 @@ impl Pager {
match state {
FlushState::Start => {
let db_size = self.db_header.lock().database_size;
let max_frame = match &self.wal {
Some(wal) => wal.borrow().get_max_frame(),
None => 0,
};
let max_frame = self.wal.borrow().get_max_frame();
for page_id in self.dirty_pages.borrow().iter() {
let mut cache = self.page_cache.write();
let page_key = PageCacheKey::new(*page_id, Some(max_frame));
let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
if let Some(wal) = &self.wal {
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!("cacheflush(page={}, page_type={:?}", page_id, page_type);
wal.borrow_mut().append_frame(
page.clone(),
db_size,
self.flush_info.borrow().in_flight_writes.clone(),
)?;
}
let page_type = page.get().contents.as_ref().unwrap().maybe_page_type();
trace!("cacheflush(page={}, page_type={:?}", page_id, page_type);
self.wal.borrow_mut().append_frame(
page.clone(),
db_size,
self.flush_info.borrow().in_flight_writes.clone(),
)?;
page.clear_dirty();
}
// This is okay assuming we use shared cache by default.
@@ -433,33 +435,28 @@ impl Pager {
}
self.dirty_pages.borrow_mut().clear();
self.flush_info.borrow_mut().state = FlushState::WaitAppendFrames;
return Ok(CheckpointStatus::IO);
return Ok(PagerCacheflushStatus::IO);
}
FlushState::WaitAppendFrames => {
let in_flight = *self.flush_info.borrow().in_flight_writes.borrow();
if in_flight == 0 {
self.flush_info.borrow_mut().state = FlushState::SyncWal;
} else {
return Ok(CheckpointStatus::IO);
return Ok(PagerCacheflushStatus::IO);
}
}
FlushState::SyncWal => {
let wal = self.wal.clone().ok_or(LimboError::InternalError(
"SyncWal was called without a existing wal".to_string(),
))?;
match wal.borrow_mut().sync() {
Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO),
Ok(CheckpointStatus::Done(res)) => checkpoint_result = res,
Err(e) => return Err(e),
if WalFsyncStatus::IO == self.wal.borrow_mut().sync()? {
return Ok(PagerCacheflushStatus::IO);
}
let should_checkpoint = wal.borrow().should_checkpoint();
if should_checkpoint {
self.flush_info.borrow_mut().state = FlushState::Checkpoint;
} else {
if !self.wal.borrow().should_checkpoint() {
self.flush_info.borrow_mut().state = FlushState::Start;
break;
return Ok(PagerCacheflushStatus::Done(
PagerCacheflushResult::WalWritten,
));
}
self.flush_info.borrow_mut().state = FlushState::Checkpoint;
}
FlushState::Checkpoint => {
match self.checkpoint()? {
@@ -467,7 +464,7 @@ impl Pager {
checkpoint_result = res;
self.flush_info.borrow_mut().state = FlushState::SyncDbFile;
}
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
CheckpointStatus::IO => return Ok(PagerCacheflushStatus::IO),
};
}
FlushState::SyncDbFile => {
@@ -476,7 +473,7 @@ impl Pager {
}
FlushState::WaitSyncDbFile => {
if *self.syncing.borrow() {
return Ok(CheckpointStatus::IO);
return Ok(PagerCacheflushStatus::IO);
} else {
self.flush_info.borrow_mut().state = FlushState::Start;
break;
@@ -484,7 +481,9 @@ impl Pager {
}
}
}
Ok(CheckpointStatus::Done(checkpoint_result))
Ok(PagerCacheflushStatus::Done(
PagerCacheflushResult::Checkpointed(checkpoint_result),
))
}
pub fn checkpoint(&self) -> Result<CheckpointStatus> {
@@ -495,13 +494,11 @@ impl Pager {
match state {
CheckpointState::Checkpoint => {
let in_flight = self.checkpoint_inflight.clone();
let wal = self.wal.clone().ok_or(LimboError::InternalError(
"Checkpoint was called without a existing wal".to_string(),
))?;
match wal
.borrow_mut()
.checkpoint(self, in_flight, CheckpointMode::Passive)?
{
match self.wal.borrow_mut().checkpoint(
self,
in_flight,
CheckpointMode::Passive,
)? {
CheckpointStatus::IO => return Ok(CheckpointStatus::IO),
CheckpointStatus::Done(res) => {
checkpoint_result = res;
@@ -538,7 +535,7 @@ impl Pager {
pub fn clear_page_cache(&self) -> CheckpointResult {
let checkpoint_result: CheckpointResult;
loop {
match self.wal.clone().unwrap().borrow_mut().checkpoint(
match self.wal.borrow_mut().checkpoint(
self,
Rc::new(RefCell::new(0)),
CheckpointMode::Passive,
@@ -667,10 +664,7 @@ impl Pager {
// setup page and add to cache
page.set_dirty();
self.add_dirty(page.get().id);
let max_frame = match &self.wal {
Some(wal) => wal.borrow().get_max_frame(),
None => 0,
};
let max_frame = self.wal.borrow().get_max_frame();
let page_key = PageCacheKey::new(page.get().id, Some(max_frame));
let mut cache = self.page_cache.write();
@@ -692,10 +686,7 @@ impl Pager {
page: PageRef,
) -> Result<(), LimboError> {
let mut cache = self.page_cache.write();
let max_frame = match &self.wal {
Some(wal) => wal.borrow().get_max_frame(),
None => 0,
};
let max_frame = self.wal.borrow().get_max_frame();
let page_key = PageCacheKey::new(id, Some(max_frame));
// FIXME: use specific page key for writer instead of max frame, this will make readers not conflict

View File

@@ -190,12 +190,89 @@ pub trait Wal {
write_counter: Rc<RefCell<usize>>,
mode: CheckpointMode,
) -> Result<CheckpointStatus>;
fn sync(&mut self) -> Result<CheckpointStatus>;
fn sync(&mut self) -> Result<WalFsyncStatus>;
fn get_max_frame_in_wal(&self) -> u64;
fn get_max_frame(&self) -> u64;
fn get_min_frame(&self) -> u64;
}
/// A dummy WAL implementation that does nothing.
/// This is used for ephemeral indexes where a WAL is not really
/// needed, and is preferable to passing an Option<dyn Wal> around
/// everywhere.
pub struct DummyWAL;
impl Wal for DummyWAL {
fn begin_read_tx(&mut self) -> Result<LimboResult> {
Ok(LimboResult::Ok)
}
fn end_read_tx(&self) -> Result<LimboResult> {
Ok(LimboResult::Ok)
}
fn begin_write_tx(&mut self) -> Result<LimboResult> {
Ok(LimboResult::Ok)
}
fn end_write_tx(&self) -> Result<LimboResult> {
Ok(LimboResult::Ok)
}
fn find_frame(&self, _page_id: u64) -> Result<Option<u64>> {
Ok(None)
}
fn read_frame(
&self,
_frame_id: u64,
_page: crate::PageRef,
_buffer_pool: Rc<BufferPool>,
) -> Result<()> {
Ok(())
}
fn append_frame(
&mut self,
_page: crate::PageRef,
_db_size: u32,
_write_counter: Rc<RefCell<usize>>,
) -> Result<()> {
Ok(())
}
fn should_checkpoint(&self) -> bool {
false
}
fn checkpoint(
&mut self,
_pager: &Pager,
_write_counter: Rc<RefCell<usize>>,
_mode: crate::CheckpointMode,
) -> Result<crate::CheckpointStatus> {
Ok(crate::CheckpointStatus::Done(
crate::CheckpointResult::default(),
))
}
fn sync(&mut self) -> Result<crate::storage::wal::WalFsyncStatus> {
Ok(crate::storage::wal::WalFsyncStatus::Done)
}
fn get_max_frame_in_wal(&self) -> u64 {
0
}
fn get_max_frame(&self) -> u64 {
0
}
fn get_min_frame(&self) -> u64 {
0
}
}
// Syncing requires a state machine because we need to schedule a sync and then wait until it is
// finished. If we don't wait there will be undefined behaviour that no one wants to debug.
#[derive(Copy, Clone, Debug)]
@@ -214,6 +291,12 @@ pub enum CheckpointState {
Done,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum WalFsyncStatus {
Done,
IO,
}
#[derive(Debug, Copy, Clone)]
pub enum CheckpointStatus {
Done(CheckpointResult),
@@ -646,7 +729,7 @@ impl Wal for WalFile {
}
}
fn sync(&mut self) -> Result<CheckpointStatus> {
fn sync(&mut self) -> Result<WalFsyncStatus> {
let state = *self.sync_state.borrow();
match state {
SyncState::NotSyncing => {
@@ -664,18 +747,14 @@ impl Wal for WalFile {
shared.file.sync(completion)?;
}
self.sync_state.replace(SyncState::Syncing);
Ok(CheckpointStatus::IO)
Ok(WalFsyncStatus::IO)
}
SyncState::Syncing => {
if *self.syncing.borrow() {
Ok(CheckpointStatus::IO)
Ok(WalFsyncStatus::IO)
} else {
self.sync_state.replace(SyncState::NotSyncing);
let checkpoint_result = CheckpointResult {
num_wal_frames: self.max_frame,
num_checkpointed_frames: self.ongoing_checkpoint.max_frame,
};
Ok(CheckpointStatus::Done(checkpoint_result))
Ok(WalFsyncStatus::Done)
}
}
}

View File

@@ -4,6 +4,7 @@ use crate::schema::Schema;
use crate::storage::database::FileMemoryStorage;
use crate::storage::page_cache::DumbLruPageCache;
use crate::storage::pager::CreateBTreeFlags;
use crate::storage::wal::DummyWAL;
use crate::types::ImmutableRecord;
use crate::{
error::{LimboError, SQLITE_CONSTRAINT, SQLITE_CONSTRAINT_PRIMARYKEY},
@@ -49,7 +50,7 @@ use crate::{
use super::{
insn::{Cookie, RegisterOrLiteral},
HaltState,
CommitState,
};
use parking_lot::RwLock;
use rand::thread_rng;
@@ -1651,7 +1652,7 @@ pub fn op_halt(
)));
}
}
match program.halt(pager.clone(), state, mv_store)? {
match program.commit_txn(pager.clone(), state, mv_store)? {
StepResult::Done => Ok(InsnFunctionStepResult::Done),
StepResult::IO => Ok(InsnFunctionStepResult::IO),
StepResult::Row => Ok(InsnFunctionStepResult::Row),
@@ -1726,8 +1727,8 @@ pub fn op_auto_commit(
unreachable!("unexpected Insn {:?}", insn)
};
let conn = program.connection.upgrade().unwrap();
if matches!(state.halt_state, Some(HaltState::Checkpointing)) {
return match program.halt(pager.clone(), state, mv_store)? {
if state.commit_state == CommitState::Committing {
return match program.commit_txn(pager.clone(), state, mv_store)? {
super::StepResult::Done => Ok(InsnFunctionStepResult::Done),
super::StepResult::IO => Ok(InsnFunctionStepResult::IO),
super::StepResult::Row => Ok(InsnFunctionStepResult::Row),
@@ -1755,7 +1756,7 @@ pub fn op_auto_commit(
"cannot commit - no transaction is active".to_string(),
));
}
return match program.halt(pager.clone(), state, mv_store)? {
return match program.commit_txn(pager.clone(), state, mv_store)? {
super::StepResult::Done => Ok(InsnFunctionStepResult::Done),
super::StepResult::IO => Ok(InsnFunctionStepResult::IO),
super::StepResult::Row => Ok(InsnFunctionStepResult::Row),
@@ -4701,7 +4702,7 @@ pub fn op_open_ephemeral(
let pager = Rc::new(Pager::finish_open(
db_header,
db_file,
None,
Rc::new(RefCell::new(DummyWAL)),
io,
page_cache,
buffer_pool,

View File

@@ -28,7 +28,7 @@ use crate::{
error::LimboError,
fast_lock::SpinLock,
function::{AggFunc, FuncCtx},
storage::sqlite3_ondisk::SmallVec,
storage::{pager::PagerCacheflushStatus, sqlite3_ondisk::SmallVec},
};
use crate::{
@@ -38,8 +38,6 @@ use crate::{
vdbe::{builder::CursorType, insn::Insn},
};
use crate::CheckpointStatus;
#[cfg(feature = "json")]
use crate::json::JsonCacheCell;
use crate::{Connection, MvStore, Result, TransactionState};
@@ -234,9 +232,16 @@ impl Drop for VTabOpaqueCursor {
}
}
#[derive(Copy, Clone)]
enum HaltState {
Checkpointing,
#[derive(Copy, Clone, PartialEq, Eq)]
/// The commit state of the program.
/// There are two states:
/// - Ready: The program is ready to run the next instruction, or has shut down after
/// the last instruction.
/// - Committing: The program is committing a write transaction. It is waiting for the pager to finish flushing the cache to disk,
/// primarily to the WAL, but also possibly checkpointing the WAL to the database file.
enum CommitState {
Ready,
Committing,
}
#[derive(Debug, Clone)]
@@ -269,7 +274,7 @@ pub struct ProgramState {
pub(crate) mv_tx_id: Option<crate::mvcc::database::TxID>,
interrupted: bool,
parameters: HashMap<NonZero<usize>, Value>,
halt_state: Option<HaltState>,
commit_state: CommitState,
#[cfg(feature = "json")]
json_cache: JsonCacheCell,
op_idx_delete_state: Option<OpIdxDeleteState>,
@@ -293,7 +298,7 @@ impl ProgramState {
mv_tx_id: None,
interrupted: false,
parameters: HashMap::new(),
halt_state: None,
commit_state: CommitState::Ready,
#[cfg(feature = "json")]
json_cache: JsonCacheCell::new(),
op_idx_delete_state: None,
@@ -417,7 +422,7 @@ impl Program {
}
}
pub fn halt(
pub fn commit_txn(
&self,
pager: Rc<Pager>,
program_state: &mut ProgramState,
@@ -441,18 +446,14 @@ impl Program {
.expect("only weak ref to connection?");
let auto_commit = connection.auto_commit.get();
tracing::trace!("Halt auto_commit {}", auto_commit);
assert!(
program_state.halt_state.is_none()
|| (matches!(program_state.halt_state.unwrap(), HaltState::Checkpointing))
);
if program_state.halt_state.is_some() {
self.step_end_write_txn(&pager, &mut program_state.halt_state, connection.deref())
if program_state.commit_state == CommitState::Committing {
self.step_end_write_txn(&pager, &mut program_state.commit_state, connection.deref())
} else if auto_commit {
let current_state = connection.transaction_state.get();
match current_state {
TransactionState::Write => self.step_end_write_txn(
&pager,
&mut program_state.halt_state,
&mut program_state.commit_state,
connection.deref(),
),
TransactionState::Read => {
@@ -476,23 +477,23 @@ impl Program {
fn step_end_write_txn(
&self,
pager: &Rc<Pager>,
halt_state: &mut Option<HaltState>,
commit_state: &mut CommitState,
connection: &Connection,
) -> Result<StepResult> {
let checkpoint_status = pager.end_tx()?;
match checkpoint_status {
CheckpointStatus::Done(_) => {
let cacheflush_status = pager.end_tx()?;
match cacheflush_status {
PagerCacheflushStatus::Done(_) => {
if self.change_cnt_on {
if let Some(conn) = self.connection.upgrade() {
conn.set_changes(self.n_change.get());
}
}
connection.transaction_state.replace(TransactionState::None);
let _ = halt_state.take();
*commit_state = CommitState::Ready;
}
CheckpointStatus::IO => {
tracing::trace!("Checkpointing IO");
*halt_state = Some(HaltState::Checkpointing);
PagerCacheflushStatus::IO => {
tracing::trace!("Cacheflush IO");
*commit_state = CommitState::Committing;
return Ok(StepResult::IO);
}
}

View File

@@ -1,4 +1,4 @@
use limbo_core::{CheckpointStatus, Connection, Database, IO};
use limbo_core::{Connection, Database, PagerCacheflushStatus, IO};
use rand::{rng, RngCore};
use rusqlite::params;
use std::path::{Path, PathBuf};
@@ -86,10 +86,10 @@ impl TempDatabase {
pub(crate) fn do_flush(conn: &Rc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<()> {
loop {
match conn.cacheflush()? {
CheckpointStatus::Done(_) => {
PagerCacheflushStatus::Done(_) => {
break;
}
CheckpointStatus::IO => {
PagerCacheflushStatus::IO => {
tmp_db.io.run_once()?;
}
}