Merge 'core/mvcc: automatic logical log checkpointing on commit' from Pere Diaz Bou

On reaching 8 MiB checkpoint threshold we perform a blocking checkpoint
on the logical log. These changes modified how transaction_state is
tracked so that on a regular transaction we don't update it checkpoint's
state machine.
I wonder if checkpoint should stay inside commit's transaction locks or
like I did, where checkpoint happens right after committing transaction
but this happens on the same query during halt.

Closes #3565
This commit is contained in:
Pekka Enberg
2025-10-04 11:30:04 +03:00
committed by GitHub
6 changed files with 162 additions and 17 deletions

View File

@@ -88,6 +88,10 @@ pub struct CheckpointStateMachine<Clock: LogicalClock> {
destroyed_tables: HashSet<MVTableId>,
/// Result of the checkpoint
checkpoint_result: Option<CheckpointResult>,
/// Update connection's transaction state on checkpoint. If checkpoint was called as automatic
/// process in a transaction we don't want to change the state as we assume we are already on a
/// write transaction and any failure will be cleared on vdbe error handling.
update_transaction_state: bool,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -109,6 +113,7 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
pager: Arc<Pager>,
mvstore: Arc<MvStore<Clock>>,
connection: Arc<Connection>,
update_transaction_state: bool,
) -> Self {
let checkpoint_lock = mvstore.blocking_checkpoint_lock.clone();
Self {
@@ -130,6 +135,7 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
cursors: HashMap::new(),
destroyed_tables: HashSet::new(),
checkpoint_result: None,
update_transaction_state,
}
}
@@ -334,9 +340,11 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
return Err(crate::LimboError::Busy);
}
result?;
*self.connection.transaction_state.write() = TransactionState::Write {
schema_did_change: false,
}; // TODO: schema_did_change??
if self.update_transaction_state {
*self.connection.transaction_state.write() = TransactionState::Write {
schema_did_change: false,
}; // TODO: schema_did_change??
}
self.lock_states.pager_write_tx = true;
self.state = CheckpointState::WriteRow {
write_set_index: 0,
@@ -546,7 +554,9 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
self.state = CheckpointState::TruncateLogicalLog;
self.lock_states.pager_read_tx = false;
self.lock_states.pager_write_tx = false;
*self.connection.transaction_state.write() = TransactionState::None;
if self.update_transaction_state {
*self.connection.transaction_state.write() = TransactionState::None;
}
let header = self
.pager
.io
@@ -637,10 +647,14 @@ impl<Clock: LogicalClock> StateTransition for CheckpointStateMachine<Clock> {
.io
.block(|| self.pager.end_tx(rollback, self.connection.as_ref()))
.expect("failed to end pager write tx");
*self.connection.transaction_state.write() = TransactionState::None;
if self.update_transaction_state {
*self.connection.transaction_state.write() = TransactionState::None;
}
} else if self.lock_states.pager_read_tx {
self.pager.end_read_tx().unwrap();
*self.connection.transaction_state.write() = TransactionState::None;
if self.update_transaction_state {
*self.connection.transaction_state.write() = TransactionState::None;
}
}
if self.lock_states.blocking_checkpoint_lock_held {
self.checkpoint_lock.unlock();

View File

@@ -26,6 +26,7 @@ use crate::Value;
use crate::{Connection, Pager};
use crossbeam_skiplist::{SkipMap, SkipSet};
use parking_lot::RwLock;
use std::cell::RefCell;
use std::collections::HashSet;
use std::fmt::Debug;
use std::marker::PhantomData;
@@ -309,14 +310,28 @@ impl AtomicTransactionState {
}
}
#[derive(Debug)]
pub enum CommitState {
#[allow(clippy::large_enum_variant)]
pub enum CommitState<Clock: LogicalClock> {
Initial,
Commit { end_ts: u64 },
BeginCommitLogicalLog { end_ts: u64, log_record: LogRecord },
EndCommitLogicalLog { end_ts: u64 },
SyncLogicalLog { end_ts: u64 },
CommitEnd { end_ts: u64 },
Commit {
end_ts: u64,
},
BeginCommitLogicalLog {
end_ts: u64,
log_record: LogRecord,
},
EndCommitLogicalLog {
end_ts: u64,
},
SyncLogicalLog {
end_ts: u64,
},
Checkpoint {
state_machine: RefCell<StateMachine<CheckpointStateMachine<Clock>>>,
},
CommitEnd {
end_ts: u64,
},
}
#[derive(Debug)]
@@ -334,7 +349,7 @@ struct CommitCoordinator {
}
pub struct CommitStateMachine<Clock: LogicalClock> {
state: CommitState,
state: CommitState<Clock>,
is_finalized: bool,
did_commit_schema_change: bool,
tx_id: TxID,
@@ -343,6 +358,7 @@ pub struct CommitStateMachine<Clock: LogicalClock> {
write_set: Vec<RowID>,
commit_coordinator: Arc<CommitCoordinator>,
header: Arc<RwLock<Option<DatabaseHeader>>>,
pager: Arc<Pager>,
_phantom: PhantomData<Clock>,
}
@@ -380,12 +396,13 @@ pub struct DeleteRowStateMachine {
impl<Clock: LogicalClock> CommitStateMachine<Clock> {
fn new(
state: CommitState,
state: CommitState<Clock>,
tx_id: TxID,
connection: Arc<Connection>,
commit_coordinator: Arc<CommitCoordinator>,
header: Arc<RwLock<Option<DatabaseHeader>>>,
) -> Self {
let pager = connection.pager.read().clone();
Self {
state,
is_finalized: false,
@@ -394,6 +411,7 @@ impl<Clock: LogicalClock> CommitStateMachine<Clock> {
connection,
write_set: Vec::new(),
commit_coordinator,
pager,
header,
_phantom: PhantomData,
}
@@ -414,7 +432,7 @@ impl WriteRowStateMachine {
}
impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
type Context = MvStore<Clock>;
type Context = Arc<MvStore<Clock>>;
type SMResult = ();
#[tracing::instrument(fields(state = ?self.state), skip(self, mvcc_store), level = Level::DEBUG)]
@@ -620,6 +638,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
Ok(TransitionResult::Io(IOCompletions::Single(c)))
}
}
CommitState::SyncLogicalLog { end_ts } => {
let c = mvcc_store.storage.sync()?;
self.state = CommitState::EndCommitLogicalLog { end_ts: *end_ts };
@@ -678,10 +697,31 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
if mvcc_store.is_exclusive_tx(&self.tx_id) {
mvcc_store.release_exclusive_tx(&self.tx_id);
}
if mvcc_store.storage.should_checkpoint() {
let state_machine = StateMachine::new(CheckpointStateMachine::new(
self.pager.clone(),
mvcc_store.clone(),
self.connection.clone(),
false,
));
let state_machine = RefCell::new(state_machine);
self.state = CommitState::Checkpoint { state_machine };
return Ok(TransitionResult::Continue);
}
tracing::trace!("logged(tx_id={}, end_ts={})", self.tx_id, *end_ts);
self.finalize(mvcc_store)?;
Ok(TransitionResult::Done(()))
}
CommitState::Checkpoint { state_machine } => {
match state_machine.borrow_mut().step(&())? {
IOResult::Done(_) => {}
IOResult::IO(iocompletions) => {
return Ok(TransitionResult::Io(iocompletions));
}
}
self.finalize(mvcc_store)?;
return Ok(TransitionResult::Done(()));
}
}
}
@@ -1979,6 +2019,10 @@ impl<Clock: LogicalClock> MvStore<Clock> {
self.commit_load_tx(tx_id);
Ok(true)
}
pub fn set_checkpoint_threshold(&self, threshold: u64) {
self.storage.set_checkpoint_threshold(threshold)
}
}
/// A write-write conflict happens when transaction T_current attempts to update a
@@ -2108,3 +2152,29 @@ fn stmt_get_all_rows(stmt: &mut Statement) -> Result<Vec<Vec<Value>>> {
}
Ok(rows)
}
impl<Clock: LogicalClock> Debug for CommitState<Clock> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Initial => write!(f, "Initial"),
Self::Commit { end_ts } => f.debug_struct("Commit").field("end_ts", end_ts).finish(),
Self::BeginCommitLogicalLog { end_ts, log_record } => f
.debug_struct("BeginCommitLogicalLog")
.field("end_ts", end_ts)
.field("log_record", log_record)
.finish(),
Self::EndCommitLogicalLog { end_ts } => f
.debug_struct("EndCommitLogicalLog")
.field("end_ts", end_ts)
.finish(),
Self::SyncLogicalLog { end_ts } => f
.debug_struct("SyncLogicalLog")
.field("end_ts", end_ts)
.finish(),
Self::Checkpoint { state_machine: _ } => f.debug_struct("Checkpoint").finish(),
Self::CommitEnd { end_ts } => {
f.debug_struct("CommitEnd").field("end_ts", end_ts).finish()
}
}
}
}

View File

@@ -1507,3 +1507,32 @@ fn transaction_display() {
let output = format!("{tx}");
assert_eq!(output, expected);
}
#[test]
fn test_should_checkpoint() {
let db = MvccTestDbNoConn::new_with_random_db();
let mv_store = db.get_mvcc_store();
assert!(!mv_store.storage.should_checkpoint());
mv_store.set_checkpoint_threshold(0);
assert!(mv_store.storage.should_checkpoint());
}
#[test]
fn test_insert_with_checkpoint() {
let db = MvccTestDbNoConn::new_with_random_db();
let mv_store = db.get_mvcc_store();
// force checkpoint on every transaction
mv_store.set_checkpoint_threshold(0);
let conn = db.connect();
conn.execute("CREATE TABLE t(x)").unwrap();
conn.execute("INSERT INTO t VALUES (1)").unwrap();
let rows = get_rows(&conn, "SELECT * FROM t");
assert_eq!(rows.len(), 1);
let row = rows.first().unwrap();
assert_eq!(row.len(), 1);
let value = row.first().unwrap();
match value {
Value::Integer(i) => assert_eq!(*i, 1),
_ => unreachable!(),
}
}

View File

@@ -12,9 +12,17 @@ use std::sync::{Arc, RwLock};
use crate::File;
pub const DEFAULT_LOG_CHECKPOINT_THRESHOLD: u64 = 1024 * 1024 * 8; // 8 MiB as default to mimic
// 2000 pages in sqlite which is
// pretty much equal to
// 8MiB if page_size ==
// 4096 bytes
pub struct LogicalLog {
pub file: Arc<dyn File>,
pub offset: u64,
/// Size at which we start performing a checkpoint on the logical log.
checkpoint_threshold: u64,
}
/// Log's Header, this will be the 64 bytes in any logical log file.
@@ -140,7 +148,11 @@ impl LogRecordType {
impl LogicalLog {
pub fn new(file: Arc<dyn File>) -> Self {
Self { file, offset: 0 }
Self {
file,
offset: 0,
checkpoint_threshold: DEFAULT_LOG_CHECKPOINT_THRESHOLD,
}
}
pub fn log_tx(&mut self, tx: &LogRecord) -> Result<Completion> {
@@ -215,6 +227,14 @@ impl LogicalLog {
self.offset = 0;
Ok(c)
}
pub fn should_checkpoint(&self) -> bool {
self.offset >= self.checkpoint_threshold
}
pub fn set_checkpoint_threshold(&mut self, threshold: u64) {
self.checkpoint_threshold = threshold;
}
}
pub enum StreamingResult {

View File

@@ -38,6 +38,17 @@ impl Storage {
pub fn get_logical_log_file(&self) -> Arc<dyn File> {
self.logical_log.write().unwrap().file.clone()
}
pub fn should_checkpoint(&self) -> bool {
self.logical_log.read().unwrap().should_checkpoint()
}
pub fn set_checkpoint_threshold(&self, threshold: u64) {
self.logical_log
.write()
.unwrap()
.set_checkpoint_threshold(threshold)
}
}
impl Debug for Storage {

View File

@@ -385,6 +385,7 @@ pub fn op_checkpoint_inner(
pager.clone(),
mv_store.clone(),
program.connection.clone(),
true,
));
loop {
let result = ckpt_sm.step(&())?;