From f8c110e66436f3ae637c9a967e9bd1f2b33e9818 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 18 Aug 2025 13:08:36 +0200 Subject: [PATCH 1/2] core/mvcc: find latest valid version for row for every operation Previously we only checked first version of row... --- core/mvcc/cursor.rs | 37 +++++++++++++--------- core/mvcc/database/mod.rs | 66 +++++++++++++++++++++++++++++++-------- 2 files changed, 76 insertions(+), 27 deletions(-) diff --git a/core/mvcc/cursor.rs b/core/mvcc/cursor.rs index 0f54065fc..a428a18b7 100644 --- a/core/mvcc/cursor.rs +++ b/core/mvcc/cursor.rs @@ -57,7 +57,9 @@ impl MvccLazyCursor { CursorPosition::Loaded(id) => Some(id), CursorPosition::BeforeFirst => { // If we are before first, we need to try and find the first row. - let maybe_rowid = self.db.get_next_row_id_for_table(self.table_id, i64::MIN); + let maybe_rowid = + self.db + .get_next_row_id_for_table(self.table_id, i64::MIN, self.tx_id); if let Some(id) = maybe_rowid { self.current_pos = CursorPosition::Loaded(id); Some(id) @@ -75,7 +77,9 @@ impl MvccLazyCursor { CursorPosition::Loaded(id) => self.db.read(self.tx_id, id), CursorPosition::BeforeFirst => { // If we are before first, we need to try and find the first row. - let maybe_rowid = self.db.get_next_row_id_for_table(self.table_id, i64::MIN); + let maybe_rowid = + self.db + .get_next_row_id_for_table(self.table_id, i64::MIN, self.tx_id); if let Some(id) = maybe_rowid { self.current_pos = CursorPosition::Loaded(id); self.db.read(self.tx_id, id) @@ -103,18 +107,22 @@ impl MvccLazyCursor { return false; } }; - self.current_pos = match self.db.get_next_row_id_for_table(self.table_id, min_id) { - Some(id) => CursorPosition::Loaded(id), - None => { - if before_first { - // if it wasn't loaded and we didn't find anything, it means the table is empty. - CursorPosition::BeforeFirst - } else { - // if we had something loaded, and we didn't find next key then it means we are at the end. - CursorPosition::End + self.current_pos = + match self + .db + .get_next_row_id_for_table(self.table_id, min_id, self.tx_id) + { + Some(id) => CursorPosition::Loaded(id), + None => { + if before_first { + // if it wasn't loaded and we didn't find anything, it means the table is empty. + CursorPosition::BeforeFirst + } else { + // if we had something loaded, and we didn't find next key then it means we are at the end. + CursorPosition::End + } } - } - }; + }; matches!(self.current_pos, CursorPosition::Loaded(_)) } @@ -175,7 +183,7 @@ impl MvccLazyCursor { SeekOp::LT => (Bound::Excluded(&rowid), false), SeekOp::LE { eq_only: _ } => (Bound::Included(&rowid), false), }; - let rowid = self.db.seek_rowid(bound, lower_bound); + let rowid = self.db.seek_rowid(bound, lower_bound, self.tx_id); if let Some(rowid) = rowid { self.current_pos = CursorPosition::Loaded(rowid); if op.eq_only() { @@ -211,6 +219,7 @@ impl MvccLazyCursor { row_id: *int_key, }), true, + self.tx_id, ) .is_some(); if exists { diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 24e55256e..911d97e9f 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -881,7 +881,12 @@ impl MvStore { Ok(()) } - pub fn get_next_row_id_for_table(&self, table_id: u64, start: i64) -> Option { + pub fn get_next_row_id_for_table( + &self, + table_id: u64, + start: i64, + tx_id: TxID, + ) -> Option { tracing::trace!( "getting_next_id_for_table(table_id={}, range_start={})", table_id, @@ -897,19 +902,50 @@ impl MvStore { row_id: i64::MAX, }; - self.rows - .range(min_bound..max_bound) - .next() - .map(|entry| *entry.key()) + let tx = self.txs.get(&tx_id).unwrap(); + let tx = tx.value().read(); + let mut rows = self.rows.range(min_bound..max_bound); + rows.next().and_then(|row| { + // Find last valid version based on transaction. + self.find_last_visible_version(&tx, row) + }) } - pub fn seek_rowid(&self, bound: Bound<&RowID>, lower_bound: bool) -> Option { + fn find_last_visible_version( + &self, + tx: &parking_lot::lock_api::RwLockReadGuard<'_, parking_lot::RawRwLock, Transaction>, + row: crossbeam_skiplist::map::Entry< + '_, + RowID, + parking_lot::lock_api::RwLock>, + >, + ) -> Option { + row.value() + .read() + .iter() + .rev() + .find(|version| version.is_visible_to(tx, &self.txs)) + .map(|_| *row.key()) + } + + pub fn seek_rowid( + &self, + bound: Bound<&RowID>, + lower_bound: bool, + tx_id: TxID, + ) -> Option { tracing::trace!("seek_rowid(bound={:?}, lower_bound={})", bound, lower_bound,); + let tx = self.txs.get(&tx_id).unwrap(); + let tx = tx.value().read(); if lower_bound { - self.rows.lower_bound(bound).map(|entry| *entry.key()) + self.rows + .lower_bound(bound) + .and_then(|entry| self.find_last_visible_version(&tx, entry)) } else { - self.rows.upper_bound(bound).map(|entry| *entry.key()) + self.rows + .upper_bound(bound) + .and_then(|entry| self.find_last_visible_version(&tx, entry)) } } @@ -945,12 +981,16 @@ impl MvStore { tx_id: TxID, pager: Rc, connection: &Arc, + schema_did_change: bool, ) -> Result>> { - let state_machine: StateMachine> = StateMachine::< - CommitStateMachine, - >::new( - CommitStateMachine::new(CommitState::Initial, pager, tx_id, connection.clone()), - ); + let state_machine: StateMachine> = + StateMachine::>::new(CommitStateMachine::new( + CommitState::Initial, + pager, + tx_id, + connection.clone(), + schema_did_change, + )); Ok(state_machine) } From 94cd504d4cbe2e72c58b7ba8878af395aebd058a Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Mon, 18 Aug 2025 13:10:21 +0200 Subject: [PATCH 2/2] core/mvcc: support schema_did change on commit_txn This not only changes schema_did_change on commit_txn for mvcc, but also extracts the connection transaction state from non mvcc transactions to mvcc too. --- core/mvcc/database/mod.rs | 24 +++--- core/mvcc/database/tests.rs | 34 +++++++- core/storage/pager.rs | 1 + core/vdbe/execute.rs | 150 +++++++++++++++++++----------------- 4 files changed, 125 insertions(+), 84 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 911d97e9f..881601210 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -506,8 +506,14 @@ impl StateTransition for CommitStateMachine { ) .map_err(|e| LimboError::InternalError(e.to_string())) .unwrap(); - if let crate::types::IOResult::Done(_) = result { - break; + match result { + crate::types::IOResult::Done(_) => { + break; + } + crate::types::IOResult::IO(io) => { + io.wait(self.pager.io.as_ref())?; + continue; + } } } self.state = CommitState::Commit { end_ts }; @@ -981,16 +987,12 @@ impl MvStore { tx_id: TxID, pager: Rc, connection: &Arc, - schema_did_change: bool, ) -> Result>> { - let state_machine: StateMachine> = - StateMachine::>::new(CommitStateMachine::new( - CommitState::Initial, - pager, - tx_id, - connection.clone(), - schema_did_change, - )); + let state_machine: StateMachine> = StateMachine::< + CommitStateMachine, + >::new( + CommitStateMachine::new(CommitState::Initial, pager, tx_id, connection.clone()), + ); Ok(state_machine) } diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 2ae781891..138cb08f2 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -654,10 +654,10 @@ fn test_future_row() { use crate::mvcc::cursor::MvccLazyCursor; use crate::mvcc::database::{MvStore, Row, RowID}; use crate::types::Text; -use crate::Database; use crate::MemoryIO; use crate::RefValue; use crate::Value; +use crate::{Database, StepResult}; // Simple atomic clock implementation for testing @@ -1124,6 +1124,38 @@ fn test_restart() { } } +#[test] +fn test_connection_sees_other_connection_changes() { + let db = MvccTestDbNoConn::new_with_random_db(); + let conn0 = db.connect(); + conn0 + .execute("CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, text TEXT)") + .unwrap(); + let conn1 = db.connect(); + conn1 + .execute("CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, text TEXT)") + .unwrap(); + conn0 + .execute("INSERT INTO test_table (id, text) VALUES (965, 'text_877')") + .unwrap(); + let mut stmt = conn1.query("SELECT * FROM test_table").unwrap().unwrap(); + loop { + let res = stmt.step().unwrap(); + match res { + StepResult::Row => { + let row = stmt.row().unwrap(); + let text = row.get_value(1).to_text().unwrap(); + assert_eq!(text, "text_877"); + } + StepResult::Done => break, + StepResult::IO => { + stmt.run_once().unwrap(); + } + _ => panic!("Expected Row"), + } + } +} + fn get_record_value(row: &Row) -> ImmutableRecord { let mut record = ImmutableRecord::new(1024); record.start_serialization(&row.data); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index e256d41de..2010f626a 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -971,6 +971,7 @@ impl Pager { TransactionState::Write { schema_did_change } => (true, schema_did_change), _ => (false, false), }; + tracing::trace!("end_tx(schema_did_change={})", schema_did_change); if rollback { if is_write { wal.borrow().end_write_tx(); diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 89ac0a2d6..7d932ef04 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -1981,7 +1981,50 @@ pub fn op_transaction( let pager = program.get_pager_from_database_index(db); + // 1. We try to upgrade current version + let current_state = conn.transaction_state.get(); + let (new_transaction_state, updated) = match (current_state, write) { + // pending state means that we tried beginning a tx and the method returned IO. + // instead of ending the read tx, just update the state to pending. + (TransactionState::PendingUpgrade, write) => { + turso_assert!( + *write, + "pending upgrade should only be set for write transactions" + ); + ( + TransactionState::Write { + schema_did_change: false, + }, + true, + ) + } + (TransactionState::Write { schema_did_change }, true) => { + (TransactionState::Write { schema_did_change }, false) + } + (TransactionState::Write { schema_did_change }, false) => { + (TransactionState::Write { schema_did_change }, false) + } + (TransactionState::Read, true) => ( + TransactionState::Write { + schema_did_change: false, + }, + true, + ), + (TransactionState::Read, false) => (TransactionState::Read, false), + (TransactionState::None, true) => ( + TransactionState::Write { + schema_did_change: false, + }, + true, + ), + (TransactionState::None, false) => (TransactionState::Read, true), + }; + + // 2. Start transaction if needed if let Some(mv_store) = &mv_store { + // In MVCC we don't have write exclusivity, therefore we just need to start a transaction if needed. + // Programs can run Transaction twice, first with read flag and then with write flag. So a single txid is enough + // for both. if state.mv_tx_id.is_none() { // We allocate the first page lazily in the first transaction. return_if_io!(pager.maybe_allocate_page1()); @@ -1997,43 +2040,6 @@ pub fn op_transaction( state.mv_tx_id = Some(tx_id); } } else { - let current_state = conn.transaction_state.get(); - let (new_transaction_state, updated) = match (current_state, write) { - // pending state means that we tried beginning a tx and the method returned IO. - // instead of ending the read tx, just update the state to pending. - (TransactionState::PendingUpgrade, write) => { - turso_assert!( - *write, - "pending upgrade should only be set for write transactions" - ); - ( - TransactionState::Write { - schema_did_change: false, - }, - true, - ) - } - (TransactionState::Write { schema_did_change }, true) => { - (TransactionState::Write { schema_did_change }, false) - } - (TransactionState::Write { schema_did_change }, false) => { - (TransactionState::Write { schema_did_change }, false) - } - (TransactionState::Read, true) => ( - TransactionState::Write { - schema_did_change: false, - }, - true, - ), - (TransactionState::Read, false) => (TransactionState::Read, false), - (TransactionState::None, true) => ( - TransactionState::Write { - schema_did_change: false, - }, - true, - ), - (TransactionState::None, false) => (TransactionState::Read, true), - }; if updated && matches!(current_state, TransactionState::None) { if let LimboResult::Busy = pager.begin_read_tx()? { return Ok(InsnFunctionStepResult::Busy); @@ -2061,36 +2067,38 @@ pub fn op_transaction( } } } + } - // Transaction state should be updated before checking for Schema cookie so that the tx is ended properly on error - if updated { - conn.transaction_state.replace(new_transaction_state); - } + // 3. Transaction state should be updated before checking for Schema cookie so that the tx is ended properly on error + if updated { + conn.transaction_state.replace(new_transaction_state); + } - // Check whether schema has changed if we are actually going to access the database. - if !matches!(new_transaction_state, TransactionState::None) { - let res = pager - .io - .block(|| pager.with_header(|header| header.schema_cookie.get())); - match res { - Ok(header_schema_cookie) => { - if header_schema_cookie != *schema_cookie { - tracing::info!( - "schema changed, force reprepare: {} != {}", - header_schema_cookie, - *schema_cookie - ); - return Err(LimboError::SchemaUpdated); - } - } - // This means we are starting a read transaction and page 1 is not allocated yet, so we just continue execution - Err(LimboError::Page1NotAlloc) => {} - Err(err) => { - return Err(err); - } + // 4. Check whether schema has changed if we are actually going to access the database. + // Can only read header if page 1 has been allocated already + // begin_write_tx that happens, but not begin_read_tx + // TODO: this is a hack to make the pager run the IO loop + let res = pager + .io + .block(|| pager.with_header(|header| header.schema_cookie.get())); + match res { + Ok(header_schema_cookie) => { + if header_schema_cookie != *schema_cookie { + tracing::info!( + "schema changed, force reprepare: {} != {}", + header_schema_cookie, + *schema_cookie + ); + return Err(LimboError::SchemaUpdated); } } + // This means we are starting a read_tx and we do not have a page 1 yet, so we just continue execution + Err(LimboError::Page1NotAlloc) => {} + Err(err) => { + return Err(err); + } } + state.pc += 1; Ok(InsnFunctionStepResult::Step) } @@ -6476,16 +6484,14 @@ pub fn op_set_cookie( header.incremental_vacuum_enabled = (*value as u32).into() } Cookie::SchemaVersion => { - if mv_store.is_none() { - // we update transaction state to indicate that the schema has changed - match program.connection.transaction_state.get() { - TransactionState::Write { schema_did_change } => { - program.connection.transaction_state.set(TransactionState::Write { schema_did_change: true }); - }, - TransactionState::Read => unreachable!("invalid transaction state for SetCookie: TransactionState::Read, should be write"), - TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"), - TransactionState::PendingUpgrade => unreachable!("invalid transaction state for SetCookie: TransactionState::PendingUpgrade, should be write"), - } + // we update transaction state to indicate that the schema has changed + match program.connection.transaction_state.get() { + TransactionState::Write { schema_did_change } => { + program.connection.transaction_state.set(TransactionState::Write { schema_did_change: true }); + }, + TransactionState::Read => unreachable!("invalid transaction state for SetCookie: TransactionState::Read, should be write"), + TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"), + TransactionState::PendingUpgrade => unreachable!("invalid transaction state for SetCookie: TransactionState::PendingUpgrade, should be write"), } program .connection