fixing more rebase issues and cleaning up code. Save cursor context when calling delete for later use when needed

This commit is contained in:
pedrocarlo
2025-05-05 00:42:12 -03:00
parent c69f503eac
commit 814508981c
2 changed files with 62 additions and 220 deletions

View File

@@ -20,6 +20,7 @@ use crate::{
LimboError, Result,
};
#[cfg(debug_assertions)]
use std::collections::HashSet;
use std::{
cell::{Cell, Ref, RefCell},
@@ -338,6 +339,14 @@ enum OverflowState {
Done,
}
enum CursorContext {
TableRowId(u64),
/// If we are in an index tree we can then reuse this field to save
/// our cursor information
IndexKeyRowId(ImmutableRecord),
}
pub struct BTreeCursor {
/// The multi-version cursor that is used to read and write to the database file.
mv_cursor: Option<Rc<RefCell<MvCursor>>>,
@@ -366,7 +375,8 @@ pub struct BTreeCursor {
pub index_key_sort_order: IndexKeySortOrder,
/// Maintain count of the number of records in the btree. Used for the `Count` opcode
count: usize,
rowid_seen: HashSet<u64>,
/// Stores the last record that was seen.
context: Option<CursorContext>,
}
impl BTreeCursor {
@@ -393,7 +403,7 @@ impl BTreeCursor {
empty_record: Cell::new(true),
index_key_sort_order: IndexKeySortOrder::default(),
count: 0,
rowid_seen: HashSet::new(),
context: None,
}
}
@@ -759,8 +769,6 @@ impl BTreeCursor {
let contents = mem_page.contents.as_ref().unwrap();
let cell_count = contents.cell_count();
tracing::trace!(cell_idx, cell_count, "get_next_record");
if cell_count == 0 || cell_idx == cell_count {
// do rightmost
let has_parent = self.stack.has_parent();
@@ -835,11 +843,6 @@ impl BTreeCursor {
)?
};
self.stack.advance();
if self.rowid_seen.contains(_rowid) {
continue;
} else {
self.rowid_seen.insert(*_rowid);
}
return Ok(CursorResult::Ok(Some(*_rowid)));
}
BTreeCell::IndexInteriorCell(IndexInteriorCell {
@@ -905,11 +908,6 @@ impl BTreeCursor {
Some(RefValue::Integer(rowid)) => *rowid as u64,
_ => unreachable!("index cells should have an integer rowid"),
};
if self.rowid_seen.contains(&rowid) {
continue;
} else {
self.rowid_seen.insert(rowid);
}
return Ok(CursorResult::Ok(Some(rowid)));
} else {
continue;
@@ -970,11 +968,6 @@ impl BTreeCursor {
Some(RefValue::Integer(rowid)) => *rowid as u64,
_ => unreachable!("index cells should have an integer rowid"),
};
if self.rowid_seen.contains(&rowid) {
continue;
} else {
self.rowid_seen.insert(rowid);
}
return Ok(CursorResult::Ok(Some(rowid)));
} else {
continue;
@@ -3440,7 +3433,6 @@ impl BTreeCursor {
}
pub fn rewind(&mut self) -> Result<CursorResult<()>> {
self.rowid_seen.clear();
if self.mv_cursor.is_some() {
let rowid = return_if_io!(self.get_next_record(None));
self.rowid.replace(rowid);
@@ -3464,6 +3456,7 @@ impl BTreeCursor {
}
pub fn next(&mut self) -> Result<CursorResult<()>> {
let _ = self.restore_context()?;
let rowid = return_if_io!(self.get_next_record(None));
self.rowid.replace(rowid);
self.empty_record.replace(rowid.is_none());
@@ -3557,6 +3550,7 @@ impl BTreeCursor {
/// 8. Finish -> Delete operation is done. Return CursorResult(Ok())
pub fn delete(&mut self) -> Result<CursorResult<()>> {
assert!(self.mv_cursor.is_none());
self.save_context();
if let CursorState::None = &self.state {
self.state = CursorState::Delete(DeleteInfo {
@@ -4359,6 +4353,44 @@ impl BTreeCursor {
}
}
}
/// Save cursor context, to be restored later
pub fn save_context(&mut self) {
if let Some(rowid) = self.rowid.get() {
match self.stack.top().get_contents().page_type() {
PageType::TableInterior | PageType::TableLeaf => {
self.context = Some(CursorContext::TableRowId(rowid));
}
PageType::IndexInterior | PageType::IndexLeaf => {
self.context = Some(CursorContext::IndexKeyRowId(
self.reusable_immutable_record
.borrow()
.as_ref()
.unwrap()
.clone(),
));
}
}
}
}
/// If context is defined, restore it and set it None on success
fn restore_context(&mut self) -> Result<CursorResult<bool>> {
if self.context.is_none() {
return Ok(CursorResult::Ok(false));
}
let ctx = self.context.as_ref().unwrap();
let res = match ctx {
CursorContext::TableRowId(rowid) => self.seek(SeekKey::TableRowId(*rowid), SeekOp::EQ),
CursorContext::IndexKeyRowId(record) => {
// TODO: see how to avoid clone here
let record = record.clone();
self.seek(SeekKey::IndexKey(&record), SeekOp::EQ)
}
}?;
self.context = None;
Ok(res)
}
}
#[cfg(debug_assertions)]
@@ -4461,11 +4493,6 @@ impl PageStack {
/// Cell index of the current page
fn current_cell_index(&self) -> i32 {
let current = self.current();
tracing::event!(
tracing::Level::TRACE,
"current: cell_indices={:?}",
self.cell_indices
);
self.cell_indices.borrow()[current]
}
@@ -4486,11 +4513,6 @@ impl PageStack {
self.cell_indices
);
self.cell_indices.borrow_mut()[current] += 1;
tracing::event!(
tracing::Level::TRACE,
"after: cell_indices={:?}",
self.cell_indices
);
}
fn retreat(&self) {
@@ -4501,11 +4523,6 @@ impl PageStack {
self.cell_indices
);
self.cell_indices.borrow_mut()[current] -= 1;
tracing::event!(
tracing::Level::TRACE,
"after: cell_indices={:?}",
self.cell_indices
);
}
/// Move the cursor to the next cell in the current page according to the iteration direction.
@@ -4522,18 +4539,7 @@ impl PageStack {
fn set_cell_index(&self, idx: i32) {
let current = self.current();
tracing::event!(
tracing::Level::TRACE,
"set_cell_index={} cell_indices={:?}",
idx,
self.cell_indices
);
self.cell_indices.borrow_mut()[current] = idx;
tracing::event!(
tracing::Level::TRACE,
"after: cell_indices={:?}",
self.cell_indices
);
}
fn has_parent(&self) -> bool {

View File

@@ -593,6 +593,7 @@ fn emit_program_for_update(
// Open indexes for update.
let mut index_cursors = Vec::with_capacity(plan.indexes_to_update.len());
// TODO: do not reopen if there is table reference using it.
for index in &plan.indexes_to_update {
let index_cursor = program.alloc_cursor_id(
Some(index.table_name.clone()),
@@ -603,7 +604,8 @@ fn emit_program_for_update(
root_page: RegisterOrLiteral::Literal(index.root_page),
name: index.name.clone(),
});
index_cursors.push(index_cursor);
let record_reg = program.alloc_register();
index_cursors.push((index_cursor, record_reg));
}
open_loop(
program,
@@ -644,7 +646,7 @@ fn emit_update_insns(
plan: &UpdatePlan,
t_ctx: &TranslateCtx,
program: &mut ProgramBuilder,
index_cursors: Vec<usize>,
index_cursors: Vec<(usize, usize)>,
) -> crate::Result<()> {
let table_ref = &plan.table_references.first().unwrap();
let loop_labels = t_ctx.labels_main_loop.first().unwrap();
@@ -671,46 +673,6 @@ fn emit_update_insns(
_ => return Ok(()),
};
// THIS IS SAME CODE AS WE HAVE ON INSERT
// allocate cursor id's for each btree index cursor we'll need to populate the indexes
// (idx name, root_page, idx cursor id)
let idx_cursors = plan
.indexes_to_update
.iter()
.map(|idx| {
let record_reg = program.alloc_register();
(
&idx.name,
idx.root_page,
program.alloc_cursor_id(
Some(idx.table_name.clone()),
CursorType::BTreeIndex(idx.clone()),
),
record_reg,
)
})
.collect::<Vec<(&String, usize, usize, usize)>>();
if !idx_cursors.is_empty() {
let open_indices_label = program.allocate_label();
// Open all cursors Once
program.emit_insn(Insn::Once {
target_pc_when_reentered: open_indices_label,
});
// Open all the index btrees for writing
for idx_cursor in idx_cursors.iter() {
program.emit_insn(Insn::OpenWrite {
cursor_id: idx_cursor.2,
root_page: idx_cursor.1.into(),
name: idx_cursor.0.clone(),
});
}
program.preassign_label_to_next_insn(open_indices_label);
}
for cond in plan
.where_clause
.iter()
@@ -797,86 +759,6 @@ fn emit_update_insns(
});
}
for (pos, index) in plan.indexes_to_update.iter().enumerate() {
if index.unique {
let (_, _, idx_cursor_id, record_reg) =
idx_cursors.get(pos).expect("index cursor should exist");
let num_cols = index.columns.len();
// allocate scratch registers for the index columns plus rowid
let idx_start_reg = program.alloc_registers(num_cols + 1);
let rowid_reg = beg;
let idx_cols_start_reg = beg + 1;
// copy each index column from the table's column registers into these scratch regs
for (i, col) in index.columns.iter().enumerate() {
// copy from the table's column register over to the index's scratch register
program.emit_insn(Insn::Copy {
src_reg: idx_cols_start_reg + col.pos_in_table,
dst_reg: idx_start_reg + i,
amount: 0,
});
}
// last register is the rowid
program.emit_insn(Insn::Copy {
src_reg: rowid_reg,
dst_reg: idx_start_reg + num_cols,
amount: 0,
});
program.emit_insn(Insn::MakeRecord {
start_reg: idx_start_reg,
count: num_cols + 1,
dest_reg: *record_reg,
index_name: Some(index.name.clone()),
});
let constraint_check = program.allocate_label();
program.emit_insn(Insn::NoConflict {
cursor_id: *idx_cursor_id,
target_pc: constraint_check,
record_reg: idx_start_reg,
num_regs: num_cols,
});
let column_names = index.columns.iter().enumerate().fold(
String::with_capacity(50),
|mut accum, (idx, col)| {
if idx > 0 {
accum.push_str(", ");
}
accum.push_str(&table_ref.table.get_name());
accum.push('.');
accum.push_str(&col.name);
accum
},
);
let idx_rowid_reg = program.alloc_register();
program.emit_insn(Insn::IdxRowId {
cursor_id: *idx_cursor_id,
dest: idx_rowid_reg,
});
program.emit_insn(Insn::Eq {
lhs: rowid_reg,
rhs: idx_rowid_reg,
target_pc: constraint_check,
flags: CmpInsFlags::default(), // TODO: not sure what type of comparison flag is needed
});
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY, // TODO: distinct between primary key and unique index for error code
description: column_names,
});
program.preassign_label_to_next_insn(constraint_check);
}
}
for cond in plan
.where_clause
.iter()
@@ -896,47 +778,6 @@ fn emit_update_insns(
)?;
}
// Update indexes first. Columns that are updated will be translated from an expression and those who aren't modified will be
// read from table. Mutiple value index key could be updated partially.
for (index, index_cursor) in plan.indexes_to_update.iter().zip(index_cursors) {
let index_record_reg_count = index.columns.len() + 1;
let index_record_reg_start = program.alloc_registers(index_record_reg_count);
for (idx, column) in index.columns.iter().enumerate() {
if let Some((_, expr)) = plan.set_clauses.iter().find(|(i, _)| *i == idx) {
translate_expr(
program,
Some(&plan.table_references),
expr,
index_record_reg_start + idx,
&t_ctx.resolver,
)?;
} else {
program.emit_insn(Insn::Column {
cursor_id: cursor_id,
column: column.pos_in_table,
dest: index_record_reg_start + idx,
});
}
}
program.emit_insn(Insn::RowId {
cursor_id: cursor_id,
dest: index_record_reg_start + index.columns.len(),
});
let index_record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: index_record_reg_start,
count: index_record_reg_count,
dest_reg: index_record_reg,
index_name: Some(index.name.clone()),
});
program.emit_insn(Insn::IdxInsert {
cursor_id: index_cursor,
record_reg: index_record_reg,
unpacked_start: Some(index_record_reg_start),
unpacked_count: Some(index_record_reg_count as u16),
flags: IdxInsertFlags::new(),
});
}
// we scan a column at a time, loading either the column's values, or the new value
// from the Set expression, into registers so we can emit a MakeRecord and update the row.
let start = if is_virtual { beg + 2 } else { beg + 1 };
@@ -1006,11 +847,8 @@ fn emit_update_insns(
}
}
for (pos, index) in plan.indexes_to_update.iter().enumerate() {
for (index, (idx_cursor_id, record_reg)) in plan.indexes_to_update.iter().zip(&index_cursors) {
if index.unique {
let (_, _, idx_cursor_id, record_reg) =
idx_cursors.get(pos).expect("index cursor should exist");
let num_cols = index.columns.len();
// allocate scratch registers for the index columns plus rowid
let idx_start_reg = program.alloc_registers(num_cols + 1);
@@ -1148,7 +986,8 @@ fn emit_update_insns(
}
// For each index -> insert
for (pos, index) in plan.indexes_to_update.iter().enumerate() {
for (index, (idx_cursor_id, record_reg)) in plan.indexes_to_update.iter().zip(index_cursors)
{
let num_regs = index.columns.len() + 1;
let start_reg = program.alloc_registers(num_regs);
@@ -1170,18 +1009,15 @@ fn emit_update_insns(
dest: start_reg + num_regs - 1,
});
let (_, _, idx_cursor_id, record_reg) =
idx_cursors.get(pos).expect("index cursor should exist");
program.emit_insn(Insn::IdxDelete {
start_reg,
num_regs,
cursor_id: *idx_cursor_id,
cursor_id: idx_cursor_id,
});
program.emit_insn(Insn::IdxInsert {
cursor_id: *idx_cursor_id,
record_reg: *record_reg,
cursor_id: idx_cursor_id,
record_reg: record_reg,
unpacked_start: Some(start),
unpacked_count: Some((index.columns.len() + 1) as u16),
flags: IdxInsertFlags::new(),