Implement VUpdate (insert/delete for virtual tables

This commit is contained in:
PThorpe92
2025-02-16 20:21:09 -05:00
parent f2e3a61204
commit 8b5772fe1c
10 changed files with 236 additions and 112 deletions

View File

@@ -111,7 +111,6 @@ impl Database {
.borrow_mut()
.vtab_modules
.insert(name.to_string(), vmodule.into());
println!("Registered module: {}", name);
ResultCode::OK
}

View File

@@ -516,18 +516,23 @@ pub type StepResult = vdbe::StepResult;
#[derive(Clone, Debug)]
pub struct VirtualTable {
name: String,
args: Option<Vec<String>>,
args: Option<Vec<ast::Expr>>,
pub implementation: Rc<VTabModuleImpl>,
columns: Vec<Column>,
}
impl VirtualTable {
pub(crate) fn rowid(&self, cursor: &VTabOpaqueCursor) -> i64 {
unsafe { (self.implementation.rowid)(cursor.as_ptr()) }
}
/// takes ownership of the provided Args
pub(crate) fn from_args(
tbl_name: Option<&str>,
module_name: &str,
args: &[String],
args: Vec<limbo_ext::Value>,
syms: &SymbolTable,
kind: VTabKind,
exprs: &Option<Vec<ast::Expr>>,
) -> Result<Rc<Self>> {
let module = syms
.vtab_modules
@@ -544,19 +549,23 @@ impl VirtualTable {
)));
}
};
let schema = module.implementation.as_ref().init_schema(args)?;
let schema = module.implementation.as_ref().init_schema(&args)?;
for arg in args {
unsafe {
arg.free();
}
}
let mut parser = Parser::new(schema.as_bytes());
parser.reset(schema.as_bytes());
println!("Schema: {}", schema);
if let ast::Cmd::Stmt(ast::Stmt::CreateTable { body, .. }) = parser.next()?.ok_or(
LimboError::ParseError("Failed to parse schema from virtual table module".to_string()),
)? {
let columns = columns_from_create_table_body(&body)?;
let vtab = Rc::new(VirtualTable {
name: tbl_name.unwrap_or(module_name).to_owned(),
args: Some(args.to_vec()),
implementation: module.implementation.clone(),
columns,
args: exprs.clone(),
});
return Ok(vtab);
}
@@ -565,24 +574,8 @@ impl VirtualTable {
))
}
pub fn open(&self) -> VTabOpaqueCursor {
let args = if let Some(args) = &self.args {
args.iter()
.map(|e| std::ffi::CString::new(e.to_string()).unwrap().into_raw())
.collect()
} else {
Vec::new()
};
let cursor =
unsafe { (self.implementation.open)(args.as_slice().as_ptr(), args.len() as i32) };
// free the CString pointers
for arg in args {
unsafe {
if !arg.is_null() {
let _ = std::ffi::CString::from_raw(arg);
}
}
}
pub fn open(&self) -> crate::Result<VTabOpaqueCursor> {
let cursor = unsafe { (self.implementation.open)(self.implementation.ctx) };
VTabOpaqueCursor::new(cursor)
}
@@ -620,7 +613,11 @@ impl VirtualTable {
pub fn column(&self, cursor: &VTabOpaqueCursor, column: usize) -> Result<OwnedValue> {
let val = unsafe { (self.implementation.column)(cursor.as_ptr(), column as u32) };
OwnedValue::from_ffi(&val)
let res = OwnedValue::from_ffi(&val)?;
unsafe {
val.free();
}
Ok(res)
}
pub fn next(&self, cursor: &VTabOpaqueCursor) -> Result<bool> {
@@ -632,7 +629,7 @@ impl VirtualTable {
}
}
pub fn update(&self, args: &[OwnedValue], rowid: Option<i64>) -> Result<Option<i64>> {
pub fn update(&self, args: &[OwnedValue]) -> Result<Option<i64>> {
let arg_count = args.len();
let mut ext_args = Vec::with_capacity(arg_count);
for i in 0..arg_count {
@@ -650,7 +647,6 @@ impl VirtualTable {
}?;
ext_args.push(extvalue_arg);
}
let rowid = rowid.unwrap_or(-1);
let newrowid = 0i64;
let implementation = self.implementation.as_ref();
let rc = unsafe {
@@ -658,10 +654,14 @@ impl VirtualTable {
implementation as *const VTabModuleImpl as *mut std::ffi::c_void,
arg_count as i32,
ext_args.as_ptr(),
rowid,
&newrowid as *const _ as *mut i64,
)
};
for arg in ext_args {
unsafe {
arg.free();
}
}
match rc {
ResultCode::OK => Ok(None),
ResultCode::RowID => Ok(Some(newrowid)),

View File

@@ -25,7 +25,7 @@ pub fn translate_delete(
let mut program = ProgramBuilder::new(ProgramBuilderOpts {
query_mode,
num_cursors: 1,
approx_num_insns: estimate_num_instructions(&delete),
approx_num_insns: estimate_num_instructions(delete),
approx_num_labels: 0,
});
emit_program(&mut program, delete_plan, syms)?;
@@ -42,13 +42,17 @@ pub fn prepare_delete_plan(
Some(table) => table,
None => crate::bail_corrupt_error!("Parse error: no such table: {}", tbl_name),
};
//if let Some(table) = table.virtual_table() {
// // TODO: emit VUpdate
//}
let table = table.btree().unwrap();
let table = if let Some(table) = table.virtual_table() {
Table::Virtual(table.clone())
} else if let Some(table) = table.btree() {
Table::BTree(table.clone())
} else {
crate::bail_corrupt_error!("Table is neither a virtual table nor a btree table");
};
let name = tbl_name.name.0.as_str().to_string();
let table_references = vec![TableReference {
table: Table::BTree(table.clone()),
identifier: table.name.clone(),
table,
identifier: name,
op: Operation::Scan { iter_dir: None },
join_info: None,
}];

View File

@@ -274,7 +274,7 @@ pub fn emit_query<'a>(
fn emit_program_for_delete(
program: &mut ProgramBuilder,
mut plan: DeletePlan,
plan: DeletePlan,
syms: &SymbolTable,
) -> Result<()> {
let (mut t_ctx, init_label, start_offset) = prologue(
@@ -286,6 +286,7 @@ fn emit_program_for_delete(
// No rows will be read from source table loops if there is a constant false condition eg. WHERE 0
let after_main_loop_label = program.allocate_label();
t_ctx.label_main_loop_end = Some(after_main_loop_label);
if plan.contains_constant_false_condition {
program.emit_insn(Insn::Goto {
target_pc: after_main_loop_label,
@@ -304,11 +305,16 @@ fn emit_program_for_delete(
open_loop(
program,
&mut t_ctx,
&mut plan.table_references,
&plan.table_references,
&plan.where_clause,
)?;
emit_delete_insns(program, &mut t_ctx, &plan.table_references, &plan.limit)?;
if let Some(table) = plan.table_references.first() {
if table.virtual_table().is_some() {
emit_delete_vtable_insns(program, &mut t_ctx, &plan.table_references, &plan.limit)?;
} else {
emit_delete_insns(program, &mut t_ctx, &plan.table_references, &plan.limit)?;
}
}
// Clean up and close the main execution loop
close_loop(program, &mut t_ctx, &plan.table_references)?;
@@ -322,6 +328,77 @@ fn emit_program_for_delete(
Ok(())
}
fn emit_delete_vtable_insns(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
table_references: &[TableReference],
limit: &Option<isize>,
) -> Result<()> {
let table_reference = table_references.first().unwrap();
let cursor_id = match &table_reference.op {
Operation::Scan { .. } => program.resolve_cursor_id(&table_reference.identifier),
Operation::Search(search) => match search {
Search::RowidEq { .. } | Search::RowidSearch { .. } => {
program.resolve_cursor_id(&table_reference.identifier)
}
Search::IndexSearch { index, .. } => program.resolve_cursor_id(&index.name),
},
_ => return Ok(()),
};
let rowid_reg = program.alloc_register();
program.emit_insn(Insn::RowId {
cursor_id,
dest: rowid_reg,
});
// if we have a limit, decrement and check zero
if let Some(limit) = limit {
let limit_reg = program.alloc_register();
program.emit_insn(Insn::Integer {
value: *limit as i64,
dest: limit_reg,
});
program.mark_last_insn_constant();
program.emit_insn(Insn::DecrJumpZero {
reg: limit_reg,
target_pc: t_ctx.label_main_loop_end.unwrap(),
});
}
// we want old_rowid= rowid_reg, new_rowid= NULL, so we pass 2 arguments to VUpdate
// we need a second register for the new rowid = NULL
let new_rowid_reg = program.alloc_register();
program.emit_insn(Insn::Null {
dest: new_rowid_reg,
dest_end: None,
});
// we'll do VUpdate with arg_count=2:
// argv[0] => old_rowid = rowid_reg
// argv[1] => new_rowid = new_rowid_reg (NULL)
let Some(virtual_table) = table_reference.virtual_table() else {
return Err(crate::LimboError::ParseError(
"Table is not a virtual table".to_string(),
));
};
let conflict_action = 0u16;
let start_reg = rowid_reg;
program.emit_insn(Insn::VUpdate {
cursor_id,
arg_count: 2,
start_reg,
vtab_ptr: virtual_table.implementation.as_ref().ctx as usize,
conflict_action,
});
Ok(())
}
fn emit_delete_insns(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,

View File

@@ -62,7 +62,7 @@ pub fn translate_insert(
body,
on_conflict,
&resolver,
);
)?;
return Ok(program);
}
let init_label = program.allocate_label();
@@ -474,11 +474,17 @@ fn translate_virtual_table_insert(
for (i, expr) in values[0].iter().enumerate() {
translate_expr(program, None, expr, value_registers_start + i, resolver)?;
}
/* *
* Inserts for virtual tables are done in a single step. The rowid is not provided by the user, but is generated by the
* vtable implementation.
* argv[0] = current_rowid (NULL for insert)
* argv[1] = insert_rowid (NULL for insert)
* argv[2..] = column values
* */
let start_reg = program.alloc_registers(column_mappings.len() + 3);
let rowid_reg = start_reg; // argv[0] = rowid
let insert_rowid_reg = start_reg + 1; // argv[1] = insert_rowid
let data_start_reg = start_reg + 2; // argv[2..] = column values
let rowid_reg = program.alloc_registers(column_mappings.len() + 3);
let insert_rowid_reg = rowid_reg + 1; // argv[1] = insert_rowid
let data_start_reg = rowid_reg + 2; // argv[2..] = column values
program.emit_insn(Insn::Null {
dest: rowid_reg,
@@ -515,7 +521,7 @@ fn translate_virtual_table_insert(
program.emit_insn(Insn::VUpdate {
cursor_id,
arg_count: column_mappings.len() + 2,
start_reg,
start_reg: rowid_reg,
vtab_ptr: virtual_table.implementation.as_ref().ctx as usize,
conflict_action,
});

View File

@@ -110,6 +110,10 @@ pub fn init_loop(
program.emit_insn(Insn::VOpenAsync { cursor_id });
program.emit_insn(Insn::VOpenAwait {});
}
(OperationMode::DELETE, Table::Virtual(_)) => {
program.emit_insn(Insn::VOpenAsync { cursor_id });
program.emit_insn(Insn::VOpenAwait {});
}
_ => {
unimplemented!()
}
@@ -286,44 +290,23 @@ pub fn open_loop(
},
),
Table::Virtual(ref table) => {
let args = if let Some(args) = table.args.as_ref() {
args
} else {
&vec![]
};
let start_reg = program.alloc_registers(args.len());
let start_reg = program
.alloc_registers(table.args.as_ref().map(|a| a.len()).unwrap_or(0));
let mut cur_reg = start_reg;
for arg_str in args {
let args = match table.args.as_ref() {
Some(args) => args,
None => &vec![],
};
for arg in args {
let reg = cur_reg;
cur_reg += 1;
if let Ok(i) = arg_str.parse::<i64>() {
program.emit_insn(Insn::Integer {
value: i,
dest: reg,
});
} else if let Ok(f) = arg_str.parse::<f64>() {
program.emit_insn(Insn::Real {
value: f,
dest: reg,
});
} else if arg_str.starts_with('"') && arg_str.ends_with('"') {
program.emit_insn(Insn::String8 {
value: arg_str.trim_matches('"').to_string(),
dest: reg,
});
} else {
program.emit_insn(Insn::String8 {
value: arg_str.clone(),
dest: reg,
});
}
let _ =
translate_expr(program, Some(tables), arg, reg, &t_ctx.resolver)?;
}
program.emit_insn(Insn::VFilter {
cursor_id,
pc_if_empty: loop_end,
arg_count: args.len(),
arg_count: table.args.as_ref().map_or(0, |args| args.len()),
args_reg: start_reg,
});
}
@@ -697,9 +680,9 @@ fn emit_loop_source(
);
let offset_jump_to = t_ctx
.labels_main_loop
.get(0)
.first()
.map(|l| l.next)
.or_else(|| t_ctx.label_main_loop_end);
.or(t_ctx.label_main_loop_end);
emit_select_result(
program,
t_ctx,

View File

@@ -572,8 +572,8 @@ fn create_vtable_body_to_str(vtab: &CreateVirtualTable) -> String {
format!(
"CREATE VIRTUAL TABLE {} {} USING {}{}",
vtab.tbl_name.name.0,
vtab.module_name.0,
if_not_exists,
vtab.module_name.0,
if args.is_empty() {
String::new()
} else {

View File

@@ -9,7 +9,7 @@ use super::{
use crate::{
function::Func,
schema::{Schema, Table},
util::{exprs_are_equivalent, normalize_ident},
util::{exprs_are_equivalent, normalize_ident, vtable_args},
vdbe::BranchOffset,
Result,
};
@@ -303,16 +303,25 @@ fn parse_from_clause_table<'a>(
return Ok(());
};
// Check if our top level schema has this table.
if let Some(table) = schema.get_btree_table(&normalized_qualified_name) {
if let Some(table) = schema.get_table(&normalized_qualified_name) {
let alias = maybe_alias
.map(|a| match a {
ast::As::As(id) => id,
ast::As::Elided(id) => id,
})
.map(|a| a.0);
let tbl_ref = if let Table::Virtual(tbl) = table.as_ref() {
Table::Virtual(tbl.clone())
} else if let Table::BTree(table) = table.as_ref() {
Table::BTree(table.clone())
} else {
return Err(crate::LimboError::InvalidArgument(
"Table type not supported".to_string(),
));
};
scope.tables.push(TableReference {
op: Operation::Scan { iter_dir: None },
table: Table::BTree(table.clone()),
table: tbl_ref,
identifier: alias.unwrap_or(normalized_qualified_name),
join_info: None,
});
@@ -367,17 +376,16 @@ fn parse_from_clause_table<'a>(
.push(TableReference::new_subquery(identifier, subplan, None));
Ok(())
}
ast::SelectTable::TableCall(qualified_name, maybe_args, maybe_alias) => {
ast::SelectTable::TableCall(qualified_name, ref maybe_args, maybe_alias) => {
let normalized_name = &normalize_ident(qualified_name.name.0.as_str());
let args = vtable_args(maybe_args.as_ref().unwrap_or(&vec![]).as_slice());
let vtab = crate::VirtualTable::from_args(
None,
normalized_name,
&maybe_args
.as_ref()
.map(|a| a.iter().map(|s| s.to_string()).collect::<Vec<_>>())
.unwrap_or_default(),
args,
syms,
limbo_ext::VTabKind::TableValuedFunction,
maybe_args,
)?;
let alias = maybe_alias
.as_ref()
@@ -610,7 +618,7 @@ fn parse_join<'a>(
constraint,
} = join;
parse_from_clause_table(schema, table, scope, &syms)?;
parse_from_clause_table(schema, table, scope, syms)?;
let (outer, natural) = match join_operator {
ast::JoinOperator::TypedJoin(Some(join_type)) => {

View File

@@ -388,6 +388,35 @@ pub fn columns_from_create_table_body(body: &ast::CreateTableBody) -> crate::Res
.collect::<Vec<_>>())
}
// for TVF's we need these at planning time so we cannot emit translate_expr
pub fn vtable_args(args: &[ast::Expr]) -> Vec<limbo_ext::Value> {
let mut vtable_args = Vec::new();
for arg in args {
match arg {
Expr::Literal(lit) => match lit {
Literal::Numeric(i) => {
if i.contains('.') {
vtable_args.push(limbo_ext::Value::from_float(i.parse().unwrap()));
} else {
vtable_args.push(limbo_ext::Value::from_integer(i.parse().unwrap()));
}
}
Literal::String(s) => {
vtable_args.push(limbo_ext::Value::from_text(s.clone()));
}
Literal::Blob(b) => {
vtable_args.push(limbo_ext::Value::from_blob(b.as_bytes().into()));
}
_ => {
vtable_args.push(limbo_ext::Value::null());
}
},
_ => vtable_args.push(limbo_ext::Value::null()),
}
}
vtable_args
}
#[cfg(test)]
pub mod tests {
use super::*;

View File

@@ -304,14 +304,19 @@ impl<const N: usize> Bitfield<N> {
}
}
pub struct VTabOpaqueCursor(*mut c_void);
pub struct VTabOpaqueCursor(*const c_void);
impl VTabOpaqueCursor {
pub fn new(cursor: *mut c_void) -> Self {
Self(cursor)
pub fn new(cursor: *const c_void) -> Result<Self> {
if cursor.is_null() {
return Err(LimboError::InternalError(
"VTabOpaqueCursor: cursor is null".into(),
));
}
Ok(Self(cursor))
}
pub fn as_ptr(&self) -> *mut c_void {
pub fn as_ptr(&self) -> *const c_void {
self.0
}
}
@@ -866,7 +871,7 @@ impl Program {
let CursorType::VirtualTable(virtual_table) = cursor_type else {
panic!("VOpenAsync on non-virtual table cursor");
};
let cursor = virtual_table.open();
let cursor = virtual_table.open()?;
state
.cursors
.borrow_mut()
@@ -882,7 +887,7 @@ impl Program {
let table_name = state.registers[*table_name].to_string();
let args = if let Some(args_reg) = args_reg {
if let OwnedValue::Record(rec) = &state.registers[*args_reg] {
rec.get_values().iter().map(|v| v.to_string()).collect()
rec.get_values().iter().map(|v| v.to_ffi()).collect()
} else {
return Err(LimboError::InternalError(
"VCreate: args_reg is not a record".to_string(),
@@ -899,9 +904,10 @@ impl Program {
let table = crate::VirtualTable::from_args(
Some(&table_name),
&module_name,
&args,
args,
&conn.db.syms.borrow(),
limbo_ext::VTabKind::VirtualTable,
&None,
)?;
{
conn.db
@@ -971,7 +977,6 @@ impl Program {
.to_string(),
));
}
let mut argv = Vec::with_capacity(*arg_count);
for i in 0..*arg_count {
if let Some(value) = state.registers.get(*start_reg + i) {
@@ -983,18 +988,10 @@ impl Program {
)));
}
}
let current_rowid = match argv.first() {
Some(OwnedValue::Integer(rowid)) => Some(*rowid),
_ => None,
};
let insert_rowid = match argv.get(1) {
Some(OwnedValue::Integer(rowid)) => Some(*rowid),
_ => None,
};
let result = virtual_table.update(&argv, insert_rowid);
// argv[0] = current_rowid (for DELETE if applicable)
// argv[1] = insert_rowid (for INSERT if applicable)
// argv[2..] = column values
let result = virtual_table.update(&argv);
match result {
Ok(Some(new_rowid)) => {
if *conflict_action == 5 {
@@ -1005,9 +1002,11 @@ impl Program {
state.pc += 1;
}
Ok(None) => {
// no-op or successful update without rowid return
state.pc += 1;
}
Err(e) => {
// virtual table update failed
return Err(LimboError::ExtensionError(format!(
"Virtual table update failed: {}",
e
@@ -1355,11 +1354,30 @@ impl Program {
}
}
let cursor = get_cursor_as_table_mut(&mut cursors, *cursor_id);
if let Some(ref rowid) = cursor.rowid()? {
state.registers[*dest] = OwnedValue::Integer(*rowid as i64);
if let Some(Cursor::Table(btree_cursor)) = cursors.get_mut(*cursor_id).unwrap()
{
if let Some(ref rowid) = btree_cursor.rowid()? {
state.registers[*dest] = OwnedValue::Integer(*rowid as i64);
} else {
state.registers[*dest] = OwnedValue::Null;
}
} else if let Some(Cursor::Virtual(virtual_cursor)) =
cursors.get_mut(*cursor_id).unwrap()
{
let (_, cursor_type) = self.cursor_ref.get(*cursor_id).unwrap();
let CursorType::VirtualTable(virtual_table) = cursor_type else {
panic!("VUpdate on non-virtual table cursor");
};
let rowid = virtual_table.rowid(virtual_cursor);
if rowid != 0 {
state.registers[*dest] = OwnedValue::Integer(rowid);
} else {
state.registers[*dest] = OwnedValue::Null;
}
} else {
state.registers[*dest] = OwnedValue::Null;
return Err(LimboError::InternalError(
"RowId: cursor is not a table or virtual cursor".to_string(),
));
}
state.pc += 1;
}