mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-24 18:34:34 +01:00
Merge 'BEGIN IMMEDIATE + COMMIT support' from Pekka Enberg
Closes #1000
This commit is contained in:
@@ -46,8 +46,8 @@ The current status of Limbo is:
|
||||
| ALTER TABLE | No | |
|
||||
| ANALYZE | No | |
|
||||
| ATTACH DATABASE | No | |
|
||||
| BEGIN TRANSACTION | No | |
|
||||
| COMMIT TRANSACTION | No | |
|
||||
| BEGIN TRANSACTION | Partial | `BEGIN IMMEDIATE` is only supported mode, transaction names are not supported. |
|
||||
| COMMIT TRANSACTION | Partial | Transaction names are not supported. |
|
||||
| CREATE INDEX | No | |
|
||||
| CREATE TABLE | Partial | |
|
||||
| CREATE TRIGGER | No | |
|
||||
@@ -59,7 +59,7 @@ The current status of Limbo is:
|
||||
| DROP TABLE | No | |
|
||||
| DROP TRIGGER | No | |
|
||||
| DROP VIEW | No | |
|
||||
| END TRANSACTION | No | |
|
||||
| END TRANSACTION | Partial | Alias for `COMMIT TRANSACTION` |
|
||||
| EXPLAIN | Yes | |
|
||||
| INDEXED BY | No | |
|
||||
| INSERT | Partial | |
|
||||
@@ -412,7 +412,7 @@ Modifiers:
|
||||
| AggStep | Yes | |
|
||||
| AggStep | Yes | |
|
||||
| And | Yes | |
|
||||
| AutoCommit | No | |
|
||||
| AutoCommit | Yes | |
|
||||
| BitAnd | Yes | |
|
||||
| BitNot | Yes | |
|
||||
| BitOr | Yes | |
|
||||
|
||||
@@ -19,6 +19,8 @@ pub enum LimboError {
|
||||
ConversionError(String),
|
||||
#[error("Env variable error: {0}")]
|
||||
EnvVarError(#[from] std::env::VarError),
|
||||
#[error("Transaction error: {0}")]
|
||||
TxError(String),
|
||||
#[error("I/O error: {0}")]
|
||||
IOError(#[from] std::io::Error),
|
||||
#[cfg(all(target_os = "linux", feature = "io_uring"))]
|
||||
|
||||
@@ -161,6 +161,7 @@ impl Database {
|
||||
pager,
|
||||
schema: schema.clone(),
|
||||
header,
|
||||
auto_commit: RefCell::new(true),
|
||||
transaction_state: RefCell::new(TransactionState::None),
|
||||
last_insert_rowid: Cell::new(0),
|
||||
last_change: Cell::new(0),
|
||||
@@ -179,6 +180,7 @@ impl Database {
|
||||
schema: self.schema.clone(),
|
||||
header: self.header.clone(),
|
||||
last_insert_rowid: Cell::new(0),
|
||||
auto_commit: RefCell::new(true),
|
||||
transaction_state: RefCell::new(TransactionState::None),
|
||||
last_change: Cell::new(0),
|
||||
total_changes: Cell::new(0),
|
||||
@@ -263,6 +265,7 @@ pub struct Connection {
|
||||
pager: Rc<Pager>,
|
||||
schema: Rc<RefCell<Schema>>,
|
||||
header: Rc<RefCell<DatabaseHeader>>,
|
||||
auto_commit: RefCell<bool>,
|
||||
transaction_state: RefCell<TransactionState>,
|
||||
last_insert_rowid: Cell<u64>,
|
||||
last_change: Cell<i64>,
|
||||
|
||||
@@ -22,6 +22,7 @@ pub(crate) mod pragma;
|
||||
pub(crate) mod result_row;
|
||||
pub(crate) mod select;
|
||||
pub(crate) mod subquery;
|
||||
pub(crate) mod transaction;
|
||||
|
||||
use crate::schema::Schema;
|
||||
use crate::storage::pager::Pager;
|
||||
@@ -38,6 +39,7 @@ use sqlite3_parser::ast::{Delete, Insert};
|
||||
use std::cell::RefCell;
|
||||
use std::fmt::Display;
|
||||
use std::rc::{Rc, Weak};
|
||||
use transaction::{translate_tx_begin, translate_tx_commit};
|
||||
|
||||
/// Translate SQL statement into bytecode program.
|
||||
pub fn translate(
|
||||
@@ -55,8 +57,8 @@ pub fn translate(
|
||||
ast::Stmt::AlterTable(_) => bail_parse_error!("ALTER TABLE not supported yet"),
|
||||
ast::Stmt::Analyze(_) => bail_parse_error!("ANALYZE not supported yet"),
|
||||
ast::Stmt::Attach { .. } => bail_parse_error!("ATTACH not supported yet"),
|
||||
ast::Stmt::Begin(_, _) => bail_parse_error!("BEGIN not supported yet"),
|
||||
ast::Stmt::Commit(_) => bail_parse_error!("COMMIT not supported yet"),
|
||||
ast::Stmt::Begin(tx_type, tx_name) => translate_tx_begin(tx_type, tx_name)?,
|
||||
ast::Stmt::Commit(tx_name) => translate_tx_commit(tx_name)?,
|
||||
ast::Stmt::CreateIndex { .. } => bail_parse_error!("CREATE INDEX not supported yet"),
|
||||
ast::Stmt::CreateTable {
|
||||
temporary,
|
||||
|
||||
58
core/translate/transaction.rs
Normal file
58
core/translate/transaction.rs
Normal file
@@ -0,0 +1,58 @@
|
||||
use crate::translate::{ProgramBuilder, ProgramBuilderOpts};
|
||||
use crate::vdbe::insn::Insn;
|
||||
use crate::{bail_parse_error, QueryMode, Result};
|
||||
use sqlite3_parser::ast::{Name, TransactionType};
|
||||
|
||||
pub fn translate_tx_begin(
|
||||
tx_type: Option<TransactionType>,
|
||||
_tx_name: Option<Name>,
|
||||
) -> Result<ProgramBuilder> {
|
||||
let mut program = ProgramBuilder::new(ProgramBuilderOpts {
|
||||
query_mode: QueryMode::Normal,
|
||||
num_cursors: 0,
|
||||
approx_num_insns: 0,
|
||||
approx_num_labels: 0,
|
||||
});
|
||||
let init_label = program.emit_init();
|
||||
let start_offset = program.offset();
|
||||
let tx_type = tx_type.unwrap_or(TransactionType::Deferred);
|
||||
match tx_type {
|
||||
TransactionType::Deferred => {
|
||||
bail_parse_error!("BEGIN DEFERRED not supported yet");
|
||||
}
|
||||
TransactionType::Exclusive => {
|
||||
bail_parse_error!("BEGIN EXCLUSIVE not supported yet");
|
||||
}
|
||||
TransactionType::Immediate => {
|
||||
program.emit_insn(Insn::Transaction { write: true });
|
||||
// TODO: Emit transaction instruction on temporary tables when we support them.
|
||||
program.emit_insn(Insn::AutoCommit {
|
||||
auto_commit: false,
|
||||
rollback: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
program.emit_halt();
|
||||
program.resolve_label(init_label, program.offset());
|
||||
program.emit_goto(start_offset);
|
||||
Ok(program)
|
||||
}
|
||||
|
||||
pub fn translate_tx_commit(_tx_name: Option<Name>) -> Result<ProgramBuilder> {
|
||||
let mut program = ProgramBuilder::new(ProgramBuilderOpts {
|
||||
query_mode: QueryMode::Normal,
|
||||
num_cursors: 0,
|
||||
approx_num_insns: 0,
|
||||
approx_num_labels: 0,
|
||||
});
|
||||
let init_label = program.emit_init();
|
||||
let start_offset = program.offset();
|
||||
program.emit_insn(Insn::AutoCommit {
|
||||
auto_commit: true,
|
||||
rollback: false,
|
||||
});
|
||||
program.emit_halt();
|
||||
program.resolve_label(init_label, program.offset());
|
||||
program.emit_goto(start_offset);
|
||||
Ok(program)
|
||||
}
|
||||
@@ -448,7 +448,6 @@ impl ProgramBuilder {
|
||||
database_header,
|
||||
comments: self.comments,
|
||||
connection,
|
||||
auto_commit: true,
|
||||
parameters: self.parameters,
|
||||
n_change: Cell::new(0),
|
||||
change_cnt_on,
|
||||
|
||||
@@ -1238,6 +1238,18 @@ pub fn insn_to_str(
|
||||
0,
|
||||
"".to_string(),
|
||||
),
|
||||
Insn::AutoCommit {
|
||||
auto_commit,
|
||||
rollback,
|
||||
} => (
|
||||
"AutoCommit",
|
||||
*auto_commit as i32,
|
||||
*rollback as i32,
|
||||
0,
|
||||
OwnedValue::build_text(""),
|
||||
0,
|
||||
format!("auto_commit={}, rollback={}", auto_commit, rollback),
|
||||
),
|
||||
};
|
||||
format!(
|
||||
"{:<4} {:<17} {:<4} {:<4} {:<4} {:<13} {:<2} {}",
|
||||
|
||||
@@ -320,6 +320,12 @@ pub enum Insn {
|
||||
write: bool,
|
||||
},
|
||||
|
||||
// Set database auto-commit mode and potentially rollback.
|
||||
AutoCommit {
|
||||
auto_commit: bool,
|
||||
rollback: bool,
|
||||
},
|
||||
|
||||
// Branch to the given PC.
|
||||
Goto {
|
||||
target_pc: BranchOffset,
|
||||
|
||||
@@ -411,7 +411,6 @@ pub struct Program {
|
||||
pub comments: Option<HashMap<InsnReference, &'static str>>,
|
||||
pub parameters: crate::parameters::Parameters,
|
||||
pub connection: Weak<Connection>,
|
||||
pub auto_commit: bool,
|
||||
pub n_change: Cell<i64>,
|
||||
pub change_cnt_on: bool,
|
||||
pub result_columns: Vec<ResultSetColumn>,
|
||||
@@ -1131,37 +1130,7 @@ impl Program {
|
||||
)));
|
||||
}
|
||||
}
|
||||
tracing::trace!("Halt auto_commit {}", self.auto_commit);
|
||||
let connection = self
|
||||
.connection
|
||||
.upgrade()
|
||||
.expect("only weak ref to connection?");
|
||||
let current_state = connection.transaction_state.borrow().clone();
|
||||
if current_state == TransactionState::Read {
|
||||
pager.end_read_tx()?;
|
||||
return Ok(StepResult::Done);
|
||||
}
|
||||
return if self.auto_commit {
|
||||
match pager.end_tx() {
|
||||
Ok(crate::storage::wal::CheckpointStatus::IO) => Ok(StepResult::IO),
|
||||
Ok(crate::storage::wal::CheckpointStatus::Done(_)) => {
|
||||
if self.change_cnt_on {
|
||||
if let Some(conn) = self.connection.upgrade() {
|
||||
conn.set_changes(self.n_change.get());
|
||||
}
|
||||
}
|
||||
Ok(StepResult::Done)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
} else {
|
||||
if self.change_cnt_on {
|
||||
if let Some(conn) = self.connection.upgrade() {
|
||||
conn.set_changes(self.n_change.get());
|
||||
}
|
||||
}
|
||||
return Ok(StepResult::Done);
|
||||
};
|
||||
return self.halt(pager);
|
||||
}
|
||||
Insn::Transaction { write } => {
|
||||
let connection = self.connection.upgrade().unwrap();
|
||||
@@ -1195,6 +1164,34 @@ impl Program {
|
||||
}
|
||||
state.pc += 1;
|
||||
}
|
||||
Insn::AutoCommit {
|
||||
auto_commit,
|
||||
rollback,
|
||||
} => {
|
||||
let conn = self.connection.upgrade().unwrap();
|
||||
if *auto_commit != *conn.auto_commit.borrow() {
|
||||
if *rollback {
|
||||
todo!("Rollback is not implemented");
|
||||
} else {
|
||||
conn.auto_commit.replace(*auto_commit);
|
||||
}
|
||||
} else {
|
||||
if !*auto_commit {
|
||||
return Err(LimboError::TxError(
|
||||
"cannot start a transaction within a transaction".to_string(),
|
||||
));
|
||||
} else if *rollback {
|
||||
return Err(LimboError::TxError(
|
||||
"cannot rollback - no transaction is active".to_string(),
|
||||
));
|
||||
} else {
|
||||
return Err(LimboError::TxError(
|
||||
"cannot commit - no transaction is active".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
return self.halt(pager);
|
||||
}
|
||||
Insn::Goto { target_pc } => {
|
||||
assert!(target_pc.is_offset());
|
||||
state.pc = target_pc.to_offset_int();
|
||||
@@ -2745,6 +2742,41 @@ impl Program {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn halt(&self, pager: Rc<Pager>) -> Result<StepResult> {
|
||||
let connection = self
|
||||
.connection
|
||||
.upgrade()
|
||||
.expect("only weak ref to connection?");
|
||||
let auto_commit = *connection.auto_commit.borrow();
|
||||
tracing::trace!("Halt auto_commit {}", auto_commit);
|
||||
let current_state = connection.transaction_state.borrow().clone();
|
||||
if current_state == TransactionState::Read {
|
||||
pager.end_read_tx()?;
|
||||
return Ok(StepResult::Done);
|
||||
}
|
||||
return if auto_commit {
|
||||
match pager.end_tx() {
|
||||
Ok(crate::storage::wal::CheckpointStatus::IO) => Ok(StepResult::IO),
|
||||
Ok(crate::storage::wal::CheckpointStatus::Done(_)) => {
|
||||
if self.change_cnt_on {
|
||||
if let Some(conn) = self.connection.upgrade() {
|
||||
conn.set_changes(self.n_change.get());
|
||||
}
|
||||
}
|
||||
Ok(StepResult::Done)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
} else {
|
||||
if self.change_cnt_on {
|
||||
if let Some(conn) = self.connection.upgrade() {
|
||||
conn.set_changes(self.n_change.get());
|
||||
}
|
||||
}
|
||||
return Ok(StepResult::Done);
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn get_new_rowid<R: Rng>(cursor: &mut BTreeCursor, mut rng: R) -> Result<CursorResult<i64>> {
|
||||
|
||||
@@ -24,4 +24,5 @@ source $testdir/compare.test
|
||||
source $testdir/changes.test
|
||||
source $testdir/total-changes.test
|
||||
source $testdir/offset.test
|
||||
source $testdir/scalar-functions-printf.test
|
||||
source $testdir/scalar-functions-printf.test
|
||||
source $testdir/transactions.test
|
||||
|
||||
8
testing/transactions.test
Executable file
8
testing/transactions.test
Executable file
@@ -0,0 +1,8 @@
|
||||
#!/usr/bin/env tclsh
|
||||
|
||||
set testdir [file dirname $argv0]
|
||||
source $testdir/tester.tcl
|
||||
|
||||
do_execsql_test basic-tx-1 {
|
||||
BEGIN IMMEDIATE; END
|
||||
} {}
|
||||
Reference in New Issue
Block a user