mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-22 16:35:30 +01:00
Extract VirtualTable to a separate module
This commit is contained in:
204
core/lib.rs
204
core/lib.rs
@@ -21,6 +21,7 @@ pub mod types;
|
||||
mod util;
|
||||
mod vdbe;
|
||||
mod vector;
|
||||
mod vtab;
|
||||
|
||||
#[cfg(feature = "fuzz")]
|
||||
pub mod numeric;
|
||||
@@ -32,6 +33,7 @@ mod numeric;
|
||||
#[global_allocator]
|
||||
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
|
||||
|
||||
use crate::vtab::VirtualTable;
|
||||
use crate::{fast_lock::SpinLock, translate::optimizer::optimize_plan};
|
||||
pub use error::LimboError;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
@@ -43,12 +45,9 @@ pub use io::UringIO;
|
||||
pub use io::{
|
||||
Buffer, Completion, File, MemoryIO, OpenFlags, PlatformIO, SyscallIO, WriteCompletion, IO,
|
||||
};
|
||||
use limbo_ext::{ConstraintInfo, IndexInfo, OrderByInfo, ResultCode, VTabKind, VTabModuleImpl};
|
||||
use limbo_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser};
|
||||
use parking_lot::RwLock;
|
||||
use schema::{Column, Schema};
|
||||
use std::ffi::c_void;
|
||||
use std::rc::Weak;
|
||||
use schema::Schema;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
cell::{Cell, RefCell, UnsafeCell},
|
||||
@@ -79,9 +78,10 @@ use storage::{
|
||||
use translate::select::prepare_select_plan;
|
||||
pub use types::RefValue;
|
||||
pub use types::Value;
|
||||
use util::{columns_from_create_table_body, parse_schema_rows};
|
||||
use util::parse_schema_rows;
|
||||
use vdbe::builder::QueryMode;
|
||||
use vdbe::builder::TableRefIdCounter;
|
||||
use vdbe::{builder::QueryMode, VTabOpaqueCursor};
|
||||
|
||||
pub type Result<T, E = LimboError> = std::result::Result<T, E>;
|
||||
pub static DATABASE_VERSION: OnceLock<String> = OnceLock::new();
|
||||
|
||||
@@ -789,198 +789,6 @@ pub type Row = vdbe::Row;
|
||||
|
||||
pub type StepResult = vdbe::StepResult;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct VirtualTable {
|
||||
name: String,
|
||||
args: Option<Vec<ast::Expr>>,
|
||||
pub implementation: Rc<VTabModuleImpl>,
|
||||
columns: Vec<Column>,
|
||||
kind: VTabKind,
|
||||
table_ptr: *const c_void,
|
||||
connection_ptr: RefCell<Option<*mut limbo_ext::Conn>>,
|
||||
}
|
||||
|
||||
impl Drop for VirtualTable {
|
||||
fn drop(&mut self) {
|
||||
if let Some(conn) = self.connection_ptr.borrow_mut().take() {
|
||||
if conn.is_null() {
|
||||
return;
|
||||
}
|
||||
// free the memory for the limbo_ext::Conn itself
|
||||
let mut conn = unsafe { Box::from_raw(conn) };
|
||||
// frees the boxed Weak pointer
|
||||
conn.close();
|
||||
}
|
||||
*self.connection_ptr.borrow_mut() = None;
|
||||
}
|
||||
}
|
||||
|
||||
impl VirtualTable {
|
||||
pub(crate) fn rowid(&self, cursor: &VTabOpaqueCursor) -> i64 {
|
||||
unsafe { (self.implementation.rowid)(cursor.as_ptr()) }
|
||||
}
|
||||
|
||||
pub(crate) fn best_index(
|
||||
&self,
|
||||
constraints: &[ConstraintInfo],
|
||||
order_by: &[OrderByInfo],
|
||||
) -> IndexInfo {
|
||||
unsafe {
|
||||
IndexInfo::from_ffi((self.implementation.best_idx)(
|
||||
constraints.as_ptr(),
|
||||
constraints.len() as i32,
|
||||
order_by.as_ptr(),
|
||||
order_by.len() as i32,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// takes ownership of the provided Args
|
||||
pub(crate) fn from_args(
|
||||
tbl_name: Option<&str>,
|
||||
module_name: &str,
|
||||
args: Vec<limbo_ext::Value>,
|
||||
syms: &SymbolTable,
|
||||
kind: VTabKind,
|
||||
exprs: Option<Vec<ast::Expr>>,
|
||||
) -> Result<Rc<Self>> {
|
||||
let module = syms
|
||||
.vtab_modules
|
||||
.get(module_name)
|
||||
.ok_or(LimboError::ExtensionError(format!(
|
||||
"Virtual table module not found: {}",
|
||||
module_name
|
||||
)))?;
|
||||
if let VTabKind::VirtualTable = kind {
|
||||
if module.module_kind == VTabKind::TableValuedFunction {
|
||||
return Err(LimboError::ExtensionError(format!(
|
||||
"{} is not a virtual table module",
|
||||
module_name
|
||||
)));
|
||||
}
|
||||
};
|
||||
let (schema, table_ptr) = module.implementation.create(args)?;
|
||||
let mut parser = Parser::new(schema.as_bytes());
|
||||
if let ast::Cmd::Stmt(ast::Stmt::CreateTable { body, .. }) = parser.next()?.ok_or(
|
||||
LimboError::ParseError("Failed to parse schema from virtual table module".to_string()),
|
||||
)? {
|
||||
let columns = columns_from_create_table_body(&body)?;
|
||||
let vtab = Rc::new(VirtualTable {
|
||||
name: tbl_name.unwrap_or(module_name).to_owned(),
|
||||
connection_ptr: RefCell::new(None),
|
||||
implementation: module.implementation.clone(),
|
||||
columns,
|
||||
args: exprs,
|
||||
kind,
|
||||
table_ptr,
|
||||
});
|
||||
return Ok(vtab);
|
||||
}
|
||||
Err(crate::LimboError::ParseError(
|
||||
"Failed to parse schema from virtual table module".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
/// Accepts a Weak pointer to the connection that owns the VTable, that the module
|
||||
/// can optionally use to query the other tables.
|
||||
pub fn open(&self, conn: Weak<Connection>) -> crate::Result<VTabOpaqueCursor> {
|
||||
// we need a Weak<Connection> to upgrade and call from the extension.
|
||||
let weak_box: *mut Weak<Connection> = Box::into_raw(Box::new(conn));
|
||||
let conn = limbo_ext::Conn::new(
|
||||
weak_box.cast(),
|
||||
crate::ext::prepare_stmt,
|
||||
crate::ext::execute,
|
||||
crate::ext::close,
|
||||
);
|
||||
let ext_conn_ptr = Box::into_raw(Box::new(conn));
|
||||
// store the leaked connection pointer on the table so it can be freed on drop
|
||||
*self.connection_ptr.borrow_mut() = Some(ext_conn_ptr);
|
||||
let cursor = unsafe { (self.implementation.open)(self.table_ptr, ext_conn_ptr) };
|
||||
VTabOpaqueCursor::new(cursor, self.implementation.close)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(cursor))]
|
||||
pub fn filter(
|
||||
&self,
|
||||
cursor: &VTabOpaqueCursor,
|
||||
idx_num: i32,
|
||||
idx_str: Option<String>,
|
||||
arg_count: usize,
|
||||
args: Vec<limbo_ext::Value>,
|
||||
) -> Result<bool> {
|
||||
tracing::trace!("xFilter");
|
||||
let c_idx_str = idx_str
|
||||
.map(|s| std::ffi::CString::new(s).unwrap())
|
||||
.map(|cstr| cstr.into_raw())
|
||||
.unwrap_or(std::ptr::null_mut());
|
||||
let rc = unsafe {
|
||||
(self.implementation.filter)(
|
||||
cursor.as_ptr(),
|
||||
arg_count as i32,
|
||||
args.as_ptr(),
|
||||
c_idx_str,
|
||||
idx_num,
|
||||
)
|
||||
};
|
||||
for arg in args {
|
||||
unsafe {
|
||||
arg.__free_internal_type();
|
||||
}
|
||||
}
|
||||
match rc {
|
||||
ResultCode::OK => Ok(true),
|
||||
ResultCode::EOF => Ok(false),
|
||||
_ => Err(LimboError::ExtensionError(rc.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn column(&self, cursor: &VTabOpaqueCursor, column: usize) -> Result<Value> {
|
||||
let val = unsafe { (self.implementation.column)(cursor.as_ptr(), column as u32) };
|
||||
Value::from_ffi(val)
|
||||
}
|
||||
|
||||
pub fn next(&self, cursor: &VTabOpaqueCursor) -> Result<bool> {
|
||||
let rc = unsafe { (self.implementation.next)(cursor.as_ptr()) };
|
||||
match rc {
|
||||
ResultCode::OK => Ok(true),
|
||||
ResultCode::EOF => Ok(false),
|
||||
_ => Err(LimboError::ExtensionError("Next failed".to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update(&self, args: &[Value]) -> Result<Option<i64>> {
|
||||
let arg_count = args.len();
|
||||
let ext_args = args.iter().map(|arg| arg.to_ffi()).collect::<Vec<_>>();
|
||||
let newrowid = 0i64;
|
||||
let rc = unsafe {
|
||||
(self.implementation.update)(
|
||||
self.table_ptr,
|
||||
arg_count as i32,
|
||||
ext_args.as_ptr(),
|
||||
&newrowid as *const _ as *mut i64,
|
||||
)
|
||||
};
|
||||
for arg in ext_args {
|
||||
unsafe {
|
||||
arg.__free_internal_type();
|
||||
}
|
||||
}
|
||||
match rc {
|
||||
ResultCode::OK => Ok(None),
|
||||
ResultCode::RowID => Ok(Some(newrowid)),
|
||||
_ => Err(LimboError::ExtensionError(rc.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn destroy(&self) -> Result<()> {
|
||||
let rc = unsafe { (self.implementation.destroy)(self.table_ptr) };
|
||||
match rc {
|
||||
ResultCode::OK => Ok(()),
|
||||
_ => Err(LimboError::ExtensionError(rc.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SymbolTable {
|
||||
pub functions: HashMap<String, Rc<function::ExternalFunc>>,
|
||||
pub vtabs: HashMap<String, Rc<VirtualTable>>,
|
||||
|
||||
202
core/vtab.rs
Normal file
202
core/vtab.rs
Normal file
@@ -0,0 +1,202 @@
|
||||
use crate::schema::Column;
|
||||
use crate::util::columns_from_create_table_body;
|
||||
use crate::vdbe::VTabOpaqueCursor;
|
||||
use crate::{Connection, LimboError, SymbolTable, Value};
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use limbo_ext::{ConstraintInfo, IndexInfo, OrderByInfo, ResultCode, VTabKind, VTabModuleImpl};
|
||||
use limbo_sqlite3_parser::{ast, lexer::sql::Parser};
|
||||
use std::cell::RefCell;
|
||||
use std::ffi::c_void;
|
||||
use std::rc::{Rc, Weak};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct VirtualTable {
|
||||
pub name: String,
|
||||
pub args: Option<Vec<ast::Expr>>,
|
||||
pub implementation: Rc<VTabModuleImpl>,
|
||||
pub columns: Vec<Column>,
|
||||
pub kind: VTabKind,
|
||||
table_ptr: *const c_void,
|
||||
connection_ptr: RefCell<Option<*mut limbo_ext::Conn>>,
|
||||
}
|
||||
|
||||
impl Drop for VirtualTable {
|
||||
fn drop(&mut self) {
|
||||
if let Some(conn) = self.connection_ptr.borrow_mut().take() {
|
||||
if conn.is_null() {
|
||||
return;
|
||||
}
|
||||
// free the memory for the limbo_ext::Conn itself
|
||||
let mut conn = unsafe { Box::from_raw(conn) };
|
||||
// frees the boxed Weak pointer
|
||||
conn.close();
|
||||
}
|
||||
*self.connection_ptr.borrow_mut() = None;
|
||||
}
|
||||
}
|
||||
|
||||
impl VirtualTable {
|
||||
pub(crate) fn rowid(&self, cursor: &VTabOpaqueCursor) -> i64 {
|
||||
unsafe { (self.implementation.rowid)(cursor.as_ptr()) }
|
||||
}
|
||||
|
||||
pub(crate) fn best_index(
|
||||
&self,
|
||||
constraints: &[ConstraintInfo],
|
||||
order_by: &[OrderByInfo],
|
||||
) -> IndexInfo {
|
||||
unsafe {
|
||||
IndexInfo::from_ffi((self.implementation.best_idx)(
|
||||
constraints.as_ptr(),
|
||||
constraints.len() as i32,
|
||||
order_by.as_ptr(),
|
||||
order_by.len() as i32,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// takes ownership of the provided Args
|
||||
pub(crate) fn from_args(
|
||||
tbl_name: Option<&str>,
|
||||
module_name: &str,
|
||||
args: Vec<limbo_ext::Value>,
|
||||
syms: &SymbolTable,
|
||||
kind: VTabKind,
|
||||
exprs: Option<Vec<ast::Expr>>,
|
||||
) -> crate::Result<Rc<Self>> {
|
||||
let module = syms
|
||||
.vtab_modules
|
||||
.get(module_name)
|
||||
.ok_or(LimboError::ExtensionError(format!(
|
||||
"Virtual table module not found: {}",
|
||||
module_name
|
||||
)))?;
|
||||
if let VTabKind::VirtualTable = kind {
|
||||
if module.module_kind == VTabKind::TableValuedFunction {
|
||||
return Err(LimboError::ExtensionError(format!(
|
||||
"{} is not a virtual table module",
|
||||
module_name
|
||||
)));
|
||||
}
|
||||
};
|
||||
let (schema, table_ptr) = module.implementation.create(args)?;
|
||||
let mut parser = Parser::new(schema.as_bytes());
|
||||
if let ast::Cmd::Stmt(ast::Stmt::CreateTable { body, .. }) = parser.next()?.ok_or(
|
||||
LimboError::ParseError("Failed to parse schema from virtual table module".to_string()),
|
||||
)? {
|
||||
let columns = columns_from_create_table_body(&body)?;
|
||||
let vtab = Rc::new(VirtualTable {
|
||||
name: tbl_name.unwrap_or(module_name).to_owned(),
|
||||
connection_ptr: RefCell::new(None),
|
||||
implementation: module.implementation.clone(),
|
||||
columns,
|
||||
args: exprs,
|
||||
kind,
|
||||
table_ptr,
|
||||
});
|
||||
return Ok(vtab);
|
||||
}
|
||||
Err(LimboError::ParseError(
|
||||
"Failed to parse schema from virtual table module".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
/// Accepts a Weak pointer to the connection that owns the VTable, that the module
|
||||
/// can optionally use to query the other tables.
|
||||
pub fn open(&self, conn: Weak<Connection>) -> crate::Result<VTabOpaqueCursor> {
|
||||
// we need a Weak<Connection> to upgrade and call from the extension.
|
||||
let weak_box: *mut Weak<Connection> = Box::into_raw(Box::new(conn));
|
||||
let conn = limbo_ext::Conn::new(
|
||||
weak_box.cast(),
|
||||
crate::ext::prepare_stmt,
|
||||
crate::ext::execute,
|
||||
crate::ext::close,
|
||||
);
|
||||
let ext_conn_ptr = Box::into_raw(Box::new(conn));
|
||||
// store the leaked connection pointer on the table so it can be freed on drop
|
||||
*self.connection_ptr.borrow_mut() = Some(ext_conn_ptr);
|
||||
let cursor = unsafe { (self.implementation.open)(self.table_ptr, ext_conn_ptr) };
|
||||
VTabOpaqueCursor::new(cursor, self.implementation.close)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(cursor))]
|
||||
pub fn filter(
|
||||
&self,
|
||||
cursor: &VTabOpaqueCursor,
|
||||
idx_num: i32,
|
||||
idx_str: Option<String>,
|
||||
arg_count: usize,
|
||||
args: Vec<limbo_ext::Value>,
|
||||
) -> crate::Result<bool> {
|
||||
tracing::trace!("xFilter");
|
||||
let c_idx_str = idx_str
|
||||
.map(|s| std::ffi::CString::new(s).unwrap())
|
||||
.map(|cstr| cstr.into_raw())
|
||||
.unwrap_or(std::ptr::null_mut());
|
||||
let rc = unsafe {
|
||||
(self.implementation.filter)(
|
||||
cursor.as_ptr(),
|
||||
arg_count as i32,
|
||||
args.as_ptr(),
|
||||
c_idx_str,
|
||||
idx_num,
|
||||
)
|
||||
};
|
||||
for arg in args {
|
||||
unsafe {
|
||||
arg.__free_internal_type();
|
||||
}
|
||||
}
|
||||
match rc {
|
||||
ResultCode::OK => Ok(true),
|
||||
ResultCode::EOF => Ok(false),
|
||||
_ => Err(LimboError::ExtensionError(rc.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn column(&self, cursor: &VTabOpaqueCursor, column: usize) -> crate::Result<Value> {
|
||||
let val = unsafe { (self.implementation.column)(cursor.as_ptr(), column as u32) };
|
||||
Value::from_ffi(val)
|
||||
}
|
||||
|
||||
pub fn next(&self, cursor: &VTabOpaqueCursor) -> crate::Result<bool> {
|
||||
let rc = unsafe { (self.implementation.next)(cursor.as_ptr()) };
|
||||
match rc {
|
||||
ResultCode::OK => Ok(true),
|
||||
ResultCode::EOF => Ok(false),
|
||||
_ => Err(LimboError::ExtensionError("Next failed".to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update(&self, args: &[Value]) -> crate::Result<Option<i64>> {
|
||||
let arg_count = args.len();
|
||||
let ext_args = args.iter().map(|arg| arg.to_ffi()).collect::<Vec<_>>();
|
||||
let newrowid = 0i64;
|
||||
let rc = unsafe {
|
||||
(self.implementation.update)(
|
||||
self.table_ptr,
|
||||
arg_count as i32,
|
||||
ext_args.as_ptr(),
|
||||
&newrowid as *const _ as *mut i64,
|
||||
)
|
||||
};
|
||||
for arg in ext_args {
|
||||
unsafe {
|
||||
arg.__free_internal_type();
|
||||
}
|
||||
}
|
||||
match rc {
|
||||
ResultCode::OK => Ok(None),
|
||||
ResultCode::RowID => Ok(Some(newrowid)),
|
||||
_ => Err(LimboError::ExtensionError(rc.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn destroy(&self) -> crate::Result<()> {
|
||||
let rc = unsafe { (self.implementation.destroy)(self.table_ptr) };
|
||||
match rc {
|
||||
ResultCode::OK => Ok(()),
|
||||
_ => Err(LimboError::ExtensionError(rc.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user