core/mvcc: introduce with_header for MVCC header update tracking

Currently header changes are tracked through pager by reading page 1.
MVCC has it's own layer to track changes during txn so this commit makes
it so that headers are tracked by each txn separately.

On commit we update the _global_ header which is used to update
`database_size` because pager commits require it to be up to date. This
also makes it _simpler_ to keep track of header updates and update
pager's header accordingly.
This commit is contained in:
Pere Diaz Bou
2025-09-16 20:21:52 +02:00
parent e6822d26ab
commit 64616dc2ca
4 changed files with 179 additions and 39 deletions

View File

@@ -19,6 +19,7 @@ use crate::Result;
use crate::{Connection, Pager};
use crossbeam_skiplist::{SkipMap, SkipSet};
use parking_lot::RwLock;
use std::cell::RefCell;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
@@ -116,16 +117,19 @@ pub struct Transaction {
write_set: SkipSet<RowID>,
/// The transaction read set.
read_set: SkipSet<RowID>,
/// The transaction header.
header: RefCell<DatabaseHeader>,
}
impl Transaction {
fn new(tx_id: u64, begin_ts: u64) -> Transaction {
fn new(tx_id: u64, begin_ts: u64, header: DatabaseHeader) -> Transaction {
Transaction {
state: TransactionState::Active.into(),
tx_id,
begin_ts,
write_set: SkipSet::new(),
read_set: SkipSet::new(),
header: RefCell::new(header),
}
}
@@ -370,6 +374,34 @@ impl<Clock: LogicalClock> CommitStateMachine<Clock> {
_phantom: PhantomData,
}
}
/// We need to update pager's header to account for changes made by other transactions.
fn update_pager_header(&self, mvcc_store: &MvStore<Clock>) -> Result<()> {
let header = self.header.read();
let last_commited_header = header.as_ref().expect("Header not found");
self.pager.io.block(|| self.pager.maybe_allocate_page1())?;
let _ = self.pager.io.block(|| {
self.pager.with_header_mut(|header_in_pager| {
let header_in_transaction = mvcc_store.get_transaction_database_header(&self.tx_id);
tracing::debug!("update header here {}", header_in_transaction.schema_cookie);
// database_size should only be updated in each commit so it should be safe to assume correct database_size is in last_commited_header
header_in_pager.database_size = last_commited_header.database_size;
if header_in_transaction.schema_cookie < last_commited_header.schema_cookie {
tracing::error!("txn's schema cookie went back in time, aborting");
return Err(LimboError::SchemaUpdated);
}
assert!(
header_in_transaction.schema_cookie >= last_commited_header.schema_cookie,
"txn's schema cookie went back in time"
);
header_in_pager.schema_cookie = header_in_transaction.schema_cookie;
// TODO: deal with other fields
Ok(())
})
})?;
Ok(())
}
}
impl WriteRowStateMachine {
@@ -518,6 +550,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
// If this is the exclusive transaction, we already acquired a write transaction
// on the pager in begin_exclusive_tx() and don't need to acquire it.
if mvcc_store.is_exclusive_tx(&self.tx_id) {
self.update_pager_header(mvcc_store)?;
self.state = CommitState::WriteRow {
end_ts,
write_set_index: 0,
@@ -545,22 +578,15 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
Completion::new_dummy(),
)));
}
self.update_pager_header(mvcc_store)?;
{
let mut wal = self.pager.wal.as_ref().unwrap().borrow_mut();
// we need to update the max frame to the latest shared max frame in order to avoid snapshot staleness
wal.update_max_frame();
}
// TODO: Force updated header?
{
if let Some(last_commited_header) = self.header.read().as_ref() {
self.pager.io.block(|| {
self.pager.with_header_mut(|header_in_pager| {
header_in_pager.database_size = last_commited_header.database_size;
// TODO: deal with other fields
})
})?;
}
}
// We started a pager read transaction at the beginning of the MV transaction, because
// any reads we do from the database file and WAL must uphold snapshot isolation.
// However, now we must end and immediately restart the read transaction before committing.
@@ -740,11 +766,9 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
match result {
IOResult::Done(_) => {
// FIXME: hack for now to keep database header updated for pager commit
self.pager.io.block(|| {
self.pager.with_header(|header| {
self.header.write().replace(*header);
})
})?;
let tx = mvcc_store.txs.get(&self.tx_id).unwrap();
let tx_unlocked = tx.value();
self.header.write().replace(*tx_unlocked.header.borrow());
self.commit_coordinator.pager_commit_lock.unlock();
// TODO: here mark we are ready for a batch
self.state = CommitState::Commit { end_ts };
@@ -1011,7 +1035,7 @@ pub struct MvStore<Clock: LogicalClock> {
/// exclusive transactions to support single-writer semantics for compatibility with SQLite.
exclusive_tx: RwLock<Option<TxID>>,
commit_coordinator: Arc<CommitCoordinator>,
header: Arc<RwLock<Option<DatabaseHeader>>>,
global_header: Arc<RwLock<Option<DatabaseHeader>>>,
}
impl<Clock: LogicalClock> MvStore<Clock> {
@@ -1030,7 +1054,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pager_commit_lock: Arc::new(TursoRwLock::new()),
commits_waiting: Arc::new(AtomicU64::new(0)),
}),
header: Arc::new(RwLock::new(None)),
global_header: Arc::new(RwLock::new(None)),
}
}
@@ -1352,6 +1376,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pager.end_read_tx()?;
return Err(LimboError::Busy);
}
let header = self.get_new_transaction_database_header(&pager);
// Try to acquire the pager write lock
let begin_w_tx_res = pager.begin_write_tx();
if let Err(LimboError::Busy) = begin_w_tx_res {
@@ -1368,7 +1393,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
return Err(LimboError::Busy);
}
return_if_io!(begin_w_tx_res);
let tx = Transaction::new(tx_id, begin_ts);
let tx = Transaction::new(tx_id, begin_ts, header);
tracing::trace!(
"begin_exclusive_tx(tx_id={}) - exclusive write transaction",
tx_id
@@ -1387,16 +1412,84 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pub fn begin_tx(&self, pager: Arc<Pager>) -> Result<TxID> {
let tx_id = self.get_tx_id();
let begin_ts = self.get_timestamp();
let tx = Transaction::new(tx_id, begin_ts);
tracing::trace!("begin_tx(tx_id={})", tx_id);
self.txs.insert(tx_id, tx);
// TODO: we need to tie a pager's read transaction to a transaction ID, so that future refactors to read
// pages from WAL/DB read from a consistent state to maintiain snapshot isolation.
pager.begin_read_tx()?;
// Set txn's header to the global header
let header = self.get_new_transaction_database_header(&pager);
let tx = Transaction::new(tx_id, begin_ts, header);
tracing::trace!("begin_tx(tx_id={})", tx_id);
self.txs.insert(tx_id, tx);
Ok(tx_id)
}
fn get_new_transaction_database_header(&self, pager: &Rc<Pager>) -> DatabaseHeader {
if self.global_header.read().is_none() {
pager.io.block(|| pager.maybe_allocate_page1()).unwrap();
let header = pager
.io
.block(|| pager.with_header(|header| *header))
.unwrap();
// TODO: We initialize header here, maybe this needs more careful handling
self.global_header.write().replace(header);
tracing::debug!(
"get_transaction_database_header create: header={:?}",
header
);
header
} else {
let header = self.global_header.read().unwrap();
tracing::debug!("get_transaction_database_header read: header={:?}", header);
header
}
}
pub fn get_transaction_database_header(&self, tx_id: &TxID) -> DatabaseHeader {
let tx = self.txs.get(tx_id).unwrap();
let header = tx.value();
let header = header.header.borrow();
tracing::debug!("get_transaction_database_header read: header={:?}", header);
*header
}
pub fn with_header<T, F>(&self, f: F, tx_id: Option<&TxID>) -> Result<T>
where
F: Fn(&DatabaseHeader) -> T,
{
if let Some(tx_id) = tx_id {
let tx = self.txs.get(tx_id).unwrap();
let header = tx.value();
let header = header.header.borrow();
tracing::debug!("with_header read: header={:?}", header);
Ok(f(&header))
} else {
let header = self.global_header.read();
tracing::debug!("with_header read: header={:?}", header);
Ok(f(header.as_ref().unwrap()))
}
}
pub fn with_header_mut<T, F>(&self, f: F, tx_id: Option<&TxID>) -> Result<T>
where
F: Fn(&mut DatabaseHeader) -> T,
{
if let Some(tx_id) = tx_id {
let tx = self.txs.get(tx_id).unwrap();
let header = tx.value();
let mut header = header.header.borrow_mut();
tracing::debug!("with_header_mut read: header={:?}", header);
Ok(f(&mut header))
} else {
let mut header = self.global_header.write();
let header = header.as_mut().unwrap();
tracing::debug!("with_header_mut write: header={:?}", header);
Ok(f(header))
}
}
/// Commits a transaction with the specified transaction ID.
///
/// This function commits the changes made within the specified transaction and finalizes the
@@ -1419,7 +1512,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
tx_id,
connection.clone(),
self.commit_coordinator.clone(),
self.header.clone(),
self.global_header.clone(),
));
Ok(state_machine)
}

View File

@@ -1,6 +1,8 @@
use super::*;
use crate::io::PlatformIO;
use crate::mvcc::clock::LocalClock;
use crate::storage::sqlite3_ondisk::DatabaseHeader;
use std::cell::RefCell;
pub(crate) struct MvccTestDbNoConn {
pub(crate) db: Option<Arc<Database>>,
@@ -1077,6 +1079,7 @@ fn new_tx(tx_id: TxID, begin_ts: u64, state: TransactionState) -> Transaction {
begin_ts,
write_set: SkipSet::new(),
read_set: SkipSet::new(),
header: RefCell::new(DatabaseHeader::default()),
}
}
@@ -1488,6 +1491,7 @@ fn transaction_display() {
begin_ts,
write_set,
read_set,
header: RefCell::new(DatabaseHeader::default()),
};
let expected = "{ state: Preparing, id: 42, begin_ts: 20250914, write_set: [RowID { table_id: 1, row_id: 11 }, RowID { table_id: 1, row_id: 13 }], read_set: [RowID { table_id: 2, row_id: 17 }, RowID { table_id: 2, row_id: 19 }] }";

View File

@@ -835,8 +835,10 @@ impl Wal for WalFile {
// WAL and fetch pages directly from the DB file. We do this
// by taking readlock 0, and capturing the latest state.
if shared_max == nbackfills {
tracing::debug!("begin_read_tx: WAL is already fully backfilled into the main DB image, shared_max={}, nbackfills={}", shared_max, nbackfills);
let lock_0_idx = 0;
if !self.get_shared().read_locks[lock_0_idx].read() {
tracing::debug!("begin_read_tx: read lock 0 is already held, returning Busy");
return Err(LimboError::Busy);
}
// we need to keep self.max_frame set to the appropriate

View File

@@ -8,7 +8,7 @@ use crate::storage::btree::{
use crate::storage::database::DatabaseFile;
use crate::storage::page_cache::PageCache;
use crate::storage::pager::{AtomicDbState, CreateBTreeFlags, DbState};
use crate::storage::sqlite3_ondisk::read_varint;
use crate::storage::sqlite3_ondisk::{read_varint, DatabaseHeader};
use crate::translate::collate::CollationSeq;
use crate::types::{
compare_immutable, compare_records_generic, Extendable, IOCompletions, ImmutableRecord,
@@ -2154,7 +2154,6 @@ pub fn op_transaction(
// for both.
if program.connection.mv_tx.get().is_none() {
// We allocate the first page lazily in the first transaction.
return_if_io!(pager.maybe_allocate_page1());
// TODO: when we fix MVCC enable schema cookie detection for reprepare statements
// let header_schema_cookie = pager
// .io
@@ -2242,9 +2241,11 @@ pub fn op_transaction(
// Can only read header if page 1 has been allocated already
// begin_write_tx that happens, but not begin_read_tx
// TODO: this is a hack to make the pager run the IO loop
let res = pager
.io
.block(|| pager.with_header(|header| header.schema_cookie.get()));
let res = pager.io.block(|| {
with_header(&pager, mv_store, program, |header| {
header.schema_cookie.get()
})
});
match res {
Ok(header_schema_cookie) => {
if header_schema_cookie != *schema_cookie {
@@ -6643,7 +6644,9 @@ pub fn op_page_count(
// TODO: implement temp databases
todo!("temp databases not implemented yet");
}
let count = match pager.with_header(|header| header.database_size.get()) {
let count = match with_header(pager, mv_store, program, |header| {
header.database_size.get()
}) {
Err(_) => 0.into(),
Ok(IOResult::Done(v)) => v.into(),
Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)),
@@ -6802,7 +6805,7 @@ pub fn op_read_cookie(
todo!("temp databases not implemented yet");
}
let cookie_value = match pager.with_header(|header| match cookie {
let cookie_value = match with_header(pager, mv_store, program, |header| match cookie {
Cookie::ApplicationId => header.application_id.get().into(),
Cookie::UserVersion => header.user_version.get().into(),
Cookie::SchemaVersion => header.schema_cookie.get().into(),
@@ -6839,16 +6842,14 @@ pub fn op_set_cookie(
todo!("temp databases not implemented yet");
}
return_if_io!(pager.with_header_mut(|header| {
return_if_io!(with_header_mut(pager, mv_store, program, |header| {
match cookie {
Cookie::ApplicationId => header.application_id = (*value).into(),
Cookie::UserVersion => header.user_version = (*value).into(),
Cookie::LargestRootPageNumber => {
header.vacuum_mode_largest_root_page = (*value as u32).into();
}
Cookie::IncrementalVacuum => {
header.incremental_vacuum_enabled = (*value as u32).into()
}
Cookie::IncrementalVacuum => header.incremental_vacuum_enabled = (*value as u32).into(),
Cookie::SchemaVersion => {
// we update transaction state to indicate that the schema has changed
match program.connection.transaction_state.get() {
@@ -7110,7 +7111,7 @@ pub fn op_open_ephemeral(
let page_size = pager
.io
.block(|| pager.with_header(|header| header.page_size))?
.block(|| with_header(pager, mv_store, program, |header| header.page_size))?
.get();
let buffer_pool = program.connection._db.buffer_pool.clone();
@@ -7128,7 +7129,7 @@ pub fn op_open_ephemeral(
let page_size = pager
.io
.block(|| pager.with_header(|header| header.page_size))
.block(|| with_header(&pager, mv_store, program, |header| header.page_size))
.unwrap_or_default();
pager.page_size.set(Some(page_size));
@@ -7486,14 +7487,18 @@ pub fn op_integrity_check(
match &mut state.op_integrity_check_state {
OpIntegrityCheckState::Start => {
let freelist_trunk_page =
return_if_io!(pager.with_header(|header| header.freelist_trunk_page.get()));
return_if_io!(with_header(pager, mv_store, program, |header| header
.freelist_trunk_page
.get()));
let mut errors = Vec::new();
let mut integrity_check_state = IntegrityCheckState::new();
let mut current_root_idx = 0;
// check freelist pages first, if there are any for database
if freelist_trunk_page > 0 {
let expected_freelist_count =
return_if_io!(pager.with_header(|header| header.freelist_pages.get()));
return_if_io!(with_header(pager, mv_store, program, |header| header
.freelist_pages
.get()));
integrity_check_state.set_expected_freelist_count(expected_freelist_count as usize);
integrity_check_state.start(
freelist_trunk_page as usize,
@@ -9249,6 +9254,42 @@ pub fn op_journal_mode(
Ok(InsnFunctionStepResult::Step)
}
fn with_header<T, F>(
pager: &Rc<Pager>,
mv_store: Option<&Arc<MvStore>>,
program: &Program,
f: F,
) -> Result<IOResult<T>>
where
F: Fn(&DatabaseHeader) -> T,
{
if let Some(mv_store) = mv_store {
let tx_id = program.connection.mv_tx.get().map(|(tx_id, _)| tx_id);
mv_store.with_header(f, tx_id.as_ref()).map(IOResult::Done)
} else {
pager.with_header(&f)
}
}
fn with_header_mut<T, F>(
pager: &Rc<Pager>,
mv_store: Option<&Arc<MvStore>>,
program: &Program,
f: F,
) -> Result<IOResult<T>>
where
F: Fn(&mut DatabaseHeader) -> T,
{
if let Some(mv_store) = mv_store {
let tx_id = program.connection.mv_tx.get().map(|(tx_id, _)| tx_id);
mv_store
.with_header_mut(f, tx_id.as_ref())
.map(IOResult::Done)
} else {
pager.with_header_mut(&f)
}
}
#[cfg(test)]
mod tests {
use super::*;