From c752058a97449762767f36ec9b05cb67b7feb6d2 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Wed, 9 Jul 2025 09:36:47 +0300 Subject: [PATCH] VDBE: introduce state machine for op_idx_insert for more granular IO control Separates cursor.key_exists_in_index() into a state machine. The problem with the main branch implementation is this: `return_if_io!(seek)` `return_if_io!(cursor.record())` The latter may yield on IO and cause the seek to start over, causing an infinite loop. With an explicit state machine we can control and prevent this. --- core/storage/btree.rs | 40 +--------- core/vdbe/execute.rs | 173 ++++++++++++++++++++++++++++-------------- core/vdbe/mod.rs | 3 + 3 files changed, 119 insertions(+), 97 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index f9426bd34..0631a784c 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -500,7 +500,7 @@ pub struct BTreeCursor { /// Colations for Index Btree constraint checks /// Contains the Collation Seq for the whole Index /// This Vec should be empty for Table Btree - collations: Vec, + pub collations: Vec, seek_state: CursorSeekState, /// Separate state to read a record with overflow pages. This separation from `state` is necessary as /// we can be in a function that relies on `state`, but also needs to process overflow pages @@ -4629,44 +4629,6 @@ impl BTreeCursor { self.null_flag } - /// Search for a key in an Index Btree. Looking up indexes that need to be unique, we cannot compare the rowid - #[instrument(skip_all, level = Level::INFO)] - pub fn key_exists_in_index(&mut self, key: &ImmutableRecord) -> Result> { - return_if_io!(self.seek(SeekKey::IndexKey(key), SeekOp::GE { eq_only: true })); - - let record_opt = return_if_io!(self.record()); - match record_opt.as_ref() { - Some(record) => { - // Existing record found; if the index has a rowid, exclude it from the comparison since it's a pointer to the table row; - // UNIQUE indexes disallow duplicates like (a=1,b=2,rowid=1) and (a=1,b=2,rowid=2). - let existing_key = if self.has_rowid() { - &record.get_values()[..record.count().saturating_sub(1)] - } else { - record.get_values() - }; - let inserted_key_vals = &key.get_values(); - // Need this check because .all returns True on an empty iterator, - // So when record_opt is invalidated, it would always indicate show up as a duplicate key - if existing_key.len() != inserted_key_vals.len() { - return Ok(CursorResult::Ok(false)); - } - - Ok(CursorResult::Ok( - compare_immutable( - existing_key, - inserted_key_vals, - self.key_sort_order(), - &self.collations, - ) == std::cmp::Ordering::Equal, - )) - } - None => { - // Cursor not pointing at a record — table is empty or past last - Ok(CursorResult::Ok(false)) - } - } - } - #[instrument(skip_all, level = Level::INFO)] pub fn exists(&mut self, key: &Value) -> Result> { assert!(self.mv_cursor.is_none()); diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 8cbbf486d..a23692559 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -4443,6 +4443,17 @@ pub fn op_idx_delete( } } +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum OpIdxInsertState { + /// Optional seek step done before an unique constraint check. + SeekIfUnique, + /// Optional unique constraint check done before an insert. + UniqueConstraintCheck, + /// Main insert step. This is always performed. Usually the state machine just + /// skips to this step unless the insertion is made into a unique index. + Insert { moved_before: bool }, +} + pub fn op_idx_insert( program: &Program, state: &mut ProgramState, @@ -4450,72 +4461,118 @@ pub fn op_idx_insert( pager: &Rc, mv_store: Option<&Rc>, ) -> Result { - if let Insn::IdxInsert { + let Insn::IdxInsert { cursor_id, record_reg, flags, .. } = *insn - { - let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap(); - let CursorType::BTreeIndex(index_meta) = cursor_type else { - panic!("IdxInsert: not a BTree index cursor"); - }; - 'block: { - let mut cursor = state.get_cursor(cursor_id); - let cursor = cursor.as_btree_mut(); - let record = match &state.registers[record_reg] { - Register::Record(ref r) => r, - o => { - return Err(LimboError::InternalError(format!( - "expected record, got {:?}", - o - ))); - } - }; - // To make this reentrant in case of `moved_before` = false, we need to check if the previous cursor.insert started - // a write/balancing operation. If it did, it means we already moved to the place we wanted. - let moved_before = if cursor.is_write_in_progress() { - true - } else if index_meta.unique { - // check for uniqueness violation - match cursor.key_exists_in_index(record)? { - CursorResult::Ok(true) => { - if flags.has(IdxInsertFlags::NO_OP_DUPLICATE) { - break 'block; - } - return Err(LimboError::Constraint( - "UNIQUE constraint failed: duplicate key".into(), - )); - } - CursorResult::IO => return Ok(InsnFunctionStepResult::IO), - CursorResult::Ok(false) => {} - }; - // uniqueness check already moved us to the correct place in the index. - // the uniqueness check uses SeekOp::GE, which means a non-matching entry - // will now be positioned at the insertion point where there currently is - // a) nothing, or - // b) the first entry greater than the key we are inserting. - // In both cases, we can insert the new entry without moving again. - // - // This is re-entrant, because once we call cursor.insert() with moved_before=true, - // we will immediately set BTreeCursor::state to CursorState::Write(WriteInfo::new()), - // in BTreeCursor::insert_into_page; thus, if this function is called again, - // moved_before will again be true due to cursor.is_write_in_progress() returning true. - true - } else { - flags.has(IdxInsertFlags::USE_SEEK) - }; + else { + unreachable!("unexpected Insn {:?}", insn) + }; - // Start insertion of row. This might trigger a balance procedure which will take care of moving to different pages, - // therefore, we don't want to seek again if that happens, meaning we don't want to return on io without moving to the following opcode - // because it could trigger a movement to child page after a balance root which will leave the current page as the root page. - return_if_io!(cursor.insert(&BTreeKey::new_index_key(record), moved_before)); + let record_to_insert = match &state.registers[record_reg] { + Register::Record(ref r) => r, + o => { + return Err(LimboError::InternalError(format!( + "expected record, got {:?}", + o + ))); + } + }; + + match state.op_idx_insert_state { + OpIdxInsertState::SeekIfUnique => { + let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap(); + let CursorType::BTreeIndex(index_meta) = cursor_type else { + panic!("IdxInsert: not a BTreeIndex cursor"); + }; + if !index_meta.unique { + state.op_idx_insert_state = OpIdxInsertState::Insert { + moved_before: false, + }; + return Ok(InsnFunctionStepResult::Step); + } + { + let mut cursor = state.get_cursor(cursor_id); + let cursor = cursor.as_btree_mut(); + + return_if_io!(cursor.seek( + SeekKey::IndexKey(record_to_insert), + SeekOp::GE { eq_only: true } + )); + } + state.op_idx_insert_state = OpIdxInsertState::UniqueConstraintCheck; + Ok(InsnFunctionStepResult::Step) + } + OpIdxInsertState::UniqueConstraintCheck => { + let ignore_conflict = 'i: { + let mut cursor = state.get_cursor(cursor_id); + let cursor = cursor.as_btree_mut(); + let record_opt = return_if_io!(cursor.record()); + let Some(record) = record_opt.as_ref() else { + // Cursor not pointing at a record — table is empty or past last + break 'i false; + }; + // Cursor is pointing at a record; if the index has a rowid, exclude it from the comparison since it's a pointer to the table row; + // UNIQUE indexes disallow duplicates like (a=1,b=2,rowid=1) and (a=1,b=2,rowid=2). + let existing_key = if cursor.has_rowid() { + &record.get_values()[..record.count().saturating_sub(1)] + } else { + record.get_values() + }; + let inserted_key_vals = &record_to_insert.get_values(); + if existing_key.len() != inserted_key_vals.len() { + break 'i false; + } + + let conflict = compare_immutable( + existing_key, + inserted_key_vals, + cursor.key_sort_order(), + &cursor.collations, + ) == std::cmp::Ordering::Equal; + if conflict { + if flags.has(IdxInsertFlags::NO_OP_DUPLICATE) { + break 'i true; + } + return Err(LimboError::Constraint( + "UNIQUE constraint failed: duplicate key".into(), + )); + } + + false + }; + state.op_idx_insert_state = if ignore_conflict { + state.pc += 1; + OpIdxInsertState::SeekIfUnique + } else { + OpIdxInsertState::Insert { moved_before: true } + }; + Ok(InsnFunctionStepResult::Step) + } + OpIdxInsertState::Insert { moved_before } => { + { + let mut cursor = state.get_cursor(cursor_id); + let cursor = cursor.as_btree_mut(); + // To make this reentrant in case of `moved_before` = false, we need to check if the previous cursor.insert started + // a write/balancing operation. If it did, it means we already moved to the place we wanted. + let moved_before = moved_before + || cursor.is_write_in_progress() + || flags.has(IdxInsertFlags::USE_SEEK); + // Start insertion of row. This might trigger a balance procedure which will take care of moving to different pages, + // therefore, we don't want to seek again if that happens, meaning we don't want to return on io without moving to the following opcode + // because it could trigger a movement to child page after a balance root which will leave the current page as the root page. + return_if_io!( + cursor.insert(&BTreeKey::new_index_key(record_to_insert), moved_before) + ); + } + state.op_idx_insert_state = OpIdxInsertState::SeekIfUnique; + state.pc += 1; + // TODO: flag optimizations, update n_change if OPFLAG_NCHANGE + Ok(InsnFunctionStepResult::Step) } - // TODO: flag optimizations, update n_change if OPFLAG_NCHANGE - state.pc += 1; } - Ok(InsnFunctionStepResult::Step) } pub fn op_new_rowid( diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 7a45238cb..52c9f32ce 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -29,6 +29,7 @@ use crate::{ function::{AggFunc, FuncCtx}, storage::{pager::PagerCacheflushStatus, sqlite3_ondisk::SmallVec}, translate::plan::TableReferences, + vdbe::execute::OpIdxInsertState, }; use crate::{ @@ -250,6 +251,7 @@ pub struct ProgramState { op_idx_delete_state: Option, op_integrity_check_state: OpIntegrityCheckState, op_open_ephemeral_state: OpOpenEphemeralState, + op_idx_insert_state: OpIdxInsertState, } impl ProgramState { @@ -276,6 +278,7 @@ impl ProgramState { op_idx_delete_state: None, op_integrity_check_state: OpIntegrityCheckState::Start, op_open_ephemeral_state: OpOpenEphemeralState::Start, + op_idx_insert_state: OpIdxInsertState::SeekIfUnique, } }