mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-01 14:24:23 +01:00
fix/vdbe: fix state handling for incremental views
- When the rowid is changed in UPDATE, it is handled as a combination of DELETE + INSERT, so we dont need to delete the old values in that case - We should only update the views after the operation on the btree is done - A proper state machine is needed to handle IO yielding points
This commit is contained in:
@@ -1189,9 +1189,9 @@ fn emit_update_insns(
|
||||
flag: if has_user_provided_rowid {
|
||||
// The previous Insn::NotExists and Insn::Delete seek to the old rowid,
|
||||
// so to insert a new user-provided rowid, we need to seek to the correct place.
|
||||
InsertFlags::new().require_seek().update()
|
||||
InsertFlags::new().require_seek().update_rowid_change()
|
||||
} else {
|
||||
InsertFlags::new().update()
|
||||
InsertFlags::new()
|
||||
},
|
||||
table_name: table_ref.identifier.clone(),
|
||||
});
|
||||
|
||||
@@ -5099,12 +5099,22 @@ pub fn op_yield(
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
pub struct OpInsertState {
|
||||
pub sub_state: OpInsertSubState,
|
||||
pub old_record: Option<(i64, Vec<Value>)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Copy, Clone)]
|
||||
pub enum OpInsertState {
|
||||
pub enum OpInsertSubState {
|
||||
/// If this insert overwrites a record, capture the old record for incremental view maintenance.
|
||||
MaybeCaptureRecord,
|
||||
/// Insert the row into the table.
|
||||
Insert,
|
||||
/// Updating last_insert_rowid may return IO, so we need a separate state for it so that we don't
|
||||
/// start inserting the same row multiple times.
|
||||
UpdateLastRowid,
|
||||
/// If there are dependent incremental views, apply the change.
|
||||
ApplyViewChange,
|
||||
}
|
||||
|
||||
pub fn op_insert(
|
||||
@@ -5125,54 +5135,153 @@ pub fn op_insert(
|
||||
insn
|
||||
);
|
||||
|
||||
if state.op_insert_state == OpInsertState::UpdateLastRowid {
|
||||
let maybe_rowid = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
return_if_io!(cursor.rowid())
|
||||
};
|
||||
if let Some(rowid) = maybe_rowid {
|
||||
program.connection.update_last_rowid(rowid);
|
||||
loop {
|
||||
match &state.op_insert_state.sub_state {
|
||||
OpInsertSubState::MaybeCaptureRecord => {
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
if dependent_views.is_empty() || !flag.has(InsertFlags::UPDATE_ROWID_CHANGE) {
|
||||
state.op_insert_state.sub_state = OpInsertSubState::Insert;
|
||||
continue;
|
||||
}
|
||||
|
||||
let prev_changes = program.n_change.get();
|
||||
program.n_change.set(prev_changes + 1);
|
||||
}
|
||||
state.op_insert_state = OpInsertState::Insert;
|
||||
state.pc += 1;
|
||||
return Ok(InsnFunctionStepResult::Step);
|
||||
}
|
||||
let old_record = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
// Get the current key
|
||||
let maybe_key = return_if_io!(cursor.rowid());
|
||||
let key = maybe_key.ok_or_else(|| {
|
||||
LimboError::InternalError("Cannot delete: no current row".to_string())
|
||||
})?;
|
||||
// Get the current record before deletion and extract values
|
||||
let maybe_record = return_if_io!(cursor.record());
|
||||
if let Some(record) = maybe_record {
|
||||
let mut values = record
|
||||
.get_values()
|
||||
.into_iter()
|
||||
.map(|v| v.to_owned())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
{
|
||||
let mut cursor_ref = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor_ref.as_btree_mut();
|
||||
// Fix rowid alias columns: replace Null with actual rowid value
|
||||
if let Some(table) = schema.get_table(table_name) {
|
||||
for (i, col) in table.columns().iter().enumerate() {
|
||||
if col.is_rowid_alias && i < values.len() {
|
||||
values[i] = Value::Integer(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
Some((key, values))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let key = match &state.registers[*key_reg].get_owned_value() {
|
||||
Value::Integer(i) => *i,
|
||||
_ => unreachable!("expected integer key"),
|
||||
};
|
||||
|
||||
let record = match &state.registers[*record_reg] {
|
||||
Register::Record(r) => std::borrow::Cow::Borrowed(r),
|
||||
Register::Value(value) => {
|
||||
let x = 1;
|
||||
let regs = &state.registers[*record_reg..*record_reg + 1];
|
||||
let new_regs = [&state.registers[*record_reg]];
|
||||
let record = ImmutableRecord::from_registers(new_regs, new_regs.len());
|
||||
std::borrow::Cow::Owned(record)
|
||||
state.op_insert_state.old_record = old_record;
|
||||
state.op_insert_state.sub_state = OpInsertSubState::Insert;
|
||||
continue;
|
||||
}
|
||||
Register::Aggregate(..) => unreachable!("Cannot insert an aggregate value."),
|
||||
};
|
||||
OpInsertSubState::Insert => {
|
||||
let key = match &state.registers[*key_reg].get_owned_value() {
|
||||
Value::Integer(i) => *i,
|
||||
_ => unreachable!("expected integer key"),
|
||||
};
|
||||
|
||||
// Update dependent views for incremental computation
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
let record = match &state.registers[*record_reg] {
|
||||
Register::Record(r) => std::borrow::Cow::Borrowed(r),
|
||||
Register::Value(value) => {
|
||||
let x = 1;
|
||||
let regs = &state.registers[*record_reg..*record_reg + 1];
|
||||
let new_regs = [&state.registers[*record_reg]];
|
||||
let record = ImmutableRecord::from_registers(new_regs, new_regs.len());
|
||||
std::borrow::Cow::Owned(record)
|
||||
}
|
||||
Register::Aggregate(..) => unreachable!("Cannot insert an aggregate value."),
|
||||
};
|
||||
{
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
|
||||
if !dependent_views.is_empty() {
|
||||
// If this is an UPDATE operation, first capture and delete the old row data
|
||||
if flag.has(InsertFlags::UPDATE) {
|
||||
// Get the old record before it's overwritten
|
||||
let old_record_values = if let Some(old_record) = return_if_io!(cursor.record()) {
|
||||
let mut values = old_record
|
||||
// In a table insert, if the caller does not pass InsertFlags::REQUIRE_SEEK, they must ensure that a seek has already happened to the correct location.
|
||||
// This typically happens by invoking either Insn::NewRowid or Insn::NotExists, because:
|
||||
// 1. op_new_rowid() seeks to the end of the table, which is the correct insertion position.
|
||||
// 2. op_not_exists() seeks to the position in the table where the target rowid would be inserted.
|
||||
let moved_before = !flag.has(InsertFlags::REQUIRE_SEEK);
|
||||
return_if_io!(cursor.insert(
|
||||
&BTreeKey::new_table_rowid(key, Some(record.as_ref())),
|
||||
moved_before
|
||||
));
|
||||
}
|
||||
|
||||
// Only update last_insert_rowid for regular table inserts, not schema modifications
|
||||
let root_page = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
cursor.root_page()
|
||||
};
|
||||
if root_page != 1 {
|
||||
state.op_insert_state.sub_state = OpInsertSubState::UpdateLastRowid;
|
||||
} else {
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
if !dependent_views.is_empty() {
|
||||
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
|
||||
} else {
|
||||
state.op_insert_state.sub_state = OpInsertSubState::MaybeCaptureRecord;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
OpInsertSubState::UpdateLastRowid => {
|
||||
let maybe_rowid = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
return_if_io!(cursor.rowid())
|
||||
};
|
||||
if let Some(rowid) = maybe_rowid {
|
||||
program.connection.update_last_rowid(rowid);
|
||||
|
||||
let prev_changes = program.n_change.get();
|
||||
program.n_change.set(prev_changes + 1);
|
||||
}
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
if !dependent_views.is_empty() {
|
||||
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
|
||||
continue;
|
||||
}
|
||||
state.op_insert_state.sub_state = OpInsertSubState::MaybeCaptureRecord;
|
||||
break;
|
||||
}
|
||||
OpInsertSubState::ApplyViewChange => {
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
assert!(!dependent_views.is_empty());
|
||||
|
||||
let (key, values) = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
|
||||
let key = match &state.registers[*key_reg].get_owned_value() {
|
||||
Value::Integer(i) => *i,
|
||||
_ => unreachable!("expected integer key"),
|
||||
};
|
||||
|
||||
let record = match &state.registers[*record_reg] {
|
||||
Register::Record(r) => std::borrow::Cow::Borrowed(r),
|
||||
Register::Value(value) => {
|
||||
let x = 1;
|
||||
let regs = &state.registers[*record_reg..*record_reg + 1];
|
||||
let new_regs = [&state.registers[*record_reg]];
|
||||
let record = ImmutableRecord::from_registers(new_regs, new_regs.len());
|
||||
std::borrow::Cow::Owned(record)
|
||||
}
|
||||
Register::Aggregate(..) => {
|
||||
unreachable!("Cannot insert an aggregate value.")
|
||||
}
|
||||
};
|
||||
|
||||
// Add insertion of new row to view deltas
|
||||
let mut new_values = record
|
||||
.get_values()
|
||||
.into_iter()
|
||||
.map(|v| v.to_owned())
|
||||
@@ -5182,75 +5291,37 @@ pub fn op_insert(
|
||||
let schema = program.connection.schema.borrow();
|
||||
if let Some(table) = schema.get_table(table_name) {
|
||||
for (i, col) in table.columns().iter().enumerate() {
|
||||
if col.is_rowid_alias && i < values.len() {
|
||||
values[i] = Value::Integer(key);
|
||||
if col.is_rowid_alias && i < new_values.len() {
|
||||
new_values[i] = Value::Integer(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(schema);
|
||||
|
||||
Some(values)
|
||||
} else {
|
||||
None
|
||||
(key, new_values)
|
||||
};
|
||||
|
||||
// Add deletion of old row to view deltas
|
||||
if let Some(old_values) = old_record_values {
|
||||
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
|
||||
for view_name in &dependent_views {
|
||||
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
|
||||
if let Some((key, values)) = state.op_insert_state.old_record.take() {
|
||||
for view_name in dependent_views.iter() {
|
||||
let tx_state = tx_states.entry(view_name.clone()).or_default();
|
||||
tx_state.delta.delete(key, old_values.clone());
|
||||
tx_state.delta.delete(key, values.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
for view_name in dependent_views.iter() {
|
||||
let tx_state = tx_states.entry(view_name.clone()).or_default();
|
||||
|
||||
// Add insertion of new row to view deltas
|
||||
let mut new_values = record
|
||||
.get_values()
|
||||
.into_iter()
|
||||
.map(|v| v.to_owned())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Fix rowid alias columns: replace Null with actual rowid value
|
||||
let schema = program.connection.schema.borrow();
|
||||
if let Some(table) = schema.get_table(table_name) {
|
||||
for (i, col) in table.columns().iter().enumerate() {
|
||||
if col.is_rowid_alias && i < new_values.len() {
|
||||
new_values[i] = Value::Integer(key);
|
||||
}
|
||||
tx_state.delta.insert(key, values.clone());
|
||||
}
|
||||
}
|
||||
drop(schema);
|
||||
|
||||
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
|
||||
for view_name in dependent_views {
|
||||
let tx_state = tx_states.entry(view_name.clone()).or_default();
|
||||
tx_state.delta.insert(key, new_values.clone());
|
||||
state.op_insert_state.sub_state = OpInsertSubState::MaybeCaptureRecord;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// In a table insert, if the caller does not pass InsertFlags::REQUIRE_SEEK, they must ensure that a seek has already happened to the correct location.
|
||||
// This typically happens by invoking either Insn::NewRowid or Insn::NotExists, because:
|
||||
// 1. op_new_rowid() seeks to the end of the table, which is the correct insertion position.
|
||||
// 2. op_not_exists() seeks to the position in the table where the target rowid would be inserted.
|
||||
let moved_before = !flag.has(InsertFlags::REQUIRE_SEEK);
|
||||
return_if_io!(cursor.insert(
|
||||
&BTreeKey::new_table_rowid(key, Some(record.as_ref())),
|
||||
moved_before
|
||||
));
|
||||
}
|
||||
|
||||
// Only update last_insert_rowid for regular table inserts, not schema modifications
|
||||
let root_page = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
cursor.root_page()
|
||||
};
|
||||
if root_page != 1 {
|
||||
state.op_insert_state = OpInsertState::UpdateLastRowid;
|
||||
} else {
|
||||
state.pc += 1;
|
||||
}
|
||||
let prev_changes = program.n_change.get();
|
||||
program.n_change.set(prev_changes + 1);
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
|
||||
@@ -107,7 +107,7 @@ impl IdxInsertFlags {
|
||||
pub struct InsertFlags(pub u8);
|
||||
|
||||
impl InsertFlags {
|
||||
pub const UPDATE: u8 = 0x01; // Flag indicating this is part of an UPDATE statement
|
||||
pub const UPDATE_ROWID_CHANGE: u8 = 0x01; // Flag indicating this is part of an UPDATE statement where the row's rowid is changed
|
||||
pub const REQUIRE_SEEK: u8 = 0x02; // Flag indicating that a seek is required to insert the row
|
||||
|
||||
pub fn new() -> Self {
|
||||
@@ -123,8 +123,8 @@ impl InsertFlags {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn update(mut self) -> Self {
|
||||
self.0 |= InsertFlags::UPDATE;
|
||||
pub fn update_rowid_change(mut self) -> Self {
|
||||
self.0 |= InsertFlags::UPDATE_ROWID_CHANGE;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,8 @@ use crate::{
|
||||
translate::plan::TableReferences,
|
||||
types::{IOResult, RawSlice, TextRef},
|
||||
vdbe::execute::{
|
||||
OpIdxInsertState, OpInsertState, OpNewRowidState, OpNoConflictState, OpSeekState,
|
||||
OpIdxInsertState, OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState,
|
||||
OpSeekState,
|
||||
},
|
||||
RefValue,
|
||||
};
|
||||
@@ -291,7 +292,10 @@ impl ProgramState {
|
||||
op_open_ephemeral_state: OpOpenEphemeralState::Start,
|
||||
op_new_rowid_state: OpNewRowidState::Start,
|
||||
op_idx_insert_state: OpIdxInsertState::SeekIfUnique,
|
||||
op_insert_state: OpInsertState::Insert,
|
||||
op_insert_state: OpInsertState {
|
||||
sub_state: OpInsertSubState::MaybeCaptureRecord,
|
||||
old_record: None,
|
||||
},
|
||||
op_no_conflict_state: OpNoConflictState::Start,
|
||||
seek_state: OpSeekState::Start,
|
||||
current_collation: None,
|
||||
|
||||
Reference in New Issue
Block a user