Merge 'Fix VDBE program abort' from Nikita Sivukhin

This PR add proper program abort in case of unfinished statement reset
and interruption.
Also, this PR makes rollback methods non-failing because otherwise of
their callers usually unclear (if rollback failed - what is the state of
statement/connection/transaction?)

Reviewed-by: Preston Thorpe <preston@turso.tech>

Closes #3591
This commit is contained in:
Pekka Enberg
2025-10-07 09:07:07 +03:00
committed by GitHub
15 changed files with 199 additions and 201 deletions

View File

@@ -36,8 +36,7 @@ fn bench(c: &mut Criterion) {
let conn = db.conn.clone();
let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()).unwrap();
db.mvcc_store
.rollback_tx(tx_id, conn.get_pager().clone(), &conn)
.unwrap();
.rollback_tx(tx_id, conn.get_pager().clone(), &conn);
})
});

View File

@@ -49,8 +49,6 @@ pub enum LimboError {
ExtensionError(String),
#[error("Runtime error: integer overflow")]
IntegerOverflow,
#[error("Schema is locked for write")]
SchemaLocked,
#[error("Runtime error: database table is locked")]
TableLocked,
#[error("Error: Resource is read-only")]

View File

@@ -458,11 +458,6 @@ impl CompiledExpression {
"Expression evaluation produced unexpected row".to_string(),
));
}
crate::vdbe::execute::InsnFunctionStepResult::Interrupt => {
return Err(crate::LimboError::InternalError(
"Expression evaluation was interrupted".to_string(),
));
}
crate::vdbe::execute::InsnFunctionStepResult::Step => {
pc = state.pc as usize;
}

View File

@@ -498,10 +498,7 @@ impl Database {
schema.schema_version = header_schema_cookie;
let result = schema
.make_from_btree(None, pager.clone(), &syms)
.or_else(|e| {
pager.end_read_tx()?;
Err(e)
});
.inspect_err(|_| pager.end_read_tx());
if let Err(LimboError::ExtensionError(e)) = result {
// this means that a vtab exists and we no longer have the module loaded. we print
// a warning to the user to load the module
@@ -559,12 +556,7 @@ impl Database {
let conn = Arc::new(Connection {
db: self.clone(),
pager: RwLock::new(pager),
schema: RwLock::new(
self.schema
.lock()
.map_err(|_| LimboError::SchemaLocked)?
.clone(),
),
schema: RwLock::new(self.schema.lock().unwrap().clone()),
database_schemas: RwLock::new(std::collections::HashMap::new()),
auto_commit: AtomicBool::new(true),
transaction_state: RwLock::new(TransactionState::None),
@@ -836,17 +828,17 @@ impl Database {
#[inline]
pub(crate) fn with_schema_mut<T>(&self, f: impl FnOnce(&mut Schema) -> Result<T>) -> Result<T> {
let mut schema_ref = self.schema.lock().map_err(|_| LimboError::SchemaLocked)?;
let mut schema_ref = self.schema.lock().unwrap();
let schema = Arc::make_mut(&mut *schema_ref);
f(schema)
}
pub(crate) fn clone_schema(&self) -> Result<Arc<Schema>> {
let schema = self.schema.lock().map_err(|_| LimboError::SchemaLocked)?;
Ok(schema.clone())
pub(crate) fn clone_schema(&self) -> Arc<Schema> {
let schema = self.schema.lock().unwrap();
schema.clone()
}
pub(crate) fn update_schema_if_newer(&self, another: Arc<Schema>) -> Result<()> {
let mut schema = self.schema.lock().map_err(|_| LimboError::SchemaLocked)?;
pub(crate) fn update_schema_if_newer(&self, another: Arc<Schema>) {
let mut schema = self.schema.lock().unwrap();
if schema.schema_version < another.schema_version {
tracing::debug!(
"DB schema is outdated: {} < {}",
@@ -861,7 +853,6 @@ impl Database {
another.schema_version
);
}
Ok(())
}
pub fn get_mv_store(&self) -> Option<&Arc<MvStore>> {
@@ -1155,7 +1146,7 @@ impl Connection {
let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end])
.unwrap()
.trim();
self.maybe_update_schema()?;
self.maybe_update_schema();
let pager = self.pager.read().clone();
let mode = QueryMode::new(&cmd);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
@@ -1202,11 +1193,11 @@ impl Connection {
0
}
Err(err) => {
pager.end_read_tx().expect("read txn must be finished");
pager.end_read_tx();
return Err(err);
}
};
pager.end_read_tx().expect("read txn must be finished");
pager.end_read_tx();
let db_schema_version = self.db.schema.lock().unwrap().schema_version;
tracing::debug!(
@@ -1243,13 +1234,14 @@ impl Connection {
// close opened transaction if it was kept open
// (in most cases, it will be automatically closed if stmt was executed properly)
if previous == TransactionState::Read {
pager.end_read_tx().expect("read txn must be finished");
pager.end_read_tx();
}
reparse_result?;
let schema = self.schema.read().clone();
self.db.update_schema_if_newer(schema)
self.db.update_schema_if_newer(schema);
Ok(())
}
fn reparse_schema(self: &Arc<Connection>) -> Result<()> {
@@ -1304,7 +1296,7 @@ impl Connection {
"The supplied SQL string contains no statements".to_string(),
));
}
self.maybe_update_schema()?;
self.maybe_update_schema();
let sql = sql.as_ref();
tracing::trace!("Preparing and executing batch: {}", sql);
let mut parser = Parser::new(sql.as_bytes());
@@ -1338,7 +1330,7 @@ impl Connection {
return Err(LimboError::InternalError("Connection closed".to_string()));
}
let sql = sql.as_ref();
self.maybe_update_schema()?;
self.maybe_update_schema();
tracing::trace!("Querying: {}", sql);
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next_cmd()?;
@@ -1390,7 +1382,7 @@ impl Connection {
return Err(LimboError::InternalError("Connection closed".to_string()));
}
let sql = sql.as_ref();
self.maybe_update_schema()?;
self.maybe_update_schema();
let mut parser = Parser::new(sql.as_bytes());
while let Some(cmd) = parser.next_cmd()? {
let syms = self.syms.read();
@@ -1539,20 +1531,14 @@ impl Connection {
Ok(db)
}
pub fn maybe_update_schema(&self) -> Result<()> {
pub fn maybe_update_schema(&self) {
let current_schema_version = self.schema.read().schema_version;
let schema = self
.db
.schema
.lock()
.map_err(|_| LimboError::SchemaLocked)?;
let schema = self.db.schema.lock().unwrap();
if matches!(self.get_tx_state(), TransactionState::None)
&& current_schema_version != schema.schema_version
{
*self.schema.write() = schema.clone();
}
Ok(())
}
/// Read schema version at current transaction
@@ -1664,7 +1650,7 @@ impl Connection {
let pager = self.pager.read();
pager.begin_read_tx()?;
pager.io.block(|| pager.begin_write_tx()).inspect_err(|_| {
pager.end_read_tx().expect("read txn must be closed");
pager.end_read_tx();
})?;
// start write transaction and disable auto-commit mode as SQL can be executed within WAL session (at caller own risk)
@@ -1712,13 +1698,11 @@ impl Connection {
wal.end_read_tx();
}
let rollback_err = if !force_commit {
if !force_commit {
// remove all non-commited changes in case if WAL session left some suffix without commit frame
pager.rollback(false, self, true).err()
} else {
None
};
if let Some(err) = commit_err.or(rollback_err) {
pager.rollback(false, self, true);
}
if let Some(err) = commit_err {
return Err(err);
}
}
@@ -1762,12 +1746,7 @@ impl Connection {
_ => {
if !self.mvcc_enabled() {
let pager = self.pager.read();
pager.io.block(|| {
pager.end_tx(
true, // rollback = true for close
self,
)
})?;
pager.rollback_tx(self);
}
self.set_tx_state(TransactionState::None);
}
@@ -2074,12 +2053,7 @@ impl Connection {
)));
}
let use_indexes = self
.db
.schema
.lock()
.map_err(|_| LimboError::SchemaLocked)?
.indexes_enabled();
let use_indexes = self.db.schema.lock().unwrap().indexes_enabled();
let use_mvcc = self.db.mv_store.is_some();
let use_views = self.db.experimental_views_enabled();
let use_strict = self.db.experimental_strict_enabled();
@@ -2471,6 +2445,12 @@ pub struct Statement {
busy_timeout: Option<BusyTimeout>,
}
impl Drop for Statement {
fn drop(&mut self) {
self.reset();
}
}
impl Statement {
pub fn new(
program: vdbe::Program,
@@ -2620,7 +2600,8 @@ impl Statement {
fn reprepare(&mut self) -> Result<()> {
tracing::trace!("repreparing statement");
let conn = self.program.connection.clone();
*conn.schema.write() = conn.db.clone_schema()?;
*conn.schema.write() = conn.db.clone_schema();
self.program = {
let mut parser = Parser::new(self.program.sql.as_bytes());
let cmd = parser.next_cmd()?;
@@ -2648,7 +2629,7 @@ impl Statement {
QueryMode::Explain => (EXPLAIN_COLUMNS.len(), 0),
QueryMode::ExplainQueryPlan => (EXPLAIN_QUERY_PLAN_COLUMNS.len(), 0),
};
self._reset(Some(max_registers), Some(cursor_count));
self.reset_internal(Some(max_registers), Some(cursor_count));
// Load the parameters back into the state
self.state.parameters = parameters;
Ok(())
@@ -2670,12 +2651,8 @@ impl Statement {
}
let state = self.program.connection.get_tx_state();
if let TransactionState::Write { .. } = state {
let end_tx_res = self.pager.end_tx(true, &self.program.connection)?;
self.pager.rollback_tx(&self.program.connection);
self.program.connection.set_tx_state(TransactionState::None);
assert!(
matches!(end_tx_res, IOResult::Done(_)),
"end_tx should not return IO as it should just end txn without flushing anything. Got {end_tx_res:?}"
);
}
}
res
@@ -2766,10 +2743,17 @@ impl Statement {
}
pub fn reset(&mut self) {
self._reset(None, None);
self.reset_internal(None, None);
}
pub fn _reset(&mut self, max_registers: Option<usize>, max_cursors: Option<usize>) {
fn reset_internal(&mut self, max_registers: Option<usize>, max_cursors: Option<usize>) {
// as abort uses auto_txn_cleanup value - it needs to be called before state.reset
self.program.abort(
self.mv_store.as_ref(),
&self.pager,
None,
&mut self.state.auto_txn_cleanup,
);
self.state.reset(max_registers, max_cursors);
self.busy = false;
self.busy_timeout = None;

View File

@@ -558,7 +558,7 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
CheckpointState::CommitPagerTxn => {
tracing::debug!("Committing pager transaction");
let result = self.pager.end_tx(false, &self.connection)?;
let result = self.pager.commit_tx(&self.connection)?;
match result {
IOResult::Done(_) => {
self.state = CheckpointState::TruncateLogicalLog;
@@ -652,16 +652,12 @@ impl<Clock: LogicalClock> StateTransition for CheckpointStateMachine<Clock> {
Err(err) => {
tracing::info!("Error in checkpoint state machine: {err}");
if self.lock_states.pager_write_tx {
let rollback = true;
self.pager
.io
.block(|| self.pager.end_tx(rollback, self.connection.as_ref()))
.expect("failed to end pager write tx");
self.pager.rollback_tx(self.connection.as_ref());
if self.update_transaction_state {
*self.connection.transaction_state.write() = TransactionState::None;
}
} else if self.lock_states.pager_read_tx {
self.pager.end_read_tx().unwrap();
self.pager.end_read_tx();
if self.update_transaction_state {
*self.connection.transaction_state.write() = TransactionState::None;
}

View File

@@ -654,7 +654,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
let schema_did_change = self.did_commit_schema_change;
if schema_did_change {
let schema = connection.schema.read().clone();
connection.db.update_schema_if_newer(schema)?;
connection.db.update_schema_if_newer(schema);
}
let tx = mvcc_store.txs.get(&self.tx_id).unwrap();
let tx_unlocked = tx.value();
@@ -1354,7 +1354,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
&self,
pager: Arc<Pager>,
maybe_existing_tx_id: Option<TxID>,
) -> Result<IOResult<TxID>> {
) -> Result<TxID> {
if !self.blocking_checkpoint_lock.read() {
// If there is a stop-the-world checkpoint in progress, we cannot begin any transaction at all.
return Err(LimboError::Busy);
@@ -1390,7 +1390,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
);
tracing::debug!("begin_exclusive_tx: tx_id={} succeeded", tx_id);
self.txs.insert(tx_id, tx);
Ok(IOResult::Done(tx_id))
Ok(tx_id)
}
/// Begins a new transaction in the database.
@@ -1578,12 +1578,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// # Arguments
///
/// * `tx_id` - The ID of the transaction to abort.
pub fn rollback_tx(
&self,
tx_id: TxID,
_pager: Arc<Pager>,
connection: &Connection,
) -> Result<()> {
pub fn rollback_tx(&self, tx_id: TxID, _pager: Arc<Pager>, connection: &Connection) {
let tx_unlocked = self.txs.get(&tx_id).unwrap();
let tx = tx_unlocked.value();
*connection.mv_tx.write() = None;
@@ -1618,7 +1613,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
> connection.db.schema.lock().unwrap().schema_version
{
// Connection made schema changes during tx and rolled back -> revert connection-local schema.
*connection.schema.write() = connection.db.clone_schema()?;
*connection.schema.write() = connection.db.clone_schema();
}
let tx = tx_unlocked.value();
@@ -1627,8 +1622,6 @@ impl<Clock: LogicalClock> MvStore<Clock> {
// FIXME: verify that we can already remove the transaction here!
// Maybe it's fine for snapshot isolation, but too early for serializable?
self.remove_tx(tx_id);
Ok(())
}
/// Returns true if the given transaction is the exclusive transaction.

View File

@@ -347,8 +347,7 @@ fn test_rollback() {
.unwrap();
assert_eq!(row3, row4);
db.mvcc_store
.rollback_tx(tx1, db.conn.pager.read().clone(), &db.conn)
.unwrap();
.rollback_tx(tx1, db.conn.pager.read().clone(), &db.conn);
let tx2 = db
.mvcc_store
.begin_tx(db.conn.pager.read().clone())
@@ -592,8 +591,7 @@ fn test_lost_update() {
));
// hack: in the actual tursodb database we rollback the mvcc tx ourselves, so manually roll it back here
db.mvcc_store
.rollback_tx(tx3, conn3.pager.read().clone(), &conn3)
.unwrap();
.rollback_tx(tx3, conn3.pager.read().clone(), &conn3);
commit_tx(db.mvcc_store.clone(), &conn2, tx2).unwrap();
assert!(matches!(

View File

@@ -472,7 +472,7 @@ impl Schema {
pager.io.block(|| cursor.next())?;
}
pager.end_read_tx()?;
pager.end_read_tx();
self.populate_indices(from_sql_indexes, automatic_indices)?;

View File

@@ -8183,7 +8183,7 @@ mod tests {
// force allocate page1 with a transaction
pager.begin_read_tx().unwrap();
run_until_done(|| pager.begin_write_tx(), &pager).unwrap();
run_until_done(|| pager.end_tx(false, &conn), &pager).unwrap();
run_until_done(|| pager.commit_tx(&conn), &pager).unwrap();
let page2 = run_until_done(|| pager.allocate_page(), &pager).unwrap();
btree_init_page(&page2, PageType::TableLeaf, 0, pager.usable_space());
@@ -8495,7 +8495,7 @@ mod tests {
pager.deref(),
)
.unwrap();
pager.io.block(|| pager.end_tx(false, &conn)).unwrap();
pager.io.block(|| pager.commit_tx(&conn)).unwrap();
pager.begin_read_tx().unwrap();
// FIXME: add sorted vector instead, should be okay for small amounts of keys for now :P, too lazy to fix right now
let _c = cursor.move_to_root().unwrap();
@@ -8524,7 +8524,7 @@ mod tests {
println!("btree after:\n{btree_after}");
panic!("invalid btree");
}
pager.end_read_tx().unwrap();
pager.end_read_tx();
}
pager.begin_read_tx().unwrap();
tracing::info!(
@@ -8546,7 +8546,7 @@ mod tests {
"key {key} is not found, got {cursor_rowid}"
);
}
pager.end_read_tx().unwrap();
pager.end_read_tx();
}
}
@@ -8641,7 +8641,7 @@ mod tests {
if let Some(c) = c {
pager.io.wait_for_completion(c).unwrap();
}
pager.io.block(|| pager.end_tx(false, &conn)).unwrap();
pager.io.block(|| pager.commit_tx(&conn)).unwrap();
}
// Check that all keys can be found by seeking
@@ -8702,7 +8702,7 @@ mod tests {
}
prev = Some(cur);
}
pager.end_read_tx().unwrap();
pager.end_read_tx();
}
}
@@ -8848,7 +8848,7 @@ mod tests {
if let Some(c) = c {
pager.io.wait_for_completion(c).unwrap();
}
pager.io.block(|| pager.end_tx(false, &conn)).unwrap();
pager.io.block(|| pager.commit_tx(&conn)).unwrap();
}
// Final validation
@@ -8856,7 +8856,7 @@ mod tests {
sorted_keys.sort();
validate_expected_keys(&pager, &mut cursor, &sorted_keys, seed);
pager.end_read_tx().unwrap();
pager.end_read_tx();
}
}
@@ -8939,7 +8939,7 @@ mod tests {
"key {key:?} is not found, seed: {seed}"
);
}
pager.end_read_tx().unwrap();
pager.end_read_tx();
}
#[test]

View File

@@ -1161,33 +1161,20 @@ impl Pager {
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn end_tx(
&self,
rollback: bool,
connection: &Connection,
) -> Result<IOResult<PagerCommitResult>> {
pub fn commit_tx(&self, connection: &Connection) -> Result<IOResult<PagerCommitResult>> {
if connection.is_nested_stmt.load(Ordering::SeqCst) {
// Parent statement will handle the transaction rollback.
return Ok(IOResult::Done(PagerCommitResult::Rollback));
}
tracing::trace!("end_tx(rollback={})", rollback);
let Some(wal) = self.wal.as_ref() else {
// TODO: Unsure what the semantics of "end_tx" is for in-memory databases, ephemeral tables and ephemeral indexes.
return Ok(IOResult::Done(PagerCommitResult::Rollback));
};
let (is_write, schema_did_change) = match connection.get_tx_state() {
let (_, schema_did_change) = match connection.get_tx_state() {
TransactionState::Write { schema_did_change } => (true, schema_did_change),
_ => (false, false),
};
tracing::trace!("end_tx(schema_did_change={})", schema_did_change);
if rollback {
if is_write {
wal.borrow().end_write_tx();
}
wal.borrow().end_read_tx();
self.rollback(schema_did_change, connection, is_write)?;
return Ok(IOResult::Done(PagerCommitResult::Rollback));
}
tracing::trace!("commit_tx(schema_did_change={})", schema_did_change);
let commit_status = return_if_io!(self.commit_dirty_pages(
connection.is_wal_auto_checkpoint_disabled(),
connection.get_sync_mode(),
@@ -1198,18 +1185,39 @@ impl Pager {
if schema_did_change {
let schema = connection.schema.read().clone();
connection.db.update_schema_if_newer(schema)?;
connection.db.update_schema_if_newer(schema);
}
Ok(IOResult::Done(commit_status))
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn end_read_tx(&self) -> Result<()> {
pub fn rollback_tx(&self, connection: &Connection) {
if connection.is_nested_stmt.load(Ordering::SeqCst) {
// Parent statement will handle the transaction rollback.
return;
}
let Some(wal) = self.wal.as_ref() else {
return Ok(());
// TODO: Unsure what the semantics of "end_tx" is for in-memory databases, ephemeral tables and ephemeral indexes.
return;
};
let (is_write, schema_did_change) = match connection.get_tx_state() {
TransactionState::Write { schema_did_change } => (true, schema_did_change),
_ => (false, false),
};
tracing::trace!("rollback_tx(schema_did_change={})", schema_did_change);
if is_write {
wal.borrow().end_write_tx();
}
wal.borrow().end_read_tx();
self.rollback(schema_did_change, connection, is_write);
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn end_read_tx(&self) {
let Some(wal) = self.wal.as_ref() else {
return;
};
wal.borrow().end_read_tx();
Ok(())
}
/// Reads a page from disk (either WAL or DB file) bypassing page-cache
@@ -2393,12 +2401,7 @@ impl Pager {
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn rollback(
&self,
schema_did_change: bool,
connection: &Connection,
is_write: bool,
) -> Result<(), LimboError> {
pub fn rollback(&self, schema_did_change: bool, connection: &Connection, is_write: bool) {
tracing::debug!(schema_did_change);
self.clear_page_cache();
if is_write {
@@ -2411,15 +2414,13 @@ impl Pager {
}
self.reset_internal_states();
if schema_did_change {
*connection.schema.write() = connection.db.clone_schema()?;
*connection.schema.write() = connection.db.clone_schema();
}
if is_write {
if let Some(wal) = self.wal.as_ref() {
wal.borrow_mut().rollback()?;
wal.borrow_mut().rollback();
}
}
Ok(())
}
fn reset_internal_states(&self) {
@@ -2764,7 +2765,7 @@ mod ptrmap_tests {
use super::*;
use crate::io::{MemoryIO, OpenFlags, IO};
use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::{DatabaseFile, DatabaseStorage};
use crate::storage::database::DatabaseFile;
use crate::storage::page_cache::PageCache;
use crate::storage::pager::Pager;
use crate::storage::sqlite3_ondisk::PageSize;

View File

@@ -302,7 +302,7 @@ pub trait Wal: Debug {
fn get_checkpoint_seq(&self) -> u32;
fn get_max_frame(&self) -> u64;
fn get_min_frame(&self) -> u64;
fn rollback(&mut self) -> Result<()>;
fn rollback(&mut self);
/// Return unique set of pages changed **after** frame_watermark position and until current WAL session max_frame_no
fn changed_pages_after(&self, frame_watermark: u64) -> Result<Vec<u32>>;
@@ -1351,8 +1351,8 @@ impl Wal for WalFile {
self.min_frame.load(Ordering::Acquire)
}
#[instrument(err, skip_all, level = Level::DEBUG)]
fn rollback(&mut self) -> Result<()> {
#[instrument(skip_all, level = Level::DEBUG)]
fn rollback(&mut self) {
let (max_frame, last_checksum) = {
let shared = self.get_shared();
let max_frame = shared.max_frame.load(Ordering::Acquire);
@@ -1369,7 +1369,6 @@ impl Wal for WalFile {
self.last_checksum = last_checksum;
self.max_frame.store(max_frame, Ordering::Release);
self.reset_internal_states();
Ok(())
}
#[instrument(skip_all, level = Level::DEBUG)]
@@ -2825,7 +2824,7 @@ pub mod test {
}
}
drop(w);
conn2.pager.write().end_read_tx().unwrap();
conn2.pager.write().end_read_tx();
conn1
.execute("create table test(id integer primary key, value text)")

View File

@@ -19,7 +19,7 @@ use crate::types::{
};
use crate::util::normalize_ident;
use crate::vdbe::insn::InsertFlags;
use crate::vdbe::registers_to_ref_values;
use crate::vdbe::{registers_to_ref_values, TxnCleanup};
use crate::vector::{vector_concat, vector_slice};
use crate::{
error::{
@@ -157,7 +157,6 @@ pub enum InsnFunctionStepResult {
Done,
IO(IOCompletions),
Row,
Interrupt,
Step,
}
@@ -2328,7 +2327,7 @@ pub fn op_transaction_inner(
| TransactionMode::Read
| TransactionMode::Concurrent => mv_store.begin_tx(pager.clone())?,
TransactionMode::Write => {
return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None))
mv_store.begin_exclusive_tx(pager.clone(), None)?
}
};
*program.connection.mv_tx.write() = Some((tx_id, *tx_mode));
@@ -2343,7 +2342,7 @@ pub fn op_transaction_inner(
if matches!(new_transaction_state, TransactionState::Write { .. })
&& matches!(actual_tx_mode, TransactionMode::Write)
{
return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id)));
mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id))?;
}
}
} else {
@@ -2359,6 +2358,7 @@ pub fn op_transaction_inner(
"nested stmt should not begin a new read transaction"
);
pager.begin_read_tx()?;
state.auto_txn_cleanup = TxnCleanup::RollbackTxn;
}
if updated && matches!(new_transaction_state, TransactionState::Write { .. }) {
@@ -2372,8 +2372,9 @@ pub fn op_transaction_inner(
// That is, if the transaction had not started, end the read transaction so that next time we
// start a new one.
if matches!(current_state, TransactionState::None) {
pager.end_read_tx()?;
pager.end_read_tx();
conn.set_tx_state(TransactionState::None);
state.auto_txn_cleanup = TxnCleanup::None;
}
assert_eq!(conn.get_tx_state(), current_state);
return Err(LimboError::Busy);
@@ -2456,10 +2457,10 @@ pub fn op_auto_commit(
// TODO(pere): add rollback I/O logic once we implement rollback journal
if let Some(mv_store) = mv_store {
if let Some(tx_id) = conn.get_mv_tx_id() {
mv_store.rollback_tx(tx_id, pager.clone(), &conn)?;
mv_store.rollback_tx(tx_id, pager.clone(), &conn);
}
} else {
return_if_io!(pager.end_tx(true, &conn));
pager.rollback_tx(&conn);
}
conn.set_tx_state(TransactionState::None);
conn.auto_commit.store(true, Ordering::SeqCst);
@@ -8039,7 +8040,7 @@ pub fn op_drop_column(
let schema = conn.schema.read();
for (view_name, view) in schema.views.iter() {
let view_select_sql = format!("SELECT * FROM {view_name}");
conn.prepare(view_select_sql.as_str()).map_err(|e| {
let _ = conn.prepare(view_select_sql.as_str()).map_err(|e| {
LimboError::ParseError(format!(
"cannot drop column \"{}\": referenced in VIEW {view_name}: {}",
column_name, view.sql,
@@ -8170,7 +8171,7 @@ pub fn op_alter_column(
for (view_name, view) in schema.views.iter() {
let view_select_sql = format!("SELECT * FROM {view_name}");
// FIXME: this should rewrite the view to reference the new column name
conn.prepare(view_select_sql.as_str()).map_err(|e| {
let _ = conn.prepare(view_select_sql.as_str()).map_err(|e| {
LimboError::ParseError(format!(
"cannot rename column \"{}\": referenced in VIEW {view_name}: {}",
old_column_name, view.sql,

View File

@@ -30,7 +30,7 @@ use crate::{
function::{AggFunc, FuncCtx},
mvcc::{database::CommitStateMachine, LocalClock},
state_machine::StateMachine,
storage::sqlite3_ondisk::SmallVec,
storage::{pager::PagerCommitResult, sqlite3_ondisk::SmallVec},
translate::{collate::CollationSeq, plan::TableReferences},
types::{IOCompletions, IOResult, RawSlice, TextRef},
vdbe::{
@@ -41,7 +41,7 @@ use crate::{
},
metrics::StatementMetrics,
},
IOExt, RefValue,
RefValue,
};
use crate::{
@@ -265,6 +265,12 @@ pub struct Row {
count: usize,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum TxnCleanup {
None,
RollbackTxn,
}
/// The program state describes the environment in which the program executes.
pub struct ProgramState {
pub io_completions: Option<IOCompletions>,
@@ -302,6 +308,10 @@ pub struct ProgramState {
op_checkpoint_state: OpCheckpointState,
/// State machine for committing view deltas with I/O handling
view_delta_state: ViewDeltaCommitState,
/// Marker which tells about auto transaction cleanup necessary for that connection in case of reset
/// This is used when statement in auto-commit mode reseted after previous uncomplete execution - in which case we may need to rollback transaction started on previous attempt
/// Note, that MVCC transactions are always explicit - so they do not update auto_txn_cleanup marker
pub(crate) auto_txn_cleanup: TxnCleanup,
}
impl ProgramState {
@@ -346,6 +356,7 @@ impl ProgramState {
op_transaction_state: OpTransactionState::Start,
op_checkpoint_state: OpCheckpointState::StartCheckpoint,
view_delta_state: ViewDeltaCommitState::NotStarted,
auto_txn_cleanup: TxnCleanup::None,
}
}
@@ -428,6 +439,7 @@ impl ProgramState {
self.op_column_state = OpColumnState::Start;
self.op_row_id_state = OpRowIdState::Start;
self.view_delta_state = ViewDeltaCommitState::NotStarted;
self.auto_txn_cleanup = TxnCleanup::None;
}
pub fn get_cursor(&mut self, cursor_id: CursorID) -> &mut Cursor {
@@ -533,7 +545,7 @@ impl Program {
// Connection is closed for whatever reason, rollback the transaction.
let state = self.connection.get_tx_state();
if let TransactionState::Write { .. } = state {
pager.io.block(|| pager.end_tx(true, &self.connection))?;
pager.rollback_tx(&self.connection);
}
return Err(LimboError::InternalError("Connection closed".to_string()));
}
@@ -588,7 +600,7 @@ impl Program {
// Connection is closed for whatever reason, rollback the transaction.
let state = self.connection.get_tx_state();
if let TransactionState::Write { .. } = state {
pager.io.block(|| pager.end_tx(true, &self.connection))?;
pager.rollback_tx(&self.connection);
}
return Err(LimboError::InternalError("Connection closed".to_string()));
}
@@ -636,11 +648,12 @@ impl Program {
// Connection is closed for whatever reason, rollback the transaction.
let state = self.connection.get_tx_state();
if let TransactionState::Write { .. } = state {
pager.io.block(|| pager.end_tx(true, &self.connection))?;
pager.rollback_tx(&self.connection);
}
return Err(LimboError::InternalError("Connection closed".to_string()));
}
if state.is_interrupted() {
self.abort(mv_store, &pager, None, &mut state.auto_txn_cleanup);
return Ok(StepResult::Interrupt);
}
if let Some(io) = &state.io_completions {
@@ -649,7 +662,7 @@ impl Program {
}
if let Some(err) = io.get_error() {
let err = err.into();
handle_program_error(&pager, &self.connection, &err, mv_store)?;
self.abort(mv_store, &pager, Some(&err), &mut state.auto_txn_cleanup);
return Err(err);
}
state.io_completions = None;
@@ -672,6 +685,7 @@ impl Program {
Ok(InsnFunctionStepResult::Done) => {
// Instruction completed execution
state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1);
state.auto_txn_cleanup = TxnCleanup::None;
return Ok(StepResult::Done);
}
Ok(InsnFunctionStepResult::IO(io)) => {
@@ -684,16 +698,12 @@ impl Program {
state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1);
return Ok(StepResult::Row);
}
Ok(InsnFunctionStepResult::Interrupt) => {
// Instruction interrupted - may resume at same PC
return Ok(StepResult::Interrupt);
}
Err(LimboError::Busy) => {
// Instruction blocked - will retry at same PC
return Ok(StepResult::Busy);
}
Err(err) => {
handle_program_error(&pager, &self.connection, &err, mv_store)?;
self.abort(mv_store, &pager, Some(&err), &mut state.auto_txn_cleanup);
return Err(err);
}
}
@@ -888,7 +898,7 @@ impl Program {
),
TransactionState::Read => {
connection.set_tx_state(TransactionState::None);
pager.end_read_tx()?;
pager.end_read_tx();
Ok(IOResult::Done(()))
}
TransactionState::None => Ok(IOResult::Done(())),
@@ -914,7 +924,12 @@ impl Program {
connection: &Connection,
rollback: bool,
) -> Result<IOResult<()>> {
let cacheflush_status = pager.end_tx(rollback, connection)?;
let cacheflush_status = if !rollback {
pager.commit_tx(connection)?
} else {
pager.rollback_tx(connection);
IOResult::Done(PagerCommitResult::Rollback)
};
match cacheflush_status {
IOResult::Done(_) => {
if self.change_cnt_on {
@@ -941,6 +956,42 @@ impl Program {
) -> Result<IOResult<()>> {
commit_state.step(mv_store)
}
/// Aborts the program due to various conditions (explicit error, interrupt or reset of unfinished statement) by rolling back the transaction
/// This method is no-op if program was already finished (either aborted or executed to completion)
pub fn abort(
&self,
mv_store: Option<&Arc<MvStore>>,
pager: &Arc<Pager>,
err: Option<&LimboError>,
cleanup: &mut TxnCleanup,
) {
// Errors from nested statements are handled by the parent statement.
if !self.connection.is_nested_stmt.load(Ordering::SeqCst) {
match err {
// Transaction errors, e.g. trying to start a nested transaction, do not cause a rollback.
Some(LimboError::TxError(_)) => {}
// Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback.
Some(LimboError::TableLocked) => {}
// Busy errors do not cause a rollback.
Some(LimboError::Busy) => {}
_ => {
if *cleanup != TxnCleanup::None || err.is_some() {
if let Some(mv_store) = mv_store {
if let Some(tx_id) = self.connection.get_mv_tx_id() {
self.connection.auto_commit.store(true, Ordering::SeqCst);
mv_store.rollback_tx(tx_id, pager.clone(), &self.connection);
}
} else {
pager.rollback_tx(&self.connection);
}
self.connection.set_tx_state(TransactionState::None);
}
}
}
}
*cleanup = TxnCleanup::None;
}
}
fn make_record(registers: &[Register], start_reg: &usize, count: &usize) -> ImmutableRecord {
@@ -1063,42 +1114,3 @@ impl Row {
self.count
}
}
/// Handle a program error by rolling back the transaction
pub fn handle_program_error(
pager: &Arc<Pager>,
connection: &Connection,
err: &LimboError,
mv_store: Option<&Arc<MvStore>>,
) -> Result<()> {
if connection.is_nested_stmt.load(Ordering::SeqCst) {
// Errors from nested statements are handled by the parent statement.
return Ok(());
}
match err {
// Transaction errors, e.g. trying to start a nested transaction, do not cause a rollback.
LimboError::TxError(_) => {}
// Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback.
LimboError::TableLocked => {}
// Busy errors do not cause a rollback.
LimboError::Busy => {}
_ => {
if let Some(mv_store) = mv_store {
if let Some(tx_id) = connection.get_mv_tx_id() {
connection.set_tx_state(TransactionState::None);
connection.auto_commit.store(true, Ordering::SeqCst);
mv_store.rollback_tx(tx_id, pager.clone(), connection)?;
}
} else {
pager
.io
.block(|| pager.end_tx(true, connection))
.inspect_err(|e| {
tracing::error!("end_tx failed: {e}");
})?;
}
connection.set_tx_state(TransactionState::None);
}
}
Ok(())
}