Merge 'Add MVCC checkpoint threshold pragma' from bit-aloo

closes: #3575

Closes #3604
This commit is contained in:
Pekka Enberg
2025-10-07 08:46:41 +03:00
committed by GitHub
7 changed files with 52 additions and 0 deletions

View File

@@ -2360,6 +2360,23 @@ impl Connection {
pub(crate) fn get_mv_tx(&self) -> Option<(u64, TransactionMode)> {
*self.mv_tx.read()
}
pub(crate) fn set_mvcc_checkpoint_threshold(&self, threshold: u64) -> Result<()> {
match self.db.mv_store.as_ref() {
Some(mv_store) => {
mv_store.set_checkpoint_threshold(threshold);
Ok(())
}
None => Err(LimboError::InternalError("MVCC not enabled".into())),
}
}
pub(crate) fn mvcc_checkpoint_threshold(&self) -> Result<u64> {
match self.db.mv_store.as_ref() {
Some(mv_store) => Ok(mv_store.checkpoint_threshold()),
None => Err(LimboError::InternalError("MVCC not enabled".into())),
}
}
}
#[derive(Debug)]

View File

@@ -2035,6 +2035,10 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pub fn set_checkpoint_threshold(&self, threshold: u64) {
self.storage.set_checkpoint_threshold(threshold)
}
pub fn checkpoint_threshold(&self) -> u64 {
self.storage.checkpoint_threshold()
}
}
/// A write-write conflict happens when transaction T_current attempts to update a

View File

@@ -235,6 +235,10 @@ impl LogicalLog {
pub fn set_checkpoint_threshold(&mut self, threshold: u64) {
self.checkpoint_threshold = threshold;
}
pub fn checkpoint_threshold(&self) -> u64 {
self.checkpoint_threshold
}
}
pub enum StreamingResult {

View File

@@ -49,6 +49,10 @@ impl Storage {
.unwrap()
.set_checkpoint_threshold(threshold)
}
pub fn checkpoint_threshold(&self) -> u64 {
self.logical_log.read().unwrap().checkpoint_threshold()
}
}
impl Debug for Storage {

View File

@@ -127,6 +127,10 @@ pub fn pragma_for(pragma: &PragmaName) -> Pragma {
PragmaFlags::Result0 | PragmaFlags::SchemaReq | PragmaFlags::NoColumns1,
&["cipher"],
),
PragmaName::MvccCheckpointThreshold => Pragma::new(
PragmaFlags::NoColumns1 | PragmaFlags::Result0,
&["mvcc_checkpoint_threshold"],
),
}
}

View File

@@ -378,6 +378,15 @@ fn update_pragma(
connection.set_data_sync_retry(retry_enabled);
Ok((program, TransactionMode::None))
}
PragmaName::MvccCheckpointThreshold => {
let threshold = match parse_signed_number(&value)? {
Value::Integer(size) if size > 0 => size as u64,
_ => bail_parse_error!("mvcc_checkpoint_threshold must be a positive integer"),
};
connection.set_mvcc_checkpoint_threshold(threshold)?;
Ok((program, TransactionMode::None))
}
}
}
@@ -687,6 +696,14 @@ fn query_pragma(
program.add_pragma_result_column(pragma.to_string());
Ok((program, TransactionMode::None))
}
PragmaName::MvccCheckpointThreshold => {
let threshold = connection.mvcc_checkpoint_threshold()?;
let register = program.alloc_register();
program.emit_int(threshold as i64, register);
program.emit_result_row(register, 1);
program.add_pragma_result_column(pragma.to_string());
Ok((program, TransactionMode::None))
}
}
}

View File

@@ -1449,6 +1449,8 @@ pub enum PragmaName {
UserVersion,
/// trigger a checkpoint to run on database(s) if WAL is enabled
WalCheckpoint,
/// Sets or queries the threshold (in bytes) at which MVCC triggers an automatic checkpoint.
MvccCheckpointThreshold,
}
/// `CREATE TRIGGER` time