Merge ' core/mvcc: schema_did_change support and find last valid version ' from Pere Diaz Bou

1. Find latest version available of a row for a specific transaction.
For that we need to keep track which txid is finding a row.
2. Support for schema_did_change on commit txn
3. Extract the connection transaction state from non mvcc transactions
to
mvcc too.

Closes #2642
This commit is contained in:
Pekka Enberg
2025-08-18 19:22:24 +03:00
committed by GitHub
5 changed files with 187 additions and 97 deletions

View File

@@ -57,7 +57,9 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
CursorPosition::Loaded(id) => Some(id),
CursorPosition::BeforeFirst => {
// If we are before first, we need to try and find the first row.
let maybe_rowid = self.db.get_next_row_id_for_table(self.table_id, i64::MIN);
let maybe_rowid =
self.db
.get_next_row_id_for_table(self.table_id, i64::MIN, self.tx_id);
if let Some(id) = maybe_rowid {
self.current_pos = CursorPosition::Loaded(id);
Some(id)
@@ -75,7 +77,9 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
CursorPosition::Loaded(id) => self.db.read(self.tx_id, id),
CursorPosition::BeforeFirst => {
// If we are before first, we need to try and find the first row.
let maybe_rowid = self.db.get_next_row_id_for_table(self.table_id, i64::MIN);
let maybe_rowid =
self.db
.get_next_row_id_for_table(self.table_id, i64::MIN, self.tx_id);
if let Some(id) = maybe_rowid {
self.current_pos = CursorPosition::Loaded(id);
self.db.read(self.tx_id, id)
@@ -103,18 +107,22 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
return false;
}
};
self.current_pos = match self.db.get_next_row_id_for_table(self.table_id, min_id) {
Some(id) => CursorPosition::Loaded(id),
None => {
if before_first {
// if it wasn't loaded and we didn't find anything, it means the table is empty.
CursorPosition::BeforeFirst
} else {
// if we had something loaded, and we didn't find next key then it means we are at the end.
CursorPosition::End
self.current_pos =
match self
.db
.get_next_row_id_for_table(self.table_id, min_id, self.tx_id)
{
Some(id) => CursorPosition::Loaded(id),
None => {
if before_first {
// if it wasn't loaded and we didn't find anything, it means the table is empty.
CursorPosition::BeforeFirst
} else {
// if we had something loaded, and we didn't find next key then it means we are at the end.
CursorPosition::End
}
}
}
};
};
matches!(self.current_pos, CursorPosition::Loaded(_))
}
@@ -175,7 +183,7 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
SeekOp::LT => (Bound::Excluded(&rowid), false),
SeekOp::LE { eq_only: _ } => (Bound::Included(&rowid), false),
};
let rowid = self.db.seek_rowid(bound, lower_bound);
let rowid = self.db.seek_rowid(bound, lower_bound, self.tx_id);
if let Some(rowid) = rowid {
self.current_pos = CursorPosition::Loaded(rowid);
if op.eq_only() {
@@ -211,6 +219,7 @@ impl<Clock: LogicalClock> MvccLazyCursor<Clock> {
row_id: *int_key,
}),
true,
self.tx_id,
)
.is_some();
if exists {

View File

@@ -506,8 +506,14 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
)
.map_err(|e| LimboError::InternalError(e.to_string()))
.unwrap();
if let crate::types::IOResult::Done(_) = result {
break;
match result {
crate::types::IOResult::Done(_) => {
break;
}
crate::types::IOResult::IO(io) => {
io.wait(self.pager.io.as_ref())?;
continue;
}
}
}
self.state = CommitState::Commit { end_ts };
@@ -881,7 +887,12 @@ impl<Clock: LogicalClock> MvStore<Clock> {
Ok(())
}
pub fn get_next_row_id_for_table(&self, table_id: u64, start: i64) -> Option<RowID> {
pub fn get_next_row_id_for_table(
&self,
table_id: u64,
start: i64,
tx_id: TxID,
) -> Option<RowID> {
tracing::trace!(
"getting_next_id_for_table(table_id={}, range_start={})",
table_id,
@@ -897,19 +908,50 @@ impl<Clock: LogicalClock> MvStore<Clock> {
row_id: i64::MAX,
};
self.rows
.range(min_bound..max_bound)
.next()
.map(|entry| *entry.key())
let tx = self.txs.get(&tx_id).unwrap();
let tx = tx.value().read();
let mut rows = self.rows.range(min_bound..max_bound);
rows.next().and_then(|row| {
// Find last valid version based on transaction.
self.find_last_visible_version(&tx, row)
})
}
pub fn seek_rowid(&self, bound: Bound<&RowID>, lower_bound: bool) -> Option<RowID> {
fn find_last_visible_version(
&self,
tx: &parking_lot::lock_api::RwLockReadGuard<'_, parking_lot::RawRwLock, Transaction>,
row: crossbeam_skiplist::map::Entry<
'_,
RowID,
parking_lot::lock_api::RwLock<parking_lot::RawRwLock, Vec<RowVersion>>,
>,
) -> Option<RowID> {
row.value()
.read()
.iter()
.rev()
.find(|version| version.is_visible_to(tx, &self.txs))
.map(|_| *row.key())
}
pub fn seek_rowid(
&self,
bound: Bound<&RowID>,
lower_bound: bool,
tx_id: TxID,
) -> Option<RowID> {
tracing::trace!("seek_rowid(bound={:?}, lower_bound={})", bound, lower_bound,);
let tx = self.txs.get(&tx_id).unwrap();
let tx = tx.value().read();
if lower_bound {
self.rows.lower_bound(bound).map(|entry| *entry.key())
self.rows
.lower_bound(bound)
.and_then(|entry| self.find_last_visible_version(&tx, entry))
} else {
self.rows.upper_bound(bound).map(|entry| *entry.key())
self.rows
.upper_bound(bound)
.and_then(|entry| self.find_last_visible_version(&tx, entry))
}
}

View File

@@ -654,10 +654,10 @@ fn test_future_row() {
use crate::mvcc::cursor::MvccLazyCursor;
use crate::mvcc::database::{MvStore, Row, RowID};
use crate::types::Text;
use crate::Database;
use crate::MemoryIO;
use crate::RefValue;
use crate::Value;
use crate::{Database, StepResult};
// Simple atomic clock implementation for testing
@@ -1124,6 +1124,38 @@ fn test_restart() {
}
}
#[test]
fn test_connection_sees_other_connection_changes() {
let db = MvccTestDbNoConn::new_with_random_db();
let conn0 = db.connect();
conn0
.execute("CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, text TEXT)")
.unwrap();
let conn1 = db.connect();
conn1
.execute("CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, text TEXT)")
.unwrap();
conn0
.execute("INSERT INTO test_table (id, text) VALUES (965, 'text_877')")
.unwrap();
let mut stmt = conn1.query("SELECT * FROM test_table").unwrap().unwrap();
loop {
let res = stmt.step().unwrap();
match res {
StepResult::Row => {
let row = stmt.row().unwrap();
let text = row.get_value(1).to_text().unwrap();
assert_eq!(text, "text_877");
}
StepResult::Done => break,
StepResult::IO => {
stmt.run_once().unwrap();
}
_ => panic!("Expected Row"),
}
}
}
fn get_record_value(row: &Row) -> ImmutableRecord {
let mut record = ImmutableRecord::new(1024);
record.start_serialization(&row.data);

View File

@@ -971,6 +971,7 @@ impl Pager {
TransactionState::Write { schema_did_change } => (true, schema_did_change),
_ => (false, false),
};
tracing::trace!("end_tx(schema_did_change={})", schema_did_change);
if rollback {
if is_write {
wal.borrow().end_write_tx();

View File

@@ -1981,7 +1981,50 @@ pub fn op_transaction(
let pager = program.get_pager_from_database_index(db);
// 1. We try to upgrade current version
let current_state = conn.transaction_state.get();
let (new_transaction_state, updated) = match (current_state, write) {
// pending state means that we tried beginning a tx and the method returned IO.
// instead of ending the read tx, just update the state to pending.
(TransactionState::PendingUpgrade, write) => {
turso_assert!(
*write,
"pending upgrade should only be set for write transactions"
);
(
TransactionState::Write {
schema_did_change: false,
},
true,
)
}
(TransactionState::Write { schema_did_change }, true) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Write { schema_did_change }, false) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Read, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::None, false) => (TransactionState::Read, true),
};
// 2. Start transaction if needed
if let Some(mv_store) = &mv_store {
// In MVCC we don't have write exclusivity, therefore we just need to start a transaction if needed.
// Programs can run Transaction twice, first with read flag and then with write flag. So a single txid is enough
// for both.
if state.mv_tx_id.is_none() {
// We allocate the first page lazily in the first transaction.
return_if_io!(pager.maybe_allocate_page1());
@@ -1997,43 +2040,6 @@ pub fn op_transaction(
state.mv_tx_id = Some(tx_id);
}
} else {
let current_state = conn.transaction_state.get();
let (new_transaction_state, updated) = match (current_state, write) {
// pending state means that we tried beginning a tx and the method returned IO.
// instead of ending the read tx, just update the state to pending.
(TransactionState::PendingUpgrade, write) => {
turso_assert!(
*write,
"pending upgrade should only be set for write transactions"
);
(
TransactionState::Write {
schema_did_change: false,
},
true,
)
}
(TransactionState::Write { schema_did_change }, true) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Write { schema_did_change }, false) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Read, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::None, false) => (TransactionState::Read, true),
};
if updated && matches!(current_state, TransactionState::None) {
if let LimboResult::Busy = pager.begin_read_tx()? {
return Ok(InsnFunctionStepResult::Busy);
@@ -2061,36 +2067,38 @@ pub fn op_transaction(
}
}
}
}
// Transaction state should be updated before checking for Schema cookie so that the tx is ended properly on error
if updated {
conn.transaction_state.replace(new_transaction_state);
}
// 3. Transaction state should be updated before checking for Schema cookie so that the tx is ended properly on error
if updated {
conn.transaction_state.replace(new_transaction_state);
}
// Check whether schema has changed if we are actually going to access the database.
if !matches!(new_transaction_state, TransactionState::None) {
let res = pager
.io
.block(|| pager.with_header(|header| header.schema_cookie.get()));
match res {
Ok(header_schema_cookie) => {
if header_schema_cookie != *schema_cookie {
tracing::info!(
"schema changed, force reprepare: {} != {}",
header_schema_cookie,
*schema_cookie
);
return Err(LimboError::SchemaUpdated);
}
}
// This means we are starting a read transaction and page 1 is not allocated yet, so we just continue execution
Err(LimboError::Page1NotAlloc) => {}
Err(err) => {
return Err(err);
}
// 4. Check whether schema has changed if we are actually going to access the database.
// Can only read header if page 1 has been allocated already
// begin_write_tx that happens, but not begin_read_tx
// TODO: this is a hack to make the pager run the IO loop
let res = pager
.io
.block(|| pager.with_header(|header| header.schema_cookie.get()));
match res {
Ok(header_schema_cookie) => {
if header_schema_cookie != *schema_cookie {
tracing::info!(
"schema changed, force reprepare: {} != {}",
header_schema_cookie,
*schema_cookie
);
return Err(LimboError::SchemaUpdated);
}
}
// This means we are starting a read_tx and we do not have a page 1 yet, so we just continue execution
Err(LimboError::Page1NotAlloc) => {}
Err(err) => {
return Err(err);
}
}
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
@@ -6476,16 +6484,14 @@ pub fn op_set_cookie(
header.incremental_vacuum_enabled = (*value as u32).into()
}
Cookie::SchemaVersion => {
if mv_store.is_none() {
// we update transaction state to indicate that the schema has changed
match program.connection.transaction_state.get() {
TransactionState::Write { schema_did_change } => {
program.connection.transaction_state.set(TransactionState::Write { schema_did_change: true });
},
TransactionState::Read => unreachable!("invalid transaction state for SetCookie: TransactionState::Read, should be write"),
TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"),
TransactionState::PendingUpgrade => unreachable!("invalid transaction state for SetCookie: TransactionState::PendingUpgrade, should be write"),
}
// we update transaction state to indicate that the schema has changed
match program.connection.transaction_state.get() {
TransactionState::Write { schema_did_change } => {
program.connection.transaction_state.set(TransactionState::Write { schema_did_change: true });
},
TransactionState::Read => unreachable!("invalid transaction state for SetCookie: TransactionState::Read, should be write"),
TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"),
TransactionState::PendingUpgrade => unreachable!("invalid transaction state for SetCookie: TransactionState::PendingUpgrade, should be write"),
}
program
.connection