From 2495d15b96495562213ae14e8d09cfe0b7b77443 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Wed, 16 Apr 2025 13:11:49 +0300 Subject: [PATCH 1/3] Index insert fuzz --- core/storage/btree.rs | 86 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 84 insertions(+), 2 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index a8167fd17..0b8226017 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -5159,8 +5159,10 @@ mod tests { fast_lock::SpinLock, io::{Buffer, Completion, MemoryIO, OpenFlags, IO}, storage::{ - database::DatabaseFile, page_cache::DumbLruPageCache, sqlite3_ondisk, - sqlite3_ondisk::DatabaseHeader, + database::DatabaseFile, + page_cache::DumbLruPageCache, + pager::CreateBTreeFlags, + sqlite3_ondisk::{self, DatabaseHeader}, }, types::Text, vdbe::Register, @@ -5678,6 +5680,81 @@ mod tests { } } } + + fn btree_index_insert_fuzz_run(attempts: usize, inserts: usize) { + let (mut rng, seed) = if std::env::var("SEED").is_ok() { + let seed = std::env::var("SEED").unwrap(); + let seed = seed.parse::().unwrap(); + let rng = ChaCha8Rng::seed_from_u64(seed); + (rng, seed) + } else { + rng_from_time() + }; + let mut seen = HashSet::new(); + tracing::info!("super seed: {}", seed); + for _ in 0..attempts { + let (pager, _) = empty_btree(); + let index_root_page = pager.btree_create(&CreateBTreeFlags::new_index()); + let index_root_page = index_root_page as usize; + let mut cursor = BTreeCursor::new(None, pager.clone(), index_root_page); + let mut keys = Vec::new(); + tracing::info!("seed: {}", seed); + for _ in 0..inserts { + let key = { + let result; + loop { + let cols = (0..10) + .map(|_| (rng.next_u64() % (1 << 30)) as i64) + .collect::>(); + if seen.contains(&cols) { + continue; + } else { + seen.insert(cols.clone()); + } + result = cols; + break; + } + result + }; + keys.push(key.clone()); + let value = ImmutableRecord::from_registers( + &key.iter() + .map(|col| Register::OwnedValue(OwnedValue::Integer(*col))) + .collect::>(), + ); + run_until_done( + || { + cursor.insert( + &BTreeKey::new_index_key(&value), + cursor.is_write_in_progress(), + ) + }, + pager.deref(), + ) + .unwrap(); + keys.sort(); + cursor.move_to_root(); + } + keys.sort(); + cursor.move_to_root(); + for key in keys.iter() { + tracing::trace!("seeking key: {:?}", key); + run_until_done(|| cursor.next(), pager.deref()).unwrap(); + let record = cursor.record(); + let record = record.as_ref().unwrap(); + let cursor_key = record.get_values(); + assert_eq!( + cursor_key, + &key.iter() + .map(|col| RefValue::Integer(*col)) + .collect::>(), + "key {:?} is not found", + key + ); + } + } + } + #[test] pub fn test_drop_odd() { let db = get_database(); @@ -5731,6 +5808,11 @@ mod tests { } } + #[test] + pub fn btree_index_insert_fuzz_run_equal_size() { + btree_index_insert_fuzz_run(2, 1024 * 32); + } + #[test] pub fn btree_insert_fuzz_run_random() { btree_insert_fuzz_run(128, 16, |rng| (rng.next_u32() % 4096) as usize); From b7970a286d77808bd01ce1c4e40ab4d103b82990 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 22 Apr 2025 11:50:51 +0200 Subject: [PATCH 2/3] implement IdxDelete clippy revert op_idx_ge changes fmt fmt again rever op_idx_gt changes --- core/translate/delete.rs | 6 + core/translate/emitter.rs | 53 +++++++- core/translate/plan.rs | 2 + core/vdbe/execute.rs | 41 +++++- core/vdbe/explain.rs | 13 ++ core/vdbe/insn.rs | 7 + .../query_processing/test_write_path.rs | 125 ++++++++++++++++++ 7 files changed, 239 insertions(+), 8 deletions(-) diff --git a/core/translate/delete.rs b/core/translate/delete.rs index fb580b8e8..5cb38cf42 100644 --- a/core/translate/delete.rs +++ b/core/translate/delete.rs @@ -50,6 +50,11 @@ pub fn prepare_delete_plan( crate::bail_corrupt_error!("Table is neither a virtual table nor a btree table"); }; let name = tbl_name.name.0.as_str().to_string(); + let indexes = schema + .get_indices(table.get_name()) + .iter() + .cloned() + .collect(); let mut table_references = vec![TableReference { table, identifier: name, @@ -82,6 +87,7 @@ pub fn prepare_delete_plan( limit: resolved_limit, offset: resolved_offset, contains_constant_false_condition: false, + indexes, }; Ok(Plan::Delete(plan)) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 5b12e4375..d1e2fdce9 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -2,13 +2,16 @@ // It handles translating high-level SQL operations into low-level bytecode that can be executed by the virtual machine. use std::rc::Rc; +use std::sync::Arc; use limbo_sqlite3_parser::ast::{self}; use crate::function::Func; +use crate::schema::Index; use crate::translate::plan::{DeletePlan, Plan, Search}; use crate::util::exprs_are_equivalent; use crate::vdbe::builder::ProgramBuilder; +use crate::vdbe::insn::RegisterOrLiteral; use crate::vdbe::{insn::Insn, BranchOffset}; use crate::{Result, SymbolTable}; @@ -375,7 +378,13 @@ fn emit_program_for_delete( &plan.table_references, &plan.where_clause, )?; - emit_delete_insns(program, &mut t_ctx, &plan.table_references, &plan.limit)?; + emit_delete_insns( + program, + &mut t_ctx, + &plan.table_references, + &plan.indexes, + &plan.limit, + )?; // Clean up and close the main execution loop close_loop(program, &mut t_ctx, &plan.table_references)?; @@ -393,6 +402,7 @@ fn emit_delete_insns( program: &mut ProgramBuilder, t_ctx: &mut TranslateCtx, table_references: &[TableReference], + index_references: &[Arc], limit: &Option, ) -> Result<()> { let table_reference = table_references.first().unwrap(); @@ -408,11 +418,12 @@ fn emit_delete_insns( }, _ => return Ok(()), }; + let main_table_cursor_id = program.resolve_cursor_id(table_reference.table.get_name()); // Emit the instructions to delete the row let key_reg = program.alloc_register(); program.emit_insn(Insn::RowId { - cursor_id, + cursor_id: main_table_cursor_id, dest: key_reg, }); @@ -433,7 +444,43 @@ fn emit_delete_insns( conflict_action, }); } else { - program.emit_insn(Insn::Delete { cursor_id }); + for index in index_references { + let index_cursor_id = program.alloc_cursor_id( + Some(index.name.clone()), + crate::vdbe::builder::CursorType::BTreeIndex(index.clone()), + ); + + program.emit_insn(Insn::OpenWrite { + cursor_id: index_cursor_id, + root_page: RegisterOrLiteral::Literal(index.root_page), + }); + let num_regs = index.columns.len() + 1; + let start_reg = program.alloc_registers(num_regs); + // Emit columns that are part of the index + index + .columns + .iter() + .enumerate() + .for_each(|(reg_offset, column_index)| { + program.emit_insn(Insn::Column { + cursor_id: main_table_cursor_id, + column: column_index.pos_in_table, + dest: start_reg + reg_offset, + }); + }); + program.emit_insn(Insn::RowId { + cursor_id: main_table_cursor_id, + dest: start_reg + num_regs - 1, + }); + program.emit_insn(Insn::IdxDelete { + start_reg, + num_regs, + cursor_id: index_cursor_id, + }); + } + program.emit_insn(Insn::Delete { + cursor_id: main_table_cursor_id, + }); } if let Some(limit) = limit { let limit_reg = program.alloc_register(); diff --git a/core/translate/plan.rs b/core/translate/plan.rs index 48ce4c854..d25e1837c 100644 --- a/core/translate/plan.rs +++ b/core/translate/plan.rs @@ -297,6 +297,8 @@ pub struct DeletePlan { pub offset: Option, /// query contains a constant condition that is always false pub contains_constant_false_condition: bool, + /// Indexes that must be updated by the delete operation. + pub indexes: Vec>, } #[derive(Debug, Clone)] diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index de871f54c..1177af3b7 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -1810,11 +1810,17 @@ pub fn op_row_id( let rowid = { let mut index_cursor = state.get_cursor(index_cursor_id); let index_cursor = index_cursor.as_btree_mut(); - index_cursor.rowid()? + let record = index_cursor.record(); + let record = record.as_ref().unwrap(); + let rowid = record.get_values().last().unwrap(); + match rowid { + RefValue::Integer(rowid) => *rowid as u64, + _ => todo!(), + } }; let mut table_cursor = state.get_cursor(table_cursor_id); let table_cursor = table_cursor.as_btree_mut(); - match table_cursor.seek(SeekKey::TableRowId(rowid.unwrap()), SeekOp::EQ)? { + match table_cursor.seek(SeekKey::TableRowId(rowid), SeekOp::EQ)? { CursorResult::Ok(_) => None, CursorResult::IO => Some((index_cursor_id, table_cursor_id)), } @@ -2069,7 +2075,6 @@ pub fn op_idx_ge( let idx_values = idx_record.get_values(); let idx_values = &idx_values[..record_from_regs.len()]; let record_values = record_from_regs.get_values(); - let record_values = &record_values[..idx_values.len()]; let ord = compare_immutable(&idx_values, &record_values, cursor.index_key_sort_order); if ord.is_ge() { target_pc.to_offset_int() @@ -3759,6 +3764,34 @@ pub fn op_delete( Ok(InsnFunctionStepResult::Step) } +pub fn op_idx_delete( + program: &Program, + state: &mut ProgramState, + insn: &Insn, + pager: &Rc, + mv_store: Option<&Rc>, +) -> Result { + let Insn::IdxDelete { + cursor_id, + start_reg, + num_regs, + } = insn + else { + unreachable!("unexpected Insn {:?}", insn) + }; + let record = make_record(&state.registers, start_reg, num_regs); + { + let mut cursor = state.get_cursor(*cursor_id); + let cursor = cursor.as_btree_mut(); + return_if_io!(cursor.seek(SeekKey::IndexKey(&record), SeekOp::EQ)); + return_if_io!(cursor.delete()); + } + let prev_changes = program.n_change.get(); + program.n_change.set(prev_changes + 1); + state.pc += 1; + Ok(InsnFunctionStepResult::Step) +} + pub fn op_idx_insert( program: &Program, state: &mut ProgramState, @@ -3766,7 +3799,6 @@ pub fn op_idx_insert( pager: &Rc, mv_store: Option<&Rc>, ) -> Result { - dbg!("op_idx_insert_"); if let Insn::IdxInsert { cursor_id, record_reg, @@ -3807,7 +3839,6 @@ pub fn op_idx_insert( } }; - dbg!(moved_before); // Start insertion of row. This might trigger a balance procedure which will take care of moving to different pages, // therefore, we don't want to seek again if that happens, meaning we don't want to return on io without moving to the following opcode // because it could trigger a movement to child page after a balance root which will leave the current page as the root page. diff --git a/core/vdbe/explain.rs b/core/vdbe/explain.rs index 96afc5d17..a6db7be05 100644 --- a/core/vdbe/explain.rs +++ b/core/vdbe/explain.rs @@ -1037,6 +1037,19 @@ pub fn insn_to_str( 0, "".to_string(), ), + Insn::IdxDelete { + cursor_id, + start_reg, + num_regs, + } => ( + "IdxDelete", + *cursor_id as i32, + *start_reg as i32, + *num_regs as i32, + OwnedValue::build_text(""), + 0, + "".to_string(), + ), Insn::NewRowid { cursor, rowid_reg, diff --git a/core/vdbe/insn.rs b/core/vdbe/insn.rs index 56f44bd2b..295b41a2d 100644 --- a/core/vdbe/insn.rs +++ b/core/vdbe/insn.rs @@ -650,6 +650,12 @@ pub enum Insn { cursor_id: CursorID, }, + IdxDelete { + start_reg: usize, + num_regs: usize, + cursor_id: CursorID, + }, + NewRowid { cursor: CursorID, // P1 rowid_reg: usize, // P2 Destination register to store the new rowid @@ -948,6 +954,7 @@ impl Insn { Insn::Once { .. } => execute::op_once, Insn::NotFound { .. } => execute::op_not_found, Insn::Affinity { .. } => execute::op_affinity, + Insn::IdxDelete { .. } => execute::op_idx_delete, } } } diff --git a/tests/integration/query_processing/test_write_path.rs b/tests/integration/query_processing/test_write_path.rs index e948ed5d1..9c6107d58 100644 --- a/tests/integration/query_processing/test_write_path.rs +++ b/tests/integration/query_processing/test_write_path.rs @@ -461,3 +461,128 @@ fn test_insert_after_big_blob() -> anyhow::Result<()> { Ok(()) } + +#[test_log::test] +#[ignore = "this takes too long :)"] +fn test_write_delete_with_index() -> anyhow::Result<()> { + let _ = env_logger::try_init(); + + maybe_setup_tracing(); + + let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x PRIMARY KEY);"); + let conn = tmp_db.connect_limbo(); + + let list_query = "SELECT * FROM test"; + let max_iterations = 1000; + for i in 0..max_iterations { + println!("inserting {} ", i); + if (i % 100) == 0 { + let progress = (i as f64 / max_iterations as f64) * 100.0; + println!("progress {:.1}%", progress); + } + let insert_query = format!("INSERT INTO test VALUES ({})", i); + match conn.query(insert_query) { + Ok(Some(ref mut rows)) => loop { + match rows.step()? { + StepResult::IO => { + tmp_db.io.run_once()?; + } + StepResult::Done => break, + _ => unreachable!(), + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + }; + } + for i in 0..max_iterations { + println!("deleting {} ", i); + if (i % 100) == 0 { + let progress = (i as f64 / max_iterations as f64) * 100.0; + println!("progress {:.1}%", progress); + } + let delete_query = format!("delete from test where x={}", i); + match conn.query(delete_query) { + Ok(Some(ref mut rows)) => loop { + match rows.step()? { + StepResult::IO => { + tmp_db.io.run_once()?; + } + StepResult::Done => break, + _ => unreachable!(), + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + }; + println!("listing after deleting {} ", i); + let mut current_read_index = i + 1; + match conn.query(list_query) { + Ok(Some(ref mut rows)) => loop { + match rows.step()? { + StepResult::Row => { + let row = rows.row().unwrap(); + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + limbo_core::OwnedValue::Integer(i) => *i as i32, + limbo_core::OwnedValue::Float(f) => *f as i32, + _ => unreachable!(), + }; + assert_eq!(current_read_index, id); + current_read_index += 1; + } + StepResult::IO => { + tmp_db.io.run_once()?; + } + StepResult::Interrupt => break, + StepResult::Done => break, + StepResult::Busy => { + panic!("Database is busy"); + } + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + } + for i in i + 1..max_iterations { + // now test with seek + match conn.query(format!("select * from test where x = {}", i)) { + Ok(Some(ref mut rows)) => loop { + match rows.step()? { + StepResult::Row => { + let row = rows.row().unwrap(); + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + limbo_core::OwnedValue::Integer(i) => *i as i32, + limbo_core::OwnedValue::Float(f) => *f as i32, + _ => unreachable!(), + }; + assert_eq!(i, id); + break; + } + StepResult::IO => { + tmp_db.io.run_once()?; + } + StepResult::Interrupt => break, + StepResult::Done => break, + StepResult::Busy => { + panic!("Database is busy"); + } + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + } + } + } + + Ok(()) +} From 3ba5c2349fb1f48a047090f05fa57273d2656368 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 24 Apr 2025 16:23:20 +0200 Subject: [PATCH 3/3] add corrupt error if no matching record found for idxdelete a --- core/vdbe/execute.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 1177af3b7..bc91c9841 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -1815,7 +1815,7 @@ pub fn op_row_id( let rowid = record.get_values().last().unwrap(); match rowid { RefValue::Integer(rowid) => *rowid as u64, - _ => todo!(), + _ => unreachable!(), } }; let mut table_cursor = state.get_cursor(table_cursor_id); @@ -3784,10 +3784,22 @@ pub fn op_idx_delete( let mut cursor = state.get_cursor(*cursor_id); let cursor = cursor.as_btree_mut(); return_if_io!(cursor.seek(SeekKey::IndexKey(&record), SeekOp::EQ)); + + if cursor.rowid()?.is_none() { + // If P5 is not zero, then raise an SQLITE_CORRUPT_INDEX error if no matching + // index entry is found. This happens when running an UPDATE or DELETE statement and the + // index entry to be updated or deleted is not found. For some uses of IdxDelete + // (example: the EXCEPT operator) it does not matter that no matching entry is found. + // For those cases, P5 is zero. Also, do not raise this (self-correcting and non-critical) error if in writable_schema mode. + return Err(LimboError::Corrupt(format!( + "IdxDelete: no matching index entry found for record {:?}", + record + ))); + } return_if_io!(cursor.delete()); } - let prev_changes = program.n_change.get(); - program.n_change.set(prev_changes + 1); + let n_change = program.n_change.get(); + program.n_change.set(n_change + 1); state.pc += 1; Ok(InsnFunctionStepResult::Step) }