From ef32a8294199a985f325f9764e6ff38932a30c19 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 6 Mar 2025 09:24:18 +0200 Subject: [PATCH] core/vdbe: Integrate MVCC transactions --- core/lib.rs | 23 +++++-- core/vdbe/mod.rs | 153 +++++++++++++++++++++++++++++------------------ 2 files changed, 112 insertions(+), 64 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index f10f893f7..cfd94dd51 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -191,6 +191,7 @@ impl Database { header: self.header.clone(), last_insert_rowid: Cell::new(0), auto_commit: RefCell::new(true), + mv_transactions: RefCell::new(Vec::new()), transaction_state: RefCell::new(TransactionState::None), last_change: Cell::new(0), syms: RefCell::new(SymbolTable::new()), @@ -257,6 +258,7 @@ pub struct Connection { schema: Arc>, header: Arc>, auto_commit: RefCell, + mv_transactions: RefCell>, transaction_state: RefCell, last_insert_rowid: Cell, last_change: Cell, @@ -287,7 +289,11 @@ impl Connection { &syms, QueryMode::Normal, )?); - Ok(Statement::new(program, self.pager.clone())) + Ok(Statement::new( + program, + self._db.mv_store.clone(), + self.pager.clone(), + )) } Cmd::Explain(_stmt) => todo!(), Cmd::ExplainQueryPlan(_stmt) => todo!(), @@ -325,7 +331,7 @@ impl Connection { &syms, QueryMode::Normal, )?); - let stmt = Statement::new(program, self.pager.clone()); + let stmt = Statement::new(program, self._db.mv_store.clone(), self.pager.clone()); Ok(Some(stmt)) } Cmd::Explain(stmt) => { @@ -420,7 +426,7 @@ impl Connection { let mut state = vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len()); - program.step(&mut state, self.pager.clone())?; + program.step(&mut state, self._db.mv_store.clone(), self.pager.clone())?; } } } @@ -502,15 +508,21 @@ impl Connection { pub struct Statement { program: Rc, state: vdbe::ProgramState, + mv_store: Option>, pager: Rc, } impl Statement { - pub fn new(program: Rc, pager: Rc) -> Self { + pub fn new( + program: Rc, + mv_store: Option>, + pager: Rc, + ) -> Self { let state = vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len()); Self { program, state, + mv_store, pager, } } @@ -520,7 +532,8 @@ impl Statement { } pub fn step(&mut self) -> Result { - self.program.step(&mut self.state, self.pager.clone()) + self.program + .step(&mut self.state, self.mv_store.clone(), self.pager.clone()) } pub fn num_columns(&self) -> usize { diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 3f16d525c..cb88cc06b 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -55,7 +55,7 @@ use crate::{ json::json_quote, json::json_remove, json::json_set, json::json_type, }; use crate::{info, CheckpointStatus}; -use crate::{resolve_ext_path, Connection, Result, TransactionState, DATABASE_VERSION}; +use crate::{resolve_ext_path, Connection, MvStore, Result, TransactionState, DATABASE_VERSION}; use insn::{ exec_add, exec_and, exec_bit_and, exec_bit_not, exec_bit_or, exec_boolean_not, exec_concat, exec_divide, exec_multiply, exec_or, exec_remainder, exec_shift_left, exec_shift_right, @@ -235,6 +235,7 @@ pub struct ProgramState { deferred_seek: Option<(CursorID, CursorID)>, ended_coroutine: Bitfield<4>, // flag to indicate that a coroutine has ended (key is the yield register. currently we assume that the yield register is always between 0-255, YOLO) regex_cache: RegexCache, + mv_tx_id: Option, interrupted: bool, parameters: HashMap, OwnedValue>, halt_state: Option, @@ -254,6 +255,7 @@ impl ProgramState { deferred_seek: None, ended_coroutine: Bitfield::new(), regex_cache: RegexCache::new(), + mv_tx_id: None, interrupted: false, parameters: HashMap::new(), halt_state: None, @@ -358,7 +360,12 @@ impl Program { } #[instrument(skip_all)] - pub fn step(&self, state: &mut ProgramState, pager: Rc) -> Result { + pub fn step( + &self, + state: &mut ProgramState, + mv_store: Option>, + pager: Rc, + ) -> Result { loop { if state.is_interrupted() { return Ok(StepResult::Interrupt); @@ -1204,36 +1211,49 @@ impl Program { ))); } } - return self.halt(pager, state); + return self.halt(pager, state, mv_store); } Insn::Transaction { write } => { - let connection = self.connection.upgrade().unwrap(); - let current_state = connection.transaction_state.borrow().clone(); - let (new_transaction_state, updated) = match (¤t_state, write) { - (TransactionState::Write, true) => (TransactionState::Write, false), - (TransactionState::Write, false) => (TransactionState::Write, false), - (TransactionState::Read, true) => (TransactionState::Write, true), - (TransactionState::Read, false) => (TransactionState::Read, false), - (TransactionState::None, true) => (TransactionState::Write, true), - (TransactionState::None, false) => (TransactionState::Read, true), - }; - - if updated && matches!(current_state, TransactionState::None) { - if let LimboResult::Busy = pager.begin_read_tx()? { - return Ok(StepResult::Busy); + if let Some(mv_store) = &mv_store { + if state.mv_tx_id.is_none() { + let tx_id = mv_store.begin_tx(); + self.connection + .upgrade() + .unwrap() + .mv_transactions + .borrow_mut() + .push(tx_id); + state.mv_tx_id = Some(tx_id); } - } + } else { + let connection = self.connection.upgrade().unwrap(); + let current_state = connection.transaction_state.borrow().clone(); + let (new_transaction_state, updated) = match (¤t_state, write) { + (TransactionState::Write, true) => (TransactionState::Write, false), + (TransactionState::Write, false) => (TransactionState::Write, false), + (TransactionState::Read, true) => (TransactionState::Write, true), + (TransactionState::Read, false) => (TransactionState::Read, false), + (TransactionState::None, true) => (TransactionState::Write, true), + (TransactionState::None, false) => (TransactionState::Read, true), + }; - if updated && matches!(new_transaction_state, TransactionState::Write) { - if let LimboResult::Busy = pager.begin_write_tx()? { - tracing::trace!("begin_write_tx busy"); - return Ok(StepResult::Busy); + if updated && matches!(current_state, TransactionState::None) { + if let LimboResult::Busy = pager.begin_read_tx()? { + return Ok(StepResult::Busy); + } + } + + if updated && matches!(new_transaction_state, TransactionState::Write) { + if let LimboResult::Busy = pager.begin_write_tx()? { + tracing::trace!("begin_write_tx busy"); + return Ok(StepResult::Busy); + } + } + if updated { + connection + .transaction_state + .replace(new_transaction_state.clone()); } - } - if updated { - connection - .transaction_state - .replace(new_transaction_state.clone()); } state.pc += 1; } @@ -1261,7 +1281,7 @@ impl Program { "cannot commit - no transaction is active".to_string(), )); } - return self.halt(pager, state); + return self.halt(pager, state, mv_store); } Insn::Goto { target_pc } => { assert!(target_pc.is_offset()); @@ -3084,41 +3104,56 @@ impl Program { } } - fn halt(&self, pager: Rc, program_state: &mut ProgramState) -> Result { - let connection = self - .connection - .upgrade() - .expect("only weak ref to connection?"); - let auto_commit = *connection.auto_commit.borrow(); - tracing::trace!("Halt auto_commit {}", auto_commit); - assert!( - program_state.halt_state.is_none() - || (matches!(program_state.halt_state.unwrap(), HaltState::Checkpointing)) - ); - if program_state.halt_state.is_some() { - self.step_end_write_txn(&pager, &mut program_state.halt_state, connection.deref()) + fn halt( + &self, + pager: Rc, + program_state: &mut ProgramState, + mv_store: Option>, + ) -> Result { + if let Some(mv_store) = mv_store { + let conn = self.connection.upgrade().unwrap(); + let mut mv_transactions = conn.mv_transactions.borrow_mut(); + for tx_id in mv_transactions.iter() { + mv_store.commit_tx(*tx_id).unwrap(); + } + mv_transactions.clear(); + return Ok(StepResult::Done); } else { - if auto_commit { - let current_state = connection.transaction_state.borrow().clone(); - match current_state { - TransactionState::Write => self.step_end_write_txn( - &pager, - &mut program_state.halt_state, - connection.deref(), - ), - TransactionState::Read => { - pager.end_read_tx()?; - Ok(StepResult::Done) - } - TransactionState::None => Ok(StepResult::Done), - } + let connection = self + .connection + .upgrade() + .expect("only weak ref to connection?"); + let auto_commit = *connection.auto_commit.borrow(); + tracing::trace!("Halt auto_commit {}", auto_commit); + assert!( + program_state.halt_state.is_none() + || (matches!(program_state.halt_state.unwrap(), HaltState::Checkpointing)) + ); + if program_state.halt_state.is_some() { + self.step_end_write_txn(&pager, &mut program_state.halt_state, connection.deref()) } else { - if self.change_cnt_on { - if let Some(conn) = self.connection.upgrade() { - conn.set_changes(self.n_change.get()); + if auto_commit { + let current_state = connection.transaction_state.borrow().clone(); + match current_state { + TransactionState::Write => self.step_end_write_txn( + &pager, + &mut program_state.halt_state, + connection.deref(), + ), + TransactionState::Read => { + pager.end_read_tx()?; + Ok(StepResult::Done) + } + TransactionState::None => Ok(StepResult::Done), } + } else { + if self.change_cnt_on { + if let Some(conn) = self.connection.upgrade() { + conn.set_changes(self.n_change.get()); + } + } + Ok(StepResult::Done) } - Ok(StepResult::Done) } } }