diff --git a/core/mvcc/database/checkpoint_state_machine.rs b/core/mvcc/database/checkpoint_state_machine.rs index d2f5e5897..a207d5ad2 100644 --- a/core/mvcc/database/checkpoint_state_machine.rs +++ b/core/mvcc/database/checkpoint_state_machine.rs @@ -88,6 +88,10 @@ pub struct CheckpointStateMachine { destroyed_tables: HashSet, /// Result of the checkpoint checkpoint_result: Option, + /// Update connection's transaction state on checkpoint. If checkpoint was called as automatic + /// process in a transaction we don't want to change the state as we assume we are already on a + /// write transaction and any failure will be cleared on vdbe error handling. + update_transaction_state: bool, } #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -109,6 +113,7 @@ impl CheckpointStateMachine { pager: Arc, mvstore: Arc>, connection: Arc, + update_transaction_state: bool, ) -> Self { let checkpoint_lock = mvstore.blocking_checkpoint_lock.clone(); Self { @@ -130,6 +135,7 @@ impl CheckpointStateMachine { cursors: HashMap::new(), destroyed_tables: HashSet::new(), checkpoint_result: None, + update_transaction_state, } } @@ -334,9 +340,11 @@ impl CheckpointStateMachine { return Err(crate::LimboError::Busy); } result?; - *self.connection.transaction_state.write() = TransactionState::Write { - schema_did_change: false, - }; // TODO: schema_did_change?? + if self.update_transaction_state { + *self.connection.transaction_state.write() = TransactionState::Write { + schema_did_change: false, + }; // TODO: schema_did_change?? + } self.lock_states.pager_write_tx = true; self.state = CheckpointState::WriteRow { write_set_index: 0, @@ -546,7 +554,9 @@ impl CheckpointStateMachine { self.state = CheckpointState::TruncateLogicalLog; self.lock_states.pager_read_tx = false; self.lock_states.pager_write_tx = false; - *self.connection.transaction_state.write() = TransactionState::None; + if self.update_transaction_state { + *self.connection.transaction_state.write() = TransactionState::None; + } let header = self .pager .io @@ -637,10 +647,14 @@ impl StateTransition for CheckpointStateMachine { .io .block(|| self.pager.end_tx(rollback, self.connection.as_ref())) .expect("failed to end pager write tx"); - *self.connection.transaction_state.write() = TransactionState::None; + if self.update_transaction_state { + *self.connection.transaction_state.write() = TransactionState::None; + } } else if self.lock_states.pager_read_tx { self.pager.end_read_tx().unwrap(); - *self.connection.transaction_state.write() = TransactionState::None; + if self.update_transaction_state { + *self.connection.transaction_state.write() = TransactionState::None; + } } if self.lock_states.blocking_checkpoint_lock_held { self.checkpoint_lock.unlock(); diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index e9794289d..f00690aff 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -26,6 +26,7 @@ use crate::Value; use crate::{Connection, Pager}; use crossbeam_skiplist::{SkipMap, SkipSet}; use parking_lot::RwLock; +use std::cell::RefCell; use std::collections::HashSet; use std::fmt::Debug; use std::marker::PhantomData; @@ -309,14 +310,28 @@ impl AtomicTransactionState { } } -#[derive(Debug)] -pub enum CommitState { +#[allow(clippy::large_enum_variant)] +pub enum CommitState { Initial, - Commit { end_ts: u64 }, - BeginCommitLogicalLog { end_ts: u64, log_record: LogRecord }, - EndCommitLogicalLog { end_ts: u64 }, - SyncLogicalLog { end_ts: u64 }, - CommitEnd { end_ts: u64 }, + Commit { + end_ts: u64, + }, + BeginCommitLogicalLog { + end_ts: u64, + log_record: LogRecord, + }, + EndCommitLogicalLog { + end_ts: u64, + }, + SyncLogicalLog { + end_ts: u64, + }, + Checkpoint { + state_machine: RefCell>>, + }, + CommitEnd { + end_ts: u64, + }, } #[derive(Debug)] @@ -334,7 +349,7 @@ struct CommitCoordinator { } pub struct CommitStateMachine { - state: CommitState, + state: CommitState, is_finalized: bool, did_commit_schema_change: bool, tx_id: TxID, @@ -343,6 +358,7 @@ pub struct CommitStateMachine { write_set: Vec, commit_coordinator: Arc, header: Arc>>, + pager: Arc, _phantom: PhantomData, } @@ -380,12 +396,13 @@ pub struct DeleteRowStateMachine { impl CommitStateMachine { fn new( - state: CommitState, + state: CommitState, tx_id: TxID, connection: Arc, commit_coordinator: Arc, header: Arc>>, ) -> Self { + let pager = connection.pager.read().clone(); Self { state, is_finalized: false, @@ -394,6 +411,7 @@ impl CommitStateMachine { connection, write_set: Vec::new(), commit_coordinator, + pager, header, _phantom: PhantomData, } @@ -414,7 +432,7 @@ impl WriteRowStateMachine { } impl StateTransition for CommitStateMachine { - type Context = MvStore; + type Context = Arc>; type SMResult = (); #[tracing::instrument(fields(state = ?self.state), skip(self, mvcc_store), level = Level::DEBUG)] @@ -620,6 +638,7 @@ impl StateTransition for CommitStateMachine { Ok(TransitionResult::Io(IOCompletions::Single(c))) } } + CommitState::SyncLogicalLog { end_ts } => { let c = mvcc_store.storage.sync()?; self.state = CommitState::EndCommitLogicalLog { end_ts: *end_ts }; @@ -678,10 +697,31 @@ impl StateTransition for CommitStateMachine { if mvcc_store.is_exclusive_tx(&self.tx_id) { mvcc_store.release_exclusive_tx(&self.tx_id); } + if mvcc_store.storage.should_checkpoint() { + let state_machine = StateMachine::new(CheckpointStateMachine::new( + self.pager.clone(), + mvcc_store.clone(), + self.connection.clone(), + false, + )); + let state_machine = RefCell::new(state_machine); + self.state = CommitState::Checkpoint { state_machine }; + return Ok(TransitionResult::Continue); + } tracing::trace!("logged(tx_id={}, end_ts={})", self.tx_id, *end_ts); self.finalize(mvcc_store)?; Ok(TransitionResult::Done(())) } + CommitState::Checkpoint { state_machine } => { + match state_machine.borrow_mut().step(&())? { + IOResult::Done(_) => {} + IOResult::IO(iocompletions) => { + return Ok(TransitionResult::Io(iocompletions)); + } + } + self.finalize(mvcc_store)?; + return Ok(TransitionResult::Done(())); + } } } @@ -1979,6 +2019,10 @@ impl MvStore { self.commit_load_tx(tx_id); Ok(true) } + + pub fn set_checkpoint_threshold(&self, threshold: u64) { + self.storage.set_checkpoint_threshold(threshold) + } } /// A write-write conflict happens when transaction T_current attempts to update a @@ -2108,3 +2152,29 @@ fn stmt_get_all_rows(stmt: &mut Statement) -> Result>> { } Ok(rows) } + +impl Debug for CommitState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Initial => write!(f, "Initial"), + Self::Commit { end_ts } => f.debug_struct("Commit").field("end_ts", end_ts).finish(), + Self::BeginCommitLogicalLog { end_ts, log_record } => f + .debug_struct("BeginCommitLogicalLog") + .field("end_ts", end_ts) + .field("log_record", log_record) + .finish(), + Self::EndCommitLogicalLog { end_ts } => f + .debug_struct("EndCommitLogicalLog") + .field("end_ts", end_ts) + .finish(), + Self::SyncLogicalLog { end_ts } => f + .debug_struct("SyncLogicalLog") + .field("end_ts", end_ts) + .finish(), + Self::Checkpoint { state_machine: _ } => f.debug_struct("Checkpoint").finish(), + Self::CommitEnd { end_ts } => { + f.debug_struct("CommitEnd").field("end_ts", end_ts).finish() + } + } + } +} diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 1781a5c1d..35a45e728 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -1507,3 +1507,32 @@ fn transaction_display() { let output = format!("{tx}"); assert_eq!(output, expected); } + +#[test] +fn test_should_checkpoint() { + let db = MvccTestDbNoConn::new_with_random_db(); + let mv_store = db.get_mvcc_store(); + assert!(!mv_store.storage.should_checkpoint()); + mv_store.set_checkpoint_threshold(0); + assert!(mv_store.storage.should_checkpoint()); +} + +#[test] +fn test_insert_with_checkpoint() { + let db = MvccTestDbNoConn::new_with_random_db(); + let mv_store = db.get_mvcc_store(); + // force checkpoint on every transaction + mv_store.set_checkpoint_threshold(0); + let conn = db.connect(); + conn.execute("CREATE TABLE t(x)").unwrap(); + conn.execute("INSERT INTO t VALUES (1)").unwrap(); + let rows = get_rows(&conn, "SELECT * FROM t"); + assert_eq!(rows.len(), 1); + let row = rows.first().unwrap(); + assert_eq!(row.len(), 1); + let value = row.first().unwrap(); + match value { + Value::Integer(i) => assert_eq!(*i, 1), + _ => unreachable!(), + } +} diff --git a/core/mvcc/persistent_storage/logical_log.rs b/core/mvcc/persistent_storage/logical_log.rs index 591dd866f..ee572f225 100644 --- a/core/mvcc/persistent_storage/logical_log.rs +++ b/core/mvcc/persistent_storage/logical_log.rs @@ -12,9 +12,17 @@ use std::sync::{Arc, RwLock}; use crate::File; +pub const DEFAULT_LOG_CHECKPOINT_THRESHOLD: u64 = 1024 * 1024 * 8; // 8 MiB as default to mimic + // 2000 pages in sqlite which is + // pretty much equal to + // 8MiB if page_size == + // 4096 bytes + pub struct LogicalLog { pub file: Arc, pub offset: u64, + /// Size at which we start performing a checkpoint on the logical log. + checkpoint_threshold: u64, } /// Log's Header, this will be the 64 bytes in any logical log file. @@ -140,7 +148,11 @@ impl LogRecordType { impl LogicalLog { pub fn new(file: Arc) -> Self { - Self { file, offset: 0 } + Self { + file, + offset: 0, + checkpoint_threshold: DEFAULT_LOG_CHECKPOINT_THRESHOLD, + } } pub fn log_tx(&mut self, tx: &LogRecord) -> Result { @@ -215,6 +227,14 @@ impl LogicalLog { self.offset = 0; Ok(c) } + + pub fn should_checkpoint(&self) -> bool { + self.offset >= self.checkpoint_threshold + } + + pub fn set_checkpoint_threshold(&mut self, threshold: u64) { + self.checkpoint_threshold = threshold; + } } pub enum StreamingResult { diff --git a/core/mvcc/persistent_storage/mod.rs b/core/mvcc/persistent_storage/mod.rs index cafcb3230..1cc8d0c2b 100644 --- a/core/mvcc/persistent_storage/mod.rs +++ b/core/mvcc/persistent_storage/mod.rs @@ -38,6 +38,17 @@ impl Storage { pub fn get_logical_log_file(&self) -> Arc { self.logical_log.write().unwrap().file.clone() } + + pub fn should_checkpoint(&self) -> bool { + self.logical_log.read().unwrap().should_checkpoint() + } + + pub fn set_checkpoint_threshold(&self, threshold: u64) { + self.logical_log + .write() + .unwrap() + .set_checkpoint_threshold(threshold) + } } impl Debug for Storage { diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 764048bf8..87fbaec0a 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -385,6 +385,7 @@ pub fn op_checkpoint_inner( pager.clone(), mv_store.clone(), program.connection.clone(), + true, )); loop { let result = ckpt_sm.step(&())?;