implement IdxDelete

clippy

revert op_idx_ge changes

fmt

fmt again

rever op_idx_gt changes
This commit is contained in:
Pere Diaz Bou
2025-04-22 11:50:51 +02:00
parent 2495d15b96
commit b7970a286d
7 changed files with 239 additions and 8 deletions

View File

@@ -50,6 +50,11 @@ pub fn prepare_delete_plan(
crate::bail_corrupt_error!("Table is neither a virtual table nor a btree table");
};
let name = tbl_name.name.0.as_str().to_string();
let indexes = schema
.get_indices(table.get_name())
.iter()
.cloned()
.collect();
let mut table_references = vec![TableReference {
table,
identifier: name,
@@ -82,6 +87,7 @@ pub fn prepare_delete_plan(
limit: resolved_limit,
offset: resolved_offset,
contains_constant_false_condition: false,
indexes,
};
Ok(Plan::Delete(plan))

View File

@@ -2,13 +2,16 @@
// It handles translating high-level SQL operations into low-level bytecode that can be executed by the virtual machine.
use std::rc::Rc;
use std::sync::Arc;
use limbo_sqlite3_parser::ast::{self};
use crate::function::Func;
use crate::schema::Index;
use crate::translate::plan::{DeletePlan, Plan, Search};
use crate::util::exprs_are_equivalent;
use crate::vdbe::builder::ProgramBuilder;
use crate::vdbe::insn::RegisterOrLiteral;
use crate::vdbe::{insn::Insn, BranchOffset};
use crate::{Result, SymbolTable};
@@ -375,7 +378,13 @@ fn emit_program_for_delete(
&plan.table_references,
&plan.where_clause,
)?;
emit_delete_insns(program, &mut t_ctx, &plan.table_references, &plan.limit)?;
emit_delete_insns(
program,
&mut t_ctx,
&plan.table_references,
&plan.indexes,
&plan.limit,
)?;
// Clean up and close the main execution loop
close_loop(program, &mut t_ctx, &plan.table_references)?;
@@ -393,6 +402,7 @@ fn emit_delete_insns(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
table_references: &[TableReference],
index_references: &[Arc<Index>],
limit: &Option<isize>,
) -> Result<()> {
let table_reference = table_references.first().unwrap();
@@ -408,11 +418,12 @@ fn emit_delete_insns(
},
_ => return Ok(()),
};
let main_table_cursor_id = program.resolve_cursor_id(table_reference.table.get_name());
// Emit the instructions to delete the row
let key_reg = program.alloc_register();
program.emit_insn(Insn::RowId {
cursor_id,
cursor_id: main_table_cursor_id,
dest: key_reg,
});
@@ -433,7 +444,43 @@ fn emit_delete_insns(
conflict_action,
});
} else {
program.emit_insn(Insn::Delete { cursor_id });
for index in index_references {
let index_cursor_id = program.alloc_cursor_id(
Some(index.name.clone()),
crate::vdbe::builder::CursorType::BTreeIndex(index.clone()),
);
program.emit_insn(Insn::OpenWrite {
cursor_id: index_cursor_id,
root_page: RegisterOrLiteral::Literal(index.root_page),
});
let num_regs = index.columns.len() + 1;
let start_reg = program.alloc_registers(num_regs);
// Emit columns that are part of the index
index
.columns
.iter()
.enumerate()
.for_each(|(reg_offset, column_index)| {
program.emit_insn(Insn::Column {
cursor_id: main_table_cursor_id,
column: column_index.pos_in_table,
dest: start_reg + reg_offset,
});
});
program.emit_insn(Insn::RowId {
cursor_id: main_table_cursor_id,
dest: start_reg + num_regs - 1,
});
program.emit_insn(Insn::IdxDelete {
start_reg,
num_regs,
cursor_id: index_cursor_id,
});
}
program.emit_insn(Insn::Delete {
cursor_id: main_table_cursor_id,
});
}
if let Some(limit) = limit {
let limit_reg = program.alloc_register();

View File

@@ -297,6 +297,8 @@ pub struct DeletePlan {
pub offset: Option<isize>,
/// query contains a constant condition that is always false
pub contains_constant_false_condition: bool,
/// Indexes that must be updated by the delete operation.
pub indexes: Vec<Arc<Index>>,
}
#[derive(Debug, Clone)]

View File

@@ -1810,11 +1810,17 @@ pub fn op_row_id(
let rowid = {
let mut index_cursor = state.get_cursor(index_cursor_id);
let index_cursor = index_cursor.as_btree_mut();
index_cursor.rowid()?
let record = index_cursor.record();
let record = record.as_ref().unwrap();
let rowid = record.get_values().last().unwrap();
match rowid {
RefValue::Integer(rowid) => *rowid as u64,
_ => todo!(),
}
};
let mut table_cursor = state.get_cursor(table_cursor_id);
let table_cursor = table_cursor.as_btree_mut();
match table_cursor.seek(SeekKey::TableRowId(rowid.unwrap()), SeekOp::EQ)? {
match table_cursor.seek(SeekKey::TableRowId(rowid), SeekOp::EQ)? {
CursorResult::Ok(_) => None,
CursorResult::IO => Some((index_cursor_id, table_cursor_id)),
}
@@ -2069,7 +2075,6 @@ pub fn op_idx_ge(
let idx_values = idx_record.get_values();
let idx_values = &idx_values[..record_from_regs.len()];
let record_values = record_from_regs.get_values();
let record_values = &record_values[..idx_values.len()];
let ord = compare_immutable(&idx_values, &record_values, cursor.index_key_sort_order);
if ord.is_ge() {
target_pc.to_offset_int()
@@ -3759,6 +3764,34 @@ pub fn op_delete(
Ok(InsnFunctionStepResult::Step)
}
pub fn op_idx_delete(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Rc<Pager>,
mv_store: Option<&Rc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
let Insn::IdxDelete {
cursor_id,
start_reg,
num_regs,
} = insn
else {
unreachable!("unexpected Insn {:?}", insn)
};
let record = make_record(&state.registers, start_reg, num_regs);
{
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
return_if_io!(cursor.seek(SeekKey::IndexKey(&record), SeekOp::EQ));
return_if_io!(cursor.delete());
}
let prev_changes = program.n_change.get();
program.n_change.set(prev_changes + 1);
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
pub fn op_idx_insert(
program: &Program,
state: &mut ProgramState,
@@ -3766,7 +3799,6 @@ pub fn op_idx_insert(
pager: &Rc<Pager>,
mv_store: Option<&Rc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
dbg!("op_idx_insert_");
if let Insn::IdxInsert {
cursor_id,
record_reg,
@@ -3807,7 +3839,6 @@ pub fn op_idx_insert(
}
};
dbg!(moved_before);
// Start insertion of row. This might trigger a balance procedure which will take care of moving to different pages,
// therefore, we don't want to seek again if that happens, meaning we don't want to return on io without moving to the following opcode
// because it could trigger a movement to child page after a balance root which will leave the current page as the root page.

View File

@@ -1037,6 +1037,19 @@ pub fn insn_to_str(
0,
"".to_string(),
),
Insn::IdxDelete {
cursor_id,
start_reg,
num_regs,
} => (
"IdxDelete",
*cursor_id as i32,
*start_reg as i32,
*num_regs as i32,
OwnedValue::build_text(""),
0,
"".to_string(),
),
Insn::NewRowid {
cursor,
rowid_reg,

View File

@@ -650,6 +650,12 @@ pub enum Insn {
cursor_id: CursorID,
},
IdxDelete {
start_reg: usize,
num_regs: usize,
cursor_id: CursorID,
},
NewRowid {
cursor: CursorID, // P1
rowid_reg: usize, // P2 Destination register to store the new rowid
@@ -948,6 +954,7 @@ impl Insn {
Insn::Once { .. } => execute::op_once,
Insn::NotFound { .. } => execute::op_not_found,
Insn::Affinity { .. } => execute::op_affinity,
Insn::IdxDelete { .. } => execute::op_idx_delete,
}
}
}

View File

@@ -461,3 +461,128 @@ fn test_insert_after_big_blob() -> anyhow::Result<()> {
Ok(())
}
#[test_log::test]
#[ignore = "this takes too long :)"]
fn test_write_delete_with_index() -> anyhow::Result<()> {
let _ = env_logger::try_init();
maybe_setup_tracing();
let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x PRIMARY KEY);");
let conn = tmp_db.connect_limbo();
let list_query = "SELECT * FROM test";
let max_iterations = 1000;
for i in 0..max_iterations {
println!("inserting {} ", i);
if (i % 100) == 0 {
let progress = (i as f64 / max_iterations as f64) * 100.0;
println!("progress {:.1}%", progress);
}
let insert_query = format!("INSERT INTO test VALUES ({})", i);
match conn.query(insert_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
}
for i in 0..max_iterations {
println!("deleting {} ", i);
if (i % 100) == 0 {
let progress = (i as f64 / max_iterations as f64) * 100.0;
println!("progress {:.1}%", progress);
}
let delete_query = format!("delete from test where x={}", i);
match conn.query(delete_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
println!("listing after deleting {} ", i);
let mut current_read_index = i + 1;
match conn.query(list_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(current_read_index, id);
current_read_index += 1;
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => {
panic!("Database is busy");
}
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
}
for i in i + 1..max_iterations {
// now test with seek
match conn.query(format!("select * from test where x = {}", i)) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(i, id);
break;
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => {
panic!("Database is busy");
}
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
}
}
}
Ok(())
}