mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-09 18:24:20 +01:00
VDBE: introduce state machine for op_idx_insert for more granular IO control
Separates cursor.key_exists_in_index() into a state machine. The problem with the main branch implementation is this: `return_if_io!(seek)` `return_if_io!(cursor.record())` The latter may yield on IO and cause the seek to start over, causing an infinite loop. With an explicit state machine we can control and prevent this.
This commit is contained in:
@@ -500,7 +500,7 @@ pub struct BTreeCursor {
|
||||
/// Colations for Index Btree constraint checks
|
||||
/// Contains the Collation Seq for the whole Index
|
||||
/// This Vec should be empty for Table Btree
|
||||
collations: Vec<CollationSeq>,
|
||||
pub collations: Vec<CollationSeq>,
|
||||
seek_state: CursorSeekState,
|
||||
/// Separate state to read a record with overflow pages. This separation from `state` is necessary as
|
||||
/// we can be in a function that relies on `state`, but also needs to process overflow pages
|
||||
@@ -4629,44 +4629,6 @@ impl BTreeCursor {
|
||||
self.null_flag
|
||||
}
|
||||
|
||||
/// Search for a key in an Index Btree. Looking up indexes that need to be unique, we cannot compare the rowid
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn key_exists_in_index(&mut self, key: &ImmutableRecord) -> Result<CursorResult<bool>> {
|
||||
return_if_io!(self.seek(SeekKey::IndexKey(key), SeekOp::GE { eq_only: true }));
|
||||
|
||||
let record_opt = return_if_io!(self.record());
|
||||
match record_opt.as_ref() {
|
||||
Some(record) => {
|
||||
// Existing record found; if the index has a rowid, exclude it from the comparison since it's a pointer to the table row;
|
||||
// UNIQUE indexes disallow duplicates like (a=1,b=2,rowid=1) and (a=1,b=2,rowid=2).
|
||||
let existing_key = if self.has_rowid() {
|
||||
&record.get_values()[..record.count().saturating_sub(1)]
|
||||
} else {
|
||||
record.get_values()
|
||||
};
|
||||
let inserted_key_vals = &key.get_values();
|
||||
// Need this check because .all returns True on an empty iterator,
|
||||
// So when record_opt is invalidated, it would always indicate show up as a duplicate key
|
||||
if existing_key.len() != inserted_key_vals.len() {
|
||||
return Ok(CursorResult::Ok(false));
|
||||
}
|
||||
|
||||
Ok(CursorResult::Ok(
|
||||
compare_immutable(
|
||||
existing_key,
|
||||
inserted_key_vals,
|
||||
self.key_sort_order(),
|
||||
&self.collations,
|
||||
) == std::cmp::Ordering::Equal,
|
||||
))
|
||||
}
|
||||
None => {
|
||||
// Cursor not pointing at a record — table is empty or past last
|
||||
Ok(CursorResult::Ok(false))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn exists(&mut self, key: &Value) -> Result<CursorResult<bool>> {
|
||||
assert!(self.mv_cursor.is_none());
|
||||
|
||||
@@ -4443,6 +4443,17 @@ pub fn op_idx_delete(
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Copy, Clone)]
|
||||
pub enum OpIdxInsertState {
|
||||
/// Optional seek step done before an unique constraint check.
|
||||
SeekIfUnique,
|
||||
/// Optional unique constraint check done before an insert.
|
||||
UniqueConstraintCheck,
|
||||
/// Main insert step. This is always performed. Usually the state machine just
|
||||
/// skips to this step unless the insertion is made into a unique index.
|
||||
Insert { moved_before: bool },
|
||||
}
|
||||
|
||||
pub fn op_idx_insert(
|
||||
program: &Program,
|
||||
state: &mut ProgramState,
|
||||
@@ -4450,72 +4461,118 @@ pub fn op_idx_insert(
|
||||
pager: &Rc<Pager>,
|
||||
mv_store: Option<&Rc<MvStore>>,
|
||||
) -> Result<InsnFunctionStepResult> {
|
||||
if let Insn::IdxInsert {
|
||||
let Insn::IdxInsert {
|
||||
cursor_id,
|
||||
record_reg,
|
||||
flags,
|
||||
..
|
||||
} = *insn
|
||||
{
|
||||
let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap();
|
||||
let CursorType::BTreeIndex(index_meta) = cursor_type else {
|
||||
panic!("IdxInsert: not a BTree index cursor");
|
||||
};
|
||||
'block: {
|
||||
let mut cursor = state.get_cursor(cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let record = match &state.registers[record_reg] {
|
||||
Register::Record(ref r) => r,
|
||||
o => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"expected record, got {:?}",
|
||||
o
|
||||
)));
|
||||
}
|
||||
};
|
||||
// To make this reentrant in case of `moved_before` = false, we need to check if the previous cursor.insert started
|
||||
// a write/balancing operation. If it did, it means we already moved to the place we wanted.
|
||||
let moved_before = if cursor.is_write_in_progress() {
|
||||
true
|
||||
} else if index_meta.unique {
|
||||
// check for uniqueness violation
|
||||
match cursor.key_exists_in_index(record)? {
|
||||
CursorResult::Ok(true) => {
|
||||
if flags.has(IdxInsertFlags::NO_OP_DUPLICATE) {
|
||||
break 'block;
|
||||
}
|
||||
return Err(LimboError::Constraint(
|
||||
"UNIQUE constraint failed: duplicate key".into(),
|
||||
));
|
||||
}
|
||||
CursorResult::IO => return Ok(InsnFunctionStepResult::IO),
|
||||
CursorResult::Ok(false) => {}
|
||||
};
|
||||
// uniqueness check already moved us to the correct place in the index.
|
||||
// the uniqueness check uses SeekOp::GE, which means a non-matching entry
|
||||
// will now be positioned at the insertion point where there currently is
|
||||
// a) nothing, or
|
||||
// b) the first entry greater than the key we are inserting.
|
||||
// In both cases, we can insert the new entry without moving again.
|
||||
//
|
||||
// This is re-entrant, because once we call cursor.insert() with moved_before=true,
|
||||
// we will immediately set BTreeCursor::state to CursorState::Write(WriteInfo::new()),
|
||||
// in BTreeCursor::insert_into_page; thus, if this function is called again,
|
||||
// moved_before will again be true due to cursor.is_write_in_progress() returning true.
|
||||
true
|
||||
} else {
|
||||
flags.has(IdxInsertFlags::USE_SEEK)
|
||||
};
|
||||
else {
|
||||
unreachable!("unexpected Insn {:?}", insn)
|
||||
};
|
||||
|
||||
// Start insertion of row. This might trigger a balance procedure which will take care of moving to different pages,
|
||||
// therefore, we don't want to seek again if that happens, meaning we don't want to return on io without moving to the following opcode
|
||||
// because it could trigger a movement to child page after a balance root which will leave the current page as the root page.
|
||||
return_if_io!(cursor.insert(&BTreeKey::new_index_key(record), moved_before));
|
||||
let record_to_insert = match &state.registers[record_reg] {
|
||||
Register::Record(ref r) => r,
|
||||
o => {
|
||||
return Err(LimboError::InternalError(format!(
|
||||
"expected record, got {:?}",
|
||||
o
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
match state.op_idx_insert_state {
|
||||
OpIdxInsertState::SeekIfUnique => {
|
||||
let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap();
|
||||
let CursorType::BTreeIndex(index_meta) = cursor_type else {
|
||||
panic!("IdxInsert: not a BTreeIndex cursor");
|
||||
};
|
||||
if !index_meta.unique {
|
||||
state.op_idx_insert_state = OpIdxInsertState::Insert {
|
||||
moved_before: false,
|
||||
};
|
||||
return Ok(InsnFunctionStepResult::Step);
|
||||
}
|
||||
{
|
||||
let mut cursor = state.get_cursor(cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
|
||||
return_if_io!(cursor.seek(
|
||||
SeekKey::IndexKey(record_to_insert),
|
||||
SeekOp::GE { eq_only: true }
|
||||
));
|
||||
}
|
||||
state.op_idx_insert_state = OpIdxInsertState::UniqueConstraintCheck;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
OpIdxInsertState::UniqueConstraintCheck => {
|
||||
let ignore_conflict = 'i: {
|
||||
let mut cursor = state.get_cursor(cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let record_opt = return_if_io!(cursor.record());
|
||||
let Some(record) = record_opt.as_ref() else {
|
||||
// Cursor not pointing at a record — table is empty or past last
|
||||
break 'i false;
|
||||
};
|
||||
// Cursor is pointing at a record; if the index has a rowid, exclude it from the comparison since it's a pointer to the table row;
|
||||
// UNIQUE indexes disallow duplicates like (a=1,b=2,rowid=1) and (a=1,b=2,rowid=2).
|
||||
let existing_key = if cursor.has_rowid() {
|
||||
&record.get_values()[..record.count().saturating_sub(1)]
|
||||
} else {
|
||||
record.get_values()
|
||||
};
|
||||
let inserted_key_vals = &record_to_insert.get_values();
|
||||
if existing_key.len() != inserted_key_vals.len() {
|
||||
break 'i false;
|
||||
}
|
||||
|
||||
let conflict = compare_immutable(
|
||||
existing_key,
|
||||
inserted_key_vals,
|
||||
cursor.key_sort_order(),
|
||||
&cursor.collations,
|
||||
) == std::cmp::Ordering::Equal;
|
||||
if conflict {
|
||||
if flags.has(IdxInsertFlags::NO_OP_DUPLICATE) {
|
||||
break 'i true;
|
||||
}
|
||||
return Err(LimboError::Constraint(
|
||||
"UNIQUE constraint failed: duplicate key".into(),
|
||||
));
|
||||
}
|
||||
|
||||
false
|
||||
};
|
||||
state.op_idx_insert_state = if ignore_conflict {
|
||||
state.pc += 1;
|
||||
OpIdxInsertState::SeekIfUnique
|
||||
} else {
|
||||
OpIdxInsertState::Insert { moved_before: true }
|
||||
};
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
OpIdxInsertState::Insert { moved_before } => {
|
||||
{
|
||||
let mut cursor = state.get_cursor(cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
// To make this reentrant in case of `moved_before` = false, we need to check if the previous cursor.insert started
|
||||
// a write/balancing operation. If it did, it means we already moved to the place we wanted.
|
||||
let moved_before = moved_before
|
||||
|| cursor.is_write_in_progress()
|
||||
|| flags.has(IdxInsertFlags::USE_SEEK);
|
||||
// Start insertion of row. This might trigger a balance procedure which will take care of moving to different pages,
|
||||
// therefore, we don't want to seek again if that happens, meaning we don't want to return on io without moving to the following opcode
|
||||
// because it could trigger a movement to child page after a balance root which will leave the current page as the root page.
|
||||
return_if_io!(
|
||||
cursor.insert(&BTreeKey::new_index_key(record_to_insert), moved_before)
|
||||
);
|
||||
}
|
||||
state.op_idx_insert_state = OpIdxInsertState::SeekIfUnique;
|
||||
state.pc += 1;
|
||||
// TODO: flag optimizations, update n_change if OPFLAG_NCHANGE
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
// TODO: flag optimizations, update n_change if OPFLAG_NCHANGE
|
||||
state.pc += 1;
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
pub fn op_new_rowid(
|
||||
|
||||
@@ -29,6 +29,7 @@ use crate::{
|
||||
function::{AggFunc, FuncCtx},
|
||||
storage::{pager::PagerCacheflushStatus, sqlite3_ondisk::SmallVec},
|
||||
translate::plan::TableReferences,
|
||||
vdbe::execute::OpIdxInsertState,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
@@ -250,6 +251,7 @@ pub struct ProgramState {
|
||||
op_idx_delete_state: Option<OpIdxDeleteState>,
|
||||
op_integrity_check_state: OpIntegrityCheckState,
|
||||
op_open_ephemeral_state: OpOpenEphemeralState,
|
||||
op_idx_insert_state: OpIdxInsertState,
|
||||
}
|
||||
|
||||
impl ProgramState {
|
||||
@@ -276,6 +278,7 @@ impl ProgramState {
|
||||
op_idx_delete_state: None,
|
||||
op_integrity_check_state: OpIntegrityCheckState::Start,
|
||||
op_open_ephemeral_state: OpOpenEphemeralState::Start,
|
||||
op_idx_insert_state: OpIdxInsertState::SeekIfUnique,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user