Merge 'core/mvcc: store txid in conn and reset transaction state on commit ' from Pere Diaz Bou

We were storing `txid` in `ProgramState`, this meant it was impossible
to track interactive transactions. This was extracted to `Connection`
instead.
Moreover, transaction state for mvcc now is reset on commit.

Closes #2689
This commit is contained in:
Pekka Enberg
2025-08-20 16:51:41 +03:00
committed by GitHub
5 changed files with 59 additions and 10 deletions

View File

@@ -423,6 +423,7 @@ impl Database {
closed: Cell::new(false),
attached_databases: RefCell::new(DatabaseCatalog::new()),
query_only: Cell::new(false),
mv_tx_id: Cell::new(None),
view_transaction_states: RefCell::new(HashMap::new()),
metrics: RefCell::new(ConnectionMetrics::new()),
is_nested_stmt: Cell::new(false),
@@ -826,6 +827,7 @@ pub struct Connection {
database_schemas: RefCell<std::collections::HashMap<usize, Arc<Schema>>>,
/// Whether to automatically commit transaction
auto_commit: Cell<bool>,
/// Transactions that are in progress.
mv_transactions: RefCell<Vec<crate::mvcc::database::TxID>>,
transaction_state: Cell<TransactionState>,
last_insert_rowid: Cell<i64>,
@@ -845,6 +847,7 @@ pub struct Connection {
/// Attached databases
attached_databases: RefCell<DatabaseCatalog>,
query_only: Cell<bool>,
pub(crate) mv_tx_id: Cell<Option<crate::mvcc::database::TxID>>,
/// Per-connection view transaction states for uncommitted changes. This represents
/// one entry per view that was touched in the transaction.
@@ -1966,7 +1969,7 @@ impl Statement {
}
pub fn set_mv_tx_id(&mut self, mv_tx_id: Option<u64>) {
self.state.mv_tx_id = mv_tx_id;
self.program.connection.mv_tx_id.set(mv_tx_id);
}
pub fn interrupt(&mut self) {

View File

@@ -1127,6 +1127,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pager: Rc<Pager>,
connection: &Arc<Connection>,
) -> Result<StateMachine<CommitStateMachine<Clock>>> {
tracing::trace!("commit_tx(tx_id={})", tx_id);
let state_machine: StateMachine<CommitStateMachine<Clock>> = StateMachine::<
CommitStateMachine<Clock>,
>::new(

View File

@@ -1198,3 +1198,48 @@ fn get_record_value(row: &Row) -> ImmutableRecord {
record.start_serialization(&row.data);
record
}
#[test]
fn test_interactive_transaction() {
let db = MvccTestDbNoConn::new_with_random_db();
let conn = db.connect();
// do some transaction
conn.execute("BEGIN").unwrap();
conn.execute("CREATE TABLE test (x)").unwrap();
conn.execute("INSERT INTO test (x) VALUES (1)").unwrap();
conn.execute("INSERT INTO test (x) VALUES (2)").unwrap();
conn.execute("COMMIT").unwrap();
// expect other transaction to see the changes
let rows = get_rows(&conn, "SELECT * FROM test");
assert_eq!(rows, vec![vec![Value::Integer(1)], vec![Value::Integer(2)]]);
}
#[test]
fn test_commit_without_tx() {
let db = MvccTestDbNoConn::new_with_random_db();
let conn = db.connect();
// do not start interactive transaction
conn.execute("CREATE TABLE test (x)").unwrap();
conn.execute("INSERT INTO test (x) VALUES (1)").unwrap();
// expect error on trying to commit a non-existent interactive transaction
let err = conn.execute("COMMIT").unwrap_err();
if let LimboError::TxError(e) = err {
assert_eq!(e.to_string(), "cannot commit - no transaction is active");
} else {
panic!("Expected TxError");
}
}
fn get_rows(conn: &Arc<Connection>, query: &str) -> Vec<Vec<Value>> {
let mut stmt = conn.prepare(query).unwrap();
let mut rows = Vec::new();
while let StepResult::Row = stmt.step().unwrap() {
let row = stmt.row().unwrap();
let values = row.get_values().cloned().collect::<Vec<_>>();
rows.push(values);
}
rows
}

View File

@@ -937,7 +937,7 @@ pub fn op_open_read(
let pager = program.get_pager_from_database_index(db);
let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap();
let mv_cursor = match state.mv_tx_id {
let mv_cursor = match program.connection.mv_tx_id.get() {
Some(tx_id) => {
let table_id = *root_page as u64;
let mv_store = mv_store.unwrap().clone();
@@ -2061,7 +2061,7 @@ pub fn op_transaction(
// In MVCC we don't have write exclusivity, therefore we just need to start a transaction if needed.
// Programs can run Transaction twice, first with read flag and then with write flag. So a single txid is enough
// for both.
if state.mv_tx_id.is_none() {
if program.connection.mv_tx_id.get().is_none() {
// We allocate the first page lazily in the first transaction.
return_if_io!(pager.maybe_allocate_page1());
// TODO: when we fix MVCC enable schema cookie detection for reprepare statements
@@ -2073,7 +2073,7 @@ pub fn op_transaction(
// }
let tx_id = mv_store.begin_tx(pager.clone());
conn.mv_transactions.borrow_mut().push(tx_id);
state.mv_tx_id = Some(tx_id);
program.connection.mv_tx_id.set(Some(tx_id));
}
} else {
if updated && matches!(current_state, TransactionState::None) {
@@ -6195,7 +6195,7 @@ pub fn op_open_write(
CursorType::BTreeIndex(index) => Some(index),
_ => None,
};
let mv_cursor = match state.mv_tx_id {
let mv_cursor = match program.connection.mv_tx_id.get() {
Some(tx_id) => {
let table_id = root_page;
let mv_store = mv_store.unwrap().clone();
@@ -6466,7 +6466,7 @@ pub fn op_parse_schema(
stmt,
schema,
&conn.syms.borrow(),
state.mv_tx_id,
program.connection.mv_tx_id.get(),
existing_views,
)
})
@@ -6481,7 +6481,7 @@ pub fn op_parse_schema(
stmt,
schema,
&conn.syms.borrow(),
state.mv_tx_id,
program.connection.mv_tx_id.get(),
existing_views,
)
})
@@ -6860,7 +6860,7 @@ pub fn op_open_ephemeral(
let root_page = return_if_io!(pager.btree_create(flag));
let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap();
let mv_cursor = match state.mv_tx_id {
let mv_cursor = match program.connection.mv_tx_id.get() {
Some(tx_id) => {
let table_id = root_page as u64;
let mv_store = mv_store.unwrap().clone();

View File

@@ -250,7 +250,6 @@ pub struct ProgramState {
/// Indicate whether an [Insn::Once] instruction at a given program counter position has already been executed, well, once.
once: SmallVec<u32, 4>,
regex_cache: RegexCache,
pub(crate) mv_tx_id: Option<crate::mvcc::database::TxID>,
interrupted: bool,
pub parameters: HashMap<NonZero<usize>, Value>,
commit_state: CommitState,
@@ -289,7 +288,6 @@ impl ProgramState {
ended_coroutine: Bitfield::new(),
once: SmallVec::<u32, 4>::new(),
regex_cache: RegexCache::new(),
mv_tx_id: None,
interrupted: false,
parameters: HashMap::new(),
commit_state: CommitState::Ready,
@@ -546,6 +544,8 @@ impl Program {
}
assert!(state_machine.is_finalized());
}
conn.mv_tx_id.set(None);
conn.transaction_state.replace(TransactionState::None);
mv_transactions.clear();
}
Ok(IOResult::Done(()))