fix/vdbe: fix state handling for incremental views

- When the rowid is changed in UPDATE, it is handled as a combination of DELETE + INSERT,
so we dont need to delete the old values in that case
- We should only update the views after the operation on the btree is done
- A proper state machine is needed to handle IO yielding points
This commit is contained in:
Jussi Saurio
2025-08-11 16:17:41 +03:00
parent ec7bded092
commit f38333b373
4 changed files with 178 additions and 103 deletions

View File

@@ -1189,9 +1189,9 @@ fn emit_update_insns(
flag: if has_user_provided_rowid {
// The previous Insn::NotExists and Insn::Delete seek to the old rowid,
// so to insert a new user-provided rowid, we need to seek to the correct place.
InsertFlags::new().require_seek().update()
InsertFlags::new().require_seek().update_rowid_change()
} else {
InsertFlags::new().update()
InsertFlags::new()
},
table_name: table_ref.identifier.clone(),
});

View File

@@ -5099,12 +5099,22 @@ pub fn op_yield(
Ok(InsnFunctionStepResult::Step)
}
pub struct OpInsertState {
pub sub_state: OpInsertSubState,
pub old_record: Option<(i64, Vec<Value>)>,
}
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum OpInsertState {
pub enum OpInsertSubState {
/// If this insert overwrites a record, capture the old record for incremental view maintenance.
MaybeCaptureRecord,
/// Insert the row into the table.
Insert,
/// Updating last_insert_rowid may return IO, so we need a separate state for it so that we don't
/// start inserting the same row multiple times.
UpdateLastRowid,
/// If there are dependent incremental views, apply the change.
ApplyViewChange,
}
pub fn op_insert(
@@ -5125,54 +5135,153 @@ pub fn op_insert(
insn
);
if state.op_insert_state == OpInsertState::UpdateLastRowid {
let maybe_rowid = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
return_if_io!(cursor.rowid())
};
if let Some(rowid) = maybe_rowid {
program.connection.update_last_rowid(rowid);
loop {
match &state.op_insert_state.sub_state {
OpInsertSubState::MaybeCaptureRecord => {
let schema = program.connection.schema.borrow();
let dependent_views = schema.get_dependent_views(table_name);
if dependent_views.is_empty() || !flag.has(InsertFlags::UPDATE_ROWID_CHANGE) {
state.op_insert_state.sub_state = OpInsertSubState::Insert;
continue;
}
let prev_changes = program.n_change.get();
program.n_change.set(prev_changes + 1);
}
state.op_insert_state = OpInsertState::Insert;
state.pc += 1;
return Ok(InsnFunctionStepResult::Step);
}
let old_record = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
// Get the current key
let maybe_key = return_if_io!(cursor.rowid());
let key = maybe_key.ok_or_else(|| {
LimboError::InternalError("Cannot delete: no current row".to_string())
})?;
// Get the current record before deletion and extract values
let maybe_record = return_if_io!(cursor.record());
if let Some(record) = maybe_record {
let mut values = record
.get_values()
.into_iter()
.map(|v| v.to_owned())
.collect::<Vec<_>>();
{
let mut cursor_ref = state.get_cursor(*cursor_id);
let cursor = cursor_ref.as_btree_mut();
// Fix rowid alias columns: replace Null with actual rowid value
if let Some(table) = schema.get_table(table_name) {
for (i, col) in table.columns().iter().enumerate() {
if col.is_rowid_alias && i < values.len() {
values[i] = Value::Integer(key);
}
}
}
Some((key, values))
} else {
None
}
};
let key = match &state.registers[*key_reg].get_owned_value() {
Value::Integer(i) => *i,
_ => unreachable!("expected integer key"),
};
let record = match &state.registers[*record_reg] {
Register::Record(r) => std::borrow::Cow::Borrowed(r),
Register::Value(value) => {
let x = 1;
let regs = &state.registers[*record_reg..*record_reg + 1];
let new_regs = [&state.registers[*record_reg]];
let record = ImmutableRecord::from_registers(new_regs, new_regs.len());
std::borrow::Cow::Owned(record)
state.op_insert_state.old_record = old_record;
state.op_insert_state.sub_state = OpInsertSubState::Insert;
continue;
}
Register::Aggregate(..) => unreachable!("Cannot insert an aggregate value."),
};
OpInsertSubState::Insert => {
let key = match &state.registers[*key_reg].get_owned_value() {
Value::Integer(i) => *i,
_ => unreachable!("expected integer key"),
};
// Update dependent views for incremental computation
let schema = program.connection.schema.borrow();
let dependent_views = schema.get_dependent_views(table_name);
let record = match &state.registers[*record_reg] {
Register::Record(r) => std::borrow::Cow::Borrowed(r),
Register::Value(value) => {
let x = 1;
let regs = &state.registers[*record_reg..*record_reg + 1];
let new_regs = [&state.registers[*record_reg]];
let record = ImmutableRecord::from_registers(new_regs, new_regs.len());
std::borrow::Cow::Owned(record)
}
Register::Aggregate(..) => unreachable!("Cannot insert an aggregate value."),
};
{
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
if !dependent_views.is_empty() {
// If this is an UPDATE operation, first capture and delete the old row data
if flag.has(InsertFlags::UPDATE) {
// Get the old record before it's overwritten
let old_record_values = if let Some(old_record) = return_if_io!(cursor.record()) {
let mut values = old_record
// In a table insert, if the caller does not pass InsertFlags::REQUIRE_SEEK, they must ensure that a seek has already happened to the correct location.
// This typically happens by invoking either Insn::NewRowid or Insn::NotExists, because:
// 1. op_new_rowid() seeks to the end of the table, which is the correct insertion position.
// 2. op_not_exists() seeks to the position in the table where the target rowid would be inserted.
let moved_before = !flag.has(InsertFlags::REQUIRE_SEEK);
return_if_io!(cursor.insert(
&BTreeKey::new_table_rowid(key, Some(record.as_ref())),
moved_before
));
}
// Only update last_insert_rowid for regular table inserts, not schema modifications
let root_page = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
cursor.root_page()
};
if root_page != 1 {
state.op_insert_state.sub_state = OpInsertSubState::UpdateLastRowid;
} else {
let schema = program.connection.schema.borrow();
let dependent_views = schema.get_dependent_views(table_name);
if !dependent_views.is_empty() {
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
} else {
state.op_insert_state.sub_state = OpInsertSubState::MaybeCaptureRecord;
break;
}
}
}
OpInsertSubState::UpdateLastRowid => {
let maybe_rowid = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
return_if_io!(cursor.rowid())
};
if let Some(rowid) = maybe_rowid {
program.connection.update_last_rowid(rowid);
let prev_changes = program.n_change.get();
program.n_change.set(prev_changes + 1);
}
let schema = program.connection.schema.borrow();
let dependent_views = schema.get_dependent_views(table_name);
if !dependent_views.is_empty() {
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
continue;
}
state.op_insert_state.sub_state = OpInsertSubState::MaybeCaptureRecord;
break;
}
OpInsertSubState::ApplyViewChange => {
let schema = program.connection.schema.borrow();
let dependent_views = schema.get_dependent_views(table_name);
assert!(!dependent_views.is_empty());
let (key, values) = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let key = match &state.registers[*key_reg].get_owned_value() {
Value::Integer(i) => *i,
_ => unreachable!("expected integer key"),
};
let record = match &state.registers[*record_reg] {
Register::Record(r) => std::borrow::Cow::Borrowed(r),
Register::Value(value) => {
let x = 1;
let regs = &state.registers[*record_reg..*record_reg + 1];
let new_regs = [&state.registers[*record_reg]];
let record = ImmutableRecord::from_registers(new_regs, new_regs.len());
std::borrow::Cow::Owned(record)
}
Register::Aggregate(..) => {
unreachable!("Cannot insert an aggregate value.")
}
};
// Add insertion of new row to view deltas
let mut new_values = record
.get_values()
.into_iter()
.map(|v| v.to_owned())
@@ -5182,75 +5291,37 @@ pub fn op_insert(
let schema = program.connection.schema.borrow();
if let Some(table) = schema.get_table(table_name) {
for (i, col) in table.columns().iter().enumerate() {
if col.is_rowid_alias && i < values.len() {
values[i] = Value::Integer(key);
if col.is_rowid_alias && i < new_values.len() {
new_values[i] = Value::Integer(key);
}
}
}
drop(schema);
Some(values)
} else {
None
(key, new_values)
};
// Add deletion of old row to view deltas
if let Some(old_values) = old_record_values {
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
for view_name in &dependent_views {
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
if let Some((key, values)) = state.op_insert_state.old_record.take() {
for view_name in dependent_views.iter() {
let tx_state = tx_states.entry(view_name.clone()).or_default();
tx_state.delta.delete(key, old_values.clone());
tx_state.delta.delete(key, values.clone());
}
}
}
for view_name in dependent_views.iter() {
let tx_state = tx_states.entry(view_name.clone()).or_default();
// Add insertion of new row to view deltas
let mut new_values = record
.get_values()
.into_iter()
.map(|v| v.to_owned())
.collect::<Vec<_>>();
// Fix rowid alias columns: replace Null with actual rowid value
let schema = program.connection.schema.borrow();
if let Some(table) = schema.get_table(table_name) {
for (i, col) in table.columns().iter().enumerate() {
if col.is_rowid_alias && i < new_values.len() {
new_values[i] = Value::Integer(key);
}
tx_state.delta.insert(key, values.clone());
}
}
drop(schema);
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
for view_name in dependent_views {
let tx_state = tx_states.entry(view_name.clone()).or_default();
tx_state.delta.insert(key, new_values.clone());
state.op_insert_state.sub_state = OpInsertSubState::MaybeCaptureRecord;
break;
}
}
// In a table insert, if the caller does not pass InsertFlags::REQUIRE_SEEK, they must ensure that a seek has already happened to the correct location.
// This typically happens by invoking either Insn::NewRowid or Insn::NotExists, because:
// 1. op_new_rowid() seeks to the end of the table, which is the correct insertion position.
// 2. op_not_exists() seeks to the position in the table where the target rowid would be inserted.
let moved_before = !flag.has(InsertFlags::REQUIRE_SEEK);
return_if_io!(cursor.insert(
&BTreeKey::new_table_rowid(key, Some(record.as_ref())),
moved_before
));
}
// Only update last_insert_rowid for regular table inserts, not schema modifications
let root_page = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
cursor.root_page()
};
if root_page != 1 {
state.op_insert_state = OpInsertState::UpdateLastRowid;
} else {
state.pc += 1;
}
let prev_changes = program.n_change.get();
program.n_change.set(prev_changes + 1);
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}

View File

@@ -107,7 +107,7 @@ impl IdxInsertFlags {
pub struct InsertFlags(pub u8);
impl InsertFlags {
pub const UPDATE: u8 = 0x01; // Flag indicating this is part of an UPDATE statement
pub const UPDATE_ROWID_CHANGE: u8 = 0x01; // Flag indicating this is part of an UPDATE statement where the row's rowid is changed
pub const REQUIRE_SEEK: u8 = 0x02; // Flag indicating that a seek is required to insert the row
pub fn new() -> Self {
@@ -123,8 +123,8 @@ impl InsertFlags {
self
}
pub fn update(mut self) -> Self {
self.0 |= InsertFlags::UPDATE;
pub fn update_rowid_change(mut self) -> Self {
self.0 |= InsertFlags::UPDATE_ROWID_CHANGE;
self
}
}

View File

@@ -33,7 +33,8 @@ use crate::{
translate::plan::TableReferences,
types::{IOResult, RawSlice, TextRef},
vdbe::execute::{
OpIdxInsertState, OpInsertState, OpNewRowidState, OpNoConflictState, OpSeekState,
OpIdxInsertState, OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState,
OpSeekState,
},
RefValue,
};
@@ -291,7 +292,10 @@ impl ProgramState {
op_open_ephemeral_state: OpOpenEphemeralState::Start,
op_new_rowid_state: OpNewRowidState::Start,
op_idx_insert_state: OpIdxInsertState::SeekIfUnique,
op_insert_state: OpInsertState::Insert,
op_insert_state: OpInsertState {
sub_state: OpInsertSubState::MaybeCaptureRecord,
old_record: None,
},
op_no_conflict_state: OpNoConflictState::Start,
seek_state: OpSeekState::Start,
current_collation: None,