Merge 'Support UPDATE for virtual tables' from Preston Thorpe

closes #1144

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #1169
This commit is contained in:
Pekka Enberg
2025-04-09 18:21:10 +03:00
6 changed files with 142 additions and 72 deletions

View File

@@ -530,29 +530,67 @@ fn emit_update_insns(
) -> crate::Result<()> {
let table_ref = &plan.table_references.first().unwrap();
let loop_labels = t_ctx.labels_main_loop.first().unwrap();
let (cursor_id, index) = match &table_ref.op {
Operation::Scan { .. } => (program.resolve_cursor_id(&table_ref.identifier), None),
let (cursor_id, index, is_virtual) = match &table_ref.op {
Operation::Scan { .. } => (
program.resolve_cursor_id(&table_ref.identifier),
None,
table_ref.virtual_table().is_some(),
),
Operation::Search(search) => match search {
&Search::RowidEq { .. } | Search::RowidSearch { .. } => {
(program.resolve_cursor_id(&table_ref.identifier), None)
}
&Search::RowidEq { .. } | Search::RowidSearch { .. } => (
program.resolve_cursor_id(&table_ref.identifier),
None,
false,
),
Search::IndexSearch { index, .. } => (
program.resolve_cursor_id(&table_ref.identifier),
Some((index.clone(), program.resolve_cursor_id(&index.name))),
false,
),
},
_ => return Ok(()),
};
let rowid_reg = program.alloc_register();
for cond in plan.where_clause.iter().filter(|c| c.is_constant()) {
let jump_target = program.allocate_label();
let meta = ConditionMetadata {
jump_if_condition_is_true: false,
jump_target_when_true: jump_target,
jump_target_when_false: t_ctx.label_main_loop_end.unwrap(),
};
translate_condition_expr(
program,
&plan.table_references,
&cond.expr,
meta,
&t_ctx.resolver,
)?;
program.resolve_label(jump_target, program.offset());
}
let beg = program.alloc_registers(
table_ref.table.columns().len()
+ if is_virtual {
2 // two args before the relevant columns for VUpdate
} else {
1 // rowid reg
},
);
program.emit_insn(Insn::RowId {
cursor_id,
dest: rowid_reg,
dest: beg,
});
// if no rowid, we're done
program.emit_insn(Insn::IsNull {
reg: rowid_reg,
reg: beg,
target_pc: t_ctx.label_main_loop_end.unwrap(),
});
if is_virtual {
program.emit_insn(Insn::Copy {
src_reg: beg,
dst_reg: beg + 1,
amount: 0,
})
}
if let Some(offset) = t_ctx.reg_offset {
program.emit_insn(Insn::IfPos {
@@ -576,12 +614,13 @@ fn emit_update_insns(
&t_ctx.resolver,
)?;
}
let first_col_reg = program.alloc_registers(table_ref.table.columns().len());
// we scan a column at a time, loading either the column's values, or the new value
// from the Set expression, into registers so we can emit a MakeRecord and update the row.
let start = if is_virtual { beg + 2 } else { beg + 1 };
for idx in 0..table_ref.columns().len() {
if let Some((idx, expr)) = plan.set_clauses.iter().find(|(i, _)| *i == idx) {
let target_reg = first_col_reg + idx;
let target_reg = start + idx;
if let Some((_, expr)) = plan.set_clauses.iter().find(|(i, _)| *i == idx) {
translate_expr(
program,
Some(&plan.table_references),
@@ -596,9 +635,17 @@ fn emit_update_insns(
.iter()
.position(|c| Some(&c.name) == table_column.name.as_ref())
});
let dest = first_col_reg + idx;
if table_column.primary_key {
program.emit_null(dest, None);
// don't emit null for pkey of virtual tables. they require first two args
// before the 'record' to be explicitly non-null
if table_column.primary_key && !is_virtual {
program.emit_null(target_reg, None);
} else if is_virtual {
program.emit_insn(Insn::VColumn {
cursor_id,
column: idx,
dest: target_reg,
});
} else {
program.emit_insn(Insn::Column {
cursor_id: *index
@@ -612,7 +659,7 @@ fn emit_update_insns(
})
.unwrap_or(&cursor_id),
column: column_idx_in_index.unwrap_or(idx),
dest,
dest: target_reg,
});
}
}
@@ -620,26 +667,35 @@ fn emit_update_insns(
if let Some(btree_table) = table_ref.btree() {
if btree_table.is_strict {
program.emit_insn(Insn::TypeCheck {
start_reg: first_col_reg,
start_reg: start,
count: table_ref.columns().len(),
check_generated: true,
table_reference: Rc::clone(&btree_table),
});
}
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: start,
count: table_ref.columns().len(),
dest_reg: record_reg,
});
program.emit_insn(Insn::InsertAsync {
cursor: cursor_id,
key_reg: beg,
record_reg,
flag: 0,
});
program.emit_insn(Insn::InsertAwait { cursor_id });
} else if let Some(vtab) = table_ref.virtual_table() {
let arg_count = table_ref.columns().len() + 2;
program.emit_insn(Insn::VUpdate {
cursor_id,
arg_count,
start_reg: beg,
vtab_ptr: vtab.implementation.as_ref().ctx as usize,
conflict_action: 0u16,
});
}
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: first_col_reg,
count: table_ref.columns().len(),
dest_reg: record_reg,
});
program.emit_insn(Insn::InsertAsync {
cursor: cursor_id,
key_reg: rowid_reg,
record_reg,
flag: 0,
});
program.emit_insn(Insn::InsertAwait { cursor_id });
if let Some(limit_reg) = t_ctx.reg_limit {
program.emit_insn(Insn::DecrJumpZero {

View File

@@ -131,11 +131,7 @@ pub fn init_loop(
}
program.emit_insn(Insn::OpenWriteAwait {});
}
(OperationMode::SELECT, Table::Virtual(_)) => {
program.emit_insn(Insn::VOpenAsync { cursor_id });
program.emit_insn(Insn::VOpenAwait {});
}
(OperationMode::DELETE, Table::Virtual(_)) => {
(_, Table::Virtual(_)) => {
program.emit_insn(Insn::VOpenAsync { cursor_id });
program.emit_insn(Insn::VOpenAwait {});
}
@@ -158,14 +154,7 @@ pub fn init_loop(
});
program.emit_insn(Insn::OpenReadAwait {});
}
OperationMode::DELETE => {
program.emit_insn(Insn::OpenWriteAsync {
cursor_id: table_cursor_id,
root_page: table.table.get_root_page().into(),
});
program.emit_insn(Insn::OpenWriteAwait {});
}
OperationMode::UPDATE => {
OperationMode::DELETE | OperationMode::UPDATE => {
program.emit_insn(Insn::OpenWriteAsync {
cursor_id: table_cursor_id,
root_page: table.table.get_root_page().into(),
@@ -191,14 +180,7 @@ pub fn init_loop(
});
program.emit_insn(Insn::OpenReadAwait);
}
OperationMode::DELETE => {
program.emit_insn(Insn::OpenWriteAsync {
cursor_id: index_cursor_id,
root_page: index.root_page.into(),
});
program.emit_insn(Insn::OpenWriteAwait {});
}
OperationMode::UPDATE => {
OperationMode::UPDATE | OperationMode::DELETE => {
program.emit_insn(Insn::OpenWriteAsync {
cursor_id: index_cursor_id,
root_page: index.root_page.into(),

View File

@@ -65,14 +65,17 @@ pub fn translate_update(
}
pub fn prepare_update_plan(schema: &Schema, body: &mut Update) -> crate::Result<Plan> {
if body.with.is_some() {
bail_parse_error!("WITH clause is not supported");
}
if body.or_conflict.is_some() {
bail_parse_error!("ON CONFLICT clause is not supported");
}
let table_name = &body.tbl_name.name;
let table = match schema.get_table(table_name.0.as_str()) {
Some(table) => table,
None => bail_parse_error!("Parse error: no such table: {}", table_name),
};
let Some(btree_table) = table.btree() else {
bail_parse_error!("Error: {} is not a btree table", table_name);
};
let iter_dir = body
.order_by
.as_ref()
@@ -86,7 +89,11 @@ pub fn prepare_update_plan(schema: &Schema, body: &mut Update) -> crate::Result<
})
.unwrap_or(IterationDirection::Forwards);
let table_references = vec![TableReference {
table: Table::BTree(btree_table.clone()),
table: match table.as_ref() {
Table::Virtual(vtab) => Table::Virtual(vtab.clone()),
Table::BTree(btree_table) => Table::BTree(btree_table.clone()),
_ => unreachable!(),
},
identifier: table_name.0.clone(),
op: Operation::Scan {
iter_dir,
@@ -99,8 +106,8 @@ pub fn prepare_update_plan(schema: &Schema, body: &mut Update) -> crate::Result<
.iter_mut()
.map(|set| {
let ident = normalize_ident(set.col_names[0].0.as_str());
let col_index = btree_table
.columns
let col_index = table
.columns()
.iter()
.enumerate()
.find_map(|(i, col)| {

View File

@@ -1582,7 +1582,7 @@ pub fn op_halt(
)));
}
}
match program.halt(pager.clone(), state, mv_store.clone())? {
match program.halt(pager.clone(), state, mv_store)? {
StepResult::Done => Ok(InsnFunctionStepResult::Done),
StepResult::IO => Ok(InsnFunctionStepResult::IO),
StepResult::Row => Ok(InsnFunctionStepResult::Row),
@@ -1661,7 +1661,7 @@ pub fn op_auto_commit(
};
let conn = program.connection.upgrade().unwrap();
if matches!(state.halt_state, Some(HaltState::Checkpointing)) {
return match program.halt(pager.clone(), state, mv_store.clone())? {
return match program.halt(pager.clone(), state, mv_store)? {
super::StepResult::Done => Ok(InsnFunctionStepResult::Done),
super::StepResult::IO => Ok(InsnFunctionStepResult::IO),
super::StepResult::Row => Ok(InsnFunctionStepResult::Row),
@@ -1689,7 +1689,7 @@ pub fn op_auto_commit(
"cannot commit - no transaction is active".to_string(),
));
}
return match program.halt(pager.clone(), state, mv_store.clone())? {
return match program.halt(pager.clone(), state, mv_store)? {
super::StepResult::Done => Ok(InsnFunctionStepResult::Done),
super::StepResult::IO => Ok(InsnFunctionStepResult::IO),
super::StepResult::Row => Ok(InsnFunctionStepResult::Row),
@@ -1848,17 +1848,14 @@ pub fn op_row_id(
let rowid = {
let mut index_cursor = state.get_cursor(index_cursor_id);
let index_cursor = index_cursor.as_btree_mut();
let rowid = index_cursor.rowid()?;
rowid
index_cursor.rowid()?
};
let mut table_cursor = state.get_cursor(table_cursor_id);
let table_cursor = table_cursor.as_btree_mut();
let deferred_seek =
match table_cursor.seek(SeekKey::TableRowId(rowid.unwrap()), SeekOp::EQ)? {
CursorResult::Ok(_) => None,
CursorResult::IO => Some((index_cursor_id, table_cursor_id)),
};
deferred_seek
match table_cursor.seek(SeekKey::TableRowId(rowid.unwrap()), SeekOp::EQ)? {
CursorResult::Ok(_) => None,
CursorResult::IO => Some((index_cursor_id, table_cursor_id)),
}
};
if let Some(deferred_seek) = deferred_seek {
state.deferred_seek = Some(deferred_seek);

View File

@@ -112,11 +112,14 @@ impl VTabModule for KVStoreVTab {
if cursor.index.is_some_and(|c| c >= cursor.rows.len()) {
return Err("cursor out of range".into());
}
let (_, ref key, ref val) = cursor.rows[cursor.index.unwrap_or(0)];
match idx {
0 => Ok(Value::from_text(key.clone())), // key
1 => Ok(Value::from_text(val.clone())), // value
_ => Err("Invalid column".into()),
if let Some((_, ref key, ref val)) = cursor.rows.get(cursor.index.unwrap_or(0)) {
match idx {
0 => Ok(Value::from_text(key.clone())), // key
1 => Ok(Value::from_text(val.clone())), // value
_ => Err("Invalid column".into()),
}
} else {
Err("cursor out of range".into())
}
}
}

View File

@@ -398,10 +398,35 @@ def test_kv():
limbo.run_test_fn(
"select count(*) from t;", lambda res: "100" == res, "can insert 100 rows"
)
limbo.run_test_fn("update t set value = 'updated' where key = 'key33';", null)
limbo.run_test_fn(
"select * from t where key = 'key33';",
lambda res: res == "key33|updated",
"can update single row",
)
limbo.run_test_fn(
"select COUNT(*) from t where value = 'updated';",
lambda res: res == "1",
"only updated a single row",
)
limbo.run_test_fn("update t set value = 'updated2';", null)
limbo.run_test_fn(
"select COUNT(*) from t where value = 'updated2';",
lambda res: res == "100",
"can update all rows",
)
limbo.run_test_fn("delete from t limit 96;", null, "can delete 96 rows")
limbo.run_test_fn(
"select count(*) from t;", lambda res: "4" == res, "four rows remain"
)
limbo.run_test_fn(
"update t set key = '100' where 1;", null, "where clause evaluates properly"
)
limbo.run_test_fn(
"select * from t where key = '100';",
lambda res: res == "100|updated2",
"there is only 1 key remaining after setting all keys to same value",
)
limbo.quit()