Merge 'BTree traversal refactor and bugfixes' from Pere Diaz Bou

The major change in this PR is the following:
- It refactors deserializing the record from the cell so that it does
not happen in `next()` or `prev()`, but lazily when it is needed. This
will allow a further enhancement where we only read a _partial_ section
of the record payload, e.g. when `Insn::Column` is requested by the
VDBE, we can only deserialize the relevant column.
It also fixes multiple bugs in `BTreeCursor`
Follow-ups:
ASAP: fix performance regression (probably due to too much record
invalidation)
ASAP: do post-balance seek key calculation only when balance will be
needed
ASAP: fix balancing scenario where interior index cell is deleted - even
if leaf doesnt require balance, parent might. both need to be checked
LATER: implement safe way to use indexes for UPDATE -- ephemeral indexes
needed, see FIXME in `optimizer/mod.rs`
LATER: implement on-demand serialization of parts of the record, instead
of entire record

Closes #1664
This commit is contained in:
Pere Diaz Bou
2025-06-10 13:38:20 +02:00
13 changed files with 1242 additions and 1183 deletions

View File

@@ -40,7 +40,7 @@ jobs:
env:
RUST_LOG: ${{ runner.debug && 'limbo_core::storage=trace' || '' }}
run: cargo test --verbose
timeout-minutes: 10
timeout-minutes: 20
clippy:
@@ -67,6 +67,7 @@ jobs:
test-limbo:
runs-on: blacksmith-4vcpu-ubuntu-2404
timeout-minutes: 20
steps:
- name: Install cargo-c
env:
@@ -91,7 +92,7 @@ jobs:
- uses: "./.github/shared/install_sqlite"
- name: Test
run: make test
timeout-minutes: 10
timeout-minutes: 20
test-sqlite:
runs-on: blacksmith-4vcpu-ubuntu-2404

File diff suppressed because it is too large Load Diff

View File

@@ -850,6 +850,7 @@ impl Pager {
// Providing a page is optional, if provided it will be used to avoid reading the page from disk.
// This is implemented in accordance with sqlite freepage2() function.
pub fn free_page(&self, page: Option<PageRef>, page_id: usize) -> Result<()> {
tracing::info!("free_page(page_id={})", page_id);
const TRUNK_PAGE_HEADER_SIZE: usize = 8;
const LEAF_ENTRY_SIZE: usize = 4;
const RESERVED_SLOTS: usize = 2;

View File

@@ -377,6 +377,15 @@ pub enum PageType {
TableLeaf = 13,
}
impl PageType {
pub fn is_table(&self) -> bool {
match self {
PageType::IndexInterior | PageType::IndexLeaf => false,
PageType::TableInterior | PageType::TableLeaf => true,
}
}
}
impl TryFrom<u8> for PageType {
type Error = LimboError;
@@ -585,7 +594,12 @@ impl PageContent {
// the page header is 12 bytes for interior pages, 8 bytes for leaf pages
// this is because the 4 last bytes in the interior page's header are used for the rightmost pointer.
let cell_pointer_array_start = self.header_size();
assert!(idx < ncells, "cell_get: idx out of bounds");
assert!(
idx < ncells,
"cell_get: idx out of bounds: idx={}, ncells={}",
idx,
ncells
);
let cell_pointer = cell_pointer_array_start + (idx * 2);
let cell_pointer = self.read_u16(cell_pointer) as usize;

View File

@@ -870,9 +870,20 @@ fn emit_program_for_update(
)?;
// Open indexes for update.
let mut index_cursors = Vec::with_capacity(plan.indexes_to_update.len());
// TODO: do not reopen if there is table reference using it.
for index in &plan.indexes_to_update {
if let Some(index_cursor) = program.resolve_cursor_id_safe(&CursorKey::index(
plan.table_references
.joined_tables()
.first()
.unwrap()
.internal_id,
index.clone(),
)) {
// Don't reopen index if it was already opened as the iteration cursor for this update plan.
let record_reg = program.alloc_register();
index_cursors.push((index_cursor, record_reg));
continue;
}
let index_cursor = program.alloc_cursor_id(CursorType::BTreeIndex(index.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: index_cursor,

View File

@@ -1182,12 +1182,13 @@ fn emit_seek(
seek.len
};
match seek.op {
SeekOp::GE => program.emit_insn(Insn::SeekGE {
SeekOp::GE { eq_only } => program.emit_insn(Insn::SeekGE {
is_index,
cursor_id: seek_cursor_id,
start_reg,
num_regs,
target_pc: loop_end,
eq_only,
}),
SeekOp::GT => program.emit_insn(Insn::SeekGT {
is_index,
@@ -1196,12 +1197,13 @@ fn emit_seek(
num_regs,
target_pc: loop_end,
}),
SeekOp::LE => program.emit_insn(Insn::SeekLE {
SeekOp::LE { eq_only } => program.emit_insn(Insn::SeekLE {
is_index,
cursor_id: seek_cursor_id,
start_reg,
num_regs,
target_pc: loop_end,
eq_only,
}),
SeekOp::LT => program.emit_insn(Insn::SeekLT {
is_index,
@@ -1210,7 +1212,6 @@ fn emit_seek(
num_regs,
target_pc: loop_end,
}),
SeekOp::EQ => panic!("An index seek is never EQ"),
};
Ok(())
@@ -1293,7 +1294,7 @@ fn emit_seek_termination(
}
match (is_index, termination.op) {
(true, SeekOp::GE) => program.emit_insn(Insn::IdxGE {
(true, SeekOp::GE { .. }) => program.emit_insn(Insn::IdxGE {
cursor_id: seek_cursor_id,
start_reg,
num_regs,
@@ -1305,7 +1306,7 @@ fn emit_seek_termination(
num_regs,
target_pc: loop_end,
}),
(true, SeekOp::LE) => program.emit_insn(Insn::IdxLE {
(true, SeekOp::LE { .. }) => program.emit_insn(Insn::IdxLE {
cursor_id: seek_cursor_id,
start_reg,
num_regs,
@@ -1317,7 +1318,7 @@ fn emit_seek_termination(
num_regs,
target_pc: loop_end,
}),
(false, SeekOp::GE) => program.emit_insn(Insn::Ge {
(false, SeekOp::GE { .. }) => program.emit_insn(Insn::Ge {
lhs: rowid_reg.unwrap(),
rhs: start_reg,
target_pc: loop_end,
@@ -1331,7 +1332,7 @@ fn emit_seek_termination(
flags: CmpInsFlags::default(),
collation: program.curr_collation(),
}),
(false, SeekOp::LE) => program.emit_insn(Insn::Le {
(false, SeekOp::LE { .. }) => program.emit_insn(Insn::Le {
lhs: rowid_reg.unwrap(),
rhs: start_reg,
target_pc: loop_end,
@@ -1345,9 +1346,6 @@ fn emit_seek_termination(
flags: CmpInsFlags::default(),
collation: program.curr_collation(),
}),
(_, SeekOp::EQ) => {
panic!("An index termination condition is never EQ")
}
};
Ok(())

View File

@@ -98,7 +98,7 @@ fn optimize_delete_plan(plan: &mut DeletePlan, schema: &Schema) -> Result<()> {
Ok(())
}
fn optimize_update_plan(plan: &mut UpdatePlan, schema: &Schema) -> Result<()> {
fn optimize_update_plan(plan: &mut UpdatePlan, _schema: &Schema) -> Result<()> {
rewrite_exprs_update(plan)?;
if let ConstantConditionEliminationResult::ImpossibleCondition =
eliminate_constant_conditions(&mut plan.where_clause)?
@@ -106,13 +106,18 @@ fn optimize_update_plan(plan: &mut UpdatePlan, schema: &Schema) -> Result<()> {
plan.contains_constant_false_condition = true;
return Ok(());
}
let _ = optimize_table_access(
&mut plan.table_references,
&schema.indexes,
&mut plan.where_clause,
&mut plan.order_by,
&mut None,
)?;
// FIXME: don't use indexes for update right now because it's not safe to traverse an index
// while also updating the same table, things go wrong.
// e.g. in 'explain update t set x=x+5 where x > 10;' where x is an indexed column,
// sqlite first creates an ephemeral index to store the current values so the tree traversal
// doesn't get messed up while updating.
// let _ = optimize_table_access(
// &mut plan.table_references,
// &schema.indexes,
// &mut plan.where_clause,
// &mut plan.order_by,
// &mut None,
// )?;
Ok(())
}
@@ -881,7 +886,7 @@ fn build_seek_def(
seek: Some(SeekKey {
len: key_len,
null_pad: false,
op: SeekOp::GE,
op: SeekOp::GE { eq_only: true },
}),
termination: Some(TerminationKey {
len: key_len,
@@ -905,8 +910,8 @@ fn build_seek_def(
(
key_len - 1,
key_len,
SeekOp::LE.reverse(),
SeekOp::LE.reverse(),
SeekOp::LE { eq_only: false }.reverse(),
SeekOp::LE { eq_only: false }.reverse(),
)
};
SeekDef {
@@ -943,12 +948,17 @@ fn build_seek_def(
(IterationDirection::Forwards, ast::Operator::GreaterEquals) => {
let (seek_key_len, termination_key_len, seek_op, termination_op) =
if sort_order_of_last_key == SortOrder::Asc {
(key_len, key_len - 1, SeekOp::GE, SeekOp::GT)
(
key_len,
key_len - 1,
SeekOp::GE { eq_only: false },
SeekOp::GT,
)
} else {
(
key_len - 1,
key_len,
SeekOp::LE.reverse(),
SeekOp::LE { eq_only: false }.reverse(),
SeekOp::LT.reverse(),
)
};
@@ -986,9 +996,19 @@ fn build_seek_def(
(IterationDirection::Forwards, ast::Operator::Less) => {
let (seek_key_len, termination_key_len, seek_op, termination_op) =
if sort_order_of_last_key == SortOrder::Asc {
(key_len - 1, key_len, SeekOp::GT, SeekOp::GE)
(
key_len - 1,
key_len,
SeekOp::GT,
SeekOp::GE { eq_only: false },
)
} else {
(key_len, key_len - 1, SeekOp::GT, SeekOp::GE)
(
key_len,
key_len - 1,
SeekOp::GT,
SeekOp::GE { eq_only: false },
)
};
SeekDef {
key,
@@ -1029,8 +1049,8 @@ fn build_seek_def(
(
key_len,
key_len - 1,
SeekOp::LE.reverse(),
SeekOp::LE.reverse(),
SeekOp::LE { eq_only: false }.reverse(),
SeekOp::LE { eq_only: false }.reverse(),
)
};
SeekDef {
@@ -1066,7 +1086,7 @@ fn build_seek_def(
iter_dir,
seek: Some(SeekKey {
len: key_len,
op: SeekOp::LE,
op: SeekOp::LE { eq_only: true },
null_pad: false,
}),
termination: Some(TerminationKey {
@@ -1086,13 +1106,18 @@ fn build_seek_def(
(IterationDirection::Backwards, ast::Operator::Less) => {
let (seek_key_len, termination_key_len, seek_op, termination_op) =
if sort_order_of_last_key == SortOrder::Asc {
(key_len, key_len - 1, SeekOp::LT, SeekOp::LE)
(
key_len,
key_len - 1,
SeekOp::LT,
SeekOp::LE { eq_only: false },
)
} else {
(
key_len - 1,
key_len,
SeekOp::GT.reverse(),
SeekOp::GE.reverse(),
SeekOp::GE { eq_only: false }.reverse(),
)
};
SeekDef {
@@ -1129,7 +1154,12 @@ fn build_seek_def(
(IterationDirection::Backwards, ast::Operator::LessEquals) => {
let (seek_key_len, termination_key_len, seek_op, termination_op) =
if sort_order_of_last_key == SortOrder::Asc {
(key_len, key_len - 1, SeekOp::LE, SeekOp::LE)
(
key_len,
key_len - 1,
SeekOp::LE { eq_only: false },
SeekOp::LE { eq_only: false },
)
} else {
(
key_len - 1,
@@ -1172,7 +1202,12 @@ fn build_seek_def(
(IterationDirection::Backwards, ast::Operator::Greater) => {
let (seek_key_len, termination_key_len, seek_op, termination_op) =
if sort_order_of_last_key == SortOrder::Asc {
(key_len - 1, key_len, SeekOp::LE, SeekOp::LE)
(
key_len - 1,
key_len,
SeekOp::LE { eq_only: false },
SeekOp::LE { eq_only: false },
)
} else {
(
key_len,
@@ -1215,12 +1250,17 @@ fn build_seek_def(
(IterationDirection::Backwards, ast::Operator::GreaterEquals) => {
let (seek_key_len, termination_key_len, seek_op, termination_op) =
if sort_order_of_last_key == SortOrder::Asc {
(key_len - 1, key_len, SeekOp::LE, SeekOp::LT)
(
key_len - 1,
key_len,
SeekOp::LE { eq_only: false },
SeekOp::LT,
)
} else {
(
key_len,
key_len - 1,
SeekOp::GE.reverse(),
SeekOp::GE { eq_only: false }.reverse(),
SeekOp::GT.reverse(),
)
};

View File

@@ -701,6 +701,12 @@ pub struct ImmutableRecord {
recreating: bool,
}
#[derive(PartialEq)]
pub enum ParseRecordState {
Init,
Parsing { payload: Vec<u8> },
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Record {
values: Vec<Value>,
@@ -934,6 +940,10 @@ impl ImmutableRecord {
self.values.clear();
}
pub fn is_invalidated(&self) -> bool {
self.payload.is_empty()
}
pub fn get_payload(&self) -> &[u8] {
&self.payload
}
@@ -1482,10 +1492,17 @@ pub enum CursorResult<T> {
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
/// The match condition of a table/index seek.
pub enum SeekOp {
EQ,
GE,
/// If eq_only is true, this means in practice:
/// We are iterating forwards, but we are really looking for an exact match on the seek key.
GE {
eq_only: bool,
},
GT,
LE,
/// If eq_only is true, this means in practice:
/// We are iterating backwards, but we are really looking for an exact match on the seek key.
LE {
eq_only: bool,
},
LT,
}
@@ -1502,17 +1519,23 @@ impl SeekOp {
#[inline(always)]
pub fn iteration_direction(&self) -> IterationDirection {
match self {
SeekOp::EQ | SeekOp::GE | SeekOp::GT => IterationDirection::Forwards,
SeekOp::LE | SeekOp::LT => IterationDirection::Backwards,
SeekOp::GE { .. } | SeekOp::GT => IterationDirection::Forwards,
SeekOp::LE { .. } | SeekOp::LT => IterationDirection::Backwards,
}
}
pub fn eq_only(&self) -> bool {
match self {
SeekOp::GE { eq_only } | SeekOp::LE { eq_only } => *eq_only,
_ => false,
}
}
pub fn reverse(&self) -> Self {
match self {
SeekOp::EQ => SeekOp::EQ,
SeekOp::GE => SeekOp::LE,
SeekOp::GE { eq_only } => SeekOp::LE { eq_only: *eq_only },
SeekOp::GT => SeekOp::LT,
SeekOp::LE => SeekOp::GE,
SeekOp::LE { eq_only } => SeekOp::GE { eq_only: *eq_only },
SeekOp::LT => SeekOp::GT,
}
}

View File

@@ -1344,15 +1344,23 @@ pub fn op_column(
unreachable!("unexpected Insn {:?}", insn)
};
if let Some((index_cursor_id, table_cursor_id)) = state.deferred_seeks[*cursor_id].take() {
let deferred_seek = {
let deferred_seek = 'd: {
let rowid = {
let mut index_cursor = state.get_cursor(index_cursor_id);
let index_cursor = index_cursor.as_btree_mut();
index_cursor.rowid()?
match index_cursor.rowid()? {
CursorResult::IO => {
break 'd Some((index_cursor_id, table_cursor_id));
}
CursorResult::Ok(rowid) => rowid,
}
};
let mut table_cursor = state.get_cursor(table_cursor_id);
let table_cursor = table_cursor.as_btree_mut();
match table_cursor.seek(SeekKey::TableRowId(rowid.unwrap()), SeekOp::EQ)? {
match table_cursor.seek(
SeekKey::TableRowId(rowid.unwrap()),
SeekOp::GE { eq_only: true },
)? {
CursorResult::Ok(_) => None,
CursorResult::IO => Some((index_cursor_id, table_cursor_id)),
}
@@ -1369,7 +1377,7 @@ pub fn op_column(
let mut cursor =
must_be_btree_cursor!(*cursor_id, program.cursor_ref, state, "Column");
let cursor = cursor.as_btree_mut();
let record = cursor.record();
let record = return_if_io!(cursor.record());
let value = if let Some(record) = record.as_ref() {
if cursor.get_null_flag() {
RefValue::Null
@@ -1900,11 +1908,16 @@ pub fn op_row_id(
unreachable!("unexpected Insn {:?}", insn)
};
if let Some((index_cursor_id, table_cursor_id)) = state.deferred_seeks[*cursor_id].take() {
let deferred_seek = {
let deferred_seek = 'd: {
let rowid = {
let mut index_cursor = state.get_cursor(index_cursor_id);
let index_cursor = index_cursor.as_btree_mut();
let record = index_cursor.record();
let record = match index_cursor.record()? {
CursorResult::IO => {
break 'd Some((index_cursor_id, table_cursor_id));
}
CursorResult::Ok(record) => record,
};
let record = record.as_ref().unwrap();
let rowid = record.get_values().last().unwrap();
match rowid {
@@ -1914,7 +1927,7 @@ pub fn op_row_id(
};
let mut table_cursor = state.get_cursor(table_cursor_id);
let table_cursor = table_cursor.as_btree_mut();
match table_cursor.seek(SeekKey::TableRowId(rowid), SeekOp::EQ)? {
match table_cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true })? {
CursorResult::Ok(_) => None,
CursorResult::IO => Some((index_cursor_id, table_cursor_id)),
}
@@ -1926,7 +1939,7 @@ pub fn op_row_id(
}
let mut cursors = state.cursors.borrow_mut();
if let Some(Cursor::BTree(btree_cursor)) = cursors.get_mut(*cursor_id).unwrap() {
if let Some(ref rowid) = btree_cursor.rowid()? {
if let Some(ref rowid) = return_if_io!(btree_cursor.rowid()) {
state.registers[*dest] = Register::Value(Value::Integer(*rowid as i64));
} else {
state.registers[*dest] = Register::Value(Value::Null);
@@ -1960,7 +1973,7 @@ pub fn op_idx_row_id(
let mut cursors = state.cursors.borrow_mut();
let cursor = cursors.get_mut(*cursor_id).unwrap().as_mut().unwrap();
let cursor = cursor.as_btree_mut();
let rowid = cursor.rowid()?;
let rowid = return_if_io!(cursor.rowid());
state.registers[*dest] = match rowid {
Some(rowid) => Register::Value(Value::Integer(rowid as i64)),
None => Register::Value(Value::Null),
@@ -2000,7 +2013,9 @@ pub fn op_seek_rowid(
};
match rowid {
Some(rowid) => {
let found = return_if_io!(cursor.seek(SeekKey::TableRowId(rowid), SeekOp::EQ));
let found = return_if_io!(
cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true })
);
if !found {
target_pc.to_offset_int()
} else {
@@ -2046,6 +2061,7 @@ pub fn op_seek(
num_regs,
target_pc,
is_index,
..
}
| Insn::SeekGT {
cursor_id,
@@ -2060,6 +2076,7 @@ pub fn op_seek(
num_regs,
target_pc,
is_index,
..
}
| Insn::SeekLT {
cursor_id,
@@ -2076,19 +2093,22 @@ pub fn op_seek(
"target_pc should be an offset, is: {:?}",
target_pc
);
let eq_only = match insn {
Insn::SeekGE { eq_only, .. } | Insn::SeekLE { eq_only, .. } => *eq_only,
_ => false,
};
let op = match insn {
Insn::SeekGE { .. } => SeekOp::GE,
Insn::SeekGE { eq_only, .. } => SeekOp::GE { eq_only: *eq_only },
Insn::SeekGT { .. } => SeekOp::GT,
Insn::SeekLE { .. } => SeekOp::LE,
Insn::SeekLE { eq_only, .. } => SeekOp::LE { eq_only: *eq_only },
Insn::SeekLT { .. } => SeekOp::LT,
_ => unreachable!("unexpected Insn {:?}", insn),
};
let op_name = match op {
SeekOp::GE => "SeekGE",
SeekOp::GE { .. } => "SeekGE",
SeekOp::GT => "SeekGT",
SeekOp::LE => "SeekLE",
SeekOp::LE { .. } => "SeekLE",
SeekOp::LT => "SeekLT",
_ => unreachable!("unexpected SeekOp {:?}", op),
};
if *is_index {
let found = {
@@ -2160,7 +2180,7 @@ pub fn op_idx_ge(
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let record_from_regs = make_record(&state.registers, start_reg, num_regs);
let pc = if let Some(ref idx_record) = *cursor.record() {
let pc = if let Some(idx_record) = return_if_io!(cursor.record()) {
// Compare against the same number of values
let idx_values = idx_record.get_values();
let idx_values = &idx_values[..record_from_regs.len()];
@@ -2224,7 +2244,7 @@ pub fn op_idx_le(
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let record_from_regs = make_record(&state.registers, start_reg, num_regs);
let pc = if let Some(ref idx_record) = *cursor.record() {
let pc = if let Some(ref idx_record) = return_if_io!(cursor.record()) {
// Compare against the same number of values
let idx_values = idx_record.get_values();
let idx_values = &idx_values[..record_from_regs.len()];
@@ -2270,7 +2290,7 @@ pub fn op_idx_gt(
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let record_from_regs = make_record(&state.registers, start_reg, num_regs);
let pc = if let Some(ref idx_record) = *cursor.record() {
let pc = if let Some(ref idx_record) = return_if_io!(cursor.record()) {
// Compare against the same number of values
let idx_values = idx_record.get_values();
let idx_values = &idx_values[..record_from_regs.len()];
@@ -2316,7 +2336,7 @@ pub fn op_idx_lt(
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let record_from_regs = make_record(&state.registers, start_reg, num_regs);
let pc = if let Some(ref idx_record) = *cursor.record() {
let pc = if let Some(ref idx_record) = return_if_io!(cursor.record()) {
// Compare against the same number of values
let idx_values = idx_record.get_values();
let idx_values = &idx_values[..record_from_regs.len()];
@@ -3836,13 +3856,10 @@ pub fn op_insert(
Value::Integer(i) => *i,
_ => unreachable!("expected integer key"),
};
// NOTE(pere): Sending moved_before == true is okay because we moved before but
// if we were to set to false after starting a balance procedure, it might
// leave undefined state.
return_if_io!(cursor.insert(&BTreeKey::new_table_rowid(key, Some(record)), true));
// Only update last_insert_rowid for regular table inserts, not schema modifications
if cursor.root_page() != 1 {
if let Some(rowid) = cursor.rowid()? {
if let Some(rowid) = return_if_io!(cursor.rowid()) {
if let Some(conn) = program.connection.upgrade() {
conn.update_last_rowid(rowid);
}
@@ -3889,11 +3906,6 @@ pub fn op_delete(
{
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
tracing::debug!(
"op_delete(record={:?}, rowid={:?})",
cursor.record(),
cursor.rowid()?
);
return_if_io!(cursor.delete());
}
let prev_changes = program.n_change.get();
@@ -3905,6 +3917,7 @@ pub fn op_delete(
#[derive(Debug)]
pub enum OpIdxDeleteState {
Seeking(ImmutableRecord), // First seek row to delete
Verifying,
Deleting,
}
pub fn op_idx_delete(
@@ -3923,29 +3936,48 @@ pub fn op_idx_delete(
unreachable!("unexpected Insn {:?}", insn)
};
loop {
tracing::debug!(
"op_idx_delete(cursor_id={}, start_reg={}, num_regs={}, rootpage={}, state={:?})",
cursor_id,
start_reg,
num_regs,
state.get_cursor(*cursor_id).as_btree_mut().root_page(),
state.op_idx_delete_state
);
match &state.op_idx_delete_state {
Some(OpIdxDeleteState::Seeking(record)) => {
{
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
return_if_io!(cursor.seek(SeekKey::IndexKey(&record), SeekOp::EQ));
tracing::debug!(
"op_idx_delete(seek={}, record={} rowid={:?})",
&record,
cursor.record().as_ref().unwrap(),
cursor.rowid()
let found = return_if_io!(
cursor.seek(SeekKey::IndexKey(&record), SeekOp::GE { eq_only: true })
);
if cursor.rowid()?.is_none() {
// If P5 is not zero, then raise an SQLITE_CORRUPT_INDEX error if no matching
// index entry is found. This happens when running an UPDATE or DELETE statement and the
// index entry to be updated or deleted is not found. For some uses of IdxDelete
// (example: the EXCEPT operator) it does not matter that no matching entry is found.
// For those cases, P5 is zero. Also, do not raise this (self-correcting and non-critical) error if in writable_schema mode.
return Err(LimboError::Corrupt(format!(
"IdxDelete: no matching index entry found for record {:?}",
record
)));
}
tracing::debug!(
"op_idx_delete: found={:?}, rootpage={}, key={:?}",
found,
cursor.root_page(),
record
);
}
state.op_idx_delete_state = Some(OpIdxDeleteState::Verifying);
}
Some(OpIdxDeleteState::Verifying) => {
let rowid = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
return_if_io!(cursor.rowid())
};
if rowid.is_none() {
// If P5 is not zero, then raise an SQLITE_CORRUPT_INDEX error if no matching
// index entry is found. This happens when running an UPDATE or DELETE statement and the
// index entry to be updated or deleted is not found. For some uses of IdxDelete
// (example: the EXCEPT operator) it does not matter that no matching entry is found.
// For those cases, P5 is zero. Also, do not raise this (self-correcting and non-critical) error if in writable_schema mode.
return Err(LimboError::Corrupt(format!(
"IdxDelete: no matching index entry found for record {:?}",
make_record(&state.registers, start_reg, num_regs)
)));
}
state.op_idx_delete_state = Some(OpIdxDeleteState::Deleting);
}
@@ -4151,7 +4183,8 @@ pub fn op_no_conflict(
return Ok(InsnFunctionStepResult::Step);
}
let conflict = return_if_io!(cursor.seek(SeekKey::IndexKey(record), SeekOp::EQ));
let conflict =
return_if_io!(cursor.seek(SeekKey::IndexKey(record), SeekOp::GE { eq_only: true }));
drop(cursor_ref);
if !conflict {
state.pc = target_pc.to_offset_int();
@@ -4925,10 +4958,10 @@ pub fn op_found(
}
};
return_if_io!(cursor.seek(SeekKey::IndexKey(&record), SeekOp::EQ))
return_if_io!(cursor.seek(SeekKey::IndexKey(&record), SeekOp::GE { eq_only: true }))
} else {
let record = make_record(&state.registers, record_reg, num_regs);
return_if_io!(cursor.seek(SeekKey::IndexKey(&record), SeekOp::EQ))
return_if_io!(cursor.seek(SeekKey::IndexKey(&record), SeekOp::GE { eq_only: true }))
}
};

View File

@@ -809,6 +809,7 @@ pub fn insn_to_str(
start_reg,
num_regs,
target_pc,
..
}
| Insn::SeekLE {
is_index: _,
@@ -816,6 +817,7 @@ pub fn insn_to_str(
start_reg,
num_regs,
target_pc,
..
}
| Insn::SeekLT {
is_index: _,

View File

@@ -502,6 +502,7 @@ pub enum Insn {
start_reg: usize,
num_regs: usize,
target_pc: BranchOffset,
eq_only: bool,
},
/// If cursor_id refers to an SQL table (B-Tree that uses integer keys), use the value in start_reg as the key.
@@ -538,6 +539,7 @@ pub enum Insn {
start_reg: usize,
num_regs: usize,
target_pc: BranchOffset,
eq_only: bool,
},
// If cursor_id refers to an SQL table (B-Tree that uses integer keys), use the value in start_reg as the key.

View File

@@ -499,17 +499,17 @@ fn get_new_rowid<R: Rng>(cursor: &mut BTreeCursor, mut rng: R) -> Result<CursorR
CursorResult::Ok(()) => {}
CursorResult::IO => return Ok(CursorResult::IO),
}
let mut rowid = cursor
.rowid()?
.unwrap_or(0) // if BTree is empty - use 0 as initial value for rowid
.checked_add(1) // add 1 but be careful with overflows
.unwrap_or(i64::MAX); // in case of overflow - use i64::MAX
let mut rowid = match cursor.rowid()? {
CursorResult::Ok(Some(rowid)) => rowid.checked_add(1).unwrap_or(i64::MAX), // add 1 but be careful with overflows, in case of overflow - use i64::MAX
CursorResult::Ok(None) => 1,
CursorResult::IO => return Ok(CursorResult::IO),
};
if rowid > i64::MAX.try_into().unwrap() {
let distribution = Uniform::from(1..=i64::MAX);
let max_attempts = 100;
for count in 0..max_attempts {
rowid = distribution.sample(&mut rng).try_into().unwrap();
match cursor.seek(SeekKey::TableRowId(rowid), SeekOp::EQ)? {
match cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true })? {
CursorResult::Ok(false) => break, // Found a non-existing rowid
CursorResult::Ok(true) => {
if count == max_attempts - 1 {

View File

@@ -428,6 +428,25 @@ fn test_update_with_index() -> anyhow::Result<()> {
Ok(())
}
#[test]
fn test_delete_with_index() -> anyhow::Result<()> {
let _ = env_logger::try_init();
maybe_setup_tracing();
let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE t(x UNIQUE)");
let conn = tmp_db.connect_limbo();
run_query(&tmp_db, &conn, "INSERT INTO t VALUES (1), (2)")?;
run_query(&tmp_db, &conn, "DELETE FROM t WHERE x >= 1")?;
run_query_on_row(&tmp_db, &conn, "SELECT * FROM t", |_| {
panic!("Delete should've deleted every row!");
})?;
Ok(())
}
fn run_query(tmp_db: &TempDatabase, conn: &Rc<Connection>, query: &str) -> anyhow::Result<()> {
run_query_core(tmp_db, conn, query, None::<fn(&Row)>)
}