diff --git a/core/lib.rs b/core/lib.rs index a2d5d52e9..b70ad7bcb 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -21,6 +21,7 @@ pub mod types; mod util; mod vdbe; mod vector; +mod vtab; #[cfg(feature = "fuzz")] pub mod numeric; @@ -32,6 +33,7 @@ mod numeric; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; +use crate::vtab::VirtualTable; use crate::{fast_lock::SpinLock, translate::optimizer::optimize_plan}; pub use error::LimboError; use fallible_iterator::FallibleIterator; @@ -43,12 +45,9 @@ pub use io::UringIO; pub use io::{ Buffer, Completion, File, MemoryIO, OpenFlags, PlatformIO, SyscallIO, WriteCompletion, IO, }; -use limbo_ext::{ConstraintInfo, IndexInfo, OrderByInfo, ResultCode, VTabKind, VTabModuleImpl}; use limbo_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser}; use parking_lot::RwLock; -use schema::{Column, Schema}; -use std::ffi::c_void; -use std::rc::Weak; +use schema::Schema; use std::{ borrow::Cow, cell::{Cell, RefCell, UnsafeCell}, @@ -79,9 +78,10 @@ use storage::{ use translate::select::prepare_select_plan; pub use types::RefValue; pub use types::Value; -use util::{columns_from_create_table_body, parse_schema_rows}; +use util::parse_schema_rows; +use vdbe::builder::QueryMode; use vdbe::builder::TableRefIdCounter; -use vdbe::{builder::QueryMode, VTabOpaqueCursor}; + pub type Result = std::result::Result; pub static DATABASE_VERSION: OnceLock = OnceLock::new(); @@ -789,198 +789,6 @@ pub type Row = vdbe::Row; pub type StepResult = vdbe::StepResult; -#[derive(Clone, Debug)] -pub struct VirtualTable { - name: String, - args: Option>, - pub implementation: Rc, - columns: Vec, - kind: VTabKind, - table_ptr: *const c_void, - connection_ptr: RefCell>, -} - -impl Drop for VirtualTable { - fn drop(&mut self) { - if let Some(conn) = self.connection_ptr.borrow_mut().take() { - if conn.is_null() { - return; - } - // free the memory for the limbo_ext::Conn itself - let mut conn = unsafe { Box::from_raw(conn) }; - // frees the boxed Weak pointer - conn.close(); - } - *self.connection_ptr.borrow_mut() = None; - } -} - -impl VirtualTable { - pub(crate) fn rowid(&self, cursor: &VTabOpaqueCursor) -> i64 { - unsafe { (self.implementation.rowid)(cursor.as_ptr()) } - } - - pub(crate) fn best_index( - &self, - constraints: &[ConstraintInfo], - order_by: &[OrderByInfo], - ) -> IndexInfo { - unsafe { - IndexInfo::from_ffi((self.implementation.best_idx)( - constraints.as_ptr(), - constraints.len() as i32, - order_by.as_ptr(), - order_by.len() as i32, - )) - } - } - - /// takes ownership of the provided Args - pub(crate) fn from_args( - tbl_name: Option<&str>, - module_name: &str, - args: Vec, - syms: &SymbolTable, - kind: VTabKind, - exprs: Option>, - ) -> Result> { - let module = syms - .vtab_modules - .get(module_name) - .ok_or(LimboError::ExtensionError(format!( - "Virtual table module not found: {}", - module_name - )))?; - if let VTabKind::VirtualTable = kind { - if module.module_kind == VTabKind::TableValuedFunction { - return Err(LimboError::ExtensionError(format!( - "{} is not a virtual table module", - module_name - ))); - } - }; - let (schema, table_ptr) = module.implementation.create(args)?; - let mut parser = Parser::new(schema.as_bytes()); - if let ast::Cmd::Stmt(ast::Stmt::CreateTable { body, .. }) = parser.next()?.ok_or( - LimboError::ParseError("Failed to parse schema from virtual table module".to_string()), - )? { - let columns = columns_from_create_table_body(&body)?; - let vtab = Rc::new(VirtualTable { - name: tbl_name.unwrap_or(module_name).to_owned(), - connection_ptr: RefCell::new(None), - implementation: module.implementation.clone(), - columns, - args: exprs, - kind, - table_ptr, - }); - return Ok(vtab); - } - Err(crate::LimboError::ParseError( - "Failed to parse schema from virtual table module".to_string(), - )) - } - - /// Accepts a Weak pointer to the connection that owns the VTable, that the module - /// can optionally use to query the other tables. - pub fn open(&self, conn: Weak) -> crate::Result { - // we need a Weak to upgrade and call from the extension. - let weak_box: *mut Weak = Box::into_raw(Box::new(conn)); - let conn = limbo_ext::Conn::new( - weak_box.cast(), - crate::ext::prepare_stmt, - crate::ext::execute, - crate::ext::close, - ); - let ext_conn_ptr = Box::into_raw(Box::new(conn)); - // store the leaked connection pointer on the table so it can be freed on drop - *self.connection_ptr.borrow_mut() = Some(ext_conn_ptr); - let cursor = unsafe { (self.implementation.open)(self.table_ptr, ext_conn_ptr) }; - VTabOpaqueCursor::new(cursor, self.implementation.close) - } - - #[tracing::instrument(skip(cursor))] - pub fn filter( - &self, - cursor: &VTabOpaqueCursor, - idx_num: i32, - idx_str: Option, - arg_count: usize, - args: Vec, - ) -> Result { - tracing::trace!("xFilter"); - let c_idx_str = idx_str - .map(|s| std::ffi::CString::new(s).unwrap()) - .map(|cstr| cstr.into_raw()) - .unwrap_or(std::ptr::null_mut()); - let rc = unsafe { - (self.implementation.filter)( - cursor.as_ptr(), - arg_count as i32, - args.as_ptr(), - c_idx_str, - idx_num, - ) - }; - for arg in args { - unsafe { - arg.__free_internal_type(); - } - } - match rc { - ResultCode::OK => Ok(true), - ResultCode::EOF => Ok(false), - _ => Err(LimboError::ExtensionError(rc.to_string())), - } - } - - pub fn column(&self, cursor: &VTabOpaqueCursor, column: usize) -> Result { - let val = unsafe { (self.implementation.column)(cursor.as_ptr(), column as u32) }; - Value::from_ffi(val) - } - - pub fn next(&self, cursor: &VTabOpaqueCursor) -> Result { - let rc = unsafe { (self.implementation.next)(cursor.as_ptr()) }; - match rc { - ResultCode::OK => Ok(true), - ResultCode::EOF => Ok(false), - _ => Err(LimboError::ExtensionError("Next failed".to_string())), - } - } - - pub fn update(&self, args: &[Value]) -> Result> { - let arg_count = args.len(); - let ext_args = args.iter().map(|arg| arg.to_ffi()).collect::>(); - let newrowid = 0i64; - let rc = unsafe { - (self.implementation.update)( - self.table_ptr, - arg_count as i32, - ext_args.as_ptr(), - &newrowid as *const _ as *mut i64, - ) - }; - for arg in ext_args { - unsafe { - arg.__free_internal_type(); - } - } - match rc { - ResultCode::OK => Ok(None), - ResultCode::RowID => Ok(Some(newrowid)), - _ => Err(LimboError::ExtensionError(rc.to_string())), - } - } - - pub fn destroy(&self) -> Result<()> { - let rc = unsafe { (self.implementation.destroy)(self.table_ptr) }; - match rc { - ResultCode::OK => Ok(()), - _ => Err(LimboError::ExtensionError(rc.to_string())), - } - } -} - pub struct SymbolTable { pub functions: HashMap>, pub vtabs: HashMap>, diff --git a/core/vtab.rs b/core/vtab.rs new file mode 100644 index 000000000..0fcb40335 --- /dev/null +++ b/core/vtab.rs @@ -0,0 +1,202 @@ +use crate::schema::Column; +use crate::util::columns_from_create_table_body; +use crate::vdbe::VTabOpaqueCursor; +use crate::{Connection, LimboError, SymbolTable, Value}; +use fallible_iterator::FallibleIterator; +use limbo_ext::{ConstraintInfo, IndexInfo, OrderByInfo, ResultCode, VTabKind, VTabModuleImpl}; +use limbo_sqlite3_parser::{ast, lexer::sql::Parser}; +use std::cell::RefCell; +use std::ffi::c_void; +use std::rc::{Rc, Weak}; + +#[derive(Clone, Debug)] +pub struct VirtualTable { + pub name: String, + pub args: Option>, + pub implementation: Rc, + pub columns: Vec, + pub kind: VTabKind, + table_ptr: *const c_void, + connection_ptr: RefCell>, +} + +impl Drop for VirtualTable { + fn drop(&mut self) { + if let Some(conn) = self.connection_ptr.borrow_mut().take() { + if conn.is_null() { + return; + } + // free the memory for the limbo_ext::Conn itself + let mut conn = unsafe { Box::from_raw(conn) }; + // frees the boxed Weak pointer + conn.close(); + } + *self.connection_ptr.borrow_mut() = None; + } +} + +impl VirtualTable { + pub(crate) fn rowid(&self, cursor: &VTabOpaqueCursor) -> i64 { + unsafe { (self.implementation.rowid)(cursor.as_ptr()) } + } + + pub(crate) fn best_index( + &self, + constraints: &[ConstraintInfo], + order_by: &[OrderByInfo], + ) -> IndexInfo { + unsafe { + IndexInfo::from_ffi((self.implementation.best_idx)( + constraints.as_ptr(), + constraints.len() as i32, + order_by.as_ptr(), + order_by.len() as i32, + )) + } + } + + /// takes ownership of the provided Args + pub(crate) fn from_args( + tbl_name: Option<&str>, + module_name: &str, + args: Vec, + syms: &SymbolTable, + kind: VTabKind, + exprs: Option>, + ) -> crate::Result> { + let module = syms + .vtab_modules + .get(module_name) + .ok_or(LimboError::ExtensionError(format!( + "Virtual table module not found: {}", + module_name + )))?; + if let VTabKind::VirtualTable = kind { + if module.module_kind == VTabKind::TableValuedFunction { + return Err(LimboError::ExtensionError(format!( + "{} is not a virtual table module", + module_name + ))); + } + }; + let (schema, table_ptr) = module.implementation.create(args)?; + let mut parser = Parser::new(schema.as_bytes()); + if let ast::Cmd::Stmt(ast::Stmt::CreateTable { body, .. }) = parser.next()?.ok_or( + LimboError::ParseError("Failed to parse schema from virtual table module".to_string()), + )? { + let columns = columns_from_create_table_body(&body)?; + let vtab = Rc::new(VirtualTable { + name: tbl_name.unwrap_or(module_name).to_owned(), + connection_ptr: RefCell::new(None), + implementation: module.implementation.clone(), + columns, + args: exprs, + kind, + table_ptr, + }); + return Ok(vtab); + } + Err(LimboError::ParseError( + "Failed to parse schema from virtual table module".to_string(), + )) + } + + /// Accepts a Weak pointer to the connection that owns the VTable, that the module + /// can optionally use to query the other tables. + pub fn open(&self, conn: Weak) -> crate::Result { + // we need a Weak to upgrade and call from the extension. + let weak_box: *mut Weak = Box::into_raw(Box::new(conn)); + let conn = limbo_ext::Conn::new( + weak_box.cast(), + crate::ext::prepare_stmt, + crate::ext::execute, + crate::ext::close, + ); + let ext_conn_ptr = Box::into_raw(Box::new(conn)); + // store the leaked connection pointer on the table so it can be freed on drop + *self.connection_ptr.borrow_mut() = Some(ext_conn_ptr); + let cursor = unsafe { (self.implementation.open)(self.table_ptr, ext_conn_ptr) }; + VTabOpaqueCursor::new(cursor, self.implementation.close) + } + + #[tracing::instrument(skip(cursor))] + pub fn filter( + &self, + cursor: &VTabOpaqueCursor, + idx_num: i32, + idx_str: Option, + arg_count: usize, + args: Vec, + ) -> crate::Result { + tracing::trace!("xFilter"); + let c_idx_str = idx_str + .map(|s| std::ffi::CString::new(s).unwrap()) + .map(|cstr| cstr.into_raw()) + .unwrap_or(std::ptr::null_mut()); + let rc = unsafe { + (self.implementation.filter)( + cursor.as_ptr(), + arg_count as i32, + args.as_ptr(), + c_idx_str, + idx_num, + ) + }; + for arg in args { + unsafe { + arg.__free_internal_type(); + } + } + match rc { + ResultCode::OK => Ok(true), + ResultCode::EOF => Ok(false), + _ => Err(LimboError::ExtensionError(rc.to_string())), + } + } + + pub fn column(&self, cursor: &VTabOpaqueCursor, column: usize) -> crate::Result { + let val = unsafe { (self.implementation.column)(cursor.as_ptr(), column as u32) }; + Value::from_ffi(val) + } + + pub fn next(&self, cursor: &VTabOpaqueCursor) -> crate::Result { + let rc = unsafe { (self.implementation.next)(cursor.as_ptr()) }; + match rc { + ResultCode::OK => Ok(true), + ResultCode::EOF => Ok(false), + _ => Err(LimboError::ExtensionError("Next failed".to_string())), + } + } + + pub fn update(&self, args: &[Value]) -> crate::Result> { + let arg_count = args.len(); + let ext_args = args.iter().map(|arg| arg.to_ffi()).collect::>(); + let newrowid = 0i64; + let rc = unsafe { + (self.implementation.update)( + self.table_ptr, + arg_count as i32, + ext_args.as_ptr(), + &newrowid as *const _ as *mut i64, + ) + }; + for arg in ext_args { + unsafe { + arg.__free_internal_type(); + } + } + match rc { + ResultCode::OK => Ok(None), + ResultCode::RowID => Ok(Some(newrowid)), + _ => Err(LimboError::ExtensionError(rc.to_string())), + } + } + + pub fn destroy(&self) -> crate::Result<()> { + let rc = unsafe { (self.implementation.destroy)(self.table_ptr) }; + match rc { + ResultCode::OK => Ok(()), + _ => Err(LimboError::ExtensionError(rc.to_string())), + } + } +}