Merge 'update index on updated indexed columns' from Pere Diaz Bou

Previously columns that were indexed were updated only in the
BtreeTable, but not on Index table. This commit basically enables
updates on indexes too if they are needed.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #1428
This commit is contained in:
Jussi Saurio
2025-05-03 10:41:13 +03:00
5 changed files with 225 additions and 258 deletions

View File

@@ -10,8 +10,8 @@ use crate::function::Func;
use crate::schema::Index;
use crate::translate::plan::{DeletePlan, Plan, Search};
use crate::util::exprs_are_equivalent;
use crate::vdbe::builder::ProgramBuilder;
use crate::vdbe::insn::RegisterOrLiteral;
use crate::vdbe::builder::{CursorType, ProgramBuilder};
use crate::vdbe::insn::{IdxInsertFlags, RegisterOrLiteral};
use crate::vdbe::{insn::Insn, BranchOffset};
use crate::{Result, SymbolTable};
@@ -546,19 +546,34 @@ fn emit_program_for_update(
target_pc: after_main_loop_label,
});
}
init_loop(
program,
&mut t_ctx,
&plan.table_references,
OperationMode::UPDATE,
)?;
// Open indexes for update.
let mut index_cursors = Vec::with_capacity(plan.indexes_to_update.len());
// TODO: do not reopen if there is table reference using it.
for index in &plan.indexes_to_update {
let index_cursor = program.alloc_cursor_id(
Some(index.table_name.clone()),
CursorType::BTreeIndex(index.clone()),
);
program.emit_insn(Insn::OpenWrite {
cursor_id: index_cursor,
root_page: RegisterOrLiteral::Literal(index.root_page),
});
index_cursors.push(index_cursor);
}
open_loop(
program,
&mut t_ctx,
&plan.table_references,
&plan.where_clause,
)?;
emit_update_insns(&plan, &t_ctx, program)?;
emit_update_insns(&plan, &t_ctx, program, index_cursors)?;
close_loop(program, &mut t_ctx, &plan.table_references)?;
program.preassign_label_to_next_insn(after_main_loop_label);
@@ -573,6 +588,7 @@ fn emit_update_insns(
plan: &UpdatePlan,
t_ctx: &TranslateCtx,
program: &mut ProgramBuilder,
index_cursors: Vec<usize>,
) -> crate::Result<()> {
let table_ref = &plan.table_references.first().unwrap();
let loop_labels = t_ctx.labels_main_loop.first().unwrap();
@@ -663,6 +679,46 @@ fn emit_update_insns(
)?;
}
// Update indexes first. Columns that are updated will be translated from an expression and those who aren't modified will be
// read from table. Mutiple value index key could be updated partially.
for (index, index_cursor) in plan.indexes_to_update.iter().zip(index_cursors) {
let index_record_reg_count = index.columns.len() + 1;
let index_record_reg_start = program.alloc_registers(index_record_reg_count);
for (idx, column) in index.columns.iter().enumerate() {
if let Some((_, expr)) = plan.set_clauses.iter().find(|(i, _)| *i == idx) {
translate_expr(
program,
Some(&plan.table_references),
expr,
index_record_reg_start + idx,
&t_ctx.resolver,
)?;
} else {
program.emit_insn(Insn::Column {
cursor_id: cursor_id,
column: column.pos_in_table,
dest: index_record_reg_start + idx,
});
}
}
program.emit_insn(Insn::RowId {
cursor_id: cursor_id,
dest: index_record_reg_start + index.columns.len(),
});
let index_record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: index_record_reg_start,
count: index_record_reg_count,
dest_reg: index_record_reg,
});
program.emit_insn(Insn::IdxInsert {
cursor_id: index_cursor,
record_reg: index_record_reg,
unpacked_start: Some(index_record_reg_start),
unpacked_count: Some(index_record_reg_count as u16),
flags: IdxInsertFlags::new(),
});
}
// we scan a column at a time, loading either the column's values, or the new value
// from the Set expression, into registers so we can emit a MakeRecord and update the row.
let start = if is_virtual { beg + 2 } else { beg + 1 };

View File

@@ -315,6 +315,7 @@ pub struct UpdatePlan {
pub returning: Option<Vec<ResultSetColumn>>,
// whether the WHERE clause is always false
pub contains_constant_false_condition: bool,
pub indexes_to_update: Vec<Arc<Index>>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]

View File

@@ -183,6 +183,21 @@ pub fn prepare_update_plan(schema: &Schema, body: &mut Update) -> crate::Result<
.map(|l| parse_limit(l))
.unwrap_or(Ok((None, None)))?;
// Check what indexes will need to be updated by checking set_clauses and see
// if a column is contained in an index.
let indexes = schema.get_indices(&table_name.0);
let indexes_to_update = indexes
.iter()
.filter(|index| {
index.columns.iter().any(|index_column| {
set_clauses
.iter()
.any(|(set_index_column, _)| index_column.pos_in_table == *set_index_column)
})
})
.cloned()
.collect();
Ok(Plan::Update(UpdatePlan {
table_references,
set_clauses,
@@ -192,5 +207,6 @@ pub fn prepare_update_plan(schema: &Schema, body: &mut Update) -> crate::Result<
limit,
offset,
contains_constant_false_condition: false,
indexes_to_update,
}))
}

View File

@@ -623,6 +623,15 @@ impl<'a> FromValueRow<'a> for i64 {
}
}
impl<'a> FromValueRow<'a> for f64 {
fn from_value(value: &'a OwnedValue) -> Result<Self> {
match value {
OwnedValue::Float(f) => Ok(*f),
_ => Err(LimboError::ConversionError("Expected integer value".into())),
}
}
}
impl<'a> FromValueRow<'a> for String {
fn from_value(value: &'a OwnedValue) -> Result<Self> {
match value {

View File

@@ -1,6 +1,6 @@
use crate::common::{self, maybe_setup_tracing};
use crate::common::{compare_string, do_flush, TempDatabase};
use limbo_core::{Connection, OwnedValue, StepResult};
use limbo_core::{Connection, OwnedValue, Row, StepResult};
use log::debug;
use std::rc::Rc;
@@ -153,52 +153,19 @@ fn test_sequential_write() -> anyhow::Result<()> {
println!("progress {:.1}%", progress);
}
let insert_query = format!("INSERT INTO test VALUES ({})", i);
match conn.query(insert_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
run_query(&tmp_db, &conn, &insert_query)?;
let mut current_read_index = 0;
match conn.query(list_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(current_read_index, id);
current_read_index += 1;
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => {
panic!("Database is busy");
}
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
}
run_query_on_row(&tmp_db, &conn, &list_query, |row: &Row| {
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(current_read_index, id);
current_read_index += 1;
})?;
common::do_flush(&conn, &tmp_db)?;
}
Ok(())
@@ -215,55 +182,22 @@ fn test_regression_multi_row_insert() -> anyhow::Result<()> {
let insert_query = "INSERT INTO test VALUES (-2), (-3), (-1)";
let list_query = "SELECT * FROM test";
match conn.query(insert_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
run_query(&tmp_db, &conn, insert_query)?;
common::do_flush(&conn, &tmp_db)?;
let mut current_read_index = 1;
let expected_ids = vec![-3, -2, -1];
let mut actual_ids = Vec::new();
match conn.query(list_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
OwnedValue::Float(f) => *f as i32,
_ => panic!("expected float"),
};
actual_ids.push(id);
current_read_index += 1;
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => {
panic!("Database is busy");
}
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
}
run_query_on_row(&tmp_db, &conn, list_query, |row: &Row| {
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
OwnedValue::Float(f) => *f as i32,
_ => panic!("expected float"),
};
actual_ids.push(id);
current_read_index += 1;
})?;
assert_eq!(current_read_index, 4); // Verify we read all rows
// sort ids
@@ -331,49 +265,18 @@ fn test_wal_checkpoint() -> anyhow::Result<()> {
let insert_query = format!("INSERT INTO test VALUES ({})", i);
do_flush(&conn, &tmp_db)?;
conn.checkpoint()?;
match conn.query(insert_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
run_query(&tmp_db, &conn, &insert_query)?;
}
do_flush(&conn, &tmp_db)?;
conn.clear_page_cache()?;
let list_query = "SELECT * FROM test LIMIT 1";
let mut current_index = 0;
match conn.query(list_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let id = row.get::<i64>(0).unwrap();
assert_eq!(current_index, id as usize);
current_index += 1;
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
}
run_query_on_row(&tmp_db, &conn, list_query, |row: &Row| {
let id = row.get::<i64>(0).unwrap();
assert_eq!(current_index, id as usize);
current_index += 1;
})?;
do_flush(&conn, &tmp_db)?;
Ok(())
}
@@ -387,21 +290,7 @@ fn test_wal_restart() -> anyhow::Result<()> {
fn insert(i: usize, conn: &Rc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<()> {
debug!("inserting {}", i);
let insert_query = format!("INSERT INTO test VALUES ({})", i);
match conn.query(insert_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
run_query(tmp_db, conn, &insert_query)?;
debug!("inserted {}", i);
tmp_db.io.run_once()?;
Ok(())
@@ -410,26 +299,13 @@ fn test_wal_restart() -> anyhow::Result<()> {
fn count(conn: &Rc<Connection>, tmp_db: &TempDatabase) -> anyhow::Result<usize> {
debug!("counting");
let list_query = "SELECT count(x) FROM test";
loop {
if let Some(ref mut rows) = conn.query(list_query)? {
loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let count = row.get::<i64>(0).unwrap();
debug!("counted {}", count);
return Ok(count as usize);
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => panic!("Database is busy"),
}
}
}
}
let mut count = None;
run_query_on_row(tmp_db, conn, list_query, |row: &Row| {
assert!(count.is_none());
count = Some(row.get::<i64>(0).unwrap() as usize);
debug!("counted {:?}", count);
})?;
Ok(count.unwrap())
}
{
@@ -476,113 +352,122 @@ fn test_write_delete_with_index() -> anyhow::Result<()> {
let max_iterations = 1000;
for i in 0..max_iterations {
println!("inserting {} ", i);
if (i % 100) == 0 {
let progress = (i as f64 / max_iterations as f64) * 100.0;
println!("progress {:.1}%", progress);
}
let insert_query = format!("INSERT INTO test VALUES ({})", i);
match conn.query(insert_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
run_query(&tmp_db, &conn, &insert_query)?;
}
for i in 0..max_iterations {
println!("deleting {} ", i);
if (i % 100) == 0 {
let progress = (i as f64 / max_iterations as f64) * 100.0;
println!("progress {:.1}%", progress);
}
let delete_query = format!("delete from test where x={}", i);
match conn.query(delete_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
run_query(&tmp_db, &conn, &delete_query)?;
println!("listing after deleting {} ", i);
let mut current_read_index = i + 1;
match conn.query(list_query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(current_read_index, id);
current_read_index += 1;
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => {
panic!("Database is busy");
}
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
}
run_query_on_row(&tmp_db, &conn, list_query, |row: &Row| {
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(current_read_index, id);
current_read_index += 1;
})?;
for i in i + 1..max_iterations {
// now test with seek
match conn.query(format!("select * from test where x = {}", i)) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::Row => {
let row = rows.row().unwrap();
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(i, id);
break;
}
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Interrupt => break,
StepResult::Done => break,
StepResult::Busy => {
panic!("Database is busy");
}
}
run_query_on_row(
&tmp_db,
&conn,
&format!("select * from test where x = {}", i),
|row| {
let first_value = row.get::<&OwnedValue>(0).expect("missing id");
let id = match first_value {
limbo_core::OwnedValue::Integer(i) => *i as i32,
limbo_core::OwnedValue::Float(f) => *f as i32,
_ => unreachable!(),
};
assert_eq!(i, id);
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
}
)?;
}
}
Ok(())
}
#[test]
fn test_update_with_index() -> anyhow::Result<()> {
let _ = env_logger::try_init();
maybe_setup_tracing();
let tmp_db = TempDatabase::new_with_rusqlite("CREATE TABLE test (x REAL PRIMARY KEY, y TEXT);");
let conn = tmp_db.connect_limbo();
run_query(&tmp_db, &conn, "INSERT INTO test VALUES (1.0, 'foo')")?;
run_query(&tmp_db, &conn, "INSERT INTO test VALUES (2.0, 'bar')")?;
run_query_on_row(&tmp_db, &conn, "SELECT * from test WHERE x=10.0", |row| {
assert_eq!(row.get::<f64>(0).unwrap(), 1.0);
})?;
run_query(&tmp_db, &conn, "UPDATE test SET x=10.0 WHERE x=1.0")?;
run_query_on_row(&tmp_db, &conn, "SELECT * from test WHERE x=10.0", |row| {
assert_eq!(row.get::<f64>(0).unwrap(), 10.0);
})?;
let mut count_1 = 0;
let mut count_10 = 0;
run_query_on_row(&tmp_db, &conn, "SELECT * from test", |row| {
let v = row.get::<f64>(0).unwrap();
if v == 1.0 {
count_1 += 1;
} else if v == 10.0 {
count_10 += 1;
}
})?;
assert_eq!(count_1, 0, "1.0 shouldn't be inside table");
assert_eq!(count_10, 1, "10.0 should have existed");
Ok(())
}
fn run_query(tmp_db: &TempDatabase, conn: &Rc<Connection>, query: &str) -> anyhow::Result<()> {
run_query_core(tmp_db, conn, query, None::<fn(&Row)>)
}
fn run_query_on_row(
tmp_db: &TempDatabase,
conn: &Rc<Connection>,
query: &str,
on_row: impl FnMut(&Row),
) -> anyhow::Result<()> {
run_query_core(tmp_db, conn, query, Some(on_row))
}
fn run_query_core(
tmp_db: &TempDatabase,
conn: &Rc<Connection>,
query: &str,
mut on_row: Option<impl FnMut(&Row)>,
) -> anyhow::Result<()> {
match conn.query(query) {
Ok(Some(ref mut rows)) => loop {
match rows.step()? {
StepResult::IO => {
tmp_db.io.run_once()?;
}
StepResult::Done => break,
StepResult::Row => {
if let Some(on_row) = on_row.as_mut() {
let row = rows.row().unwrap();
on_row(row)
}
}
_ => unreachable!(),
}
},
Ok(None) => {}
Err(err) => {
eprintln!("{}", err);
}
};
Ok(())
}