diff --git a/core/incremental/compiler.rs b/core/incremental/compiler.rs index a1734a29a..f375d2f95 100644 --- a/core/incremental/compiler.rs +++ b/core/incremental/compiler.rs @@ -23,7 +23,7 @@ use crate::Pager; use crate::{return_and_restore_if_io, return_if_io, LimboError, Result}; use std::collections::HashMap; use std::fmt::{self, Display, Formatter}; -use std::sync::Arc; +use std::sync::{atomic::Ordering, Arc}; // The state table has 5 columns: operator_id, zset_id, element_id, value, weight const OPERATOR_COLUMNS: usize = 5; @@ -1507,7 +1507,7 @@ impl DbspCompiler { let db = Database::open_file(io, ":memory:", false, false)?; let internal_conn = db.connect()?; internal_conn.query_only.set(true); - internal_conn.auto_commit.set(false); + internal_conn.auto_commit.store(false, Ordering::SeqCst); // Create temporary symbol table let temp_syms = SymbolTable::new(); diff --git a/core/incremental/project_operator.rs b/core/incremental/project_operator.rs index ffa131323..435f6d90a 100644 --- a/core/incremental/project_operator.rs +++ b/core/incremental/project_operator.rs @@ -8,7 +8,7 @@ use crate::incremental::operator::{ }; use crate::types::IOResult; use crate::{Connection, Database, Result, Value}; -use std::sync::{Arc, Mutex}; +use std::sync::{atomic::Ordering, Arc, Mutex}; #[derive(Debug, Clone)] pub struct ProjectColumn { @@ -63,7 +63,7 @@ impl ProjectOperator { let internal_conn = db.connect()?; // Set to read-only mode and disable auto-commit since we're only evaluating expressions internal_conn.query_only.set(true); - internal_conn.auto_commit.set(false); + internal_conn.auto_commit.store(false, Ordering::SeqCst); // Create ProjectColumn structs from compiled expressions let columns: Vec = compiled_exprs diff --git a/core/lib.rs b/core/lib.rs index 6cd215bc5..320099e92 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -70,7 +70,7 @@ use std::{ ops::Deref, rc::Rc, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, LazyLock, Mutex, Weak, }, time::Duration, @@ -500,7 +500,7 @@ impl Database { .clone(), ), database_schemas: RwLock::new(std::collections::HashMap::new()), - auto_commit: Cell::new(true), + auto_commit: AtomicBool::new(true), transaction_state: Cell::new(TransactionState::None), last_insert_rowid: Cell::new(0), last_change: Cell::new(0), @@ -985,7 +985,7 @@ pub struct Connection { /// Loaded lazily to avoid copying all schemas on connection open database_schemas: RwLock>>, /// Whether to automatically commit transaction - auto_commit: Cell, + auto_commit: AtomicBool, transaction_state: Cell, last_insert_rowid: Cell, last_change: Cell, @@ -1543,7 +1543,7 @@ impl Connection { self.transaction_state.replace(TransactionState::Write { schema_did_change: false, }); - self.auto_commit.replace(false); + self.auto_commit.store(false, Ordering::SeqCst); Ok(()) } @@ -1576,7 +1576,7 @@ impl Connection { None }; - self.auto_commit.replace(true); + self.auto_commit.store(true, Ordering::SeqCst); self.transaction_state.replace(TransactionState::None); { let wal = wal.borrow_mut(); @@ -1779,7 +1779,7 @@ impl Connection { } pub fn get_auto_commit(&self) -> bool { - self.auto_commit.get() + self.auto_commit.load(Ordering::SeqCst) } pub fn parse_schema_rows(self: &Arc) -> Result<()> { diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 4ebb9d966..289d6a1d3 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -38,7 +38,7 @@ use std::env::temp_dir; use std::ops::DerefMut; use std::{ borrow::BorrowMut, - sync::{Arc, Mutex}, + sync::{atomic::Ordering, Arc, Mutex}, }; use turso_macros::match_ignore_ascii_case; @@ -369,7 +369,7 @@ pub fn op_checkpoint_inner( }, insn ); - if !program.connection.auto_commit.get() { + if !program.connection.auto_commit.load(Ordering::SeqCst) { // TODO: sqlite returns "Runtime error: database table is locked (6)" when a table is in use // when a checkpoint is attempted. We don't have table locks, so return TableLocked for any // attempt to checkpoint in an interactive transaction. This does not end the transaction, @@ -2107,7 +2107,7 @@ pub fn halt( } } - let auto_commit = program.connection.auto_commit.get(); + let auto_commit = program.connection.auto_commit.load(Ordering::SeqCst); tracing::trace!("halt(auto_commit={})", auto_commit); if auto_commit { program @@ -2391,7 +2391,7 @@ pub fn op_auto_commit( .map(Into::into); } - if *auto_commit != conn.auto_commit.get() { + if *auto_commit != conn.auto_commit.load(Ordering::SeqCst) { if *rollback { // TODO(pere): add rollback I/O logic once we implement rollback journal if let Some(mv_store) = mv_store { @@ -2402,9 +2402,9 @@ pub fn op_auto_commit( return_if_io!(pager.end_tx(true, &conn)); } conn.transaction_state.replace(TransactionState::None); - conn.auto_commit.replace(true); + conn.auto_commit.store(true, Ordering::SeqCst); } else { - conn.auto_commit.replace(*auto_commit); + conn.auto_commit.store(*auto_commit, Ordering::SeqCst); } } else { let mvcc_tx_active = program.connection.mv_tx.get().is_some(); @@ -6784,8 +6784,8 @@ pub fn op_parse_schema( let conn = program.connection.clone(); // set auto commit to false in order for parse schema to not commit changes as transaction state is stored in connection, // and we use the same connection for nested query. - let previous_auto_commit = conn.auto_commit.get(); - conn.auto_commit.set(false); + let previous_auto_commit = conn.auto_commit.load(Ordering::SeqCst); + conn.auto_commit.store(false, Ordering::SeqCst); let maybe_nested_stmt_err = if let Some(where_clause) = where_clause { let stmt = conn.prepare(format!("SELECT * FROM sqlite_schema WHERE {where_clause}"))?; @@ -6821,7 +6821,8 @@ pub fn op_parse_schema( }) }; conn.is_nested_stmt.set(false); - conn.auto_commit.set(previous_auto_commit); + conn.auto_commit + .store(previous_auto_commit, Ordering::SeqCst); maybe_nested_stmt_err?; state.pc += 1; Ok(InsnFunctionStepResult::Step) diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 6b4f2223d..eecc4bb4a 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -62,7 +62,12 @@ use execute::{ use explain::{insn_to_row_with_comment, EXPLAIN_COLUMNS, EXPLAIN_QUERY_PLAN_COLUMNS}; use regex::Regex; -use std::{cell::Cell, collections::HashMap, num::NonZero, sync::Arc}; +use std::{ + cell::Cell, + collections::HashMap, + num::NonZero, + sync::{atomic::Ordering, Arc}, +}; use tracing::{instrument, Level}; /// State machine for committing view deltas with I/O handling @@ -825,7 +830,7 @@ impl Program { return Ok(IOResult::Done(())); } let conn = self.connection.clone(); - let auto_commit = conn.auto_commit.get(); + let auto_commit = conn.auto_commit.load(Ordering::SeqCst); if auto_commit { // FIXME: we don't want to commit stuff from other programs. if matches!(program_state.commit_state, CommitState::Ready) { @@ -855,7 +860,7 @@ impl Program { Ok(IOResult::Done(())) } else { let connection = self.connection.clone(); - let auto_commit = connection.auto_commit.get(); + let auto_commit = connection.auto_commit.load(Ordering::SeqCst); tracing::trace!( "Halt auto_commit {}, state={:?}", auto_commit, @@ -1079,7 +1084,7 @@ pub fn handle_program_error( if let Some(mv_store) = mv_store { if let Some((tx_id, _)) = connection.mv_tx.get() { connection.transaction_state.replace(TransactionState::None); - connection.auto_commit.replace(true); + connection.auto_commit.store(true, Ordering::SeqCst); mv_store.rollback_tx(tx_id, pager.clone(), connection)?; } } else {