mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-06 09:44:21 +01:00
Merge 'Fix MVCC rollback' from Jussi Saurio
Closes #3119 Closes #3121 executing ROLLBACK did not rollback the mv-store transaction Closes #3123
This commit is contained in:
@@ -36,7 +36,9 @@ fn bench(c: &mut Criterion) {
|
||||
b.to_async(FuturesExecutor).iter(|| async {
|
||||
let conn = db.conn.clone();
|
||||
let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone());
|
||||
db.mvcc_store.rollback_tx(tx_id, conn.get_pager().clone())
|
||||
db.mvcc_store
|
||||
.rollback_tx(tx_id, conn.get_pager().clone(), &conn)
|
||||
.unwrap();
|
||||
})
|
||||
});
|
||||
|
||||
|
||||
@@ -1388,17 +1388,28 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `tx_id` - The ID of the transaction to abort.
|
||||
pub fn rollback_tx(&self, tx_id: TxID, pager: Rc<Pager>) {
|
||||
pub fn rollback_tx(
|
||||
&self,
|
||||
tx_id: TxID,
|
||||
pager: Rc<Pager>,
|
||||
connection: &Connection,
|
||||
) -> Result<()> {
|
||||
let tx_unlocked = self.txs.get(&tx_id).unwrap();
|
||||
let tx = tx_unlocked.value();
|
||||
connection.mv_tx.set(None);
|
||||
assert_eq!(tx.state, TransactionState::Active);
|
||||
tx.state.store(TransactionState::Aborted);
|
||||
tracing::trace!("abort(tx_id={})", tx_id);
|
||||
let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
|
||||
|
||||
if self.is_exclusive_tx(&tx_id) {
|
||||
let pager_rollback_done = if self.is_exclusive_tx(&tx_id) {
|
||||
self.commit_coordinator.pager_commit_lock.unlock();
|
||||
self.release_exclusive_tx(&tx_id);
|
||||
}
|
||||
pager.io.block(|| pager.end_tx(true, connection))?;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
for ref id in write_set {
|
||||
if let Some(row_versions) = self.rows.get(id) {
|
||||
@@ -1420,10 +1431,14 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
let tx = tx_unlocked.value();
|
||||
tx.state.store(TransactionState::Terminated);
|
||||
tracing::trace!("terminate(tx_id={})", tx_id);
|
||||
pager.end_read_tx().unwrap();
|
||||
if !pager_rollback_done {
|
||||
pager.end_read_tx()?;
|
||||
}
|
||||
// FIXME: verify that we can already remove the transaction here!
|
||||
// Maybe it's fine for snapshot isolation, but too early for serializable?
|
||||
self.txs.remove(&tx_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns true if the given transaction is the exclusive transaction.
|
||||
|
||||
@@ -296,7 +296,8 @@ fn test_rollback() {
|
||||
.unwrap();
|
||||
assert_eq!(row3, row4);
|
||||
db.mvcc_store
|
||||
.rollback_tx(tx1, db.conn.pager.borrow().clone());
|
||||
.rollback_tx(tx1, db.conn.pager.borrow().clone(), &db.conn)
|
||||
.unwrap();
|
||||
let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
|
||||
let row5 = db
|
||||
.mvcc_store
|
||||
@@ -521,7 +522,9 @@ fn test_lost_update() {
|
||||
Err(LimboError::WriteWriteConflict)
|
||||
));
|
||||
// hack: in the actual tursodb database we rollback the mvcc tx ourselves, so manually roll it back here
|
||||
db.mvcc_store.rollback_tx(tx3, conn3.pager.borrow().clone());
|
||||
db.mvcc_store
|
||||
.rollback_tx(tx3, conn3.pager.borrow().clone(), &conn3)
|
||||
.unwrap();
|
||||
|
||||
commit_tx(db.mvcc_store.clone(), &conn2, tx2).unwrap();
|
||||
assert!(matches!(
|
||||
|
||||
@@ -2300,7 +2300,13 @@ pub fn op_auto_commit(
|
||||
if *auto_commit != conn.auto_commit.get() {
|
||||
if *rollback {
|
||||
// TODO(pere): add rollback I/O logic once we implement rollback journal
|
||||
return_if_io!(pager.end_tx(true, &conn));
|
||||
if let Some(mv_store) = mv_store {
|
||||
if let Some((tx_id, _)) = conn.mv_tx.get() {
|
||||
mv_store.rollback_tx(tx_id, pager.clone(), &conn)?;
|
||||
}
|
||||
} else {
|
||||
return_if_io!(pager.end_tx(true, &conn));
|
||||
}
|
||||
conn.transaction_state.replace(TransactionState::None);
|
||||
conn.auto_commit.replace(true);
|
||||
} else {
|
||||
|
||||
@@ -1080,10 +1080,9 @@ pub fn handle_program_error(
|
||||
_ => {
|
||||
if let Some(mv_store) = mv_store {
|
||||
if let Some((tx_id, _)) = connection.mv_tx.get() {
|
||||
connection.mv_tx.set(None);
|
||||
connection.transaction_state.replace(TransactionState::None);
|
||||
connection.auto_commit.replace(true);
|
||||
mv_store.rollback_tx(tx_id, pager.clone());
|
||||
mv_store.rollback_tx(tx_id, pager.clone(), connection)?;
|
||||
}
|
||||
} else {
|
||||
pager
|
||||
|
||||
Reference in New Issue
Block a user