fix/vdbe: fix state handling for incremental views

- When the rowid is changed in UPDATE, it is handled as a combination of DELETE + INSERT,
so we dont need to delete the old values in that case
- We should only update the views after the operation on the btree is done
- A proper state machine is needed to handle IO yielding points
This commit is contained in:
Jussi Saurio
2025-08-11 16:17:41 +03:00
parent ec7bded092
commit f38333b373
4 changed files with 178 additions and 103 deletions

View File

@@ -1189,9 +1189,9 @@ fn emit_update_insns(
flag: if has_user_provided_rowid { flag: if has_user_provided_rowid {
// The previous Insn::NotExists and Insn::Delete seek to the old rowid, // The previous Insn::NotExists and Insn::Delete seek to the old rowid,
// so to insert a new user-provided rowid, we need to seek to the correct place. // so to insert a new user-provided rowid, we need to seek to the correct place.
InsertFlags::new().require_seek().update() InsertFlags::new().require_seek().update_rowid_change()
} else { } else {
InsertFlags::new().update() InsertFlags::new()
}, },
table_name: table_ref.identifier.clone(), table_name: table_ref.identifier.clone(),
}); });

View File

@@ -5099,12 +5099,22 @@ pub fn op_yield(
Ok(InsnFunctionStepResult::Step) Ok(InsnFunctionStepResult::Step)
} }
pub struct OpInsertState {
pub sub_state: OpInsertSubState,
pub old_record: Option<(i64, Vec<Value>)>,
}
#[derive(Debug, PartialEq, Copy, Clone)] #[derive(Debug, PartialEq, Copy, Clone)]
pub enum OpInsertState { pub enum OpInsertSubState {
/// If this insert overwrites a record, capture the old record for incremental view maintenance.
MaybeCaptureRecord,
/// Insert the row into the table.
Insert, Insert,
/// Updating last_insert_rowid may return IO, so we need a separate state for it so that we don't /// Updating last_insert_rowid may return IO, so we need a separate state for it so that we don't
/// start inserting the same row multiple times. /// start inserting the same row multiple times.
UpdateLastRowid, UpdateLastRowid,
/// If there are dependent incremental views, apply the change.
ApplyViewChange,
} }
pub fn op_insert( pub fn op_insert(
@@ -5125,54 +5135,153 @@ pub fn op_insert(
insn insn
); );
if state.op_insert_state == OpInsertState::UpdateLastRowid { loop {
let maybe_rowid = { match &state.op_insert_state.sub_state {
let mut cursor = state.get_cursor(*cursor_id); OpInsertSubState::MaybeCaptureRecord => {
let cursor = cursor.as_btree_mut(); let schema = program.connection.schema.borrow();
return_if_io!(cursor.rowid()) let dependent_views = schema.get_dependent_views(table_name);
}; if dependent_views.is_empty() || !flag.has(InsertFlags::UPDATE_ROWID_CHANGE) {
if let Some(rowid) = maybe_rowid { state.op_insert_state.sub_state = OpInsertSubState::Insert;
program.connection.update_last_rowid(rowid); continue;
}
let prev_changes = program.n_change.get(); let old_record = {
program.n_change.set(prev_changes + 1); let mut cursor = state.get_cursor(*cursor_id);
} let cursor = cursor.as_btree_mut();
state.op_insert_state = OpInsertState::Insert; // Get the current key
state.pc += 1; let maybe_key = return_if_io!(cursor.rowid());
return Ok(InsnFunctionStepResult::Step); let key = maybe_key.ok_or_else(|| {
} LimboError::InternalError("Cannot delete: no current row".to_string())
})?;
// Get the current record before deletion and extract values
let maybe_record = return_if_io!(cursor.record());
if let Some(record) = maybe_record {
let mut values = record
.get_values()
.into_iter()
.map(|v| v.to_owned())
.collect::<Vec<_>>();
{ // Fix rowid alias columns: replace Null with actual rowid value
let mut cursor_ref = state.get_cursor(*cursor_id); if let Some(table) = schema.get_table(table_name) {
let cursor = cursor_ref.as_btree_mut(); for (i, col) in table.columns().iter().enumerate() {
if col.is_rowid_alias && i < values.len() {
values[i] = Value::Integer(key);
}
}
}
Some((key, values))
} else {
None
}
};
let key = match &state.registers[*key_reg].get_owned_value() { state.op_insert_state.old_record = old_record;
Value::Integer(i) => *i, state.op_insert_state.sub_state = OpInsertSubState::Insert;
_ => unreachable!("expected integer key"), continue;
};
let record = match &state.registers[*record_reg] {
Register::Record(r) => std::borrow::Cow::Borrowed(r),
Register::Value(value) => {
let x = 1;
let regs = &state.registers[*record_reg..*record_reg + 1];
let new_regs = [&state.registers[*record_reg]];
let record = ImmutableRecord::from_registers(new_regs, new_regs.len());
std::borrow::Cow::Owned(record)
} }
Register::Aggregate(..) => unreachable!("Cannot insert an aggregate value."), OpInsertSubState::Insert => {
}; let key = match &state.registers[*key_reg].get_owned_value() {
Value::Integer(i) => *i,
_ => unreachable!("expected integer key"),
};
// Update dependent views for incremental computation let record = match &state.registers[*record_reg] {
let schema = program.connection.schema.borrow(); Register::Record(r) => std::borrow::Cow::Borrowed(r),
let dependent_views = schema.get_dependent_views(table_name); Register::Value(value) => {
let x = 1;
let regs = &state.registers[*record_reg..*record_reg + 1];
let new_regs = [&state.registers[*record_reg]];
let record = ImmutableRecord::from_registers(new_regs, new_regs.len());
std::borrow::Cow::Owned(record)
}
Register::Aggregate(..) => unreachable!("Cannot insert an aggregate value."),
};
{
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
if !dependent_views.is_empty() { // In a table insert, if the caller does not pass InsertFlags::REQUIRE_SEEK, they must ensure that a seek has already happened to the correct location.
// If this is an UPDATE operation, first capture and delete the old row data // This typically happens by invoking either Insn::NewRowid or Insn::NotExists, because:
if flag.has(InsertFlags::UPDATE) { // 1. op_new_rowid() seeks to the end of the table, which is the correct insertion position.
// Get the old record before it's overwritten // 2. op_not_exists() seeks to the position in the table where the target rowid would be inserted.
let old_record_values = if let Some(old_record) = return_if_io!(cursor.record()) { let moved_before = !flag.has(InsertFlags::REQUIRE_SEEK);
let mut values = old_record return_if_io!(cursor.insert(
&BTreeKey::new_table_rowid(key, Some(record.as_ref())),
moved_before
));
}
// Only update last_insert_rowid for regular table inserts, not schema modifications
let root_page = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
cursor.root_page()
};
if root_page != 1 {
state.op_insert_state.sub_state = OpInsertSubState::UpdateLastRowid;
} else {
let schema = program.connection.schema.borrow();
let dependent_views = schema.get_dependent_views(table_name);
if !dependent_views.is_empty() {
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
} else {
state.op_insert_state.sub_state = OpInsertSubState::MaybeCaptureRecord;
break;
}
}
}
OpInsertSubState::UpdateLastRowid => {
let maybe_rowid = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
return_if_io!(cursor.rowid())
};
if let Some(rowid) = maybe_rowid {
program.connection.update_last_rowid(rowid);
let prev_changes = program.n_change.get();
program.n_change.set(prev_changes + 1);
}
let schema = program.connection.schema.borrow();
let dependent_views = schema.get_dependent_views(table_name);
if !dependent_views.is_empty() {
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
continue;
}
state.op_insert_state.sub_state = OpInsertSubState::MaybeCaptureRecord;
break;
}
OpInsertSubState::ApplyViewChange => {
let schema = program.connection.schema.borrow();
let dependent_views = schema.get_dependent_views(table_name);
assert!(!dependent_views.is_empty());
let (key, values) = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let key = match &state.registers[*key_reg].get_owned_value() {
Value::Integer(i) => *i,
_ => unreachable!("expected integer key"),
};
let record = match &state.registers[*record_reg] {
Register::Record(r) => std::borrow::Cow::Borrowed(r),
Register::Value(value) => {
let x = 1;
let regs = &state.registers[*record_reg..*record_reg + 1];
let new_regs = [&state.registers[*record_reg]];
let record = ImmutableRecord::from_registers(new_regs, new_regs.len());
std::borrow::Cow::Owned(record)
}
Register::Aggregate(..) => {
unreachable!("Cannot insert an aggregate value.")
}
};
// Add insertion of new row to view deltas
let mut new_values = record
.get_values() .get_values()
.into_iter() .into_iter()
.map(|v| v.to_owned()) .map(|v| v.to_owned())
@@ -5182,75 +5291,37 @@ pub fn op_insert(
let schema = program.connection.schema.borrow(); let schema = program.connection.schema.borrow();
if let Some(table) = schema.get_table(table_name) { if let Some(table) = schema.get_table(table_name) {
for (i, col) in table.columns().iter().enumerate() { for (i, col) in table.columns().iter().enumerate() {
if col.is_rowid_alias && i < values.len() { if col.is_rowid_alias && i < new_values.len() {
values[i] = Value::Integer(key); new_values[i] = Value::Integer(key);
} }
} }
} }
drop(schema);
Some(values) (key, new_values)
} else {
None
}; };
// Add deletion of old row to view deltas let mut tx_states = program.connection.view_transaction_states.borrow_mut();
if let Some(old_values) = old_record_values { if let Some((key, values)) = state.op_insert_state.old_record.take() {
let mut tx_states = program.connection.view_transaction_states.borrow_mut(); for view_name in dependent_views.iter() {
for view_name in &dependent_views {
let tx_state = tx_states.entry(view_name.clone()).or_default(); let tx_state = tx_states.entry(view_name.clone()).or_default();
tx_state.delta.delete(key, old_values.clone()); tx_state.delta.delete(key, values.clone());
} }
} }
} for view_name in dependent_views.iter() {
let tx_state = tx_states.entry(view_name.clone()).or_default();
// Add insertion of new row to view deltas tx_state.delta.insert(key, values.clone());
let mut new_values = record
.get_values()
.into_iter()
.map(|v| v.to_owned())
.collect::<Vec<_>>();
// Fix rowid alias columns: replace Null with actual rowid value
let schema = program.connection.schema.borrow();
if let Some(table) = schema.get_table(table_name) {
for (i, col) in table.columns().iter().enumerate() {
if col.is_rowid_alias && i < new_values.len() {
new_values[i] = Value::Integer(key);
}
} }
}
drop(schema);
let mut tx_states = program.connection.view_transaction_states.borrow_mut(); state.op_insert_state.sub_state = OpInsertSubState::MaybeCaptureRecord;
for view_name in dependent_views { break;
let tx_state = tx_states.entry(view_name.clone()).or_default();
tx_state.delta.insert(key, new_values.clone());
} }
} }
// In a table insert, if the caller does not pass InsertFlags::REQUIRE_SEEK, they must ensure that a seek has already happened to the correct location.
// This typically happens by invoking either Insn::NewRowid or Insn::NotExists, because:
// 1. op_new_rowid() seeks to the end of the table, which is the correct insertion position.
// 2. op_not_exists() seeks to the position in the table where the target rowid would be inserted.
let moved_before = !flag.has(InsertFlags::REQUIRE_SEEK);
return_if_io!(cursor.insert(
&BTreeKey::new_table_rowid(key, Some(record.as_ref())),
moved_before
));
} }
// Only update last_insert_rowid for regular table inserts, not schema modifications let prev_changes = program.n_change.get();
let root_page = { program.n_change.set(prev_changes + 1);
let mut cursor = state.get_cursor(*cursor_id); state.pc += 1;
let cursor = cursor.as_btree_mut();
cursor.root_page()
};
if root_page != 1 {
state.op_insert_state = OpInsertState::UpdateLastRowid;
} else {
state.pc += 1;
}
Ok(InsnFunctionStepResult::Step) Ok(InsnFunctionStepResult::Step)
} }

View File

@@ -107,7 +107,7 @@ impl IdxInsertFlags {
pub struct InsertFlags(pub u8); pub struct InsertFlags(pub u8);
impl InsertFlags { impl InsertFlags {
pub const UPDATE: u8 = 0x01; // Flag indicating this is part of an UPDATE statement pub const UPDATE_ROWID_CHANGE: u8 = 0x01; // Flag indicating this is part of an UPDATE statement where the row's rowid is changed
pub const REQUIRE_SEEK: u8 = 0x02; // Flag indicating that a seek is required to insert the row pub const REQUIRE_SEEK: u8 = 0x02; // Flag indicating that a seek is required to insert the row
pub fn new() -> Self { pub fn new() -> Self {
@@ -123,8 +123,8 @@ impl InsertFlags {
self self
} }
pub fn update(mut self) -> Self { pub fn update_rowid_change(mut self) -> Self {
self.0 |= InsertFlags::UPDATE; self.0 |= InsertFlags::UPDATE_ROWID_CHANGE;
self self
} }
} }

View File

@@ -33,7 +33,8 @@ use crate::{
translate::plan::TableReferences, translate::plan::TableReferences,
types::{IOResult, RawSlice, TextRef}, types::{IOResult, RawSlice, TextRef},
vdbe::execute::{ vdbe::execute::{
OpIdxInsertState, OpInsertState, OpNewRowidState, OpNoConflictState, OpSeekState, OpIdxInsertState, OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState,
OpSeekState,
}, },
RefValue, RefValue,
}; };
@@ -291,7 +292,10 @@ impl ProgramState {
op_open_ephemeral_state: OpOpenEphemeralState::Start, op_open_ephemeral_state: OpOpenEphemeralState::Start,
op_new_rowid_state: OpNewRowidState::Start, op_new_rowid_state: OpNewRowidState::Start,
op_idx_insert_state: OpIdxInsertState::SeekIfUnique, op_idx_insert_state: OpIdxInsertState::SeekIfUnique,
op_insert_state: OpInsertState::Insert, op_insert_state: OpInsertState {
sub_state: OpInsertSubState::MaybeCaptureRecord,
old_record: None,
},
op_no_conflict_state: OpNoConflictState::Start, op_no_conflict_state: OpNoConflictState::Start,
seek_state: OpSeekState::Start, seek_state: OpSeekState::Start,
current_collation: None, current_collation: None,