From 64616dc2ca852f1aa1ff9b3152d573ef1e49640c Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 16 Sep 2025 20:21:52 +0200 Subject: [PATCH] core/mvcc: introduce with_header for MVCC header update tracking Currently header changes are tracked through pager by reading page 1. MVCC has it's own layer to track changes during txn so this commit makes it so that headers are tracked by each txn separately. On commit we update the _global_ header which is used to update `database_size` because pager commits require it to be up to date. This also makes it _simpler_ to keep track of header updates and update pager's header accordingly. --- core/mvcc/database/mod.rs | 141 ++++++++++++++++++++++++++++++------ core/mvcc/database/tests.rs | 4 + core/storage/wal.rs | 2 + core/vdbe/execute.rs | 71 ++++++++++++++---- 4 files changed, 179 insertions(+), 39 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 9285fe150..0df58feb4 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -19,6 +19,7 @@ use crate::Result; use crate::{Connection, Pager}; use crossbeam_skiplist::{SkipMap, SkipSet}; use parking_lot::RwLock; +use std::cell::RefCell; use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Debug; @@ -116,16 +117,19 @@ pub struct Transaction { write_set: SkipSet, /// The transaction read set. read_set: SkipSet, + /// The transaction header. + header: RefCell, } impl Transaction { - fn new(tx_id: u64, begin_ts: u64) -> Transaction { + fn new(tx_id: u64, begin_ts: u64, header: DatabaseHeader) -> Transaction { Transaction { state: TransactionState::Active.into(), tx_id, begin_ts, write_set: SkipSet::new(), read_set: SkipSet::new(), + header: RefCell::new(header), } } @@ -370,6 +374,34 @@ impl CommitStateMachine { _phantom: PhantomData, } } + + /// We need to update pager's header to account for changes made by other transactions. + fn update_pager_header(&self, mvcc_store: &MvStore) -> Result<()> { + let header = self.header.read(); + let last_commited_header = header.as_ref().expect("Header not found"); + self.pager.io.block(|| self.pager.maybe_allocate_page1())?; + let _ = self.pager.io.block(|| { + self.pager.with_header_mut(|header_in_pager| { + let header_in_transaction = mvcc_store.get_transaction_database_header(&self.tx_id); + tracing::debug!("update header here {}", header_in_transaction.schema_cookie); + // database_size should only be updated in each commit so it should be safe to assume correct database_size is in last_commited_header + header_in_pager.database_size = last_commited_header.database_size; + if header_in_transaction.schema_cookie < last_commited_header.schema_cookie { + tracing::error!("txn's schema cookie went back in time, aborting"); + return Err(LimboError::SchemaUpdated); + } + + assert!( + header_in_transaction.schema_cookie >= last_commited_header.schema_cookie, + "txn's schema cookie went back in time" + ); + header_in_pager.schema_cookie = header_in_transaction.schema_cookie; + // TODO: deal with other fields + Ok(()) + }) + })?; + Ok(()) + } } impl WriteRowStateMachine { @@ -518,6 +550,7 @@ impl StateTransition for CommitStateMachine { // If this is the exclusive transaction, we already acquired a write transaction // on the pager in begin_exclusive_tx() and don't need to acquire it. if mvcc_store.is_exclusive_tx(&self.tx_id) { + self.update_pager_header(mvcc_store)?; self.state = CommitState::WriteRow { end_ts, write_set_index: 0, @@ -545,22 +578,15 @@ impl StateTransition for CommitStateMachine { Completion::new_dummy(), ))); } + + self.update_pager_header(mvcc_store)?; + { let mut wal = self.pager.wal.as_ref().unwrap().borrow_mut(); // we need to update the max frame to the latest shared max frame in order to avoid snapshot staleness wal.update_max_frame(); } - // TODO: Force updated header? - { - if let Some(last_commited_header) = self.header.read().as_ref() { - self.pager.io.block(|| { - self.pager.with_header_mut(|header_in_pager| { - header_in_pager.database_size = last_commited_header.database_size; - // TODO: deal with other fields - }) - })?; - } - } + // We started a pager read transaction at the beginning of the MV transaction, because // any reads we do from the database file and WAL must uphold snapshot isolation. // However, now we must end and immediately restart the read transaction before committing. @@ -740,11 +766,9 @@ impl StateTransition for CommitStateMachine { match result { IOResult::Done(_) => { // FIXME: hack for now to keep database header updated for pager commit - self.pager.io.block(|| { - self.pager.with_header(|header| { - self.header.write().replace(*header); - }) - })?; + let tx = mvcc_store.txs.get(&self.tx_id).unwrap(); + let tx_unlocked = tx.value(); + self.header.write().replace(*tx_unlocked.header.borrow()); self.commit_coordinator.pager_commit_lock.unlock(); // TODO: here mark we are ready for a batch self.state = CommitState::Commit { end_ts }; @@ -1011,7 +1035,7 @@ pub struct MvStore { /// exclusive transactions to support single-writer semantics for compatibility with SQLite. exclusive_tx: RwLock>, commit_coordinator: Arc, - header: Arc>>, + global_header: Arc>>, } impl MvStore { @@ -1030,7 +1054,7 @@ impl MvStore { pager_commit_lock: Arc::new(TursoRwLock::new()), commits_waiting: Arc::new(AtomicU64::new(0)), }), - header: Arc::new(RwLock::new(None)), + global_header: Arc::new(RwLock::new(None)), } } @@ -1352,6 +1376,7 @@ impl MvStore { pager.end_read_tx()?; return Err(LimboError::Busy); } + let header = self.get_new_transaction_database_header(&pager); // Try to acquire the pager write lock let begin_w_tx_res = pager.begin_write_tx(); if let Err(LimboError::Busy) = begin_w_tx_res { @@ -1368,7 +1393,7 @@ impl MvStore { return Err(LimboError::Busy); } return_if_io!(begin_w_tx_res); - let tx = Transaction::new(tx_id, begin_ts); + let tx = Transaction::new(tx_id, begin_ts, header); tracing::trace!( "begin_exclusive_tx(tx_id={}) - exclusive write transaction", tx_id @@ -1387,16 +1412,84 @@ impl MvStore { pub fn begin_tx(&self, pager: Arc) -> Result { let tx_id = self.get_tx_id(); let begin_ts = self.get_timestamp(); - let tx = Transaction::new(tx_id, begin_ts); - tracing::trace!("begin_tx(tx_id={})", tx_id); - self.txs.insert(tx_id, tx); // TODO: we need to tie a pager's read transaction to a transaction ID, so that future refactors to read // pages from WAL/DB read from a consistent state to maintiain snapshot isolation. pager.begin_read_tx()?; + + // Set txn's header to the global header + let header = self.get_new_transaction_database_header(&pager); + let tx = Transaction::new(tx_id, begin_ts, header); + tracing::trace!("begin_tx(tx_id={})", tx_id); + self.txs.insert(tx_id, tx); + Ok(tx_id) } + fn get_new_transaction_database_header(&self, pager: &Rc) -> DatabaseHeader { + if self.global_header.read().is_none() { + pager.io.block(|| pager.maybe_allocate_page1()).unwrap(); + let header = pager + .io + .block(|| pager.with_header(|header| *header)) + .unwrap(); + // TODO: We initialize header here, maybe this needs more careful handling + self.global_header.write().replace(header); + tracing::debug!( + "get_transaction_database_header create: header={:?}", + header + ); + header + } else { + let header = self.global_header.read().unwrap(); + tracing::debug!("get_transaction_database_header read: header={:?}", header); + header + } + } + + pub fn get_transaction_database_header(&self, tx_id: &TxID) -> DatabaseHeader { + let tx = self.txs.get(tx_id).unwrap(); + let header = tx.value(); + let header = header.header.borrow(); + tracing::debug!("get_transaction_database_header read: header={:?}", header); + *header + } + + pub fn with_header(&self, f: F, tx_id: Option<&TxID>) -> Result + where + F: Fn(&DatabaseHeader) -> T, + { + if let Some(tx_id) = tx_id { + let tx = self.txs.get(tx_id).unwrap(); + let header = tx.value(); + let header = header.header.borrow(); + tracing::debug!("with_header read: header={:?}", header); + Ok(f(&header)) + } else { + let header = self.global_header.read(); + tracing::debug!("with_header read: header={:?}", header); + Ok(f(header.as_ref().unwrap())) + } + } + + pub fn with_header_mut(&self, f: F, tx_id: Option<&TxID>) -> Result + where + F: Fn(&mut DatabaseHeader) -> T, + { + if let Some(tx_id) = tx_id { + let tx = self.txs.get(tx_id).unwrap(); + let header = tx.value(); + let mut header = header.header.borrow_mut(); + tracing::debug!("with_header_mut read: header={:?}", header); + Ok(f(&mut header)) + } else { + let mut header = self.global_header.write(); + let header = header.as_mut().unwrap(); + tracing::debug!("with_header_mut write: header={:?}", header); + Ok(f(header)) + } + } + /// Commits a transaction with the specified transaction ID. /// /// This function commits the changes made within the specified transaction and finalizes the @@ -1419,7 +1512,7 @@ impl MvStore { tx_id, connection.clone(), self.commit_coordinator.clone(), - self.header.clone(), + self.global_header.clone(), )); Ok(state_machine) } diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 9ff6f2416..720578e30 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -1,6 +1,8 @@ use super::*; use crate::io::PlatformIO; use crate::mvcc::clock::LocalClock; +use crate::storage::sqlite3_ondisk::DatabaseHeader; +use std::cell::RefCell; pub(crate) struct MvccTestDbNoConn { pub(crate) db: Option>, @@ -1077,6 +1079,7 @@ fn new_tx(tx_id: TxID, begin_ts: u64, state: TransactionState) -> Transaction { begin_ts, write_set: SkipSet::new(), read_set: SkipSet::new(), + header: RefCell::new(DatabaseHeader::default()), } } @@ -1488,6 +1491,7 @@ fn transaction_display() { begin_ts, write_set, read_set, + header: RefCell::new(DatabaseHeader::default()), }; let expected = "{ state: Preparing, id: 42, begin_ts: 20250914, write_set: [RowID { table_id: 1, row_id: 11 }, RowID { table_id: 1, row_id: 13 }], read_set: [RowID { table_id: 2, row_id: 17 }, RowID { table_id: 2, row_id: 19 }] }"; diff --git a/core/storage/wal.rs b/core/storage/wal.rs index d7368a5ce..1222c27ed 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -835,8 +835,10 @@ impl Wal for WalFile { // WAL and fetch pages directly from the DB file. We do this // by taking read‑lock 0, and capturing the latest state. if shared_max == nbackfills { + tracing::debug!("begin_read_tx: WAL is already fully back‑filled into the main DB image, shared_max={}, nbackfills={}", shared_max, nbackfills); let lock_0_idx = 0; if !self.get_shared().read_locks[lock_0_idx].read() { + tracing::debug!("begin_read_tx: read lock 0 is already held, returning Busy"); return Err(LimboError::Busy); } // we need to keep self.max_frame set to the appropriate diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 21fe827a9..4c46e2fed 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -8,7 +8,7 @@ use crate::storage::btree::{ use crate::storage::database::DatabaseFile; use crate::storage::page_cache::PageCache; use crate::storage::pager::{AtomicDbState, CreateBTreeFlags, DbState}; -use crate::storage::sqlite3_ondisk::read_varint; +use crate::storage::sqlite3_ondisk::{read_varint, DatabaseHeader}; use crate::translate::collate::CollationSeq; use crate::types::{ compare_immutable, compare_records_generic, Extendable, IOCompletions, ImmutableRecord, @@ -2154,7 +2154,6 @@ pub fn op_transaction( // for both. if program.connection.mv_tx.get().is_none() { // We allocate the first page lazily in the first transaction. - return_if_io!(pager.maybe_allocate_page1()); // TODO: when we fix MVCC enable schema cookie detection for reprepare statements // let header_schema_cookie = pager // .io @@ -2242,9 +2241,11 @@ pub fn op_transaction( // Can only read header if page 1 has been allocated already // begin_write_tx that happens, but not begin_read_tx // TODO: this is a hack to make the pager run the IO loop - let res = pager - .io - .block(|| pager.with_header(|header| header.schema_cookie.get())); + let res = pager.io.block(|| { + with_header(&pager, mv_store, program, |header| { + header.schema_cookie.get() + }) + }); match res { Ok(header_schema_cookie) => { if header_schema_cookie != *schema_cookie { @@ -6643,7 +6644,9 @@ pub fn op_page_count( // TODO: implement temp databases todo!("temp databases not implemented yet"); } - let count = match pager.with_header(|header| header.database_size.get()) { + let count = match with_header(pager, mv_store, program, |header| { + header.database_size.get() + }) { Err(_) => 0.into(), Ok(IOResult::Done(v)) => v.into(), Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)), @@ -6802,7 +6805,7 @@ pub fn op_read_cookie( todo!("temp databases not implemented yet"); } - let cookie_value = match pager.with_header(|header| match cookie { + let cookie_value = match with_header(pager, mv_store, program, |header| match cookie { Cookie::ApplicationId => header.application_id.get().into(), Cookie::UserVersion => header.user_version.get().into(), Cookie::SchemaVersion => header.schema_cookie.get().into(), @@ -6839,16 +6842,14 @@ pub fn op_set_cookie( todo!("temp databases not implemented yet"); } - return_if_io!(pager.with_header_mut(|header| { + return_if_io!(with_header_mut(pager, mv_store, program, |header| { match cookie { Cookie::ApplicationId => header.application_id = (*value).into(), Cookie::UserVersion => header.user_version = (*value).into(), Cookie::LargestRootPageNumber => { header.vacuum_mode_largest_root_page = (*value as u32).into(); } - Cookie::IncrementalVacuum => { - header.incremental_vacuum_enabled = (*value as u32).into() - } + Cookie::IncrementalVacuum => header.incremental_vacuum_enabled = (*value as u32).into(), Cookie::SchemaVersion => { // we update transaction state to indicate that the schema has changed match program.connection.transaction_state.get() { @@ -7110,7 +7111,7 @@ pub fn op_open_ephemeral( let page_size = pager .io - .block(|| pager.with_header(|header| header.page_size))? + .block(|| with_header(pager, mv_store, program, |header| header.page_size))? .get(); let buffer_pool = program.connection._db.buffer_pool.clone(); @@ -7128,7 +7129,7 @@ pub fn op_open_ephemeral( let page_size = pager .io - .block(|| pager.with_header(|header| header.page_size)) + .block(|| with_header(&pager, mv_store, program, |header| header.page_size)) .unwrap_or_default(); pager.page_size.set(Some(page_size)); @@ -7486,14 +7487,18 @@ pub fn op_integrity_check( match &mut state.op_integrity_check_state { OpIntegrityCheckState::Start => { let freelist_trunk_page = - return_if_io!(pager.with_header(|header| header.freelist_trunk_page.get())); + return_if_io!(with_header(pager, mv_store, program, |header| header + .freelist_trunk_page + .get())); let mut errors = Vec::new(); let mut integrity_check_state = IntegrityCheckState::new(); let mut current_root_idx = 0; // check freelist pages first, if there are any for database if freelist_trunk_page > 0 { let expected_freelist_count = - return_if_io!(pager.with_header(|header| header.freelist_pages.get())); + return_if_io!(with_header(pager, mv_store, program, |header| header + .freelist_pages + .get())); integrity_check_state.set_expected_freelist_count(expected_freelist_count as usize); integrity_check_state.start( freelist_trunk_page as usize, @@ -9249,6 +9254,42 @@ pub fn op_journal_mode( Ok(InsnFunctionStepResult::Step) } +fn with_header( + pager: &Rc, + mv_store: Option<&Arc>, + program: &Program, + f: F, +) -> Result> +where + F: Fn(&DatabaseHeader) -> T, +{ + if let Some(mv_store) = mv_store { + let tx_id = program.connection.mv_tx.get().map(|(tx_id, _)| tx_id); + mv_store.with_header(f, tx_id.as_ref()).map(IOResult::Done) + } else { + pager.with_header(&f) + } +} + +fn with_header_mut( + pager: &Rc, + mv_store: Option<&Arc>, + program: &Program, + f: F, +) -> Result> +where + F: Fn(&mut DatabaseHeader) -> T, +{ + if let Some(mv_store) = mv_store { + let tx_id = program.connection.mv_tx.get().map(|(tx_id, _)| tx_id); + mv_store + .with_header_mut(f, tx_id.as_ref()) + .map(IOResult::Done) + } else { + pager.with_header_mut(&f) + } +} + #[cfg(test)] mod tests { use super::*;