mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-01 15:34:19 +01:00
Merge 'VDBE: fix op_idx_insert re-entrancy' from Jussi Saurio
Separates cursor.key_exists_in_index() into a state machine. The problem with the main branch implementation is this: `return_if_io!(seek)` `return_if_io!(cursor.record())` The latter may yield on IO and cause the seek to start over, causing an infinite loop. With an explicit state machine we can control and prevent this. Reviewed-by: Pere Diaz Bou <pere-altea@homail.com> Closes #2011
This commit is contained in:
@@ -500,7 +500,7 @@ pub struct BTreeCursor {
|
||||
/// Colations for Index Btree constraint checks
|
||||
/// Contains the Collation Seq for the whole Index
|
||||
/// This Vec should be empty for Table Btree
|
||||
collations: Vec<CollationSeq>,
|
||||
pub collations: Vec<CollationSeq>,
|
||||
seek_state: CursorSeekState,
|
||||
/// Separate state to read a record with overflow pages. This separation from `state` is necessary as
|
||||
/// we can be in a function that relies on `state`, but also needs to process overflow pages
|
||||
@@ -4629,44 +4629,6 @@ impl BTreeCursor {
|
||||
self.null_flag
|
||||
}
|
||||
|
||||
/// Search for a key in an Index Btree. Looking up indexes that need to be unique, we cannot compare the rowid
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn key_exists_in_index(&mut self, key: &ImmutableRecord) -> Result<CursorResult<bool>> {
|
||||
return_if_io!(self.seek(SeekKey::IndexKey(key), SeekOp::GE { eq_only: true }));
|
||||
|
||||
let record_opt = return_if_io!(self.record());
|
||||
match record_opt.as_ref() {
|
||||
Some(record) => {
|
||||
// Existing record found; if the index has a rowid, exclude it from the comparison since it's a pointer to the table row;
|
||||
// UNIQUE indexes disallow duplicates like (a=1,b=2,rowid=1) and (a=1,b=2,rowid=2).
|
||||
let existing_key = if self.has_rowid() {
|
||||
&record.get_values()[..record.count().saturating_sub(1)]
|
||||
} else {
|
||||
record.get_values()
|
||||
};
|
||||
let inserted_key_vals = &key.get_values();
|
||||
// Need this check because .all returns True on an empty iterator,
|
||||
// So when record_opt is invalidated, it would always indicate show up as a duplicate key
|
||||
if existing_key.len() != inserted_key_vals.len() {
|
||||
return Ok(CursorResult::Ok(false));
|
||||
}
|
||||
|
||||
Ok(CursorResult::Ok(
|
||||
compare_immutable(
|
||||
existing_key,
|
||||
inserted_key_vals,
|
||||
self.key_sort_order(),
|
||||
&self.collations,
|
||||
) == std::cmp::Ordering::Equal,
|
||||
))
|
||||
}
|
||||
None => {
|
||||
// Cursor not pointing at a record — table is empty or past last
|
||||
Ok(CursorResult::Ok(false))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn exists(&mut self, key: &Value) -> Result<CursorResult<bool>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
|
||||
@@ -4443,6 +4443,17 @@ pub fn op_idx_delete(
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Copy, Clone)]
|
||||
pub enum OpIdxInsertState {
|
||||
/// Optional seek step done before an unique constraint check.
|
||||
SeekIfUnique,
|
||||
/// Optional unique constraint check done before an insert.
|
||||
UniqueConstraintCheck,
|
||||
/// Main insert step. This is always performed. Usually the state machine just
|
||||
/// skips to this step unless the insertion is made into a unique index.
|
||||
Insert { moved_before: bool },
|
||||
}
|
||||
|
||||
pub fn op_idx_insert(
|
||||
program: &Program,
|
||||
state: &mut ProgramState,
|
||||
@@ -4450,72 +4461,118 @@ pub fn op_idx_insert(
|
||||
pager: &Rc<Pager>,
|
||||
mv_store: Option<&Rc<MvStore>>,
|
||||
) -> Result<InsnFunctionStepResult> {
|
||||
if let Insn::IdxInsert {
|
||||
let Insn::IdxInsert {
|
||||
cursor_id,
|
||||
record_reg,
|
||||
flags,
|
||||
..
|
||||
} = *insn
|
||||
{
|
||||
let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap();
|
||||
let CursorType::BTreeIndex(index_meta) = cursor_type else {
|
||||
panic!("IdxInsert: not a BTree index cursor");
|
||||
};
|
||||
'block: {
|
||||
let mut cursor = state.get_cursor(cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let record = match &state.registers[record_reg] {
|
||||
Register::Record(ref r) => r,
|
||||
o => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"expected record, got {:?}",
|
||||
o
|
||||
)));
|
||||
}
|
||||
};
|
||||
// To make this reentrant in case of `moved_before` = false, we need to check if the previous cursor.insert started
|
||||
// a write/balancing operation. If it did, it means we already moved to the place we wanted.
|
||||
let moved_before = if cursor.is_write_in_progress() {
|
||||
true
|
||||
} else if index_meta.unique {
|
||||
// check for uniqueness violation
|
||||
match cursor.key_exists_in_index(record)? {
|
||||
CursorResult::Ok(true) => {
|
||||
if flags.has(IdxInsertFlags::NO_OP_DUPLICATE) {
|
||||
break 'block;
|
||||
}
|
||||
return Err(LimboError::Constraint(
|
||||
"UNIQUE constraint failed: duplicate key".into(),
|
||||
));
|
||||
}
|
||||
CursorResult::IO => return Ok(InsnFunctionStepResult::IO),
|
||||
CursorResult::Ok(false) => {}
|
||||
};
|
||||
// uniqueness check already moved us to the correct place in the index.
|
||||
// the uniqueness check uses SeekOp::GE, which means a non-matching entry
|
||||
// will now be positioned at the insertion point where there currently is
|
||||
// a) nothing, or
|
||||
// b) the first entry greater than the key we are inserting.
|
||||
// In both cases, we can insert the new entry without moving again.
|
||||
//
|
||||
// This is re-entrant, because once we call cursor.insert() with moved_before=true,
|
||||
// we will immediately set BTreeCursor::state to CursorState::Write(WriteInfo::new()),
|
||||
// in BTreeCursor::insert_into_page; thus, if this function is called again,
|
||||
// moved_before will again be true due to cursor.is_write_in_progress() returning true.
|
||||
true
|
||||
} else {
|
||||
flags.has(IdxInsertFlags::USE_SEEK)
|
||||
};
|
||||
else {
|
||||
unreachable!("unexpected Insn {:?}", insn)
|
||||
};
|
||||
|
||||
// Start insertion of row. This might trigger a balance procedure which will take care of moving to different pages,
|
||||
// therefore, we don't want to seek again if that happens, meaning we don't want to return on io without moving to the following opcode
|
||||
// because it could trigger a movement to child page after a balance root which will leave the current page as the root page.
|
||||
return_if_io!(cursor.insert(&BTreeKey::new_index_key(record), moved_before));
|
||||
let record_to_insert = match &state.registers[record_reg] {
|
||||
Register::Record(ref r) => r,
|
||||
o => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"expected record, got {:?}",
|
||||
o
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
match state.op_idx_insert_state {
|
||||
OpIdxInsertState::SeekIfUnique => {
|
||||
let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap();
|
||||
let CursorType::BTreeIndex(index_meta) = cursor_type else {
|
||||
panic!("IdxInsert: not a BTreeIndex cursor");
|
||||
};
|
||||
if !index_meta.unique {
|
||||
state.op_idx_insert_state = OpIdxInsertState::Insert {
|
||||
moved_before: false,
|
||||
};
|
||||
return Ok(InsnFunctionStepResult::Step);
|
||||
}
|
||||
{
|
||||
let mut cursor = state.get_cursor(cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
|
||||
return_if_io!(cursor.seek(
|
||||
SeekKey::IndexKey(record_to_insert),
|
||||
SeekOp::GE { eq_only: true }
|
||||
));
|
||||
}
|
||||
state.op_idx_insert_state = OpIdxInsertState::UniqueConstraintCheck;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
OpIdxInsertState::UniqueConstraintCheck => {
|
||||
let ignore_conflict = 'i: {
|
||||
let mut cursor = state.get_cursor(cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let record_opt = return_if_io!(cursor.record());
|
||||
let Some(record) = record_opt.as_ref() else {
|
||||
// Cursor not pointing at a record — table is empty or past last
|
||||
break 'i false;
|
||||
};
|
||||
// Cursor is pointing at a record; if the index has a rowid, exclude it from the comparison since it's a pointer to the table row;
|
||||
// UNIQUE indexes disallow duplicates like (a=1,b=2,rowid=1) and (a=1,b=2,rowid=2).
|
||||
let existing_key = if cursor.has_rowid() {
|
||||
&record.get_values()[..record.count().saturating_sub(1)]
|
||||
} else {
|
||||
record.get_values()
|
||||
};
|
||||
let inserted_key_vals = &record_to_insert.get_values();
|
||||
if existing_key.len() != inserted_key_vals.len() {
|
||||
break 'i false;
|
||||
}
|
||||
|
||||
let conflict = compare_immutable(
|
||||
existing_key,
|
||||
inserted_key_vals,
|
||||
cursor.key_sort_order(),
|
||||
&cursor.collations,
|
||||
) == std::cmp::Ordering::Equal;
|
||||
if conflict {
|
||||
if flags.has(IdxInsertFlags::NO_OP_DUPLICATE) {
|
||||
break 'i true;
|
||||
}
|
||||
return Err(LimboError::Constraint(
|
||||
"UNIQUE constraint failed: duplicate key".into(),
|
||||
));
|
||||
}
|
||||
|
||||
false
|
||||
};
|
||||
state.op_idx_insert_state = if ignore_conflict {
|
||||
state.pc += 1;
|
||||
OpIdxInsertState::SeekIfUnique
|
||||
} else {
|
||||
OpIdxInsertState::Insert { moved_before: true }
|
||||
};
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
OpIdxInsertState::Insert { moved_before } => {
|
||||
{
|
||||
let mut cursor = state.get_cursor(cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
// To make this reentrant in case of `moved_before` = false, we need to check if the previous cursor.insert started
|
||||
// a write/balancing operation. If it did, it means we already moved to the place we wanted.
|
||||
let moved_before = moved_before
|
||||
|| cursor.is_write_in_progress()
|
||||
|| flags.has(IdxInsertFlags::USE_SEEK);
|
||||
// Start insertion of row. This might trigger a balance procedure which will take care of moving to different pages,
|
||||
// therefore, we don't want to seek again if that happens, meaning we don't want to return on io without moving to the following opcode
|
||||
// because it could trigger a movement to child page after a balance root which will leave the current page as the root page.
|
||||
return_if_io!(
|
||||
cursor.insert(&BTreeKey::new_index_key(record_to_insert), moved_before)
|
||||
);
|
||||
}
|
||||
state.op_idx_insert_state = OpIdxInsertState::SeekIfUnique;
|
||||
state.pc += 1;
|
||||
// TODO: flag optimizations, update n_change if OPFLAG_NCHANGE
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
// TODO: flag optimizations, update n_change if OPFLAG_NCHANGE
|
||||
state.pc += 1;
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
pub fn op_new_rowid(
|
||||
|
||||
@@ -29,6 +29,7 @@ use crate::{
|
||||
function::{AggFunc, FuncCtx},
|
||||
storage::{pager::PagerCacheflushStatus, sqlite3_ondisk::SmallVec},
|
||||
translate::plan::TableReferences,
|
||||
vdbe::execute::OpIdxInsertState,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
@@ -250,6 +251,7 @@ pub struct ProgramState {
|
||||
op_idx_delete_state: Option<OpIdxDeleteState>,
|
||||
op_integrity_check_state: OpIntegrityCheckState,
|
||||
op_open_ephemeral_state: OpOpenEphemeralState,
|
||||
op_idx_insert_state: OpIdxInsertState,
|
||||
}
|
||||
|
||||
impl ProgramState {
|
||||
@@ -276,6 +278,7 @@ impl ProgramState {
|
||||
op_idx_delete_state: None,
|
||||
op_integrity_check_state: OpIntegrityCheckState::Start,
|
||||
op_open_ephemeral_state: OpOpenEphemeralState::Start,
|
||||
op_idx_insert_state: OpIdxInsertState::SeekIfUnique,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user