From 64a12ed88788c1f524f1e16bc94ad82387ef1de8 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 1 May 2025 10:54:11 +0300 Subject: [PATCH 1/4] update index on indexed columns Previously columns that were indexed were updated only in the BtreeTable, but not on Index table. This commit basically enables updates on indexes too if they are needed. --- core/translate/emitter.rs | 62 +++++++++++++++++++++++++++++++++++++-- core/translate/plan.rs | 1 + core/translate/update.rs | 15 ++++++++++ 3 files changed, 75 insertions(+), 3 deletions(-) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 5ea2f9cfa..09a9bbb9f 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -10,8 +10,8 @@ use crate::function::Func; use crate::schema::Index; use crate::translate::plan::{DeletePlan, Plan, Search}; use crate::util::exprs_are_equivalent; -use crate::vdbe::builder::ProgramBuilder; -use crate::vdbe::insn::RegisterOrLiteral; +use crate::vdbe::builder::{CursorType, ProgramBuilder}; +use crate::vdbe::insn::{IdxInsertFlags, RegisterOrLiteral}; use crate::vdbe::{insn::Insn, BranchOffset}; use crate::{Result, SymbolTable}; @@ -546,19 +546,34 @@ fn emit_program_for_update( target_pc: after_main_loop_label, }); } + init_loop( program, &mut t_ctx, &plan.table_references, OperationMode::UPDATE, )?; + // Open indexes for update. + let mut index_cursors = vec![]; + // TODO: do not reopen if there is table reference using it. + for index in &plan.indexes_to_update { + let index_cursor = program.alloc_cursor_id( + Some(index.table_name.clone()), + CursorType::BTreeIndex(index.clone()), + ); + program.emit_insn(Insn::OpenWrite { + cursor_id: index_cursor, + root_page: RegisterOrLiteral::Literal(index.root_page), + }); + index_cursors.push(index_cursor); + } open_loop( program, &mut t_ctx, &plan.table_references, &plan.where_clause, )?; - emit_update_insns(&plan, &t_ctx, program)?; + emit_update_insns(&plan, &t_ctx, program, index_cursors)?; close_loop(program, &mut t_ctx, &plan.table_references)?; program.preassign_label_to_next_insn(after_main_loop_label); @@ -573,6 +588,7 @@ fn emit_update_insns( plan: &UpdatePlan, t_ctx: &TranslateCtx, program: &mut ProgramBuilder, + index_cursors: Vec, ) -> crate::Result<()> { let table_ref = &plan.table_references.first().unwrap(); let loop_labels = t_ctx.labels_main_loop.first().unwrap(); @@ -663,6 +679,46 @@ fn emit_update_insns( )?; } + // Update indexes first. Columns that are updated will be translated from an expression and those who aren't modified will be + // read from table. Mutiple value index key could be updated partially. + for (index, index_cursor) in plan.indexes_to_update.iter().zip(index_cursors) { + let index_record_reg_count = index.columns.len() + 1; + let index_record_reg_start = program.alloc_registers(index_record_reg_count); + for (idx, column) in index.columns.iter().enumerate() { + if let Some((_, expr)) = plan.set_clauses.iter().find(|(i, _)| *i == idx) { + translate_expr( + program, + Some(&plan.table_references), + expr, + index_record_reg_start + idx, + &t_ctx.resolver, + )?; + } else { + program.emit_insn(Insn::Column { + cursor_id: cursor_id, + column: column.pos_in_table, + dest: index_record_reg_start + idx, + }); + } + } + program.emit_insn(Insn::RowId { + cursor_id: cursor_id, + dest: index_record_reg_start + index.columns.len(), + }); + let index_record_reg = program.alloc_register(); + program.emit_insn(Insn::MakeRecord { + start_reg: index_record_reg_start, + count: index_record_reg_count, + dest_reg: index_record_reg, + }); + program.emit_insn(Insn::IdxInsert { + cursor_id: index_cursor, + record_reg: index_record_reg, + unpacked_start: Some(index_record_reg_start), + unpacked_count: Some(index_record_reg_count as u16), + flags: IdxInsertFlags::new(), + }); + } // we scan a column at a time, loading either the column's values, or the new value // from the Set expression, into registers so we can emit a MakeRecord and update the row. let start = if is_virtual { beg + 2 } else { beg + 1 }; diff --git a/core/translate/plan.rs b/core/translate/plan.rs index ad6514247..2baf59d32 100644 --- a/core/translate/plan.rs +++ b/core/translate/plan.rs @@ -315,6 +315,7 @@ pub struct UpdatePlan { pub returning: Option>, // whether the WHERE clause is always false pub contains_constant_false_condition: bool, + pub indexes_to_update: Vec>, } #[derive(Clone, Copy, Debug, PartialEq, Eq)] diff --git a/core/translate/update.rs b/core/translate/update.rs index a0e32e640..70b275396 100644 --- a/core/translate/update.rs +++ b/core/translate/update.rs @@ -183,6 +183,20 @@ pub fn prepare_update_plan(schema: &Schema, body: &mut Update) -> crate::Result< .map(|l| parse_limit(l)) .unwrap_or(Ok((None, None)))?; + let mut indexes_to_update = vec![]; + let indexes = schema.get_indices(&table_name.0); + for (set_column_index, _) in &set_clauses { + if let Some(index) = indexes.iter().find(|index| { + index + .columns + .iter() + .find(|column| column.pos_in_table == *set_column_index) + .is_some() + }) { + indexes_to_update.push(index.clone()); + } + } + Ok(Plan::Update(UpdatePlan { table_references, set_clauses, @@ -192,5 +206,6 @@ pub fn prepare_update_plan(schema: &Schema, body: &mut Update) -> crate::Result< limit, offset, contains_constant_false_condition: false, + indexes_to_update, })) } From e503bb46414e968c0db24f55fac0ad80d07cdb4d Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 1 May 2025 11:16:25 +0300 Subject: [PATCH 2/4] run_query helper for test_write_path --- .../query_processing/test_write_path.rs | 359 +++++------------- 1 file changed, 104 insertions(+), 255 deletions(-) diff --git a/tests/integration/query_processing/test_write_path.rs b/tests/integration/query_processing/test_write_path.rs index 9c6107d58..bab91e3ff 100644 --- a/tests/integration/query_processing/test_write_path.rs +++ b/tests/integration/query_processing/test_write_path.rs @@ -1,6 +1,6 @@ use crate::common::{self, maybe_setup_tracing}; use crate::common::{compare_string, do_flush, TempDatabase}; -use limbo_core::{Connection, OwnedValue, StepResult}; +use limbo_core::{Connection, OwnedValue, Row, StepResult}; use log::debug; use std::rc::Rc; @@ -153,52 +153,19 @@ fn test_sequential_write() -> anyhow::Result<()> { println!("progress {:.1}%", progress); } let insert_query = format!("INSERT INTO test VALUES ({})", i); - match conn.query(insert_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(&tmp_db, &conn, &insert_query)?; let mut current_read_index = 0; - match conn.query(list_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let first_value = row.get::<&OwnedValue>(0).expect("missing id"); - let id = match first_value { - limbo_core::OwnedValue::Integer(i) => *i as i32, - limbo_core::OwnedValue::Float(f) => *f as i32, - _ => unreachable!(), - }; - assert_eq!(current_read_index, id); - current_read_index += 1; - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => { - panic!("Database is busy"); - } - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - } + run_query_on_row(&tmp_db, &conn, &list_query, |row: &Row| { + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + limbo_core::OwnedValue::Integer(i) => *i as i32, + limbo_core::OwnedValue::Float(f) => *f as i32, + _ => unreachable!(), + }; + assert_eq!(current_read_index, id); + current_read_index += 1; + })?; common::do_flush(&conn, &tmp_db)?; } Ok(()) @@ -215,55 +182,22 @@ fn test_regression_multi_row_insert() -> anyhow::Result<()> { let insert_query = "INSERT INTO test VALUES (-2), (-3), (-1)"; let list_query = "SELECT * FROM test"; - match conn.query(insert_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(&tmp_db, &conn, insert_query)?; common::do_flush(&conn, &tmp_db)?; let mut current_read_index = 1; let expected_ids = vec![-3, -2, -1]; let mut actual_ids = Vec::new(); - match conn.query(list_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let first_value = row.get::<&OwnedValue>(0).expect("missing id"); - let id = match first_value { - OwnedValue::Float(f) => *f as i32, - _ => panic!("expected float"), - }; - actual_ids.push(id); - current_read_index += 1; - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => { - panic!("Database is busy"); - } - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - } + run_query_on_row(&tmp_db, &conn, list_query, |row: &Row| { + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + OwnedValue::Float(f) => *f as i32, + _ => panic!("expected float"), + }; + actual_ids.push(id); + current_read_index += 1; + })?; assert_eq!(current_read_index, 4); // Verify we read all rows // sort ids @@ -331,49 +265,18 @@ fn test_wal_checkpoint() -> anyhow::Result<()> { let insert_query = format!("INSERT INTO test VALUES ({})", i); do_flush(&conn, &tmp_db)?; conn.checkpoint()?; - match conn.query(insert_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(&tmp_db, &conn, &insert_query)?; } do_flush(&conn, &tmp_db)?; conn.clear_page_cache()?; let list_query = "SELECT * FROM test LIMIT 1"; let mut current_index = 0; - match conn.query(list_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let id = row.get::(0).unwrap(); - assert_eq!(current_index, id as usize); - current_index += 1; - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - } + run_query_on_row(&tmp_db, &conn, list_query, |row: &Row| { + let id = row.get::(0).unwrap(); + assert_eq!(current_index, id as usize); + current_index += 1; + })?; do_flush(&conn, &tmp_db)?; Ok(()) } @@ -387,21 +290,7 @@ fn test_wal_restart() -> anyhow::Result<()> { fn insert(i: usize, conn: &Rc, tmp_db: &TempDatabase) -> anyhow::Result<()> { debug!("inserting {}", i); let insert_query = format!("INSERT INTO test VALUES ({})", i); - match conn.query(insert_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(tmp_db, conn, &insert_query)?; debug!("inserted {}", i); tmp_db.io.run_once()?; Ok(()) @@ -410,26 +299,13 @@ fn test_wal_restart() -> anyhow::Result<()> { fn count(conn: &Rc, tmp_db: &TempDatabase) -> anyhow::Result { debug!("counting"); let list_query = "SELECT count(x) FROM test"; - loop { - if let Some(ref mut rows) = conn.query(list_query)? { - loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let count = row.get::(0).unwrap(); - debug!("counted {}", count); - return Ok(count as usize); - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => panic!("Database is busy"), - } - } - } - } + let mut count = None; + run_query_on_row(tmp_db, conn, list_query, |row: &Row| { + assert!(count.is_none()); + count = Some(row.get::(0).unwrap() as usize); + debug!("counted {:?}", count); + })?; + Ok(count.unwrap()) } { @@ -476,113 +352,86 @@ fn test_write_delete_with_index() -> anyhow::Result<()> { let max_iterations = 1000; for i in 0..max_iterations { println!("inserting {} ", i); - if (i % 100) == 0 { - let progress = (i as f64 / max_iterations as f64) * 100.0; - println!("progress {:.1}%", progress); - } let insert_query = format!("INSERT INTO test VALUES ({})", i); - match conn.query(insert_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(&tmp_db, &conn, &insert_query)?; } for i in 0..max_iterations { println!("deleting {} ", i); - if (i % 100) == 0 { - let progress = (i as f64 / max_iterations as f64) * 100.0; - println!("progress {:.1}%", progress); - } let delete_query = format!("delete from test where x={}", i); - match conn.query(delete_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(&tmp_db, &conn, &delete_query)?; println!("listing after deleting {} ", i); let mut current_read_index = i + 1; - match conn.query(list_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let first_value = row.get::<&OwnedValue>(0).expect("missing id"); - let id = match first_value { - limbo_core::OwnedValue::Integer(i) => *i as i32, - limbo_core::OwnedValue::Float(f) => *f as i32, - _ => unreachable!(), - }; - assert_eq!(current_read_index, id); - current_read_index += 1; - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => { - panic!("Database is busy"); - } - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - } + run_query_on_row(&tmp_db, &conn, list_query, |row: &Row| { + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + limbo_core::OwnedValue::Integer(i) => *i as i32, + limbo_core::OwnedValue::Float(f) => *f as i32, + _ => unreachable!(), + }; + assert_eq!(current_read_index, id); + current_read_index += 1; + })?; for i in i + 1..max_iterations { // now test with seek - match conn.query(format!("select * from test where x = {}", i)) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let first_value = row.get::<&OwnedValue>(0).expect("missing id"); - let id = match first_value { - limbo_core::OwnedValue::Integer(i) => *i as i32, - limbo_core::OwnedValue::Float(f) => *f as i32, - _ => unreachable!(), - }; - assert_eq!(i, id); - break; - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => { - panic!("Database is busy"); - } - } + run_query_on_row( + &tmp_db, + &conn, + &format!("select * from test where x = {}", i), + |row| { + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + limbo_core::OwnedValue::Integer(i) => *i as i32, + limbo_core::OwnedValue::Float(f) => *f as i32, + _ => unreachable!(), + }; + assert_eq!(i, id); }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - } + )?; } } Ok(()) } + +fn run_query(tmp_db: &TempDatabase, conn: &Rc, query: &str) -> anyhow::Result<()> { + run_query_core(tmp_db, conn, query, None::) +} + +fn run_query_on_row( + tmp_db: &TempDatabase, + conn: &Rc, + query: &str, + on_row: impl FnMut(&Row), +) -> anyhow::Result<()> { + run_query_core(tmp_db, conn, query, Some(on_row)) +} + +fn run_query_core( + tmp_db: &TempDatabase, + conn: &Rc, + query: &str, + mut on_row: Option, +) -> anyhow::Result<()> { + match conn.query(query) { + Ok(Some(ref mut rows)) => loop { + match rows.step()? { + StepResult::IO => { + tmp_db.io.run_once()?; + } + StepResult::Done => break, + StepResult::Row => { + if let Some(on_row) = on_row.as_mut() { + let row = rows.row().unwrap(); + on_row(row) + } + } + _ => unreachable!(), + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + }; + Ok(()) +} From c808863256b1973df354b4bb70f83334b9f11678 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 1 May 2025 11:43:42 +0300 Subject: [PATCH 3/4] test update with index --- core/vdbe/mod.rs | 9 +++++ .../query_processing/test_write_path.rs | 36 +++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 9652dafef..2adf438b6 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -623,6 +623,15 @@ impl<'a> FromValueRow<'a> for i64 { } } +impl<'a> FromValueRow<'a> for f64 { + fn from_value(value: &'a OwnedValue) -> Result { + match value { + OwnedValue::Float(f) => Ok(*f), + _ => Err(LimboError::ConversionError("Expected integer value".into())), + } + } +} + impl<'a> FromValueRow<'a> for String { fn from_value(value: &'a OwnedValue) -> Result { match value { diff --git a/tests/integration/query_processing/test_write_path.rs b/tests/integration/query_processing/test_write_path.rs index bab91e3ff..407d1e366 100644 --- a/tests/integration/query_processing/test_write_path.rs +++ b/tests/integration/query_processing/test_write_path.rs @@ -393,6 +393,42 @@ fn test_write_delete_with_index() -> anyhow::Result<()> { Ok(()) } +#[test] +fn test_update_with_index() -> anyhow::Result<()> { + let _ = env_logger::try_init(); + + maybe_setup_tracing(); + + let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x REAL PRIMARY KEY, y TEXT);"); + let conn = tmp_db.connect_limbo(); + + run_query(&tmp_db, &conn, "INSERT INTO test VALUES (1.0, 'foo')")?; + run_query(&tmp_db, &conn, "INSERT INTO test VALUES (2.0, 'bar')")?; + + run_query_on_row(&tmp_db, &conn, "SELECT * from test WHERE x=10.0", |row| { + assert_eq!(row.get::(0).unwrap(), 1.0); + })?; + run_query(&tmp_db, &conn, "UPDATE test SET x=10.0 WHERE x=1.0")?; + run_query_on_row(&tmp_db, &conn, "SELECT * from test WHERE x=10.0", |row| { + assert_eq!(row.get::(0).unwrap(), 10.0); + })?; + + let mut count_1 = 0; + let mut count_10 = 0; + run_query_on_row(&tmp_db, &conn, "SELECT * from test", |row| { + let v = row.get::(0).unwrap(); + if v == 1.0 { + count_1 += 1; + } else if v == 10.0 { + count_10 += 1; + } + })?; + assert_eq!(count_1, 0, "1.0 shouldn't be inside table"); + assert_eq!(count_10, 1, "10.0 should have existed"); + + Ok(()) +} + fn run_query(tmp_db: &TempDatabase, conn: &Rc, query: &str) -> anyhow::Result<()> { run_query_core(tmp_db, conn, query, None::) } From f15a17699b37471a3a32e1aa1bbd962f9438a517 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 1 May 2025 12:10:44 +0300 Subject: [PATCH 4/4] check indexes are not added twice in update plan --- core/translate/emitter.rs | 2 +- core/translate/update.rs | 25 +++++++++++++------------ 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 09a9bbb9f..e2d3f78c4 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -554,7 +554,7 @@ fn emit_program_for_update( OperationMode::UPDATE, )?; // Open indexes for update. - let mut index_cursors = vec![]; + let mut index_cursors = Vec::with_capacity(plan.indexes_to_update.len()); // TODO: do not reopen if there is table reference using it. for index in &plan.indexes_to_update { let index_cursor = program.alloc_cursor_id( diff --git a/core/translate/update.rs b/core/translate/update.rs index 70b275396..3e36583dc 100644 --- a/core/translate/update.rs +++ b/core/translate/update.rs @@ -183,19 +183,20 @@ pub fn prepare_update_plan(schema: &Schema, body: &mut Update) -> crate::Result< .map(|l| parse_limit(l)) .unwrap_or(Ok((None, None)))?; - let mut indexes_to_update = vec![]; + // Check what indexes will need to be updated by checking set_clauses and see + // if a column is contained in an index. let indexes = schema.get_indices(&table_name.0); - for (set_column_index, _) in &set_clauses { - if let Some(index) = indexes.iter().find(|index| { - index - .columns - .iter() - .find(|column| column.pos_in_table == *set_column_index) - .is_some() - }) { - indexes_to_update.push(index.clone()); - } - } + let indexes_to_update = indexes + .iter() + .filter(|index| { + index.columns.iter().any(|index_column| { + set_clauses + .iter() + .any(|(set_index_column, _)| index_column.pos_in_table == *set_index_column) + }) + }) + .cloned() + .collect(); Ok(Plan::Update(UpdatePlan { table_references,