core/mvcc: support schema_did change on commit_txn

This not only changes schema_did_change on commit_txn for mvcc, but also
extracts the connection transaction state from non mvcc transactions to
mvcc too.
This commit is contained in:
Pere Diaz Bou
2025-08-18 13:10:21 +02:00
parent f8c110e664
commit 94cd504d4c
4 changed files with 125 additions and 84 deletions

View File

@@ -506,8 +506,14 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
)
.map_err(|e| LimboError::InternalError(e.to_string()))
.unwrap();
if let crate::types::IOResult::Done(_) = result {
break;
match result {
crate::types::IOResult::Done(_) => {
break;
}
crate::types::IOResult::IO(io) => {
io.wait(self.pager.io.as_ref())?;
continue;
}
}
}
self.state = CommitState::Commit { end_ts };
@@ -981,16 +987,12 @@ impl<Clock: LogicalClock> MvStore<Clock> {
tx_id: TxID,
pager: Rc<Pager>,
connection: &Arc<Connection>,
schema_did_change: bool,
) -> Result<StateMachine<CommitStateMachine<Clock>>> {
let state_machine: StateMachine<CommitStateMachine<Clock>> =
StateMachine::<CommitStateMachine<Clock>>::new(CommitStateMachine::new(
CommitState::Initial,
pager,
tx_id,
connection.clone(),
schema_did_change,
));
let state_machine: StateMachine<CommitStateMachine<Clock>> = StateMachine::<
CommitStateMachine<Clock>,
>::new(
CommitStateMachine::new(CommitState::Initial, pager, tx_id, connection.clone()),
);
Ok(state_machine)
}

View File

@@ -654,10 +654,10 @@ fn test_future_row() {
use crate::mvcc::cursor::MvccLazyCursor;
use crate::mvcc::database::{MvStore, Row, RowID};
use crate::types::Text;
use crate::Database;
use crate::MemoryIO;
use crate::RefValue;
use crate::Value;
use crate::{Database, StepResult};
// Simple atomic clock implementation for testing
@@ -1124,6 +1124,38 @@ fn test_restart() {
}
}
#[test]
fn test_connection_sees_other_connection_changes() {
let db = MvccTestDbNoConn::new_with_random_db();
let conn0 = db.connect();
conn0
.execute("CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, text TEXT)")
.unwrap();
let conn1 = db.connect();
conn1
.execute("CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, text TEXT)")
.unwrap();
conn0
.execute("INSERT INTO test_table (id, text) VALUES (965, 'text_877')")
.unwrap();
let mut stmt = conn1.query("SELECT * FROM test_table").unwrap().unwrap();
loop {
let res = stmt.step().unwrap();
match res {
StepResult::Row => {
let row = stmt.row().unwrap();
let text = row.get_value(1).to_text().unwrap();
assert_eq!(text, "text_877");
}
StepResult::Done => break,
StepResult::IO => {
stmt.run_once().unwrap();
}
_ => panic!("Expected Row"),
}
}
}
fn get_record_value(row: &Row) -> ImmutableRecord {
let mut record = ImmutableRecord::new(1024);
record.start_serialization(&row.data);

View File

@@ -971,6 +971,7 @@ impl Pager {
TransactionState::Write { schema_did_change } => (true, schema_did_change),
_ => (false, false),
};
tracing::trace!("end_tx(schema_did_change={})", schema_did_change);
if rollback {
if is_write {
wal.borrow().end_write_tx();

View File

@@ -1981,7 +1981,50 @@ pub fn op_transaction(
let pager = program.get_pager_from_database_index(db);
// 1. We try to upgrade current version
let current_state = conn.transaction_state.get();
let (new_transaction_state, updated) = match (current_state, write) {
// pending state means that we tried beginning a tx and the method returned IO.
// instead of ending the read tx, just update the state to pending.
(TransactionState::PendingUpgrade, write) => {
turso_assert!(
*write,
"pending upgrade should only be set for write transactions"
);
(
TransactionState::Write {
schema_did_change: false,
},
true,
)
}
(TransactionState::Write { schema_did_change }, true) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Write { schema_did_change }, false) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Read, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::None, false) => (TransactionState::Read, true),
};
// 2. Start transaction if needed
if let Some(mv_store) = &mv_store {
// In MVCC we don't have write exclusivity, therefore we just need to start a transaction if needed.
// Programs can run Transaction twice, first with read flag and then with write flag. So a single txid is enough
// for both.
if state.mv_tx_id.is_none() {
// We allocate the first page lazily in the first transaction.
return_if_io!(pager.maybe_allocate_page1());
@@ -1997,43 +2040,6 @@ pub fn op_transaction(
state.mv_tx_id = Some(tx_id);
}
} else {
let current_state = conn.transaction_state.get();
let (new_transaction_state, updated) = match (current_state, write) {
// pending state means that we tried beginning a tx and the method returned IO.
// instead of ending the read tx, just update the state to pending.
(TransactionState::PendingUpgrade, write) => {
turso_assert!(
*write,
"pending upgrade should only be set for write transactions"
);
(
TransactionState::Write {
schema_did_change: false,
},
true,
)
}
(TransactionState::Write { schema_did_change }, true) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Write { schema_did_change }, false) => {
(TransactionState::Write { schema_did_change }, false)
}
(TransactionState::Read, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (
TransactionState::Write {
schema_did_change: false,
},
true,
),
(TransactionState::None, false) => (TransactionState::Read, true),
};
if updated && matches!(current_state, TransactionState::None) {
if let LimboResult::Busy = pager.begin_read_tx()? {
return Ok(InsnFunctionStepResult::Busy);
@@ -2061,36 +2067,38 @@ pub fn op_transaction(
}
}
}
}
// Transaction state should be updated before checking for Schema cookie so that the tx is ended properly on error
if updated {
conn.transaction_state.replace(new_transaction_state);
}
// 3. Transaction state should be updated before checking for Schema cookie so that the tx is ended properly on error
if updated {
conn.transaction_state.replace(new_transaction_state);
}
// Check whether schema has changed if we are actually going to access the database.
if !matches!(new_transaction_state, TransactionState::None) {
let res = pager
.io
.block(|| pager.with_header(|header| header.schema_cookie.get()));
match res {
Ok(header_schema_cookie) => {
if header_schema_cookie != *schema_cookie {
tracing::info!(
"schema changed, force reprepare: {} != {}",
header_schema_cookie,
*schema_cookie
);
return Err(LimboError::SchemaUpdated);
}
}
// This means we are starting a read transaction and page 1 is not allocated yet, so we just continue execution
Err(LimboError::Page1NotAlloc) => {}
Err(err) => {
return Err(err);
}
// 4. Check whether schema has changed if we are actually going to access the database.
// Can only read header if page 1 has been allocated already
// begin_write_tx that happens, but not begin_read_tx
// TODO: this is a hack to make the pager run the IO loop
let res = pager
.io
.block(|| pager.with_header(|header| header.schema_cookie.get()));
match res {
Ok(header_schema_cookie) => {
if header_schema_cookie != *schema_cookie {
tracing::info!(
"schema changed, force reprepare: {} != {}",
header_schema_cookie,
*schema_cookie
);
return Err(LimboError::SchemaUpdated);
}
}
// This means we are starting a read_tx and we do not have a page 1 yet, so we just continue execution
Err(LimboError::Page1NotAlloc) => {}
Err(err) => {
return Err(err);
}
}
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
@@ -6476,16 +6484,14 @@ pub fn op_set_cookie(
header.incremental_vacuum_enabled = (*value as u32).into()
}
Cookie::SchemaVersion => {
if mv_store.is_none() {
// we update transaction state to indicate that the schema has changed
match program.connection.transaction_state.get() {
TransactionState::Write { schema_did_change } => {
program.connection.transaction_state.set(TransactionState::Write { schema_did_change: true });
},
TransactionState::Read => unreachable!("invalid transaction state for SetCookie: TransactionState::Read, should be write"),
TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"),
TransactionState::PendingUpgrade => unreachable!("invalid transaction state for SetCookie: TransactionState::PendingUpgrade, should be write"),
}
// we update transaction state to indicate that the schema has changed
match program.connection.transaction_state.get() {
TransactionState::Write { schema_did_change } => {
program.connection.transaction_state.set(TransactionState::Write { schema_did_change: true });
},
TransactionState::Read => unreachable!("invalid transaction state for SetCookie: TransactionState::Read, should be write"),
TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"),
TransactionState::PendingUpgrade => unreachable!("invalid transaction state for SetCookie: TransactionState::PendingUpgrade, should be write"),
}
program
.connection