diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 5ea2f9cfa..e2d3f78c4 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -10,8 +10,8 @@ use crate::function::Func; use crate::schema::Index; use crate::translate::plan::{DeletePlan, Plan, Search}; use crate::util::exprs_are_equivalent; -use crate::vdbe::builder::ProgramBuilder; -use crate::vdbe::insn::RegisterOrLiteral; +use crate::vdbe::builder::{CursorType, ProgramBuilder}; +use crate::vdbe::insn::{IdxInsertFlags, RegisterOrLiteral}; use crate::vdbe::{insn::Insn, BranchOffset}; use crate::{Result, SymbolTable}; @@ -546,19 +546,34 @@ fn emit_program_for_update( target_pc: after_main_loop_label, }); } + init_loop( program, &mut t_ctx, &plan.table_references, OperationMode::UPDATE, )?; + // Open indexes for update. + let mut index_cursors = Vec::with_capacity(plan.indexes_to_update.len()); + // TODO: do not reopen if there is table reference using it. + for index in &plan.indexes_to_update { + let index_cursor = program.alloc_cursor_id( + Some(index.table_name.clone()), + CursorType::BTreeIndex(index.clone()), + ); + program.emit_insn(Insn::OpenWrite { + cursor_id: index_cursor, + root_page: RegisterOrLiteral::Literal(index.root_page), + }); + index_cursors.push(index_cursor); + } open_loop( program, &mut t_ctx, &plan.table_references, &plan.where_clause, )?; - emit_update_insns(&plan, &t_ctx, program)?; + emit_update_insns(&plan, &t_ctx, program, index_cursors)?; close_loop(program, &mut t_ctx, &plan.table_references)?; program.preassign_label_to_next_insn(after_main_loop_label); @@ -573,6 +588,7 @@ fn emit_update_insns( plan: &UpdatePlan, t_ctx: &TranslateCtx, program: &mut ProgramBuilder, + index_cursors: Vec, ) -> crate::Result<()> { let table_ref = &plan.table_references.first().unwrap(); let loop_labels = t_ctx.labels_main_loop.first().unwrap(); @@ -663,6 +679,46 @@ fn emit_update_insns( )?; } + // Update indexes first. Columns that are updated will be translated from an expression and those who aren't modified will be + // read from table. Mutiple value index key could be updated partially. + for (index, index_cursor) in plan.indexes_to_update.iter().zip(index_cursors) { + let index_record_reg_count = index.columns.len() + 1; + let index_record_reg_start = program.alloc_registers(index_record_reg_count); + for (idx, column) in index.columns.iter().enumerate() { + if let Some((_, expr)) = plan.set_clauses.iter().find(|(i, _)| *i == idx) { + translate_expr( + program, + Some(&plan.table_references), + expr, + index_record_reg_start + idx, + &t_ctx.resolver, + )?; + } else { + program.emit_insn(Insn::Column { + cursor_id: cursor_id, + column: column.pos_in_table, + dest: index_record_reg_start + idx, + }); + } + } + program.emit_insn(Insn::RowId { + cursor_id: cursor_id, + dest: index_record_reg_start + index.columns.len(), + }); + let index_record_reg = program.alloc_register(); + program.emit_insn(Insn::MakeRecord { + start_reg: index_record_reg_start, + count: index_record_reg_count, + dest_reg: index_record_reg, + }); + program.emit_insn(Insn::IdxInsert { + cursor_id: index_cursor, + record_reg: index_record_reg, + unpacked_start: Some(index_record_reg_start), + unpacked_count: Some(index_record_reg_count as u16), + flags: IdxInsertFlags::new(), + }); + } // we scan a column at a time, loading either the column's values, or the new value // from the Set expression, into registers so we can emit a MakeRecord and update the row. let start = if is_virtual { beg + 2 } else { beg + 1 }; diff --git a/core/translate/plan.rs b/core/translate/plan.rs index ad6514247..2baf59d32 100644 --- a/core/translate/plan.rs +++ b/core/translate/plan.rs @@ -315,6 +315,7 @@ pub struct UpdatePlan { pub returning: Option>, // whether the WHERE clause is always false pub contains_constant_false_condition: bool, + pub indexes_to_update: Vec>, } #[derive(Clone, Copy, Debug, PartialEq, Eq)] diff --git a/core/translate/update.rs b/core/translate/update.rs index a0e32e640..3e36583dc 100644 --- a/core/translate/update.rs +++ b/core/translate/update.rs @@ -183,6 +183,21 @@ pub fn prepare_update_plan(schema: &Schema, body: &mut Update) -> crate::Result< .map(|l| parse_limit(l)) .unwrap_or(Ok((None, None)))?; + // Check what indexes will need to be updated by checking set_clauses and see + // if a column is contained in an index. + let indexes = schema.get_indices(&table_name.0); + let indexes_to_update = indexes + .iter() + .filter(|index| { + index.columns.iter().any(|index_column| { + set_clauses + .iter() + .any(|(set_index_column, _)| index_column.pos_in_table == *set_index_column) + }) + }) + .cloned() + .collect(); + Ok(Plan::Update(UpdatePlan { table_references, set_clauses, @@ -192,5 +207,6 @@ pub fn prepare_update_plan(schema: &Schema, body: &mut Update) -> crate::Result< limit, offset, contains_constant_false_condition: false, + indexes_to_update, })) } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 9652dafef..2adf438b6 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -623,6 +623,15 @@ impl<'a> FromValueRow<'a> for i64 { } } +impl<'a> FromValueRow<'a> for f64 { + fn from_value(value: &'a OwnedValue) -> Result { + match value { + OwnedValue::Float(f) => Ok(*f), + _ => Err(LimboError::ConversionError("Expected integer value".into())), + } + } +} + impl<'a> FromValueRow<'a> for String { fn from_value(value: &'a OwnedValue) -> Result { match value { diff --git a/tests/integration/query_processing/test_write_path.rs b/tests/integration/query_processing/test_write_path.rs index 9c6107d58..407d1e366 100644 --- a/tests/integration/query_processing/test_write_path.rs +++ b/tests/integration/query_processing/test_write_path.rs @@ -1,6 +1,6 @@ use crate::common::{self, maybe_setup_tracing}; use crate::common::{compare_string, do_flush, TempDatabase}; -use limbo_core::{Connection, OwnedValue, StepResult}; +use limbo_core::{Connection, OwnedValue, Row, StepResult}; use log::debug; use std::rc::Rc; @@ -153,52 +153,19 @@ fn test_sequential_write() -> anyhow::Result<()> { println!("progress {:.1}%", progress); } let insert_query = format!("INSERT INTO test VALUES ({})", i); - match conn.query(insert_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(&tmp_db, &conn, &insert_query)?; let mut current_read_index = 0; - match conn.query(list_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let first_value = row.get::<&OwnedValue>(0).expect("missing id"); - let id = match first_value { - limbo_core::OwnedValue::Integer(i) => *i as i32, - limbo_core::OwnedValue::Float(f) => *f as i32, - _ => unreachable!(), - }; - assert_eq!(current_read_index, id); - current_read_index += 1; - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => { - panic!("Database is busy"); - } - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - } + run_query_on_row(&tmp_db, &conn, &list_query, |row: &Row| { + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + limbo_core::OwnedValue::Integer(i) => *i as i32, + limbo_core::OwnedValue::Float(f) => *f as i32, + _ => unreachable!(), + }; + assert_eq!(current_read_index, id); + current_read_index += 1; + })?; common::do_flush(&conn, &tmp_db)?; } Ok(()) @@ -215,55 +182,22 @@ fn test_regression_multi_row_insert() -> anyhow::Result<()> { let insert_query = "INSERT INTO test VALUES (-2), (-3), (-1)"; let list_query = "SELECT * FROM test"; - match conn.query(insert_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(&tmp_db, &conn, insert_query)?; common::do_flush(&conn, &tmp_db)?; let mut current_read_index = 1; let expected_ids = vec![-3, -2, -1]; let mut actual_ids = Vec::new(); - match conn.query(list_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let first_value = row.get::<&OwnedValue>(0).expect("missing id"); - let id = match first_value { - OwnedValue::Float(f) => *f as i32, - _ => panic!("expected float"), - }; - actual_ids.push(id); - current_read_index += 1; - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => { - panic!("Database is busy"); - } - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - } + run_query_on_row(&tmp_db, &conn, list_query, |row: &Row| { + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + OwnedValue::Float(f) => *f as i32, + _ => panic!("expected float"), + }; + actual_ids.push(id); + current_read_index += 1; + })?; assert_eq!(current_read_index, 4); // Verify we read all rows // sort ids @@ -331,49 +265,18 @@ fn test_wal_checkpoint() -> anyhow::Result<()> { let insert_query = format!("INSERT INTO test VALUES ({})", i); do_flush(&conn, &tmp_db)?; conn.checkpoint()?; - match conn.query(insert_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(&tmp_db, &conn, &insert_query)?; } do_flush(&conn, &tmp_db)?; conn.clear_page_cache()?; let list_query = "SELECT * FROM test LIMIT 1"; let mut current_index = 0; - match conn.query(list_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let id = row.get::(0).unwrap(); - assert_eq!(current_index, id as usize); - current_index += 1; - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - } + run_query_on_row(&tmp_db, &conn, list_query, |row: &Row| { + let id = row.get::(0).unwrap(); + assert_eq!(current_index, id as usize); + current_index += 1; + })?; do_flush(&conn, &tmp_db)?; Ok(()) } @@ -387,21 +290,7 @@ fn test_wal_restart() -> anyhow::Result<()> { fn insert(i: usize, conn: &Rc, tmp_db: &TempDatabase) -> anyhow::Result<()> { debug!("inserting {}", i); let insert_query = format!("INSERT INTO test VALUES ({})", i); - match conn.query(insert_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(tmp_db, conn, &insert_query)?; debug!("inserted {}", i); tmp_db.io.run_once()?; Ok(()) @@ -410,26 +299,13 @@ fn test_wal_restart() -> anyhow::Result<()> { fn count(conn: &Rc, tmp_db: &TempDatabase) -> anyhow::Result { debug!("counting"); let list_query = "SELECT count(x) FROM test"; - loop { - if let Some(ref mut rows) = conn.query(list_query)? { - loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let count = row.get::(0).unwrap(); - debug!("counted {}", count); - return Ok(count as usize); - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => panic!("Database is busy"), - } - } - } - } + let mut count = None; + run_query_on_row(tmp_db, conn, list_query, |row: &Row| { + assert!(count.is_none()); + count = Some(row.get::(0).unwrap() as usize); + debug!("counted {:?}", count); + })?; + Ok(count.unwrap()) } { @@ -476,113 +352,122 @@ fn test_write_delete_with_index() -> anyhow::Result<()> { let max_iterations = 1000; for i in 0..max_iterations { println!("inserting {} ", i); - if (i % 100) == 0 { - let progress = (i as f64 / max_iterations as f64) * 100.0; - println!("progress {:.1}%", progress); - } let insert_query = format!("INSERT INTO test VALUES ({})", i); - match conn.query(insert_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(&tmp_db, &conn, &insert_query)?; } for i in 0..max_iterations { println!("deleting {} ", i); - if (i % 100) == 0 { - let progress = (i as f64 / max_iterations as f64) * 100.0; - println!("progress {:.1}%", progress); - } let delete_query = format!("delete from test where x={}", i); - match conn.query(delete_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Done => break, - _ => unreachable!(), - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - }; + run_query(&tmp_db, &conn, &delete_query)?; println!("listing after deleting {} ", i); let mut current_read_index = i + 1; - match conn.query(list_query) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let first_value = row.get::<&OwnedValue>(0).expect("missing id"); - let id = match first_value { - limbo_core::OwnedValue::Integer(i) => *i as i32, - limbo_core::OwnedValue::Float(f) => *f as i32, - _ => unreachable!(), - }; - assert_eq!(current_read_index, id); - current_read_index += 1; - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => { - panic!("Database is busy"); - } - } - }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - } + run_query_on_row(&tmp_db, &conn, list_query, |row: &Row| { + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + limbo_core::OwnedValue::Integer(i) => *i as i32, + limbo_core::OwnedValue::Float(f) => *f as i32, + _ => unreachable!(), + }; + assert_eq!(current_read_index, id); + current_read_index += 1; + })?; for i in i + 1..max_iterations { // now test with seek - match conn.query(format!("select * from test where x = {}", i)) { - Ok(Some(ref mut rows)) => loop { - match rows.step()? { - StepResult::Row => { - let row = rows.row().unwrap(); - let first_value = row.get::<&OwnedValue>(0).expect("missing id"); - let id = match first_value { - limbo_core::OwnedValue::Integer(i) => *i as i32, - limbo_core::OwnedValue::Float(f) => *f as i32, - _ => unreachable!(), - }; - assert_eq!(i, id); - break; - } - StepResult::IO => { - tmp_db.io.run_once()?; - } - StepResult::Interrupt => break, - StepResult::Done => break, - StepResult::Busy => { - panic!("Database is busy"); - } - } + run_query_on_row( + &tmp_db, + &conn, + &format!("select * from test where x = {}", i), + |row| { + let first_value = row.get::<&OwnedValue>(0).expect("missing id"); + let id = match first_value { + limbo_core::OwnedValue::Integer(i) => *i as i32, + limbo_core::OwnedValue::Float(f) => *f as i32, + _ => unreachable!(), + }; + assert_eq!(i, id); }, - Ok(None) => {} - Err(err) => { - eprintln!("{}", err); - } - } + )?; } } Ok(()) } + +#[test] +fn test_update_with_index() -> anyhow::Result<()> { + let _ = env_logger::try_init(); + + maybe_setup_tracing(); + + let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x REAL PRIMARY KEY, y TEXT);"); + let conn = tmp_db.connect_limbo(); + + run_query(&tmp_db, &conn, "INSERT INTO test VALUES (1.0, 'foo')")?; + run_query(&tmp_db, &conn, "INSERT INTO test VALUES (2.0, 'bar')")?; + + run_query_on_row(&tmp_db, &conn, "SELECT * from test WHERE x=10.0", |row| { + assert_eq!(row.get::(0).unwrap(), 1.0); + })?; + run_query(&tmp_db, &conn, "UPDATE test SET x=10.0 WHERE x=1.0")?; + run_query_on_row(&tmp_db, &conn, "SELECT * from test WHERE x=10.0", |row| { + assert_eq!(row.get::(0).unwrap(), 10.0); + })?; + + let mut count_1 = 0; + let mut count_10 = 0; + run_query_on_row(&tmp_db, &conn, "SELECT * from test", |row| { + let v = row.get::(0).unwrap(); + if v == 1.0 { + count_1 += 1; + } else if v == 10.0 { + count_10 += 1; + } + })?; + assert_eq!(count_1, 0, "1.0 shouldn't be inside table"); + assert_eq!(count_10, 1, "10.0 should have existed"); + + Ok(()) +} + +fn run_query(tmp_db: &TempDatabase, conn: &Rc, query: &str) -> anyhow::Result<()> { + run_query_core(tmp_db, conn, query, None::) +} + +fn run_query_on_row( + tmp_db: &TempDatabase, + conn: &Rc, + query: &str, + on_row: impl FnMut(&Row), +) -> anyhow::Result<()> { + run_query_core(tmp_db, conn, query, Some(on_row)) +} + +fn run_query_core( + tmp_db: &TempDatabase, + conn: &Rc, + query: &str, + mut on_row: Option, +) -> anyhow::Result<()> { + match conn.query(query) { + Ok(Some(ref mut rows)) => loop { + match rows.step()? { + StepResult::IO => { + tmp_db.io.run_once()?; + } + StepResult::Done => break, + StepResult::Row => { + if let Some(on_row) = on_row.as_mut() { + let row = rows.row().unwrap(); + on_row(row) + } + } + _ => unreachable!(), + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + }; + Ok(()) +}