core/vdbe: Integrate MVCC transactions

This commit is contained in:
Pekka Enberg
2025-03-06 09:24:18 +02:00
parent 461be0dc87
commit ef32a82941
2 changed files with 112 additions and 64 deletions

View File

@@ -191,6 +191,7 @@ impl Database {
header: self.header.clone(),
last_insert_rowid: Cell::new(0),
auto_commit: RefCell::new(true),
mv_transactions: RefCell::new(Vec::new()),
transaction_state: RefCell::new(TransactionState::None),
last_change: Cell::new(0),
syms: RefCell::new(SymbolTable::new()),
@@ -257,6 +258,7 @@ pub struct Connection {
schema: Arc<RwLock<Schema>>,
header: Arc<Mutex<DatabaseHeader>>,
auto_commit: RefCell<bool>,
mv_transactions: RefCell<Vec<crate::mvcc::database::TxID>>,
transaction_state: RefCell<TransactionState>,
last_insert_rowid: Cell<u64>,
last_change: Cell<i64>,
@@ -287,7 +289,11 @@ impl Connection {
&syms,
QueryMode::Normal,
)?);
Ok(Statement::new(program, self.pager.clone()))
Ok(Statement::new(
program,
self._db.mv_store.clone(),
self.pager.clone(),
))
}
Cmd::Explain(_stmt) => todo!(),
Cmd::ExplainQueryPlan(_stmt) => todo!(),
@@ -325,7 +331,7 @@ impl Connection {
&syms,
QueryMode::Normal,
)?);
let stmt = Statement::new(program, self.pager.clone());
let stmt = Statement::new(program, self._db.mv_store.clone(), self.pager.clone());
Ok(Some(stmt))
}
Cmd::Explain(stmt) => {
@@ -420,7 +426,7 @@ impl Connection {
let mut state =
vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len());
program.step(&mut state, self.pager.clone())?;
program.step(&mut state, self._db.mv_store.clone(), self.pager.clone())?;
}
}
}
@@ -502,15 +508,21 @@ impl Connection {
pub struct Statement {
program: Rc<vdbe::Program>,
state: vdbe::ProgramState,
mv_store: Option<Rc<MvStore>>,
pager: Rc<Pager>,
}
impl Statement {
pub fn new(program: Rc<vdbe::Program>, pager: Rc<Pager>) -> Self {
pub fn new(
program: Rc<vdbe::Program>,
mv_store: Option<Rc<MvStore>>,
pager: Rc<Pager>,
) -> Self {
let state = vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len());
Self {
program,
state,
mv_store,
pager,
}
}
@@ -520,7 +532,8 @@ impl Statement {
}
pub fn step(&mut self) -> Result<StepResult> {
self.program.step(&mut self.state, self.pager.clone())
self.program
.step(&mut self.state, self.mv_store.clone(), self.pager.clone())
}
pub fn num_columns(&self) -> usize {

View File

@@ -55,7 +55,7 @@ use crate::{
json::json_quote, json::json_remove, json::json_set, json::json_type,
};
use crate::{info, CheckpointStatus};
use crate::{resolve_ext_path, Connection, Result, TransactionState, DATABASE_VERSION};
use crate::{resolve_ext_path, Connection, MvStore, Result, TransactionState, DATABASE_VERSION};
use insn::{
exec_add, exec_and, exec_bit_and, exec_bit_not, exec_bit_or, exec_boolean_not, exec_concat,
exec_divide, exec_multiply, exec_or, exec_remainder, exec_shift_left, exec_shift_right,
@@ -235,6 +235,7 @@ pub struct ProgramState {
deferred_seek: Option<(CursorID, CursorID)>,
ended_coroutine: Bitfield<4>, // flag to indicate that a coroutine has ended (key is the yield register. currently we assume that the yield register is always between 0-255, YOLO)
regex_cache: RegexCache,
mv_tx_id: Option<crate::mvcc::database::TxID>,
interrupted: bool,
parameters: HashMap<NonZero<usize>, OwnedValue>,
halt_state: Option<HaltState>,
@@ -254,6 +255,7 @@ impl ProgramState {
deferred_seek: None,
ended_coroutine: Bitfield::new(),
regex_cache: RegexCache::new(),
mv_tx_id: None,
interrupted: false,
parameters: HashMap::new(),
halt_state: None,
@@ -358,7 +360,12 @@ impl Program {
}
#[instrument(skip_all)]
pub fn step(&self, state: &mut ProgramState, pager: Rc<Pager>) -> Result<StepResult> {
pub fn step(
&self,
state: &mut ProgramState,
mv_store: Option<Rc<MvStore>>,
pager: Rc<Pager>,
) -> Result<StepResult> {
loop {
if state.is_interrupted() {
return Ok(StepResult::Interrupt);
@@ -1204,36 +1211,49 @@ impl Program {
)));
}
}
return self.halt(pager, state);
return self.halt(pager, state, mv_store);
}
Insn::Transaction { write } => {
let connection = self.connection.upgrade().unwrap();
let current_state = connection.transaction_state.borrow().clone();
let (new_transaction_state, updated) = match (&current_state, write) {
(TransactionState::Write, true) => (TransactionState::Write, false),
(TransactionState::Write, false) => (TransactionState::Write, false),
(TransactionState::Read, true) => (TransactionState::Write, true),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (TransactionState::Write, true),
(TransactionState::None, false) => (TransactionState::Read, true),
};
if updated && matches!(current_state, TransactionState::None) {
if let LimboResult::Busy = pager.begin_read_tx()? {
return Ok(StepResult::Busy);
if let Some(mv_store) = &mv_store {
if state.mv_tx_id.is_none() {
let tx_id = mv_store.begin_tx();
self.connection
.upgrade()
.unwrap()
.mv_transactions
.borrow_mut()
.push(tx_id);
state.mv_tx_id = Some(tx_id);
}
}
} else {
let connection = self.connection.upgrade().unwrap();
let current_state = connection.transaction_state.borrow().clone();
let (new_transaction_state, updated) = match (&current_state, write) {
(TransactionState::Write, true) => (TransactionState::Write, false),
(TransactionState::Write, false) => (TransactionState::Write, false),
(TransactionState::Read, true) => (TransactionState::Write, true),
(TransactionState::Read, false) => (TransactionState::Read, false),
(TransactionState::None, true) => (TransactionState::Write, true),
(TransactionState::None, false) => (TransactionState::Read, true),
};
if updated && matches!(new_transaction_state, TransactionState::Write) {
if let LimboResult::Busy = pager.begin_write_tx()? {
tracing::trace!("begin_write_tx busy");
return Ok(StepResult::Busy);
if updated && matches!(current_state, TransactionState::None) {
if let LimboResult::Busy = pager.begin_read_tx()? {
return Ok(StepResult::Busy);
}
}
if updated && matches!(new_transaction_state, TransactionState::Write) {
if let LimboResult::Busy = pager.begin_write_tx()? {
tracing::trace!("begin_write_tx busy");
return Ok(StepResult::Busy);
}
}
if updated {
connection
.transaction_state
.replace(new_transaction_state.clone());
}
}
if updated {
connection
.transaction_state
.replace(new_transaction_state.clone());
}
state.pc += 1;
}
@@ -1261,7 +1281,7 @@ impl Program {
"cannot commit - no transaction is active".to_string(),
));
}
return self.halt(pager, state);
return self.halt(pager, state, mv_store);
}
Insn::Goto { target_pc } => {
assert!(target_pc.is_offset());
@@ -3084,41 +3104,56 @@ impl Program {
}
}
fn halt(&self, pager: Rc<Pager>, program_state: &mut ProgramState) -> Result<StepResult> {
let connection = self
.connection
.upgrade()
.expect("only weak ref to connection?");
let auto_commit = *connection.auto_commit.borrow();
tracing::trace!("Halt auto_commit {}", auto_commit);
assert!(
program_state.halt_state.is_none()
|| (matches!(program_state.halt_state.unwrap(), HaltState::Checkpointing))
);
if program_state.halt_state.is_some() {
self.step_end_write_txn(&pager, &mut program_state.halt_state, connection.deref())
fn halt(
&self,
pager: Rc<Pager>,
program_state: &mut ProgramState,
mv_store: Option<Rc<MvStore>>,
) -> Result<StepResult> {
if let Some(mv_store) = mv_store {
let conn = self.connection.upgrade().unwrap();
let mut mv_transactions = conn.mv_transactions.borrow_mut();
for tx_id in mv_transactions.iter() {
mv_store.commit_tx(*tx_id).unwrap();
}
mv_transactions.clear();
return Ok(StepResult::Done);
} else {
if auto_commit {
let current_state = connection.transaction_state.borrow().clone();
match current_state {
TransactionState::Write => self.step_end_write_txn(
&pager,
&mut program_state.halt_state,
connection.deref(),
),
TransactionState::Read => {
pager.end_read_tx()?;
Ok(StepResult::Done)
}
TransactionState::None => Ok(StepResult::Done),
}
let connection = self
.connection
.upgrade()
.expect("only weak ref to connection?");
let auto_commit = *connection.auto_commit.borrow();
tracing::trace!("Halt auto_commit {}", auto_commit);
assert!(
program_state.halt_state.is_none()
|| (matches!(program_state.halt_state.unwrap(), HaltState::Checkpointing))
);
if program_state.halt_state.is_some() {
self.step_end_write_txn(&pager, &mut program_state.halt_state, connection.deref())
} else {
if self.change_cnt_on {
if let Some(conn) = self.connection.upgrade() {
conn.set_changes(self.n_change.get());
if auto_commit {
let current_state = connection.transaction_state.borrow().clone();
match current_state {
TransactionState::Write => self.step_end_write_txn(
&pager,
&mut program_state.halt_state,
connection.deref(),
),
TransactionState::Read => {
pager.end_read_tx()?;
Ok(StepResult::Done)
}
TransactionState::None => Ok(StepResult::Done),
}
} else {
if self.change_cnt_on {
if let Some(conn) = self.connection.upgrade() {
conn.set_changes(self.n_change.get());
}
}
Ok(StepResult::Done)
}
Ok(StepResult::Done)
}
}
}