diff --git a/core/translate/delete.rs b/core/translate/delete.rs index fb580b8e8..5cb38cf42 100644 --- a/core/translate/delete.rs +++ b/core/translate/delete.rs @@ -50,6 +50,11 @@ pub fn prepare_delete_plan( crate::bail_corrupt_error!("Table is neither a virtual table nor a btree table"); }; let name = tbl_name.name.0.as_str().to_string(); + let indexes = schema + .get_indices(table.get_name()) + .iter() + .cloned() + .collect(); let mut table_references = vec![TableReference { table, identifier: name, @@ -82,6 +87,7 @@ pub fn prepare_delete_plan( limit: resolved_limit, offset: resolved_offset, contains_constant_false_condition: false, + indexes, }; Ok(Plan::Delete(plan)) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 5b12e4375..d1e2fdce9 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -2,13 +2,16 @@ // It handles translating high-level SQL operations into low-level bytecode that can be executed by the virtual machine. use std::rc::Rc; +use std::sync::Arc; use limbo_sqlite3_parser::ast::{self}; use crate::function::Func; +use crate::schema::Index; use crate::translate::plan::{DeletePlan, Plan, Search}; use crate::util::exprs_are_equivalent; use crate::vdbe::builder::ProgramBuilder; +use crate::vdbe::insn::RegisterOrLiteral; use crate::vdbe::{insn::Insn, BranchOffset}; use crate::{Result, SymbolTable}; @@ -375,7 +378,13 @@ fn emit_program_for_delete( &plan.table_references, &plan.where_clause, )?; - emit_delete_insns(program, &mut t_ctx, &plan.table_references, &plan.limit)?; + emit_delete_insns( + program, + &mut t_ctx, + &plan.table_references, + &plan.indexes, + &plan.limit, + )?; // Clean up and close the main execution loop close_loop(program, &mut t_ctx, &plan.table_references)?; @@ -393,6 +402,7 @@ fn emit_delete_insns( program: &mut ProgramBuilder, t_ctx: &mut TranslateCtx, table_references: &[TableReference], + index_references: &[Arc], limit: &Option, ) -> Result<()> { let table_reference = table_references.first().unwrap(); @@ -408,11 +418,12 @@ fn emit_delete_insns( }, _ => return Ok(()), }; + let main_table_cursor_id = program.resolve_cursor_id(table_reference.table.get_name()); // Emit the instructions to delete the row let key_reg = program.alloc_register(); program.emit_insn(Insn::RowId { - cursor_id, + cursor_id: main_table_cursor_id, dest: key_reg, }); @@ -433,7 +444,43 @@ fn emit_delete_insns( conflict_action, }); } else { - program.emit_insn(Insn::Delete { cursor_id }); + for index in index_references { + let index_cursor_id = program.alloc_cursor_id( + Some(index.name.clone()), + crate::vdbe::builder::CursorType::BTreeIndex(index.clone()), + ); + + program.emit_insn(Insn::OpenWrite { + cursor_id: index_cursor_id, + root_page: RegisterOrLiteral::Literal(index.root_page), + }); + let num_regs = index.columns.len() + 1; + let start_reg = program.alloc_registers(num_regs); + // Emit columns that are part of the index + index + .columns + .iter() + .enumerate() + .for_each(|(reg_offset, column_index)| { + program.emit_insn(Insn::Column { + cursor_id: main_table_cursor_id, + column: column_index.pos_in_table, + dest: start_reg + reg_offset, + }); + }); + program.emit_insn(Insn::RowId { + cursor_id: main_table_cursor_id, + dest: start_reg + num_regs - 1, + }); + program.emit_insn(Insn::IdxDelete { + start_reg, + num_regs, + cursor_id: index_cursor_id, + }); + } + program.emit_insn(Insn::Delete { + cursor_id: main_table_cursor_id, + }); } if let Some(limit) = limit { let limit_reg = program.alloc_register(); diff --git a/core/translate/plan.rs b/core/translate/plan.rs index 48ce4c854..d25e1837c 100644 --- a/core/translate/plan.rs +++ b/core/translate/plan.rs @@ -297,6 +297,8 @@ pub struct DeletePlan { pub offset: Option, /// query contains a constant condition that is always false pub contains_constant_false_condition: bool, + /// Indexes that must be updated by the delete operation. + pub indexes: Vec>, } #[derive(Debug, Clone)] diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index de871f54c..1177af3b7 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -1810,11 +1810,17 @@ pub fn op_row_id( let rowid = { let mut index_cursor = state.get_cursor(index_cursor_id); let index_cursor = index_cursor.as_btree_mut(); - index_cursor.rowid()? + let record = index_cursor.record(); + let record = record.as_ref().unwrap(); + let rowid = record.get_values().last().unwrap(); + match rowid { + RefValue::Integer(rowid) => *rowid as u64, + _ => todo!(), + } }; let mut table_cursor = state.get_cursor(table_cursor_id); let table_cursor = table_cursor.as_btree_mut(); - match table_cursor.seek(SeekKey::TableRowId(rowid.unwrap()), SeekOp::EQ)? { + match table_cursor.seek(SeekKey::TableRowId(rowid), SeekOp::EQ)? { CursorResult::Ok(_) => None, CursorResult::IO => Some((index_cursor_id, table_cursor_id)), } @@ -2069,7 +2075,6 @@ pub fn op_idx_ge( let idx_values = idx_record.get_values(); let idx_values = &idx_values[..record_from_regs.len()]; let record_values = record_from_regs.get_values(); - let record_values = &record_values[..idx_values.len()]; let ord = compare_immutable(&idx_values, &record_values, cursor.index_key_sort_order); if ord.is_ge() { target_pc.to_offset_int() @@ -3759,6 +3764,34 @@ pub fn op_delete( Ok(InsnFunctionStepResult::Step) } +pub fn op_idx_delete( + program: &Program, + state: &mut ProgramState, + insn: &Insn, + pager: &Rc, + mv_store: Option<&Rc>, +) -> Result { + let Insn::IdxDelete { + cursor_id, + start_reg, + num_regs, + } = insn + else { + unreachable!("unexpected Insn {:?}", insn) + }; + let record = make_record(&state.registers, start_reg, num_regs); + { + let mut cursor = state.get_cursor(*cursor_id); + let cursor = cursor.as_btree_mut(); + return_if_io!(cursor.seek(SeekKey::IndexKey(&record), SeekOp::EQ)); + return_if_io!(cursor.delete()); + } + let prev_changes = program.n_change.get(); + program.n_change.set(prev_changes + 1); + state.pc += 1; + Ok(InsnFunctionStepResult::Step) +} + pub fn op_idx_insert( program: &Program, state: &mut ProgramState, @@ -3766,7 +3799,6 @@ pub fn op_idx_insert( pager: &Rc, mv_store: Option<&Rc>, ) -> Result { - dbg!("op_idx_insert_"); if let Insn::IdxInsert { cursor_id, record_reg, @@ -3807,7 +3839,6 @@ pub fn op_idx_insert( } }; - dbg!(moved_before); // Start insertion of row. This might trigger a balance procedure which will take care of moving to different pages, // therefore, we don't want to seek again if that happens, meaning we don't want to return on io without moving to the following opcode // because it could trigger a movement to child page after a balance root which will leave the current page as the root page. diff --git a/core/vdbe/explain.rs b/core/vdbe/explain.rs index 96afc5d17..a6db7be05 100644 --- a/core/vdbe/explain.rs +++ b/core/vdbe/explain.rs @@ -1037,6 +1037,19 @@ pub fn insn_to_str( 0, "".to_string(), ), + Insn::IdxDelete { + cursor_id, + start_reg, + num_regs, + } => ( + "IdxDelete", + *cursor_id as i32, + *start_reg as i32, + *num_regs as i32, + OwnedValue::build_text(""), + 0, + "".to_string(), + ), Insn::NewRowid { cursor, rowid_reg, diff --git a/core/vdbe/insn.rs b/core/vdbe/insn.rs index 56f44bd2b..295b41a2d 100644 --- a/core/vdbe/insn.rs +++ b/core/vdbe/insn.rs @@ -650,6 +650,12 @@ pub enum Insn { cursor_id: CursorID, }, + IdxDelete { + start_reg: usize, + num_regs: usize, + cursor_id: CursorID, + }, + NewRowid { cursor: CursorID, // P1 rowid_reg: usize, // P2 Destination register to store the new rowid @@ -948,6 +954,7 @@ impl Insn { Insn::Once { .. } => execute::op_once, Insn::NotFound { .. } => execute::op_not_found, Insn::Affinity { .. } => execute::op_affinity, + Insn::IdxDelete { .. } => execute::op_idx_delete, } } } diff --git a/tests/integration/query_processing/test_write_path.rs b/tests/integration/query_processing/test_write_path.rs index e948ed5d1..9c6107d58 100644 --- a/tests/integration/query_processing/test_write_path.rs +++ b/tests/integration/query_processing/test_write_path.rs @@ -461,3 +461,128 @@ fn test_insert_after_big_blob() -> anyhow::Result<()> { Ok(()) } + +#[test_log::test] +#[ignore = "this takes too long :)"] +fn test_write_delete_with_index() -> anyhow::Result<()> { + let _ = env_logger::try_init(); + + maybe_setup_tracing(); + + let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x PRIMARY KEY);"); + let conn = tmp_db.connect_limbo(); + + let list_query = "SELECT * FROM test"; + let max_iterations = 1000; + for i in 0..max_iterations { + println!("inserting {} ", i); + if (i % 100) == 0 { + let progress = (i as f64 / max_iterations as f64) * 100.0; + println!("progress {:.1}%", progress); + } + let insert_query = format!("INSERT INTO test VALUES ({})", i); + match conn.query(insert_query) { + Ok(Some(ref mut rows)) => loop { + match rows.step()? { + StepResult::IO => { + tmp_db.io.run_once()?; + } + StepResult::Done => break, + _ => unreachable!(), + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + }; + } + for i in 0..max_iterations { + println!("deleting {} ", i); + if (i % 100) == 0 { + let progress = (i as f64 / max_iterations as f64) * 100.0; + println!("progress {:.1}%", progress); + } + let delete_query = format!("delete from test where x={}", i); + match conn.query(delete_query) { + Ok(Some(ref mut rows)) => loop { + match rows.step()? { + StepResult::IO => { + tmp_db.io.run_once()?; + } + StepResult::Done => break, + _ => unreachable!(), + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + }; + println!("listing after deleting {} ", i); + let mut current_read_index = i + 1; + match conn.query(list_query) { + Ok(Some(ref mut rows)) => loop { + match rows.step()? { + StepResult::Row => { + let row = rows.row().unwrap(); + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + limbo_core::OwnedValue::Integer(i) => *i as i32, + limbo_core::OwnedValue::Float(f) => *f as i32, + _ => unreachable!(), + }; + assert_eq!(current_read_index, id); + current_read_index += 1; + } + StepResult::IO => { + tmp_db.io.run_once()?; + } + StepResult::Interrupt => break, + StepResult::Done => break, + StepResult::Busy => { + panic!("Database is busy"); + } + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + } + for i in i + 1..max_iterations { + // now test with seek + match conn.query(format!("select * from test where x = {}", i)) { + Ok(Some(ref mut rows)) => loop { + match rows.step()? { + StepResult::Row => { + let row = rows.row().unwrap(); + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + limbo_core::OwnedValue::Integer(i) => *i as i32, + limbo_core::OwnedValue::Float(f) => *f as i32, + _ => unreachable!(), + }; + assert_eq!(i, id); + break; + } + StepResult::IO => { + tmp_db.io.run_once()?; + } + StepResult::Interrupt => break, + StepResult::Done => break, + StepResult::Busy => { + panic!("Database is busy"); + } + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + } + } + } + + Ok(()) +}