diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 9285fe150..0df58feb4 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -19,6 +19,7 @@ use crate::Result; use crate::{Connection, Pager}; use crossbeam_skiplist::{SkipMap, SkipSet}; use parking_lot::RwLock; +use std::cell::RefCell; use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Debug; @@ -116,16 +117,19 @@ pub struct Transaction { write_set: SkipSet, /// The transaction read set. read_set: SkipSet, + /// The transaction header. + header: RefCell, } impl Transaction { - fn new(tx_id: u64, begin_ts: u64) -> Transaction { + fn new(tx_id: u64, begin_ts: u64, header: DatabaseHeader) -> Transaction { Transaction { state: TransactionState::Active.into(), tx_id, begin_ts, write_set: SkipSet::new(), read_set: SkipSet::new(), + header: RefCell::new(header), } } @@ -370,6 +374,34 @@ impl CommitStateMachine { _phantom: PhantomData, } } + + /// We need to update pager's header to account for changes made by other transactions. + fn update_pager_header(&self, mvcc_store: &MvStore) -> Result<()> { + let header = self.header.read(); + let last_commited_header = header.as_ref().expect("Header not found"); + self.pager.io.block(|| self.pager.maybe_allocate_page1())?; + let _ = self.pager.io.block(|| { + self.pager.with_header_mut(|header_in_pager| { + let header_in_transaction = mvcc_store.get_transaction_database_header(&self.tx_id); + tracing::debug!("update header here {}", header_in_transaction.schema_cookie); + // database_size should only be updated in each commit so it should be safe to assume correct database_size is in last_commited_header + header_in_pager.database_size = last_commited_header.database_size; + if header_in_transaction.schema_cookie < last_commited_header.schema_cookie { + tracing::error!("txn's schema cookie went back in time, aborting"); + return Err(LimboError::SchemaUpdated); + } + + assert!( + header_in_transaction.schema_cookie >= last_commited_header.schema_cookie, + "txn's schema cookie went back in time" + ); + header_in_pager.schema_cookie = header_in_transaction.schema_cookie; + // TODO: deal with other fields + Ok(()) + }) + })?; + Ok(()) + } } impl WriteRowStateMachine { @@ -518,6 +550,7 @@ impl StateTransition for CommitStateMachine { // If this is the exclusive transaction, we already acquired a write transaction // on the pager in begin_exclusive_tx() and don't need to acquire it. if mvcc_store.is_exclusive_tx(&self.tx_id) { + self.update_pager_header(mvcc_store)?; self.state = CommitState::WriteRow { end_ts, write_set_index: 0, @@ -545,22 +578,15 @@ impl StateTransition for CommitStateMachine { Completion::new_dummy(), ))); } + + self.update_pager_header(mvcc_store)?; + { let mut wal = self.pager.wal.as_ref().unwrap().borrow_mut(); // we need to update the max frame to the latest shared max frame in order to avoid snapshot staleness wal.update_max_frame(); } - // TODO: Force updated header? - { - if let Some(last_commited_header) = self.header.read().as_ref() { - self.pager.io.block(|| { - self.pager.with_header_mut(|header_in_pager| { - header_in_pager.database_size = last_commited_header.database_size; - // TODO: deal with other fields - }) - })?; - } - } + // We started a pager read transaction at the beginning of the MV transaction, because // any reads we do from the database file and WAL must uphold snapshot isolation. // However, now we must end and immediately restart the read transaction before committing. @@ -740,11 +766,9 @@ impl StateTransition for CommitStateMachine { match result { IOResult::Done(_) => { // FIXME: hack for now to keep database header updated for pager commit - self.pager.io.block(|| { - self.pager.with_header(|header| { - self.header.write().replace(*header); - }) - })?; + let tx = mvcc_store.txs.get(&self.tx_id).unwrap(); + let tx_unlocked = tx.value(); + self.header.write().replace(*tx_unlocked.header.borrow()); self.commit_coordinator.pager_commit_lock.unlock(); // TODO: here mark we are ready for a batch self.state = CommitState::Commit { end_ts }; @@ -1011,7 +1035,7 @@ pub struct MvStore { /// exclusive transactions to support single-writer semantics for compatibility with SQLite. exclusive_tx: RwLock>, commit_coordinator: Arc, - header: Arc>>, + global_header: Arc>>, } impl MvStore { @@ -1030,7 +1054,7 @@ impl MvStore { pager_commit_lock: Arc::new(TursoRwLock::new()), commits_waiting: Arc::new(AtomicU64::new(0)), }), - header: Arc::new(RwLock::new(None)), + global_header: Arc::new(RwLock::new(None)), } } @@ -1352,6 +1376,7 @@ impl MvStore { pager.end_read_tx()?; return Err(LimboError::Busy); } + let header = self.get_new_transaction_database_header(&pager); // Try to acquire the pager write lock let begin_w_tx_res = pager.begin_write_tx(); if let Err(LimboError::Busy) = begin_w_tx_res { @@ -1368,7 +1393,7 @@ impl MvStore { return Err(LimboError::Busy); } return_if_io!(begin_w_tx_res); - let tx = Transaction::new(tx_id, begin_ts); + let tx = Transaction::new(tx_id, begin_ts, header); tracing::trace!( "begin_exclusive_tx(tx_id={}) - exclusive write transaction", tx_id @@ -1387,16 +1412,84 @@ impl MvStore { pub fn begin_tx(&self, pager: Arc) -> Result { let tx_id = self.get_tx_id(); let begin_ts = self.get_timestamp(); - let tx = Transaction::new(tx_id, begin_ts); - tracing::trace!("begin_tx(tx_id={})", tx_id); - self.txs.insert(tx_id, tx); // TODO: we need to tie a pager's read transaction to a transaction ID, so that future refactors to read // pages from WAL/DB read from a consistent state to maintiain snapshot isolation. pager.begin_read_tx()?; + + // Set txn's header to the global header + let header = self.get_new_transaction_database_header(&pager); + let tx = Transaction::new(tx_id, begin_ts, header); + tracing::trace!("begin_tx(tx_id={})", tx_id); + self.txs.insert(tx_id, tx); + Ok(tx_id) } + fn get_new_transaction_database_header(&self, pager: &Rc) -> DatabaseHeader { + if self.global_header.read().is_none() { + pager.io.block(|| pager.maybe_allocate_page1()).unwrap(); + let header = pager + .io + .block(|| pager.with_header(|header| *header)) + .unwrap(); + // TODO: We initialize header here, maybe this needs more careful handling + self.global_header.write().replace(header); + tracing::debug!( + "get_transaction_database_header create: header={:?}", + header + ); + header + } else { + let header = self.global_header.read().unwrap(); + tracing::debug!("get_transaction_database_header read: header={:?}", header); + header + } + } + + pub fn get_transaction_database_header(&self, tx_id: &TxID) -> DatabaseHeader { + let tx = self.txs.get(tx_id).unwrap(); + let header = tx.value(); + let header = header.header.borrow(); + tracing::debug!("get_transaction_database_header read: header={:?}", header); + *header + } + + pub fn with_header(&self, f: F, tx_id: Option<&TxID>) -> Result + where + F: Fn(&DatabaseHeader) -> T, + { + if let Some(tx_id) = tx_id { + let tx = self.txs.get(tx_id).unwrap(); + let header = tx.value(); + let header = header.header.borrow(); + tracing::debug!("with_header read: header={:?}", header); + Ok(f(&header)) + } else { + let header = self.global_header.read(); + tracing::debug!("with_header read: header={:?}", header); + Ok(f(header.as_ref().unwrap())) + } + } + + pub fn with_header_mut(&self, f: F, tx_id: Option<&TxID>) -> Result + where + F: Fn(&mut DatabaseHeader) -> T, + { + if let Some(tx_id) = tx_id { + let tx = self.txs.get(tx_id).unwrap(); + let header = tx.value(); + let mut header = header.header.borrow_mut(); + tracing::debug!("with_header_mut read: header={:?}", header); + Ok(f(&mut header)) + } else { + let mut header = self.global_header.write(); + let header = header.as_mut().unwrap(); + tracing::debug!("with_header_mut write: header={:?}", header); + Ok(f(header)) + } + } + /// Commits a transaction with the specified transaction ID. /// /// This function commits the changes made within the specified transaction and finalizes the @@ -1419,7 +1512,7 @@ impl MvStore { tx_id, connection.clone(), self.commit_coordinator.clone(), - self.header.clone(), + self.global_header.clone(), )); Ok(state_machine) } diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 9ff6f2416..720578e30 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -1,6 +1,8 @@ use super::*; use crate::io::PlatformIO; use crate::mvcc::clock::LocalClock; +use crate::storage::sqlite3_ondisk::DatabaseHeader; +use std::cell::RefCell; pub(crate) struct MvccTestDbNoConn { pub(crate) db: Option>, @@ -1077,6 +1079,7 @@ fn new_tx(tx_id: TxID, begin_ts: u64, state: TransactionState) -> Transaction { begin_ts, write_set: SkipSet::new(), read_set: SkipSet::new(), + header: RefCell::new(DatabaseHeader::default()), } } @@ -1488,6 +1491,7 @@ fn transaction_display() { begin_ts, write_set, read_set, + header: RefCell::new(DatabaseHeader::default()), }; let expected = "{ state: Preparing, id: 42, begin_ts: 20250914, write_set: [RowID { table_id: 1, row_id: 11 }, RowID { table_id: 1, row_id: 13 }], read_set: [RowID { table_id: 2, row_id: 17 }, RowID { table_id: 2, row_id: 19 }] }"; diff --git a/core/storage/wal.rs b/core/storage/wal.rs index d7368a5ce..1222c27ed 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -835,8 +835,10 @@ impl Wal for WalFile { // WAL and fetch pages directly from the DB file. We do this // by taking read‑lock 0, and capturing the latest state. if shared_max == nbackfills { + tracing::debug!("begin_read_tx: WAL is already fully back‑filled into the main DB image, shared_max={}, nbackfills={}", shared_max, nbackfills); let lock_0_idx = 0; if !self.get_shared().read_locks[lock_0_idx].read() { + tracing::debug!("begin_read_tx: read lock 0 is already held, returning Busy"); return Err(LimboError::Busy); } // we need to keep self.max_frame set to the appropriate diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 21fe827a9..4c46e2fed 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -8,7 +8,7 @@ use crate::storage::btree::{ use crate::storage::database::DatabaseFile; use crate::storage::page_cache::PageCache; use crate::storage::pager::{AtomicDbState, CreateBTreeFlags, DbState}; -use crate::storage::sqlite3_ondisk::read_varint; +use crate::storage::sqlite3_ondisk::{read_varint, DatabaseHeader}; use crate::translate::collate::CollationSeq; use crate::types::{ compare_immutable, compare_records_generic, Extendable, IOCompletions, ImmutableRecord, @@ -2154,7 +2154,6 @@ pub fn op_transaction( // for both. if program.connection.mv_tx.get().is_none() { // We allocate the first page lazily in the first transaction. - return_if_io!(pager.maybe_allocate_page1()); // TODO: when we fix MVCC enable schema cookie detection for reprepare statements // let header_schema_cookie = pager // .io @@ -2242,9 +2241,11 @@ pub fn op_transaction( // Can only read header if page 1 has been allocated already // begin_write_tx that happens, but not begin_read_tx // TODO: this is a hack to make the pager run the IO loop - let res = pager - .io - .block(|| pager.with_header(|header| header.schema_cookie.get())); + let res = pager.io.block(|| { + with_header(&pager, mv_store, program, |header| { + header.schema_cookie.get() + }) + }); match res { Ok(header_schema_cookie) => { if header_schema_cookie != *schema_cookie { @@ -6643,7 +6644,9 @@ pub fn op_page_count( // TODO: implement temp databases todo!("temp databases not implemented yet"); } - let count = match pager.with_header(|header| header.database_size.get()) { + let count = match with_header(pager, mv_store, program, |header| { + header.database_size.get() + }) { Err(_) => 0.into(), Ok(IOResult::Done(v)) => v.into(), Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)), @@ -6802,7 +6805,7 @@ pub fn op_read_cookie( todo!("temp databases not implemented yet"); } - let cookie_value = match pager.with_header(|header| match cookie { + let cookie_value = match with_header(pager, mv_store, program, |header| match cookie { Cookie::ApplicationId => header.application_id.get().into(), Cookie::UserVersion => header.user_version.get().into(), Cookie::SchemaVersion => header.schema_cookie.get().into(), @@ -6839,16 +6842,14 @@ pub fn op_set_cookie( todo!("temp databases not implemented yet"); } - return_if_io!(pager.with_header_mut(|header| { + return_if_io!(with_header_mut(pager, mv_store, program, |header| { match cookie { Cookie::ApplicationId => header.application_id = (*value).into(), Cookie::UserVersion => header.user_version = (*value).into(), Cookie::LargestRootPageNumber => { header.vacuum_mode_largest_root_page = (*value as u32).into(); } - Cookie::IncrementalVacuum => { - header.incremental_vacuum_enabled = (*value as u32).into() - } + Cookie::IncrementalVacuum => header.incremental_vacuum_enabled = (*value as u32).into(), Cookie::SchemaVersion => { // we update transaction state to indicate that the schema has changed match program.connection.transaction_state.get() { @@ -7110,7 +7111,7 @@ pub fn op_open_ephemeral( let page_size = pager .io - .block(|| pager.with_header(|header| header.page_size))? + .block(|| with_header(pager, mv_store, program, |header| header.page_size))? .get(); let buffer_pool = program.connection._db.buffer_pool.clone(); @@ -7128,7 +7129,7 @@ pub fn op_open_ephemeral( let page_size = pager .io - .block(|| pager.with_header(|header| header.page_size)) + .block(|| with_header(&pager, mv_store, program, |header| header.page_size)) .unwrap_or_default(); pager.page_size.set(Some(page_size)); @@ -7486,14 +7487,18 @@ pub fn op_integrity_check( match &mut state.op_integrity_check_state { OpIntegrityCheckState::Start => { let freelist_trunk_page = - return_if_io!(pager.with_header(|header| header.freelist_trunk_page.get())); + return_if_io!(with_header(pager, mv_store, program, |header| header + .freelist_trunk_page + .get())); let mut errors = Vec::new(); let mut integrity_check_state = IntegrityCheckState::new(); let mut current_root_idx = 0; // check freelist pages first, if there are any for database if freelist_trunk_page > 0 { let expected_freelist_count = - return_if_io!(pager.with_header(|header| header.freelist_pages.get())); + return_if_io!(with_header(pager, mv_store, program, |header| header + .freelist_pages + .get())); integrity_check_state.set_expected_freelist_count(expected_freelist_count as usize); integrity_check_state.start( freelist_trunk_page as usize, @@ -9249,6 +9254,42 @@ pub fn op_journal_mode( Ok(InsnFunctionStepResult::Step) } +fn with_header( + pager: &Rc, + mv_store: Option<&Arc>, + program: &Program, + f: F, +) -> Result> +where + F: Fn(&DatabaseHeader) -> T, +{ + if let Some(mv_store) = mv_store { + let tx_id = program.connection.mv_tx.get().map(|(tx_id, _)| tx_id); + mv_store.with_header(f, tx_id.as_ref()).map(IOResult::Done) + } else { + pager.with_header(&f) + } +} + +fn with_header_mut( + pager: &Rc, + mv_store: Option<&Arc>, + program: &Program, + f: F, +) -> Result> +where + F: Fn(&mut DatabaseHeader) -> T, +{ + if let Some(mv_store) = mv_store { + let tx_id = program.connection.mv_tx.get().map(|(tx_id, _)| tx_id); + mv_store + .with_header_mut(f, tx_id.as_ref()) + .map(IOResult::Done) + } else { + pager.with_header_mut(&f) + } +} + #[cfg(test)] mod tests { use super::*;