mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-11 03:04:22 +01:00
fix/vdbe: fix state handling for incremental views in op_delete
This commit is contained in:
@@ -5346,6 +5346,20 @@ pub fn op_int_64(
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
pub struct OpDeleteState {
|
||||
pub sub_state: OpDeleteSubState,
|
||||
pub deleted_record: Option<(i64, Vec<Value>)>,
|
||||
}
|
||||
|
||||
pub enum OpDeleteSubState {
|
||||
/// Capture the record before deletion, if the are dependent views.
|
||||
MaybeCaptureRecord,
|
||||
/// Delete the record.
|
||||
Delete,
|
||||
/// Apply the change to the dependent views.
|
||||
ApplyViewChange,
|
||||
}
|
||||
|
||||
pub fn op_delete(
|
||||
program: &Program,
|
||||
state: &mut ProgramState,
|
||||
@@ -5361,59 +5375,83 @@ pub fn op_delete(
|
||||
insn
|
||||
);
|
||||
|
||||
// Capture row data before deletion for view updates
|
||||
let record_key_and_values = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
loop {
|
||||
match &state.op_delete_state.sub_state {
|
||||
OpDeleteSubState::MaybeCaptureRecord => {
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
if dependent_views.is_empty() {
|
||||
state.op_delete_state.sub_state = OpDeleteSubState::Delete;
|
||||
continue;
|
||||
}
|
||||
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
let result = if !dependent_views.is_empty() {
|
||||
// Get the current key
|
||||
let maybe_key = return_if_io!(cursor.rowid());
|
||||
let key = maybe_key.ok_or_else(|| {
|
||||
LimboError::InternalError("Cannot delete: no current row".to_string())
|
||||
})?;
|
||||
// Get the current record before deletion and extract values
|
||||
if let Some(record) = return_if_io!(cursor.record()) {
|
||||
let mut values = record
|
||||
.get_values()
|
||||
.into_iter()
|
||||
.map(|v| v.to_owned())
|
||||
.collect::<Vec<_>>();
|
||||
let deleted_record = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
// Get the current key
|
||||
let maybe_key = return_if_io!(cursor.rowid());
|
||||
let key = maybe_key.ok_or_else(|| {
|
||||
LimboError::InternalError("Cannot delete: no current row".to_string())
|
||||
})?;
|
||||
// Get the current record before deletion and extract values
|
||||
let maybe_record = return_if_io!(cursor.record());
|
||||
if let Some(record) = maybe_record {
|
||||
let mut values = record
|
||||
.get_values()
|
||||
.into_iter()
|
||||
.map(|v| v.to_owned())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Fix rowid alias columns: replace Null with actual rowid value
|
||||
if let Some(table) = schema.get_table(table_name) {
|
||||
for (i, col) in table.columns().iter().enumerate() {
|
||||
if col.is_rowid_alias && i < values.len() {
|
||||
values[i] = Value::Integer(key);
|
||||
// Fix rowid alias columns: replace Null with actual rowid value
|
||||
if let Some(table) = schema.get_table(table_name) {
|
||||
for (i, col) in table.columns().iter().enumerate() {
|
||||
if col.is_rowid_alias && i < values.len() {
|
||||
values[i] = Value::Integer(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
Some((key, values))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
state.op_delete_state.deleted_record = deleted_record;
|
||||
state.op_delete_state.sub_state = OpDeleteSubState::Delete;
|
||||
continue;
|
||||
}
|
||||
OpDeleteSubState::Delete => {
|
||||
{
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
return_if_io!(cursor.delete());
|
||||
}
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
if dependent_views.is_empty() {
|
||||
break;
|
||||
}
|
||||
state.op_delete_state.sub_state = OpDeleteSubState::ApplyViewChange;
|
||||
continue;
|
||||
}
|
||||
OpDeleteSubState::ApplyViewChange => {
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
assert!(!dependent_views.is_empty());
|
||||
let maybe_deleted_record = state.op_delete_state.deleted_record.take();
|
||||
if let Some((key, values)) = maybe_deleted_record {
|
||||
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
|
||||
for view_name in dependent_views {
|
||||
let tx_state = tx_states.entry(view_name.clone()).or_default();
|
||||
tx_state.delta.delete(key, values.clone());
|
||||
}
|
||||
}
|
||||
Some((key, values))
|
||||
} else {
|
||||
None
|
||||
|
||||
state.op_delete_state.sub_state = OpDeleteSubState::MaybeCaptureRecord;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Now perform the deletion
|
||||
return_if_io!(cursor.delete());
|
||||
|
||||
result
|
||||
};
|
||||
|
||||
// Update dependent views for incremental computation
|
||||
if let Some((key, values)) = record_key_and_values {
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
|
||||
for view_name in dependent_views {
|
||||
let tx_state = tx_states.entry(view_name).or_default();
|
||||
tx_state.delta.delete(key, values.clone());
|
||||
}
|
||||
}
|
||||
|
||||
let prev_changes = program.n_change.get();
|
||||
program.n_change.set(prev_changes + 1);
|
||||
state.pc += 1;
|
||||
|
||||
@@ -29,12 +29,11 @@ use crate::{
|
||||
function::{AggFunc, FuncCtx},
|
||||
state_machine::StateTransition,
|
||||
storage::sqlite3_ondisk::SmallVec,
|
||||
translate::collate::CollationSeq,
|
||||
translate::plan::TableReferences,
|
||||
translate::{collate::CollationSeq, plan::TableReferences},
|
||||
types::{IOResult, RawSlice, TextRef},
|
||||
vdbe::execute::{
|
||||
OpIdxInsertState, OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState,
|
||||
OpSeekState,
|
||||
OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState, OpInsertSubState,
|
||||
OpNewRowidState, OpNoConflictState, OpSeekState,
|
||||
},
|
||||
RefValue,
|
||||
};
|
||||
@@ -254,6 +253,7 @@ pub struct ProgramState {
|
||||
commit_state: CommitState,
|
||||
#[cfg(feature = "json")]
|
||||
json_cache: JsonCacheCell,
|
||||
op_delete_state: OpDeleteState,
|
||||
op_idx_delete_state: Option<OpIdxDeleteState>,
|
||||
op_integrity_check_state: OpIntegrityCheckState,
|
||||
op_open_ephemeral_state: OpOpenEphemeralState,
|
||||
@@ -287,6 +287,10 @@ impl ProgramState {
|
||||
commit_state: CommitState::Ready,
|
||||
#[cfg(feature = "json")]
|
||||
json_cache: JsonCacheCell::new(),
|
||||
op_delete_state: OpDeleteState {
|
||||
sub_state: OpDeleteSubState::MaybeCaptureRecord,
|
||||
deleted_record: None,
|
||||
},
|
||||
op_idx_delete_state: None,
|
||||
op_integrity_check_state: OpIntegrityCheckState::Start,
|
||||
op_open_ephemeral_state: OpOpenEphemeralState::Start,
|
||||
|
||||
Reference in New Issue
Block a user