mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-22 16:35:30 +01:00
fix/vdbe: fix state handling for incremental views
- When the rowid is changed in UPDATE, it is handled as a combination of DELETE + INSERT, so we dont need to delete the old values in that case - We should only update the views after the operation on the btree is done - A proper state machine is needed to handle IO yielding points
This commit is contained in:
@@ -1189,9 +1189,9 @@ fn emit_update_insns(
|
|||||||
flag: if has_user_provided_rowid {
|
flag: if has_user_provided_rowid {
|
||||||
// The previous Insn::NotExists and Insn::Delete seek to the old rowid,
|
// The previous Insn::NotExists and Insn::Delete seek to the old rowid,
|
||||||
// so to insert a new user-provided rowid, we need to seek to the correct place.
|
// so to insert a new user-provided rowid, we need to seek to the correct place.
|
||||||
InsertFlags::new().require_seek().update()
|
InsertFlags::new().require_seek().update_rowid_change()
|
||||||
} else {
|
} else {
|
||||||
InsertFlags::new().update()
|
InsertFlags::new()
|
||||||
},
|
},
|
||||||
table_name: table_ref.identifier.clone(),
|
table_name: table_ref.identifier.clone(),
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -5099,12 +5099,22 @@ pub fn op_yield(
|
|||||||
Ok(InsnFunctionStepResult::Step)
|
Ok(InsnFunctionStepResult::Step)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct OpInsertState {
|
||||||
|
pub sub_state: OpInsertSubState,
|
||||||
|
pub old_record: Option<(i64, Vec<Value>)>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Copy, Clone)]
|
#[derive(Debug, PartialEq, Copy, Clone)]
|
||||||
pub enum OpInsertState {
|
pub enum OpInsertSubState {
|
||||||
|
/// If this insert overwrites a record, capture the old record for incremental view maintenance.
|
||||||
|
MaybeCaptureRecord,
|
||||||
|
/// Insert the row into the table.
|
||||||
Insert,
|
Insert,
|
||||||
/// Updating last_insert_rowid may return IO, so we need a separate state for it so that we don't
|
/// Updating last_insert_rowid may return IO, so we need a separate state for it so that we don't
|
||||||
/// start inserting the same row multiple times.
|
/// start inserting the same row multiple times.
|
||||||
UpdateLastRowid,
|
UpdateLastRowid,
|
||||||
|
/// If there are dependent incremental views, apply the change.
|
||||||
|
ApplyViewChange,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn op_insert(
|
pub fn op_insert(
|
||||||
@@ -5125,54 +5135,153 @@ pub fn op_insert(
|
|||||||
insn
|
insn
|
||||||
);
|
);
|
||||||
|
|
||||||
if state.op_insert_state == OpInsertState::UpdateLastRowid {
|
loop {
|
||||||
let maybe_rowid = {
|
match &state.op_insert_state.sub_state {
|
||||||
let mut cursor = state.get_cursor(*cursor_id);
|
OpInsertSubState::MaybeCaptureRecord => {
|
||||||
let cursor = cursor.as_btree_mut();
|
let schema = program.connection.schema.borrow();
|
||||||
return_if_io!(cursor.rowid())
|
let dependent_views = schema.get_dependent_views(table_name);
|
||||||
};
|
if dependent_views.is_empty() || !flag.has(InsertFlags::UPDATE_ROWID_CHANGE) {
|
||||||
if let Some(rowid) = maybe_rowid {
|
state.op_insert_state.sub_state = OpInsertSubState::Insert;
|
||||||
program.connection.update_last_rowid(rowid);
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
let prev_changes = program.n_change.get();
|
let old_record = {
|
||||||
program.n_change.set(prev_changes + 1);
|
let mut cursor = state.get_cursor(*cursor_id);
|
||||||
}
|
let cursor = cursor.as_btree_mut();
|
||||||
state.op_insert_state = OpInsertState::Insert;
|
// Get the current key
|
||||||
state.pc += 1;
|
let maybe_key = return_if_io!(cursor.rowid());
|
||||||
return Ok(InsnFunctionStepResult::Step);
|
let key = maybe_key.ok_or_else(|| {
|
||||||
}
|
LimboError::InternalError("Cannot delete: no current row".to_string())
|
||||||
|
})?;
|
||||||
|
// Get the current record before deletion and extract values
|
||||||
|
let maybe_record = return_if_io!(cursor.record());
|
||||||
|
if let Some(record) = maybe_record {
|
||||||
|
let mut values = record
|
||||||
|
.get_values()
|
||||||
|
.into_iter()
|
||||||
|
.map(|v| v.to_owned())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
{
|
// Fix rowid alias columns: replace Null with actual rowid value
|
||||||
let mut cursor_ref = state.get_cursor(*cursor_id);
|
if let Some(table) = schema.get_table(table_name) {
|
||||||
let cursor = cursor_ref.as_btree_mut();
|
for (i, col) in table.columns().iter().enumerate() {
|
||||||
|
if col.is_rowid_alias && i < values.len() {
|
||||||
|
values[i] = Value::Integer(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some((key, values))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let key = match &state.registers[*key_reg].get_owned_value() {
|
state.op_insert_state.old_record = old_record;
|
||||||
Value::Integer(i) => *i,
|
state.op_insert_state.sub_state = OpInsertSubState::Insert;
|
||||||
_ => unreachable!("expected integer key"),
|
continue;
|
||||||
};
|
|
||||||
|
|
||||||
let record = match &state.registers[*record_reg] {
|
|
||||||
Register::Record(r) => std::borrow::Cow::Borrowed(r),
|
|
||||||
Register::Value(value) => {
|
|
||||||
let x = 1;
|
|
||||||
let regs = &state.registers[*record_reg..*record_reg + 1];
|
|
||||||
let new_regs = [&state.registers[*record_reg]];
|
|
||||||
let record = ImmutableRecord::from_registers(new_regs, new_regs.len());
|
|
||||||
std::borrow::Cow::Owned(record)
|
|
||||||
}
|
}
|
||||||
Register::Aggregate(..) => unreachable!("Cannot insert an aggregate value."),
|
OpInsertSubState::Insert => {
|
||||||
};
|
let key = match &state.registers[*key_reg].get_owned_value() {
|
||||||
|
Value::Integer(i) => *i,
|
||||||
|
_ => unreachable!("expected integer key"),
|
||||||
|
};
|
||||||
|
|
||||||
// Update dependent views for incremental computation
|
let record = match &state.registers[*record_reg] {
|
||||||
let schema = program.connection.schema.borrow();
|
Register::Record(r) => std::borrow::Cow::Borrowed(r),
|
||||||
let dependent_views = schema.get_dependent_views(table_name);
|
Register::Value(value) => {
|
||||||
|
let x = 1;
|
||||||
|
let regs = &state.registers[*record_reg..*record_reg + 1];
|
||||||
|
let new_regs = [&state.registers[*record_reg]];
|
||||||
|
let record = ImmutableRecord::from_registers(new_regs, new_regs.len());
|
||||||
|
std::borrow::Cow::Owned(record)
|
||||||
|
}
|
||||||
|
Register::Aggregate(..) => unreachable!("Cannot insert an aggregate value."),
|
||||||
|
};
|
||||||
|
{
|
||||||
|
let mut cursor = state.get_cursor(*cursor_id);
|
||||||
|
let cursor = cursor.as_btree_mut();
|
||||||
|
|
||||||
if !dependent_views.is_empty() {
|
// In a table insert, if the caller does not pass InsertFlags::REQUIRE_SEEK, they must ensure that a seek has already happened to the correct location.
|
||||||
// If this is an UPDATE operation, first capture and delete the old row data
|
// This typically happens by invoking either Insn::NewRowid or Insn::NotExists, because:
|
||||||
if flag.has(InsertFlags::UPDATE) {
|
// 1. op_new_rowid() seeks to the end of the table, which is the correct insertion position.
|
||||||
// Get the old record before it's overwritten
|
// 2. op_not_exists() seeks to the position in the table where the target rowid would be inserted.
|
||||||
let old_record_values = if let Some(old_record) = return_if_io!(cursor.record()) {
|
let moved_before = !flag.has(InsertFlags::REQUIRE_SEEK);
|
||||||
let mut values = old_record
|
return_if_io!(cursor.insert(
|
||||||
|
&BTreeKey::new_table_rowid(key, Some(record.as_ref())),
|
||||||
|
moved_before
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only update last_insert_rowid for regular table inserts, not schema modifications
|
||||||
|
let root_page = {
|
||||||
|
let mut cursor = state.get_cursor(*cursor_id);
|
||||||
|
let cursor = cursor.as_btree_mut();
|
||||||
|
cursor.root_page()
|
||||||
|
};
|
||||||
|
if root_page != 1 {
|
||||||
|
state.op_insert_state.sub_state = OpInsertSubState::UpdateLastRowid;
|
||||||
|
} else {
|
||||||
|
let schema = program.connection.schema.borrow();
|
||||||
|
let dependent_views = schema.get_dependent_views(table_name);
|
||||||
|
if !dependent_views.is_empty() {
|
||||||
|
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
|
||||||
|
} else {
|
||||||
|
state.op_insert_state.sub_state = OpInsertSubState::MaybeCaptureRecord;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
OpInsertSubState::UpdateLastRowid => {
|
||||||
|
let maybe_rowid = {
|
||||||
|
let mut cursor = state.get_cursor(*cursor_id);
|
||||||
|
let cursor = cursor.as_btree_mut();
|
||||||
|
return_if_io!(cursor.rowid())
|
||||||
|
};
|
||||||
|
if let Some(rowid) = maybe_rowid {
|
||||||
|
program.connection.update_last_rowid(rowid);
|
||||||
|
|
||||||
|
let prev_changes = program.n_change.get();
|
||||||
|
program.n_change.set(prev_changes + 1);
|
||||||
|
}
|
||||||
|
let schema = program.connection.schema.borrow();
|
||||||
|
let dependent_views = schema.get_dependent_views(table_name);
|
||||||
|
if !dependent_views.is_empty() {
|
||||||
|
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
state.op_insert_state.sub_state = OpInsertSubState::MaybeCaptureRecord;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
OpInsertSubState::ApplyViewChange => {
|
||||||
|
let schema = program.connection.schema.borrow();
|
||||||
|
let dependent_views = schema.get_dependent_views(table_name);
|
||||||
|
assert!(!dependent_views.is_empty());
|
||||||
|
|
||||||
|
let (key, values) = {
|
||||||
|
let mut cursor = state.get_cursor(*cursor_id);
|
||||||
|
let cursor = cursor.as_btree_mut();
|
||||||
|
|
||||||
|
let key = match &state.registers[*key_reg].get_owned_value() {
|
||||||
|
Value::Integer(i) => *i,
|
||||||
|
_ => unreachable!("expected integer key"),
|
||||||
|
};
|
||||||
|
|
||||||
|
let record = match &state.registers[*record_reg] {
|
||||||
|
Register::Record(r) => std::borrow::Cow::Borrowed(r),
|
||||||
|
Register::Value(value) => {
|
||||||
|
let x = 1;
|
||||||
|
let regs = &state.registers[*record_reg..*record_reg + 1];
|
||||||
|
let new_regs = [&state.registers[*record_reg]];
|
||||||
|
let record = ImmutableRecord::from_registers(new_regs, new_regs.len());
|
||||||
|
std::borrow::Cow::Owned(record)
|
||||||
|
}
|
||||||
|
Register::Aggregate(..) => {
|
||||||
|
unreachable!("Cannot insert an aggregate value.")
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Add insertion of new row to view deltas
|
||||||
|
let mut new_values = record
|
||||||
.get_values()
|
.get_values()
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|v| v.to_owned())
|
.map(|v| v.to_owned())
|
||||||
@@ -5182,75 +5291,37 @@ pub fn op_insert(
|
|||||||
let schema = program.connection.schema.borrow();
|
let schema = program.connection.schema.borrow();
|
||||||
if let Some(table) = schema.get_table(table_name) {
|
if let Some(table) = schema.get_table(table_name) {
|
||||||
for (i, col) in table.columns().iter().enumerate() {
|
for (i, col) in table.columns().iter().enumerate() {
|
||||||
if col.is_rowid_alias && i < values.len() {
|
if col.is_rowid_alias && i < new_values.len() {
|
||||||
values[i] = Value::Integer(key);
|
new_values[i] = Value::Integer(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
drop(schema);
|
|
||||||
|
|
||||||
Some(values)
|
(key, new_values)
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Add deletion of old row to view deltas
|
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
|
||||||
if let Some(old_values) = old_record_values {
|
if let Some((key, values)) = state.op_insert_state.old_record.take() {
|
||||||
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
|
for view_name in dependent_views.iter() {
|
||||||
for view_name in &dependent_views {
|
|
||||||
let tx_state = tx_states.entry(view_name.clone()).or_default();
|
let tx_state = tx_states.entry(view_name.clone()).or_default();
|
||||||
tx_state.delta.delete(key, old_values.clone());
|
tx_state.delta.delete(key, values.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
for view_name in dependent_views.iter() {
|
||||||
|
let tx_state = tx_states.entry(view_name.clone()).or_default();
|
||||||
|
|
||||||
// Add insertion of new row to view deltas
|
tx_state.delta.insert(key, values.clone());
|
||||||
let mut new_values = record
|
|
||||||
.get_values()
|
|
||||||
.into_iter()
|
|
||||||
.map(|v| v.to_owned())
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
// Fix rowid alias columns: replace Null with actual rowid value
|
|
||||||
let schema = program.connection.schema.borrow();
|
|
||||||
if let Some(table) = schema.get_table(table_name) {
|
|
||||||
for (i, col) in table.columns().iter().enumerate() {
|
|
||||||
if col.is_rowid_alias && i < new_values.len() {
|
|
||||||
new_values[i] = Value::Integer(key);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
drop(schema);
|
|
||||||
|
|
||||||
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
|
state.op_insert_state.sub_state = OpInsertSubState::MaybeCaptureRecord;
|
||||||
for view_name in dependent_views {
|
break;
|
||||||
let tx_state = tx_states.entry(view_name.clone()).or_default();
|
|
||||||
tx_state.delta.insert(key, new_values.clone());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// In a table insert, if the caller does not pass InsertFlags::REQUIRE_SEEK, they must ensure that a seek has already happened to the correct location.
|
|
||||||
// This typically happens by invoking either Insn::NewRowid or Insn::NotExists, because:
|
|
||||||
// 1. op_new_rowid() seeks to the end of the table, which is the correct insertion position.
|
|
||||||
// 2. op_not_exists() seeks to the position in the table where the target rowid would be inserted.
|
|
||||||
let moved_before = !flag.has(InsertFlags::REQUIRE_SEEK);
|
|
||||||
return_if_io!(cursor.insert(
|
|
||||||
&BTreeKey::new_table_rowid(key, Some(record.as_ref())),
|
|
||||||
moved_before
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only update last_insert_rowid for regular table inserts, not schema modifications
|
let prev_changes = program.n_change.get();
|
||||||
let root_page = {
|
program.n_change.set(prev_changes + 1);
|
||||||
let mut cursor = state.get_cursor(*cursor_id);
|
state.pc += 1;
|
||||||
let cursor = cursor.as_btree_mut();
|
|
||||||
cursor.root_page()
|
|
||||||
};
|
|
||||||
if root_page != 1 {
|
|
||||||
state.op_insert_state = OpInsertState::UpdateLastRowid;
|
|
||||||
} else {
|
|
||||||
state.pc += 1;
|
|
||||||
}
|
|
||||||
Ok(InsnFunctionStepResult::Step)
|
Ok(InsnFunctionStepResult::Step)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -107,7 +107,7 @@ impl IdxInsertFlags {
|
|||||||
pub struct InsertFlags(pub u8);
|
pub struct InsertFlags(pub u8);
|
||||||
|
|
||||||
impl InsertFlags {
|
impl InsertFlags {
|
||||||
pub const UPDATE: u8 = 0x01; // Flag indicating this is part of an UPDATE statement
|
pub const UPDATE_ROWID_CHANGE: u8 = 0x01; // Flag indicating this is part of an UPDATE statement where the row's rowid is changed
|
||||||
pub const REQUIRE_SEEK: u8 = 0x02; // Flag indicating that a seek is required to insert the row
|
pub const REQUIRE_SEEK: u8 = 0x02; // Flag indicating that a seek is required to insert the row
|
||||||
|
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
@@ -123,8 +123,8 @@ impl InsertFlags {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update(mut self) -> Self {
|
pub fn update_rowid_change(mut self) -> Self {
|
||||||
self.0 |= InsertFlags::UPDATE;
|
self.0 |= InsertFlags::UPDATE_ROWID_CHANGE;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,7 +33,8 @@ use crate::{
|
|||||||
translate::plan::TableReferences,
|
translate::plan::TableReferences,
|
||||||
types::{IOResult, RawSlice, TextRef},
|
types::{IOResult, RawSlice, TextRef},
|
||||||
vdbe::execute::{
|
vdbe::execute::{
|
||||||
OpIdxInsertState, OpInsertState, OpNewRowidState, OpNoConflictState, OpSeekState,
|
OpIdxInsertState, OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState,
|
||||||
|
OpSeekState,
|
||||||
},
|
},
|
||||||
RefValue,
|
RefValue,
|
||||||
};
|
};
|
||||||
@@ -291,7 +292,10 @@ impl ProgramState {
|
|||||||
op_open_ephemeral_state: OpOpenEphemeralState::Start,
|
op_open_ephemeral_state: OpOpenEphemeralState::Start,
|
||||||
op_new_rowid_state: OpNewRowidState::Start,
|
op_new_rowid_state: OpNewRowidState::Start,
|
||||||
op_idx_insert_state: OpIdxInsertState::SeekIfUnique,
|
op_idx_insert_state: OpIdxInsertState::SeekIfUnique,
|
||||||
op_insert_state: OpInsertState::Insert,
|
op_insert_state: OpInsertState {
|
||||||
|
sub_state: OpInsertSubState::MaybeCaptureRecord,
|
||||||
|
old_record: None,
|
||||||
|
},
|
||||||
op_no_conflict_state: OpNoConflictState::Start,
|
op_no_conflict_state: OpNoConflictState::Start,
|
||||||
seek_state: OpSeekState::Start,
|
seek_state: OpSeekState::Start,
|
||||||
current_collation: None,
|
current_collation: None,
|
||||||
|
|||||||
Reference in New Issue
Block a user