mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-22 00:15:20 +01:00
VDBE: fix op_insert re-entrancy
when updating last_insert_rowid we call return_if_io!(cursor.rowid()) which yields IO on large records. this causes op_insert to insert and overwrite the same row many times. we need a state machine to ensure that the insertion only happens once and the reading of rowid can independently yield IO without causing a re-insert.
This commit is contained in:
@@ -4249,6 +4249,14 @@ pub fn op_yield(
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Copy, Clone)]
|
||||
pub enum OpInsertState {
|
||||
Insert,
|
||||
/// Updating last_insert_rowid may return IO, so we need a separate state for it so that we don't
|
||||
/// start inserting the same row multiple times.
|
||||
UpdateLastRowid,
|
||||
}
|
||||
|
||||
pub fn op_insert(
|
||||
program: &Program,
|
||||
state: &mut ProgramState,
|
||||
@@ -4257,7 +4265,7 @@ pub fn op_insert(
|
||||
mv_store: Option<&Rc<MvStore>>,
|
||||
) -> Result<InsnFunctionStepResult> {
|
||||
let Insn::Insert {
|
||||
cursor,
|
||||
cursor: cursor_id,
|
||||
key_reg,
|
||||
record_reg,
|
||||
flag,
|
||||
@@ -4266,9 +4274,27 @@ pub fn op_insert(
|
||||
else {
|
||||
unreachable!("unexpected Insn {:?}", insn)
|
||||
};
|
||||
|
||||
if state.op_insert_state == OpInsertState::UpdateLastRowid {
|
||||
let maybe_rowid = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
return_if_io!(cursor.rowid())
|
||||
};
|
||||
if let Some(rowid) = maybe_rowid {
|
||||
program.connection.update_last_rowid(rowid);
|
||||
|
||||
let prev_changes = program.n_change.get();
|
||||
program.n_change.set(prev_changes + 1);
|
||||
}
|
||||
state.op_insert_state = OpInsertState::Insert;
|
||||
state.pc += 1;
|
||||
return Ok(InsnFunctionStepResult::Step);
|
||||
}
|
||||
|
||||
{
|
||||
let mut cursor = state.get_cursor(*cursor);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let mut cursor_ref = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor_ref.as_btree_mut();
|
||||
|
||||
let key = match &state.registers[*key_reg].get_owned_value() {
|
||||
Value::Integer(i) => *i,
|
||||
@@ -4288,19 +4314,21 @@ pub fn op_insert(
|
||||
};
|
||||
|
||||
return_if_io!(cursor.insert(&BTreeKey::new_table_rowid(key, Some(record.as_ref())), true));
|
||||
// Only update last_insert_rowid for regular table inserts, not schema modifications
|
||||
if cursor.root_page() != 1 {
|
||||
if let Some(rowid) = return_if_io!(cursor.rowid()) {
|
||||
program.connection.update_last_rowid(rowid);
|
||||
|
||||
let prev_changes = program.n_change.get();
|
||||
program.n_change.set(prev_changes + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
// Only update last_insert_rowid for regular table inserts, not schema modifications
|
||||
let root_page = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
cursor.root_page()
|
||||
};
|
||||
if root_page != 1 {
|
||||
state.op_insert_state = OpInsertState::UpdateLastRowid;
|
||||
return Ok(InsnFunctionStepResult::Step);
|
||||
} else {
|
||||
state.pc += 1;
|
||||
return Ok(InsnFunctionStepResult::Step);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn op_int_64(
|
||||
|
||||
@@ -30,6 +30,7 @@ use crate::{
|
||||
storage::{pager::PagerCacheflushStatus, sqlite3_ondisk::SmallVec},
|
||||
translate::plan::TableReferences,
|
||||
vdbe::execute::OpIdxInsertState,
|
||||
vdbe::execute::OpInsertState,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
@@ -252,6 +253,7 @@ pub struct ProgramState {
|
||||
op_integrity_check_state: OpIntegrityCheckState,
|
||||
op_open_ephemeral_state: OpOpenEphemeralState,
|
||||
op_idx_insert_state: OpIdxInsertState,
|
||||
op_insert_state: OpInsertState,
|
||||
}
|
||||
|
||||
impl ProgramState {
|
||||
@@ -279,6 +281,7 @@ impl ProgramState {
|
||||
op_integrity_check_state: OpIntegrityCheckState::Start,
|
||||
op_open_ephemeral_state: OpOpenEphemeralState::Start,
|
||||
op_idx_insert_state: OpIdxInsertState::SeekIfUnique,
|
||||
op_insert_state: OpInsertState::Insert,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user