Merge 'core: Wrap Connection::autocommit in AtomicBool' from Pekka Enberg

Reviewed-by: Pere Diaz Bou <pere-altea@homail.com>

Closes #3268
This commit is contained in:
Pekka Enberg
2025-09-23 13:52:10 +03:00
committed by GitHub
5 changed files with 29 additions and 23 deletions

View File

@@ -23,7 +23,7 @@ use crate::Pager;
use crate::{return_and_restore_if_io, return_if_io, LimboError, Result};
use std::collections::HashMap;
use std::fmt::{self, Display, Formatter};
use std::sync::Arc;
use std::sync::{atomic::Ordering, Arc};
// The state table has 5 columns: operator_id, zset_id, element_id, value, weight
const OPERATOR_COLUMNS: usize = 5;
@@ -1507,7 +1507,7 @@ impl DbspCompiler {
let db = Database::open_file(io, ":memory:", false, false)?;
let internal_conn = db.connect()?;
internal_conn.query_only.set(true);
internal_conn.auto_commit.set(false);
internal_conn.auto_commit.store(false, Ordering::SeqCst);
// Create temporary symbol table
let temp_syms = SymbolTable::new();

View File

@@ -8,7 +8,7 @@ use crate::incremental::operator::{
};
use crate::types::IOResult;
use crate::{Connection, Database, Result, Value};
use std::sync::{Arc, Mutex};
use std::sync::{atomic::Ordering, Arc, Mutex};
#[derive(Debug, Clone)]
pub struct ProjectColumn {
@@ -63,7 +63,7 @@ impl ProjectOperator {
let internal_conn = db.connect()?;
// Set to read-only mode and disable auto-commit since we're only evaluating expressions
internal_conn.query_only.set(true);
internal_conn.auto_commit.set(false);
internal_conn.auto_commit.store(false, Ordering::SeqCst);
// Create ProjectColumn structs from compiled expressions
let columns: Vec<ProjectColumn> = compiled_exprs

View File

@@ -70,7 +70,7 @@ use std::{
ops::Deref,
rc::Rc,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, LazyLock, Mutex, Weak,
},
time::Duration,
@@ -500,7 +500,7 @@ impl Database {
.clone(),
),
database_schemas: RwLock::new(std::collections::HashMap::new()),
auto_commit: Cell::new(true),
auto_commit: AtomicBool::new(true),
transaction_state: Cell::new(TransactionState::None),
last_insert_rowid: Cell::new(0),
last_change: Cell::new(0),
@@ -985,7 +985,7 @@ pub struct Connection {
/// Loaded lazily to avoid copying all schemas on connection open
database_schemas: RwLock<std::collections::HashMap<usize, Arc<Schema>>>,
/// Whether to automatically commit transaction
auto_commit: Cell<bool>,
auto_commit: AtomicBool,
transaction_state: Cell<TransactionState>,
last_insert_rowid: Cell<i64>,
last_change: Cell<i64>,
@@ -1543,7 +1543,7 @@ impl Connection {
self.transaction_state.replace(TransactionState::Write {
schema_did_change: false,
});
self.auto_commit.replace(false);
self.auto_commit.store(false, Ordering::SeqCst);
Ok(())
}
@@ -1576,7 +1576,7 @@ impl Connection {
None
};
self.auto_commit.replace(true);
self.auto_commit.store(true, Ordering::SeqCst);
self.transaction_state.replace(TransactionState::None);
{
let wal = wal.borrow_mut();
@@ -1779,7 +1779,7 @@ impl Connection {
}
pub fn get_auto_commit(&self) -> bool {
self.auto_commit.get()
self.auto_commit.load(Ordering::SeqCst)
}
pub fn parse_schema_rows(self: &Arc<Connection>) -> Result<()> {

View File

@@ -38,7 +38,7 @@ use std::env::temp_dir;
use std::ops::DerefMut;
use std::{
borrow::BorrowMut,
sync::{Arc, Mutex},
sync::{atomic::Ordering, Arc, Mutex},
};
use turso_macros::match_ignore_ascii_case;
@@ -369,7 +369,7 @@ pub fn op_checkpoint_inner(
},
insn
);
if !program.connection.auto_commit.get() {
if !program.connection.auto_commit.load(Ordering::SeqCst) {
// TODO: sqlite returns "Runtime error: database table is locked (6)" when a table is in use
// when a checkpoint is attempted. We don't have table locks, so return TableLocked for any
// attempt to checkpoint in an interactive transaction. This does not end the transaction,
@@ -2107,7 +2107,7 @@ pub fn halt(
}
}
let auto_commit = program.connection.auto_commit.get();
let auto_commit = program.connection.auto_commit.load(Ordering::SeqCst);
tracing::trace!("halt(auto_commit={})", auto_commit);
if auto_commit {
program
@@ -2391,7 +2391,7 @@ pub fn op_auto_commit(
.map(Into::into);
}
if *auto_commit != conn.auto_commit.get() {
if *auto_commit != conn.auto_commit.load(Ordering::SeqCst) {
if *rollback {
// TODO(pere): add rollback I/O logic once we implement rollback journal
if let Some(mv_store) = mv_store {
@@ -2402,9 +2402,9 @@ pub fn op_auto_commit(
return_if_io!(pager.end_tx(true, &conn));
}
conn.transaction_state.replace(TransactionState::None);
conn.auto_commit.replace(true);
conn.auto_commit.store(true, Ordering::SeqCst);
} else {
conn.auto_commit.replace(*auto_commit);
conn.auto_commit.store(*auto_commit, Ordering::SeqCst);
}
} else {
let mvcc_tx_active = program.connection.mv_tx.get().is_some();
@@ -6784,8 +6784,8 @@ pub fn op_parse_schema(
let conn = program.connection.clone();
// set auto commit to false in order for parse schema to not commit changes as transaction state is stored in connection,
// and we use the same connection for nested query.
let previous_auto_commit = conn.auto_commit.get();
conn.auto_commit.set(false);
let previous_auto_commit = conn.auto_commit.load(Ordering::SeqCst);
conn.auto_commit.store(false, Ordering::SeqCst);
let maybe_nested_stmt_err = if let Some(where_clause) = where_clause {
let stmt = conn.prepare(format!("SELECT * FROM sqlite_schema WHERE {where_clause}"))?;
@@ -6821,7 +6821,8 @@ pub fn op_parse_schema(
})
};
conn.is_nested_stmt.set(false);
conn.auto_commit.set(previous_auto_commit);
conn.auto_commit
.store(previous_auto_commit, Ordering::SeqCst);
maybe_nested_stmt_err?;
state.pc += 1;
Ok(InsnFunctionStepResult::Step)

View File

@@ -62,7 +62,12 @@ use execute::{
use explain::{insn_to_row_with_comment, EXPLAIN_COLUMNS, EXPLAIN_QUERY_PLAN_COLUMNS};
use regex::Regex;
use std::{cell::Cell, collections::HashMap, num::NonZero, sync::Arc};
use std::{
cell::Cell,
collections::HashMap,
num::NonZero,
sync::{atomic::Ordering, Arc},
};
use tracing::{instrument, Level};
/// State machine for committing view deltas with I/O handling
@@ -825,7 +830,7 @@ impl Program {
return Ok(IOResult::Done(()));
}
let conn = self.connection.clone();
let auto_commit = conn.auto_commit.get();
let auto_commit = conn.auto_commit.load(Ordering::SeqCst);
if auto_commit {
// FIXME: we don't want to commit stuff from other programs.
if matches!(program_state.commit_state, CommitState::Ready) {
@@ -855,7 +860,7 @@ impl Program {
Ok(IOResult::Done(()))
} else {
let connection = self.connection.clone();
let auto_commit = connection.auto_commit.get();
let auto_commit = connection.auto_commit.load(Ordering::SeqCst);
tracing::trace!(
"Halt auto_commit {}, state={:?}",
auto_commit,
@@ -1079,7 +1084,7 @@ pub fn handle_program_error(
if let Some(mv_store) = mv_store {
if let Some((tx_id, _)) = connection.mv_tx.get() {
connection.transaction_state.replace(TransactionState::None);
connection.auto_commit.replace(true);
connection.auto_commit.store(true, Ordering::SeqCst);
mv_store.rollback_tx(tx_id, pager.clone(), connection)?;
}
} else {