mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-31 13:54:27 +01:00
support deep nestedness
This commit is contained in:
20
core/lib.rs
20
core/lib.rs
@@ -580,7 +580,7 @@ impl Database {
|
||||
mv_tx: RwLock::new(None),
|
||||
view_transaction_states: AllViewsTxState::new(),
|
||||
metrics: RwLock::new(ConnectionMetrics::new()),
|
||||
is_nested_stmt: AtomicBool::new(false),
|
||||
nestedness: AtomicI32::new(0),
|
||||
encryption_key: RwLock::new(None),
|
||||
encryption_cipher_mode: AtomicCipherMode::new(CipherMode::None),
|
||||
sync_mode: AtomicSyncMode::new(SyncMode::Full),
|
||||
@@ -1096,7 +1096,7 @@ pub struct Connection {
|
||||
pub metrics: RwLock<ConnectionMetrics>,
|
||||
/// Whether the connection is executing a statement initiated by another statement.
|
||||
/// Generally this is only true for ParseSchema.
|
||||
is_nested_stmt: AtomicBool,
|
||||
nestedness: AtomicI32,
|
||||
encryption_key: RwLock<Option<EncryptionKey>>,
|
||||
encryption_cipher_mode: AtomicCipherMode,
|
||||
sync_mode: AtomicSyncMode,
|
||||
@@ -1128,6 +1128,15 @@ impl Drop for Connection {
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
pub fn is_nested_stmt(&self) -> bool {
|
||||
self.nestedness.load(Ordering::SeqCst) > 0
|
||||
}
|
||||
pub fn start_nested(&self) {
|
||||
self.nestedness.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
pub fn end_nested(&self) {
|
||||
self.nestedness.fetch_add(-1, Ordering::SeqCst);
|
||||
}
|
||||
pub fn prepare(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Statement> {
|
||||
if self.is_mvcc_bootstrap_connection() {
|
||||
// Never use MV store for bootstrapping - we read state directly from sqlite_schema in the DB file.
|
||||
@@ -2681,12 +2690,7 @@ impl Statement {
|
||||
|
||||
pub fn run_once(&self) -> Result<()> {
|
||||
let res = self.pager.io.step();
|
||||
if self
|
||||
.program
|
||||
.connection
|
||||
.is_nested_stmt
|
||||
.load(Ordering::SeqCst)
|
||||
{
|
||||
if self.program.connection.is_nested_stmt() {
|
||||
return res;
|
||||
}
|
||||
if res.is_err() {
|
||||
|
||||
@@ -1443,8 +1443,8 @@ impl Pager {
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn commit_tx(&self, connection: &Connection) -> Result<IOResult<PagerCommitResult>> {
|
||||
if connection.is_nested_stmt.load(Ordering::SeqCst) {
|
||||
// Parent statement will handle the transaction rollback.
|
||||
if connection.is_nested_stmt() {
|
||||
// Parent statement will handle the transaction commit.
|
||||
return Ok(IOResult::Done(PagerCommitResult::Rollback));
|
||||
}
|
||||
let Some(wal) = self.wal.as_ref() else {
|
||||
@@ -1473,7 +1473,7 @@ impl Pager {
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn rollback_tx(&self, connection: &Connection) {
|
||||
if connection.is_nested_stmt.load(Ordering::SeqCst) {
|
||||
if connection.is_nested_stmt() {
|
||||
// Parent statement will handle the transaction rollback.
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -2274,8 +2274,7 @@ pub fn op_transaction_inner(
|
||||
|
||||
// 1. We try to upgrade current version
|
||||
let current_state = conn.get_tx_state();
|
||||
let (new_transaction_state, updated) = if conn.is_nested_stmt.load(Ordering::SeqCst)
|
||||
{
|
||||
let (new_transaction_state, updated) = if conn.is_nested_stmt() {
|
||||
(current_state, false)
|
||||
} else {
|
||||
match (current_state, write) {
|
||||
@@ -2365,7 +2364,7 @@ pub fn op_transaction_inner(
|
||||
}
|
||||
if updated && matches!(current_state, TransactionState::None) {
|
||||
turso_assert!(
|
||||
!conn.is_nested_stmt.load(Ordering::SeqCst),
|
||||
!conn.is_nested_stmt(),
|
||||
"nested stmt should not begin a new read transaction"
|
||||
);
|
||||
pager.begin_read_tx()?;
|
||||
@@ -2374,7 +2373,7 @@ pub fn op_transaction_inner(
|
||||
|
||||
if updated && matches!(new_transaction_state, TransactionState::Write { .. }) {
|
||||
turso_assert!(
|
||||
!conn.is_nested_stmt.load(Ordering::SeqCst),
|
||||
!conn.is_nested_stmt(),
|
||||
"nested stmt should not begin a new write transaction"
|
||||
);
|
||||
let begin_w_tx_res = pager.begin_write_tx();
|
||||
@@ -7303,7 +7302,7 @@ pub fn op_parse_schema(
|
||||
conn.with_schema_mut(|schema| {
|
||||
// TODO: This function below is synchronous, make it async
|
||||
let existing_views = schema.incremental_views.clone();
|
||||
conn.is_nested_stmt.store(true, Ordering::SeqCst);
|
||||
conn.start_nested();
|
||||
parse_schema_rows(
|
||||
stmt,
|
||||
schema,
|
||||
@@ -7318,7 +7317,7 @@ pub fn op_parse_schema(
|
||||
conn.with_schema_mut(|schema| {
|
||||
// TODO: This function below is synchronous, make it async
|
||||
let existing_views = schema.incremental_views.clone();
|
||||
conn.is_nested_stmt.store(true, Ordering::SeqCst);
|
||||
conn.start_nested();
|
||||
parse_schema_rows(
|
||||
stmt,
|
||||
schema,
|
||||
@@ -7328,7 +7327,7 @@ pub fn op_parse_schema(
|
||||
)
|
||||
})
|
||||
};
|
||||
conn.is_nested_stmt.store(false, Ordering::SeqCst);
|
||||
conn.end_nested();
|
||||
conn.auto_commit
|
||||
.store(previous_auto_commit, Ordering::SeqCst);
|
||||
maybe_nested_stmt_err?;
|
||||
|
||||
@@ -917,11 +917,11 @@ impl Program {
|
||||
// hence the mv_store.is_none() check.
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
if self.connection.is_nested_stmt() {
|
||||
// We don't want to commit on nested statements. Let parent handle it.
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
if let Some(mv_store) = mv_store {
|
||||
if self.connection.is_nested_stmt.load(Ordering::SeqCst) {
|
||||
// We don't want to commit on nested statements. Let parent handle it.
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
let conn = self.connection.clone();
|
||||
let auto_commit = conn.auto_commit.load(Ordering::SeqCst);
|
||||
if auto_commit {
|
||||
@@ -1050,7 +1050,7 @@ impl Program {
|
||||
cleanup: &mut TxnCleanup,
|
||||
) {
|
||||
// Errors from nested statements are handled by the parent statement.
|
||||
if !self.connection.is_nested_stmt.load(Ordering::SeqCst) {
|
||||
if !self.connection.is_nested_stmt() {
|
||||
match err {
|
||||
// Transaction errors, e.g. trying to start a nested transaction, do not cause a rollback.
|
||||
Some(LimboError::TxError(_)) => {}
|
||||
|
||||
Reference in New Issue
Block a user