diff --git a/core/ext/mod.rs b/core/ext/mod.rs index 3ea7d9692..a5cfd5909 100644 --- a/core/ext/mod.rs +++ b/core/ext/mod.rs @@ -111,7 +111,6 @@ impl Database { .borrow_mut() .vtab_modules .insert(name.to_string(), vmodule.into()); - println!("Registered module: {}", name); ResultCode::OK } diff --git a/core/lib.rs b/core/lib.rs index d9727fd98..8719f3c0a 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -516,18 +516,23 @@ pub type StepResult = vdbe::StepResult; #[derive(Clone, Debug)] pub struct VirtualTable { name: String, - args: Option>, + args: Option>, pub implementation: Rc, columns: Vec, } impl VirtualTable { + pub(crate) fn rowid(&self, cursor: &VTabOpaqueCursor) -> i64 { + unsafe { (self.implementation.rowid)(cursor.as_ptr()) } + } + /// takes ownership of the provided Args pub(crate) fn from_args( tbl_name: Option<&str>, module_name: &str, - args: &[String], + args: Vec, syms: &SymbolTable, kind: VTabKind, + exprs: &Option>, ) -> Result> { let module = syms .vtab_modules @@ -544,19 +549,23 @@ impl VirtualTable { ))); } }; - let schema = module.implementation.as_ref().init_schema(args)?; + let schema = module.implementation.as_ref().init_schema(&args)?; + for arg in args { + unsafe { + arg.free(); + } + } let mut parser = Parser::new(schema.as_bytes()); parser.reset(schema.as_bytes()); - println!("Schema: {}", schema); if let ast::Cmd::Stmt(ast::Stmt::CreateTable { body, .. }) = parser.next()?.ok_or( LimboError::ParseError("Failed to parse schema from virtual table module".to_string()), )? { let columns = columns_from_create_table_body(&body)?; let vtab = Rc::new(VirtualTable { name: tbl_name.unwrap_or(module_name).to_owned(), - args: Some(args.to_vec()), implementation: module.implementation.clone(), columns, + args: exprs.clone(), }); return Ok(vtab); } @@ -565,24 +574,8 @@ impl VirtualTable { )) } - pub fn open(&self) -> VTabOpaqueCursor { - let args = if let Some(args) = &self.args { - args.iter() - .map(|e| std::ffi::CString::new(e.to_string()).unwrap().into_raw()) - .collect() - } else { - Vec::new() - }; - let cursor = - unsafe { (self.implementation.open)(args.as_slice().as_ptr(), args.len() as i32) }; - // free the CString pointers - for arg in args { - unsafe { - if !arg.is_null() { - let _ = std::ffi::CString::from_raw(arg); - } - } - } + pub fn open(&self) -> crate::Result { + let cursor = unsafe { (self.implementation.open)(self.implementation.ctx) }; VTabOpaqueCursor::new(cursor) } @@ -620,7 +613,11 @@ impl VirtualTable { pub fn column(&self, cursor: &VTabOpaqueCursor, column: usize) -> Result { let val = unsafe { (self.implementation.column)(cursor.as_ptr(), column as u32) }; - OwnedValue::from_ffi(&val) + let res = OwnedValue::from_ffi(&val)?; + unsafe { + val.free(); + } + Ok(res) } pub fn next(&self, cursor: &VTabOpaqueCursor) -> Result { @@ -632,7 +629,7 @@ impl VirtualTable { } } - pub fn update(&self, args: &[OwnedValue], rowid: Option) -> Result> { + pub fn update(&self, args: &[OwnedValue]) -> Result> { let arg_count = args.len(); let mut ext_args = Vec::with_capacity(arg_count); for i in 0..arg_count { @@ -650,7 +647,6 @@ impl VirtualTable { }?; ext_args.push(extvalue_arg); } - let rowid = rowid.unwrap_or(-1); let newrowid = 0i64; let implementation = self.implementation.as_ref(); let rc = unsafe { @@ -658,10 +654,14 @@ impl VirtualTable { implementation as *const VTabModuleImpl as *mut std::ffi::c_void, arg_count as i32, ext_args.as_ptr(), - rowid, &newrowid as *const _ as *mut i64, ) }; + for arg in ext_args { + unsafe { + arg.free(); + } + } match rc { ResultCode::OK => Ok(None), ResultCode::RowID => Ok(Some(newrowid)), diff --git a/core/translate/delete.rs b/core/translate/delete.rs index 81f8ba6ef..1e0d64a98 100644 --- a/core/translate/delete.rs +++ b/core/translate/delete.rs @@ -25,7 +25,7 @@ pub fn translate_delete( let mut program = ProgramBuilder::new(ProgramBuilderOpts { query_mode, num_cursors: 1, - approx_num_insns: estimate_num_instructions(&delete), + approx_num_insns: estimate_num_instructions(delete), approx_num_labels: 0, }); emit_program(&mut program, delete_plan, syms)?; @@ -42,13 +42,17 @@ pub fn prepare_delete_plan( Some(table) => table, None => crate::bail_corrupt_error!("Parse error: no such table: {}", tbl_name), }; - //if let Some(table) = table.virtual_table() { - // // TODO: emit VUpdate - //} - let table = table.btree().unwrap(); + let table = if let Some(table) = table.virtual_table() { + Table::Virtual(table.clone()) + } else if let Some(table) = table.btree() { + Table::BTree(table.clone()) + } else { + crate::bail_corrupt_error!("Table is neither a virtual table nor a btree table"); + }; + let name = tbl_name.name.0.as_str().to_string(); let table_references = vec![TableReference { - table: Table::BTree(table.clone()), - identifier: table.name.clone(), + table, + identifier: name, op: Operation::Scan { iter_dir: None }, join_info: None, }]; diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 7786c90a9..8d095eb9b 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -274,7 +274,7 @@ pub fn emit_query<'a>( fn emit_program_for_delete( program: &mut ProgramBuilder, - mut plan: DeletePlan, + plan: DeletePlan, syms: &SymbolTable, ) -> Result<()> { let (mut t_ctx, init_label, start_offset) = prologue( @@ -286,6 +286,7 @@ fn emit_program_for_delete( // No rows will be read from source table loops if there is a constant false condition eg. WHERE 0 let after_main_loop_label = program.allocate_label(); + t_ctx.label_main_loop_end = Some(after_main_loop_label); if plan.contains_constant_false_condition { program.emit_insn(Insn::Goto { target_pc: after_main_loop_label, @@ -304,11 +305,16 @@ fn emit_program_for_delete( open_loop( program, &mut t_ctx, - &mut plan.table_references, + &plan.table_references, &plan.where_clause, )?; - - emit_delete_insns(program, &mut t_ctx, &plan.table_references, &plan.limit)?; + if let Some(table) = plan.table_references.first() { + if table.virtual_table().is_some() { + emit_delete_vtable_insns(program, &mut t_ctx, &plan.table_references, &plan.limit)?; + } else { + emit_delete_insns(program, &mut t_ctx, &plan.table_references, &plan.limit)?; + } + } // Clean up and close the main execution loop close_loop(program, &mut t_ctx, &plan.table_references)?; @@ -322,6 +328,77 @@ fn emit_program_for_delete( Ok(()) } +fn emit_delete_vtable_insns( + program: &mut ProgramBuilder, + t_ctx: &mut TranslateCtx, + table_references: &[TableReference], + limit: &Option, +) -> Result<()> { + let table_reference = table_references.first().unwrap(); + + let cursor_id = match &table_reference.op { + Operation::Scan { .. } => program.resolve_cursor_id(&table_reference.identifier), + Operation::Search(search) => match search { + Search::RowidEq { .. } | Search::RowidSearch { .. } => { + program.resolve_cursor_id(&table_reference.identifier) + } + Search::IndexSearch { index, .. } => program.resolve_cursor_id(&index.name), + }, + _ => return Ok(()), + }; + + let rowid_reg = program.alloc_register(); + program.emit_insn(Insn::RowId { + cursor_id, + dest: rowid_reg, + }); + // if we have a limit, decrement and check zero + if let Some(limit) = limit { + let limit_reg = program.alloc_register(); + program.emit_insn(Insn::Integer { + value: *limit as i64, + dest: limit_reg, + }); + program.mark_last_insn_constant(); + + program.emit_insn(Insn::DecrJumpZero { + reg: limit_reg, + target_pc: t_ctx.label_main_loop_end.unwrap(), + }); + } + + // we want old_rowid= rowid_reg, new_rowid= NULL, so we pass 2 arguments to VUpdate + // we need a second register for the new rowid = NULL + let new_rowid_reg = program.alloc_register(); + + program.emit_insn(Insn::Null { + dest: new_rowid_reg, + dest_end: None, + }); + + // we'll do VUpdate with arg_count=2: + // argv[0] => old_rowid = rowid_reg + // argv[1] => new_rowid = new_rowid_reg (NULL) + + let Some(virtual_table) = table_reference.virtual_table() else { + return Err(crate::LimboError::ParseError( + "Table is not a virtual table".to_string(), + )); + }; + let conflict_action = 0u16; + let start_reg = rowid_reg; + + program.emit_insn(Insn::VUpdate { + cursor_id, + arg_count: 2, + start_reg, + vtab_ptr: virtual_table.implementation.as_ref().ctx as usize, + conflict_action, + }); + + Ok(()) +} + fn emit_delete_insns( program: &mut ProgramBuilder, t_ctx: &mut TranslateCtx, diff --git a/core/translate/insert.rs b/core/translate/insert.rs index 5f933e93a..f77171621 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -62,7 +62,7 @@ pub fn translate_insert( body, on_conflict, &resolver, - ); + )?; return Ok(program); } let init_label = program.allocate_label(); @@ -474,11 +474,17 @@ fn translate_virtual_table_insert( for (i, expr) in values[0].iter().enumerate() { translate_expr(program, None, expr, value_registers_start + i, resolver)?; } + /* * + * Inserts for virtual tables are done in a single step. The rowid is not provided by the user, but is generated by the + * vtable implementation. + * argv[0] = current_rowid (NULL for insert) + * argv[1] = insert_rowid (NULL for insert) + * argv[2..] = column values + * */ - let start_reg = program.alloc_registers(column_mappings.len() + 3); - let rowid_reg = start_reg; // argv[0] = rowid - let insert_rowid_reg = start_reg + 1; // argv[1] = insert_rowid - let data_start_reg = start_reg + 2; // argv[2..] = column values + let rowid_reg = program.alloc_registers(column_mappings.len() + 3); + let insert_rowid_reg = rowid_reg + 1; // argv[1] = insert_rowid + let data_start_reg = rowid_reg + 2; // argv[2..] = column values program.emit_insn(Insn::Null { dest: rowid_reg, @@ -515,7 +521,7 @@ fn translate_virtual_table_insert( program.emit_insn(Insn::VUpdate { cursor_id, arg_count: column_mappings.len() + 2, - start_reg, + start_reg: rowid_reg, vtab_ptr: virtual_table.implementation.as_ref().ctx as usize, conflict_action, }); diff --git a/core/translate/main_loop.rs b/core/translate/main_loop.rs index a0e4a13c4..953e39242 100644 --- a/core/translate/main_loop.rs +++ b/core/translate/main_loop.rs @@ -110,6 +110,10 @@ pub fn init_loop( program.emit_insn(Insn::VOpenAsync { cursor_id }); program.emit_insn(Insn::VOpenAwait {}); } + (OperationMode::DELETE, Table::Virtual(_)) => { + program.emit_insn(Insn::VOpenAsync { cursor_id }); + program.emit_insn(Insn::VOpenAwait {}); + } _ => { unimplemented!() } @@ -286,44 +290,23 @@ pub fn open_loop( }, ), Table::Virtual(ref table) => { - let args = if let Some(args) = table.args.as_ref() { - args - } else { - &vec![] - }; - let start_reg = program.alloc_registers(args.len()); + let start_reg = program + .alloc_registers(table.args.as_ref().map(|a| a.len()).unwrap_or(0)); let mut cur_reg = start_reg; - - for arg_str in args { + let args = match table.args.as_ref() { + Some(args) => args, + None => &vec![], + }; + for arg in args { let reg = cur_reg; cur_reg += 1; - - if let Ok(i) = arg_str.parse::() { - program.emit_insn(Insn::Integer { - value: i, - dest: reg, - }); - } else if let Ok(f) = arg_str.parse::() { - program.emit_insn(Insn::Real { - value: f, - dest: reg, - }); - } else if arg_str.starts_with('"') && arg_str.ends_with('"') { - program.emit_insn(Insn::String8 { - value: arg_str.trim_matches('"').to_string(), - dest: reg, - }); - } else { - program.emit_insn(Insn::String8 { - value: arg_str.clone(), - dest: reg, - }); - } + let _ = + translate_expr(program, Some(tables), arg, reg, &t_ctx.resolver)?; } program.emit_insn(Insn::VFilter { cursor_id, pc_if_empty: loop_end, - arg_count: args.len(), + arg_count: table.args.as_ref().map_or(0, |args| args.len()), args_reg: start_reg, }); } @@ -697,9 +680,9 @@ fn emit_loop_source( ); let offset_jump_to = t_ctx .labels_main_loop - .get(0) + .first() .map(|l| l.next) - .or_else(|| t_ctx.label_main_loop_end); + .or(t_ctx.label_main_loop_end); emit_select_result( program, t_ctx, diff --git a/core/translate/mod.rs b/core/translate/mod.rs index 7df6258ec..f485fb7ee 100644 --- a/core/translate/mod.rs +++ b/core/translate/mod.rs @@ -572,8 +572,8 @@ fn create_vtable_body_to_str(vtab: &CreateVirtualTable) -> String { format!( "CREATE VIRTUAL TABLE {} {} USING {}{}", vtab.tbl_name.name.0, - vtab.module_name.0, if_not_exists, + vtab.module_name.0, if args.is_empty() { String::new() } else { diff --git a/core/translate/planner.rs b/core/translate/planner.rs index c9c266a13..440744426 100644 --- a/core/translate/planner.rs +++ b/core/translate/planner.rs @@ -9,7 +9,7 @@ use super::{ use crate::{ function::Func, schema::{Schema, Table}, - util::{exprs_are_equivalent, normalize_ident}, + util::{exprs_are_equivalent, normalize_ident, vtable_args}, vdbe::BranchOffset, Result, }; @@ -303,16 +303,25 @@ fn parse_from_clause_table<'a>( return Ok(()); }; // Check if our top level schema has this table. - if let Some(table) = schema.get_btree_table(&normalized_qualified_name) { + if let Some(table) = schema.get_table(&normalized_qualified_name) { let alias = maybe_alias .map(|a| match a { ast::As::As(id) => id, ast::As::Elided(id) => id, }) .map(|a| a.0); + let tbl_ref = if let Table::Virtual(tbl) = table.as_ref() { + Table::Virtual(tbl.clone()) + } else if let Table::BTree(table) = table.as_ref() { + Table::BTree(table.clone()) + } else { + return Err(crate::LimboError::InvalidArgument( + "Table type not supported".to_string(), + )); + }; scope.tables.push(TableReference { op: Operation::Scan { iter_dir: None }, - table: Table::BTree(table.clone()), + table: tbl_ref, identifier: alias.unwrap_or(normalized_qualified_name), join_info: None, }); @@ -367,17 +376,16 @@ fn parse_from_clause_table<'a>( .push(TableReference::new_subquery(identifier, subplan, None)); Ok(()) } - ast::SelectTable::TableCall(qualified_name, maybe_args, maybe_alias) => { + ast::SelectTable::TableCall(qualified_name, ref maybe_args, maybe_alias) => { let normalized_name = &normalize_ident(qualified_name.name.0.as_str()); + let args = vtable_args(maybe_args.as_ref().unwrap_or(&vec![]).as_slice()); let vtab = crate::VirtualTable::from_args( None, normalized_name, - &maybe_args - .as_ref() - .map(|a| a.iter().map(|s| s.to_string()).collect::>()) - .unwrap_or_default(), + args, syms, limbo_ext::VTabKind::TableValuedFunction, + maybe_args, )?; let alias = maybe_alias .as_ref() @@ -610,7 +618,7 @@ fn parse_join<'a>( constraint, } = join; - parse_from_clause_table(schema, table, scope, &syms)?; + parse_from_clause_table(schema, table, scope, syms)?; let (outer, natural) = match join_operator { ast::JoinOperator::TypedJoin(Some(join_type)) => { diff --git a/core/util.rs b/core/util.rs index ddaf931a9..17c12fbb3 100644 --- a/core/util.rs +++ b/core/util.rs @@ -388,6 +388,35 @@ pub fn columns_from_create_table_body(body: &ast::CreateTableBody) -> crate::Res .collect::>()) } +// for TVF's we need these at planning time so we cannot emit translate_expr +pub fn vtable_args(args: &[ast::Expr]) -> Vec { + let mut vtable_args = Vec::new(); + for arg in args { + match arg { + Expr::Literal(lit) => match lit { + Literal::Numeric(i) => { + if i.contains('.') { + vtable_args.push(limbo_ext::Value::from_float(i.parse().unwrap())); + } else { + vtable_args.push(limbo_ext::Value::from_integer(i.parse().unwrap())); + } + } + Literal::String(s) => { + vtable_args.push(limbo_ext::Value::from_text(s.clone())); + } + Literal::Blob(b) => { + vtable_args.push(limbo_ext::Value::from_blob(b.as_bytes().into())); + } + _ => { + vtable_args.push(limbo_ext::Value::null()); + } + }, + _ => vtable_args.push(limbo_ext::Value::null()), + } + } + vtable_args +} + #[cfg(test)] pub mod tests { use super::*; diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index a881c0d56..f4a2725c2 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -304,14 +304,19 @@ impl Bitfield { } } -pub struct VTabOpaqueCursor(*mut c_void); +pub struct VTabOpaqueCursor(*const c_void); impl VTabOpaqueCursor { - pub fn new(cursor: *mut c_void) -> Self { - Self(cursor) + pub fn new(cursor: *const c_void) -> Result { + if cursor.is_null() { + return Err(LimboError::InternalError( + "VTabOpaqueCursor: cursor is null".into(), + )); + } + Ok(Self(cursor)) } - pub fn as_ptr(&self) -> *mut c_void { + pub fn as_ptr(&self) -> *const c_void { self.0 } } @@ -866,7 +871,7 @@ impl Program { let CursorType::VirtualTable(virtual_table) = cursor_type else { panic!("VOpenAsync on non-virtual table cursor"); }; - let cursor = virtual_table.open(); + let cursor = virtual_table.open()?; state .cursors .borrow_mut() @@ -882,7 +887,7 @@ impl Program { let table_name = state.registers[*table_name].to_string(); let args = if let Some(args_reg) = args_reg { if let OwnedValue::Record(rec) = &state.registers[*args_reg] { - rec.get_values().iter().map(|v| v.to_string()).collect() + rec.get_values().iter().map(|v| v.to_ffi()).collect() } else { return Err(LimboError::InternalError( "VCreate: args_reg is not a record".to_string(), @@ -899,9 +904,10 @@ impl Program { let table = crate::VirtualTable::from_args( Some(&table_name), &module_name, - &args, + args, &conn.db.syms.borrow(), limbo_ext::VTabKind::VirtualTable, + &None, )?; { conn.db @@ -971,7 +977,6 @@ impl Program { .to_string(), )); } - let mut argv = Vec::with_capacity(*arg_count); for i in 0..*arg_count { if let Some(value) = state.registers.get(*start_reg + i) { @@ -983,18 +988,10 @@ impl Program { ))); } } - - let current_rowid = match argv.first() { - Some(OwnedValue::Integer(rowid)) => Some(*rowid), - _ => None, - }; - let insert_rowid = match argv.get(1) { - Some(OwnedValue::Integer(rowid)) => Some(*rowid), - _ => None, - }; - - let result = virtual_table.update(&argv, insert_rowid); - + // argv[0] = current_rowid (for DELETE if applicable) + // argv[1] = insert_rowid (for INSERT if applicable) + // argv[2..] = column values + let result = virtual_table.update(&argv); match result { Ok(Some(new_rowid)) => { if *conflict_action == 5 { @@ -1005,9 +1002,11 @@ impl Program { state.pc += 1; } Ok(None) => { + // no-op or successful update without rowid return state.pc += 1; } Err(e) => { + // virtual table update failed return Err(LimboError::ExtensionError(format!( "Virtual table update failed: {}", e @@ -1355,11 +1354,30 @@ impl Program { } } - let cursor = get_cursor_as_table_mut(&mut cursors, *cursor_id); - if let Some(ref rowid) = cursor.rowid()? { - state.registers[*dest] = OwnedValue::Integer(*rowid as i64); + if let Some(Cursor::Table(btree_cursor)) = cursors.get_mut(*cursor_id).unwrap() + { + if let Some(ref rowid) = btree_cursor.rowid()? { + state.registers[*dest] = OwnedValue::Integer(*rowid as i64); + } else { + state.registers[*dest] = OwnedValue::Null; + } + } else if let Some(Cursor::Virtual(virtual_cursor)) = + cursors.get_mut(*cursor_id).unwrap() + { + let (_, cursor_type) = self.cursor_ref.get(*cursor_id).unwrap(); + let CursorType::VirtualTable(virtual_table) = cursor_type else { + panic!("VUpdate on non-virtual table cursor"); + }; + let rowid = virtual_table.rowid(virtual_cursor); + if rowid != 0 { + state.registers[*dest] = OwnedValue::Integer(rowid); + } else { + state.registers[*dest] = OwnedValue::Null; + } } else { - state.registers[*dest] = OwnedValue::Null; + return Err(LimboError::InternalError( + "RowId: cursor is not a table or virtual cursor".to_string(), + )); } state.pc += 1; }