diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 68219b52f..3cf603aa7 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -1189,9 +1189,9 @@ fn emit_update_insns( flag: if has_user_provided_rowid { // The previous Insn::NotExists and Insn::Delete seek to the old rowid, // so to insert a new user-provided rowid, we need to seek to the correct place. - InsertFlags::new().require_seek().update() + InsertFlags::new().require_seek().update_rowid_change() } else { - InsertFlags::new().update() + InsertFlags::new() }, table_name: table_ref.identifier.clone(), }); diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 60a2b9a59..e6afb5524 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -5099,12 +5099,22 @@ pub fn op_yield( Ok(InsnFunctionStepResult::Step) } +pub struct OpInsertState { + pub sub_state: OpInsertSubState, + pub old_record: Option<(i64, Vec)>, +} + #[derive(Debug, PartialEq, Copy, Clone)] -pub enum OpInsertState { +pub enum OpInsertSubState { + /// If this insert overwrites a record, capture the old record for incremental view maintenance. + MaybeCaptureRecord, + /// Insert the row into the table. Insert, /// Updating last_insert_rowid may return IO, so we need a separate state for it so that we don't /// start inserting the same row multiple times. UpdateLastRowid, + /// If there are dependent incremental views, apply the change. + ApplyViewChange, } pub fn op_insert( @@ -5125,54 +5135,153 @@ pub fn op_insert( insn ); - if state.op_insert_state == OpInsertState::UpdateLastRowid { - let maybe_rowid = { - let mut cursor = state.get_cursor(*cursor_id); - let cursor = cursor.as_btree_mut(); - return_if_io!(cursor.rowid()) - }; - if let Some(rowid) = maybe_rowid { - program.connection.update_last_rowid(rowid); + loop { + match &state.op_insert_state.sub_state { + OpInsertSubState::MaybeCaptureRecord => { + let schema = program.connection.schema.borrow(); + let dependent_views = schema.get_dependent_views(table_name); + if dependent_views.is_empty() || !flag.has(InsertFlags::UPDATE_ROWID_CHANGE) { + state.op_insert_state.sub_state = OpInsertSubState::Insert; + continue; + } - let prev_changes = program.n_change.get(); - program.n_change.set(prev_changes + 1); - } - state.op_insert_state = OpInsertState::Insert; - state.pc += 1; - return Ok(InsnFunctionStepResult::Step); - } + let old_record = { + let mut cursor = state.get_cursor(*cursor_id); + let cursor = cursor.as_btree_mut(); + // Get the current key + let maybe_key = return_if_io!(cursor.rowid()); + let key = maybe_key.ok_or_else(|| { + LimboError::InternalError("Cannot delete: no current row".to_string()) + })?; + // Get the current record before deletion and extract values + let maybe_record = return_if_io!(cursor.record()); + if let Some(record) = maybe_record { + let mut values = record + .get_values() + .into_iter() + .map(|v| v.to_owned()) + .collect::>(); - { - let mut cursor_ref = state.get_cursor(*cursor_id); - let cursor = cursor_ref.as_btree_mut(); + // Fix rowid alias columns: replace Null with actual rowid value + if let Some(table) = schema.get_table(table_name) { + for (i, col) in table.columns().iter().enumerate() { + if col.is_rowid_alias && i < values.len() { + values[i] = Value::Integer(key); + } + } + } + Some((key, values)) + } else { + None + } + }; - let key = match &state.registers[*key_reg].get_owned_value() { - Value::Integer(i) => *i, - _ => unreachable!("expected integer key"), - }; - - let record = match &state.registers[*record_reg] { - Register::Record(r) => std::borrow::Cow::Borrowed(r), - Register::Value(value) => { - let x = 1; - let regs = &state.registers[*record_reg..*record_reg + 1]; - let new_regs = [&state.registers[*record_reg]]; - let record = ImmutableRecord::from_registers(new_regs, new_regs.len()); - std::borrow::Cow::Owned(record) + state.op_insert_state.old_record = old_record; + state.op_insert_state.sub_state = OpInsertSubState::Insert; + continue; } - Register::Aggregate(..) => unreachable!("Cannot insert an aggregate value."), - }; + OpInsertSubState::Insert => { + let key = match &state.registers[*key_reg].get_owned_value() { + Value::Integer(i) => *i, + _ => unreachable!("expected integer key"), + }; - // Update dependent views for incremental computation - let schema = program.connection.schema.borrow(); - let dependent_views = schema.get_dependent_views(table_name); + let record = match &state.registers[*record_reg] { + Register::Record(r) => std::borrow::Cow::Borrowed(r), + Register::Value(value) => { + let x = 1; + let regs = &state.registers[*record_reg..*record_reg + 1]; + let new_regs = [&state.registers[*record_reg]]; + let record = ImmutableRecord::from_registers(new_regs, new_regs.len()); + std::borrow::Cow::Owned(record) + } + Register::Aggregate(..) => unreachable!("Cannot insert an aggregate value."), + }; + { + let mut cursor = state.get_cursor(*cursor_id); + let cursor = cursor.as_btree_mut(); - if !dependent_views.is_empty() { - // If this is an UPDATE operation, first capture and delete the old row data - if flag.has(InsertFlags::UPDATE) { - // Get the old record before it's overwritten - let old_record_values = if let Some(old_record) = return_if_io!(cursor.record()) { - let mut values = old_record + // In a table insert, if the caller does not pass InsertFlags::REQUIRE_SEEK, they must ensure that a seek has already happened to the correct location. + // This typically happens by invoking either Insn::NewRowid or Insn::NotExists, because: + // 1. op_new_rowid() seeks to the end of the table, which is the correct insertion position. + // 2. op_not_exists() seeks to the position in the table where the target rowid would be inserted. + let moved_before = !flag.has(InsertFlags::REQUIRE_SEEK); + return_if_io!(cursor.insert( + &BTreeKey::new_table_rowid(key, Some(record.as_ref())), + moved_before + )); + } + + // Only update last_insert_rowid for regular table inserts, not schema modifications + let root_page = { + let mut cursor = state.get_cursor(*cursor_id); + let cursor = cursor.as_btree_mut(); + cursor.root_page() + }; + if root_page != 1 { + state.op_insert_state.sub_state = OpInsertSubState::UpdateLastRowid; + } else { + let schema = program.connection.schema.borrow(); + let dependent_views = schema.get_dependent_views(table_name); + if !dependent_views.is_empty() { + state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange; + } else { + state.op_insert_state.sub_state = OpInsertSubState::MaybeCaptureRecord; + break; + } + } + } + OpInsertSubState::UpdateLastRowid => { + let maybe_rowid = { + let mut cursor = state.get_cursor(*cursor_id); + let cursor = cursor.as_btree_mut(); + return_if_io!(cursor.rowid()) + }; + if let Some(rowid) = maybe_rowid { + program.connection.update_last_rowid(rowid); + + let prev_changes = program.n_change.get(); + program.n_change.set(prev_changes + 1); + } + let schema = program.connection.schema.borrow(); + let dependent_views = schema.get_dependent_views(table_name); + if !dependent_views.is_empty() { + state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange; + continue; + } + state.op_insert_state.sub_state = OpInsertSubState::MaybeCaptureRecord; + break; + } + OpInsertSubState::ApplyViewChange => { + let schema = program.connection.schema.borrow(); + let dependent_views = schema.get_dependent_views(table_name); + assert!(!dependent_views.is_empty()); + + let (key, values) = { + let mut cursor = state.get_cursor(*cursor_id); + let cursor = cursor.as_btree_mut(); + + let key = match &state.registers[*key_reg].get_owned_value() { + Value::Integer(i) => *i, + _ => unreachable!("expected integer key"), + }; + + let record = match &state.registers[*record_reg] { + Register::Record(r) => std::borrow::Cow::Borrowed(r), + Register::Value(value) => { + let x = 1; + let regs = &state.registers[*record_reg..*record_reg + 1]; + let new_regs = [&state.registers[*record_reg]]; + let record = ImmutableRecord::from_registers(new_regs, new_regs.len()); + std::borrow::Cow::Owned(record) + } + Register::Aggregate(..) => { + unreachable!("Cannot insert an aggregate value.") + } + }; + + // Add insertion of new row to view deltas + let mut new_values = record .get_values() .into_iter() .map(|v| v.to_owned()) @@ -5182,75 +5291,37 @@ pub fn op_insert( let schema = program.connection.schema.borrow(); if let Some(table) = schema.get_table(table_name) { for (i, col) in table.columns().iter().enumerate() { - if col.is_rowid_alias && i < values.len() { - values[i] = Value::Integer(key); + if col.is_rowid_alias && i < new_values.len() { + new_values[i] = Value::Integer(key); } } } - drop(schema); - Some(values) - } else { - None + (key, new_values) }; - // Add deletion of old row to view deltas - if let Some(old_values) = old_record_values { - let mut tx_states = program.connection.view_transaction_states.borrow_mut(); - for view_name in &dependent_views { + let mut tx_states = program.connection.view_transaction_states.borrow_mut(); + if let Some((key, values)) = state.op_insert_state.old_record.take() { + for view_name in dependent_views.iter() { let tx_state = tx_states.entry(view_name.clone()).or_default(); - tx_state.delta.delete(key, old_values.clone()); + tx_state.delta.delete(key, values.clone()); } } - } + for view_name in dependent_views.iter() { + let tx_state = tx_states.entry(view_name.clone()).or_default(); - // Add insertion of new row to view deltas - let mut new_values = record - .get_values() - .into_iter() - .map(|v| v.to_owned()) - .collect::>(); - - // Fix rowid alias columns: replace Null with actual rowid value - let schema = program.connection.schema.borrow(); - if let Some(table) = schema.get_table(table_name) { - for (i, col) in table.columns().iter().enumerate() { - if col.is_rowid_alias && i < new_values.len() { - new_values[i] = Value::Integer(key); - } + tx_state.delta.insert(key, values.clone()); } - } - drop(schema); - let mut tx_states = program.connection.view_transaction_states.borrow_mut(); - for view_name in dependent_views { - let tx_state = tx_states.entry(view_name.clone()).or_default(); - tx_state.delta.insert(key, new_values.clone()); + state.op_insert_state.sub_state = OpInsertSubState::MaybeCaptureRecord; + break; } } - - // In a table insert, if the caller does not pass InsertFlags::REQUIRE_SEEK, they must ensure that a seek has already happened to the correct location. - // This typically happens by invoking either Insn::NewRowid or Insn::NotExists, because: - // 1. op_new_rowid() seeks to the end of the table, which is the correct insertion position. - // 2. op_not_exists() seeks to the position in the table where the target rowid would be inserted. - let moved_before = !flag.has(InsertFlags::REQUIRE_SEEK); - return_if_io!(cursor.insert( - &BTreeKey::new_table_rowid(key, Some(record.as_ref())), - moved_before - )); } - // Only update last_insert_rowid for regular table inserts, not schema modifications - let root_page = { - let mut cursor = state.get_cursor(*cursor_id); - let cursor = cursor.as_btree_mut(); - cursor.root_page() - }; - if root_page != 1 { - state.op_insert_state = OpInsertState::UpdateLastRowid; - } else { - state.pc += 1; - } + let prev_changes = program.n_change.get(); + program.n_change.set(prev_changes + 1); + state.pc += 1; Ok(InsnFunctionStepResult::Step) } diff --git a/core/vdbe/insn.rs b/core/vdbe/insn.rs index 4b835f1d9..f2bdd39f4 100644 --- a/core/vdbe/insn.rs +++ b/core/vdbe/insn.rs @@ -107,7 +107,7 @@ impl IdxInsertFlags { pub struct InsertFlags(pub u8); impl InsertFlags { - pub const UPDATE: u8 = 0x01; // Flag indicating this is part of an UPDATE statement + pub const UPDATE_ROWID_CHANGE: u8 = 0x01; // Flag indicating this is part of an UPDATE statement where the row's rowid is changed pub const REQUIRE_SEEK: u8 = 0x02; // Flag indicating that a seek is required to insert the row pub fn new() -> Self { @@ -123,8 +123,8 @@ impl InsertFlags { self } - pub fn update(mut self) -> Self { - self.0 |= InsertFlags::UPDATE; + pub fn update_rowid_change(mut self) -> Self { + self.0 |= InsertFlags::UPDATE_ROWID_CHANGE; self } } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 259767031..7209f02e1 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -33,7 +33,8 @@ use crate::{ translate::plan::TableReferences, types::{IOResult, RawSlice, TextRef}, vdbe::execute::{ - OpIdxInsertState, OpInsertState, OpNewRowidState, OpNoConflictState, OpSeekState, + OpIdxInsertState, OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState, + OpSeekState, }, RefValue, }; @@ -291,7 +292,10 @@ impl ProgramState { op_open_ephemeral_state: OpOpenEphemeralState::Start, op_new_rowid_state: OpNewRowidState::Start, op_idx_insert_state: OpIdxInsertState::SeekIfUnique, - op_insert_state: OpInsertState::Insert, + op_insert_state: OpInsertState { + sub_state: OpInsertSubState::MaybeCaptureRecord, + old_record: None, + }, op_no_conflict_state: OpNoConflictState::Start, seek_state: OpSeekState::Start, current_collation: None,