Merge 'xConnect for virtual tables to query core db connection' from Preston Thorpe

Re-Opening #1076 because it had bit-rotted to a point of no return.
However it has improved. Now with Weak references and no incrementing Rc
strong counts.
This also includes a better test extension that returns info about the
other tables in the schema.
![image](https://github.com/user-
attachments/assets/4292dc9c-121e-4ba2-8a51-4533bbcf2afd)
(theme doesn't show rows column)

Closes #1366
This commit is contained in:
Jussi Saurio
2025-05-25 14:37:38 +03:00
16 changed files with 1058 additions and 84 deletions

View File

@@ -1,5 +1,6 @@
#[cfg(feature = "fs")]
mod dynamic;
mod vtab_xconnect;
#[cfg(all(target_os = "linux", feature = "io_uring"))]
use crate::UringIO;
use crate::{function::ExternalFunc, Connection, Database, LimboError, IO};
@@ -14,6 +15,7 @@ use std::{
rc::Rc,
sync::Arc,
};
pub use vtab_xconnect::{close, execute, prepare_stmt};
type ExternAggFunc = (InitAggFunction, StepFunction, FinalizeFunction);
#[derive(Clone)]

282
core/ext/vtab_xconnect.rs Normal file
View File

@@ -0,0 +1,282 @@
use crate::{types::Value, Connection, Statement, StepResult};
use limbo_ext::{Conn as ExtConn, ResultCode, Stmt, Value as ExtValue};
use std::{
boxed::Box,
ffi::{c_char, c_void, CStr, CString},
num::NonZeroUsize,
ptr,
rc::Weak,
};
/// Free memory for the internal context of the connection.
/// This function does not close the core connection itself,
/// it only frees the memory the table is responsible for.
pub unsafe extern "C" fn close(ctx: *mut c_void) {
if ctx.is_null() {
return;
}
// only free the memory for the boxed connection, we don't upgrade
// or actually close the core connection, as we were 'sharing' it.
let _ = Box::from_raw(ctx as *mut Weak<Connection>);
}
/// Wrapper around core Connection::execute with optional arguments to bind
/// to the statment This function takes ownership of the optional limbo_ext::Value array if provided
pub unsafe extern "C" fn execute(
ctx: *mut ExtConn,
sql: *const c_char,
args: *mut ExtValue,
arg_count: i32,
last_insert_rowid: *mut i64,
) -> ResultCode {
let c_str = unsafe { CStr::from_ptr(sql as *mut c_char) };
let sql_str = match c_str.to_str() {
Ok(s) => s.to_string(),
Err(_) => {
tracing::error!("query: failed to convert sql to string");
return ResultCode::Error;
}
};
let Ok(extcon) = ExtConn::from_ptr(ctx) else {
tracing::error!("query: null connection");
return ResultCode::Error;
};
let weak_ptr = extcon._ctx as *const Weak<Connection>;
let weak = &*weak_ptr;
let Some(conn) = weak.upgrade() else {
tracing::error!("prepare_stmt: failed to upgrade weak pointer in prepare stmt");
return ResultCode::Error;
};
match conn.query(&sql_str) {
Ok(Some(mut stmt)) => {
if arg_count > 0 {
let args_slice = &mut std::slice::from_raw_parts_mut(args, arg_count as usize);
for (i, val) in args_slice.iter_mut().enumerate() {
stmt.bind_at(
NonZeroUsize::new(i + 1).unwrap(),
Value::from_ffi(std::mem::take(val)).unwrap_or(Value::Null),
);
}
}
loop {
match stmt.step() {
Ok(StepResult::Row) => {
tracing::error!("execute used for query returning a row");
return ResultCode::Error;
}
Ok(StepResult::Done) => {
*last_insert_rowid = conn.last_insert_rowid() as i64;
return ResultCode::OK;
}
Ok(StepResult::IO) => {
let _ = conn.pager.io.run_once();
continue;
}
Ok(StepResult::Interrupt) => return ResultCode::Interrupt,
Ok(StepResult::Busy) => return ResultCode::Busy,
Err(e) => {
tracing::error!("execute: failed to execute query: {:?}", e);
return ResultCode::Error;
}
}
}
}
Ok(None) => tracing::error!("query: no statement returned"),
Err(e) => tracing::error!("query: failed to execute query: {:?}", e),
};
ResultCode::Error
}
/// Wraps core Connection::prepare with a custom Stmt object with the necessary function pointers.
/// This object is boxed/leaked and the caller is responsible for freeing the memory.
pub unsafe extern "C" fn prepare_stmt(ctx: *mut ExtConn, sql: *const c_char) -> *mut Stmt {
let c_str = unsafe { CStr::from_ptr(sql as *mut c_char) };
let sql_str = match c_str.to_str() {
Ok(s) => s.to_string(),
Err(_) => {
tracing::error!("prepare_stmt: failed to convert sql to string");
return ptr::null_mut();
}
};
let Ok(extcon) = ExtConn::from_ptr(ctx) else {
tracing::error!("prepare_stmt: null connection");
return ptr::null_mut();
};
let weak_ptr = extcon._ctx as *const Weak<Connection>;
let weak = &*weak_ptr;
let Some(conn) = weak.upgrade() else {
tracing::error!("prepare_stmt: failed to upgrade weak pointer in prepare stmt");
return ptr::null_mut();
};
match conn.prepare(&sql_str) {
Ok(stmt) => {
let raw_stmt = Box::into_raw(Box::new(stmt)) as *mut c_void;
Box::into_raw(Box::new(Stmt::new(
extcon._ctx,
raw_stmt,
stmt_bind_args_fn,
stmt_step,
stmt_get_row,
stmt_get_column_names,
stmt_free_current_row,
stmt_close,
)))
}
Err(e) => {
tracing::error!("prepare_stmt: failed to prepare statement: {:?}", e);
ptr::null_mut()
}
}
}
/// This function expects 1 based indexing. Wraps core statement bind_at functionality
/// this function does not take ownership of the provided arg value
pub unsafe extern "C" fn stmt_bind_args_fn(ctx: *mut Stmt, idx: i32, arg: ExtValue) -> ResultCode {
let Ok(stmt) = Stmt::from_ptr(ctx) else {
tracing::error!("prepare_stmt: null stmt pointer");
return ResultCode::Error;
};
let stmt_ctx: &mut Statement = unsafe { &mut *(stmt._ctx as *mut Statement) };
// from_ffi takes ownership
let Ok(owned_val) = Value::from_ffi(arg) else {
tracing::error!("stmt_bind_args_fn: failed to convert arg to Value");
return ResultCode::Error;
};
let Some(idx) = NonZeroUsize::new(idx as usize) else {
tracing::error!("stmt_bind_args_fn: invalid index");
return ResultCode::Error;
};
stmt_ctx.bind_at(idx, owned_val);
ResultCode::OK
}
/// Wraps the functionality of the core Statement::step function,
/// preferring to handle the IO step result internally to prevent having to expose
/// run_once. Returns the equivalent ResultCode which then maps to an external StepResult.
pub unsafe extern "C" fn stmt_step(stmt: *mut Stmt) -> ResultCode {
let Ok(stmt) = Stmt::from_ptr(stmt) else {
tracing::error!("stmt_step: failed to convert stmt to Stmt");
return ResultCode::Error;
};
if stmt._conn.is_null() || stmt._ctx.is_null() {
tracing::error!("stmt_step: null connection or context");
return ResultCode::Error;
}
let conn: &Connection = unsafe { &*(stmt._conn as *const Connection) };
let stmt_ctx: &mut Statement = unsafe { &mut *(stmt._ctx as *mut Statement) };
while let Ok(res) = stmt_ctx.step() {
match res {
StepResult::Row => return ResultCode::Row,
StepResult::Done => return ResultCode::EOF,
StepResult::IO => {
// always handle IO step result internally.
let _ = conn.pager.io.run_once();
continue;
}
StepResult::Interrupt => return ResultCode::Interrupt,
StepResult::Busy => return ResultCode::Busy,
}
}
ResultCode::Error
}
/// Instead of returning a pointer to the row, sets the Stmt's 'cursor'/current_row
/// to the next result row, and then the caller can access the resulting value on the Stmt.
pub unsafe extern "C" fn stmt_get_row(ctx: *mut Stmt) {
let Ok(stmt) = Stmt::from_ptr(ctx) else {
tracing::error!("stmt_get_row: failed to convert stmt to Stmt");
return;
};
if !stmt.current_row.is_null() {
stmt.free_current_row();
}
let stmt_ctx: &mut Statement = unsafe { &mut *(stmt._ctx as *mut Statement) };
if let Some(row) = stmt_ctx.row() {
let values = row.get_values();
let mut owned_values = Vec::with_capacity(row.len());
for value in values {
owned_values.push(Value::to_ffi(value));
}
stmt.current_row = Box::into_raw(owned_values.into_boxed_slice()) as *mut ExtValue;
stmt.current_row_len = row.len() as i32;
} else {
stmt.current_row_len = 0;
}
}
/// Free the memory of the current row/cursor of the Stmt object.
pub unsafe extern "C" fn stmt_free_current_row(ctx: *mut Stmt) {
let Ok(stmt) = Stmt::from_ptr(ctx) else {
return;
};
if !stmt.current_row.is_null() {
let values: &mut [ExtValue] =
std::slice::from_raw_parts_mut(stmt.current_row, stmt.current_row_len as usize);
for value in values.iter_mut() {
let owned_value = std::mem::take(value);
owned_value.__free_internal_type();
}
let _ = Box::from_raw(stmt.current_row);
}
}
/// Provides an easier API to get all the result column names associated with
/// the prepared Statement. The caller is responsible for freeing the memory
pub unsafe extern "C" fn stmt_get_column_names(
ctx: *mut Stmt,
count: *mut i32,
) -> *mut *mut c_char {
if !count.is_null() {
*count = 0;
}
let Ok(stmt) = Stmt::from_ptr(ctx) else {
tracing::error!("stmt_get_column_names: null Stmt pointer");
return ptr::null_mut();
};
let stmt_ctx: &mut Statement = unsafe { &mut *(stmt._ctx as *mut Statement) };
let num_cols = stmt_ctx.num_columns();
if num_cols == 0 {
tracing::info!("stmt_get_column_names: no columns");
return ptr::null_mut();
}
let mut names: Vec<*mut c_char> = Vec::with_capacity(num_cols);
// collect all the column names and convert them to C strings to send back
for i in 0..num_cols {
let name = stmt_ctx.get_column_name(i);
match CString::new(name.as_bytes()) {
Ok(cstr) => names.push(cstr.into_raw()),
Err(_) => {
// fall-back: free what we allocated so far
for p in names {
let _ = CString::from_raw(p);
}
return std::ptr::null_mut();
}
}
}
if !count.is_null() {
*count = names.len() as i32;
}
Box::into_raw(names.into_boxed_slice()) as *mut *mut c_char
}
/// Ffi/extension wrapper around core Statement::reset and
/// cleans up resources associated with the Statement
pub unsafe extern "C" fn stmt_close(stmt: *mut Stmt) {
if stmt.is_null() {
return;
}
let mut wrapper = Box::from_raw(stmt);
if wrapper._ctx.is_null() {
// already closed
return;
}
// clean up the current row if it exists
if !wrapper.current_row.is_null() {
wrapper.free_current_row();
}
// free the managed internal context
let mut internal = Box::<Statement>::from_raw(wrapper._ctx.cast());
internal.reset();
}

View File

@@ -46,6 +46,7 @@ use limbo_sqlite3_parser::{ast, ast::Cmd, lexer::sql::Parser};
use parking_lot::RwLock;
use schema::{Column, Schema};
use std::ffi::c_void;
use std::rc::Weak;
use std::{
borrow::Cow,
cell::{Cell, RefCell, UnsafeCell},
@@ -775,6 +776,22 @@ pub struct VirtualTable {
columns: Vec<Column>,
kind: VTabKind,
table_ptr: *const c_void,
connection_ptr: RefCell<Option<*mut limbo_ext::Conn>>,
}
impl Drop for VirtualTable {
fn drop(&mut self) {
if let Some(conn) = self.connection_ptr.borrow_mut().take() {
if conn.is_null() {
return;
}
// free the memory for the limbo_ext::Conn itself
let mut conn = unsafe { Box::from_raw(conn) };
// frees the boxed Weak pointer
conn.close();
}
*self.connection_ptr.borrow_mut() = None;
}
}
impl VirtualTable {
@@ -796,6 +813,7 @@ impl VirtualTable {
))
}
}
/// takes ownership of the provided Args
pub(crate) fn from_args(
tbl_name: Option<&str>,
@@ -820,7 +838,7 @@ impl VirtualTable {
)));
}
};
let (schema, table_ptr) = module.implementation.as_ref().create(args)?;
let (schema, table_ptr) = module.implementation.create(args)?;
let mut parser = Parser::new(schema.as_bytes());
if let ast::Cmd::Stmt(ast::Stmt::CreateTable { body, .. }) = parser.next()?.ok_or(
LimboError::ParseError("Failed to parse schema from virtual table module".to_string()),
@@ -828,6 +846,7 @@ impl VirtualTable {
let columns = columns_from_create_table_body(&body)?;
let vtab = Rc::new(VirtualTable {
name: tbl_name.unwrap_or(module_name).to_owned(),
connection_ptr: RefCell::new(None),
implementation: module.implementation.clone(),
columns,
args: exprs,
@@ -841,8 +860,21 @@ impl VirtualTable {
))
}
pub fn open(&self) -> crate::Result<VTabOpaqueCursor> {
let cursor = unsafe { (self.implementation.open)(self.table_ptr) };
/// Accepts a Weak pointer to the connection that owns the VTable, that the module
/// can optionally use to query the other tables.
pub fn open(&self, conn: Weak<Connection>) -> crate::Result<VTabOpaqueCursor> {
// we need a Weak<Connection> to upgrade and call from the extension.
let weak_box: *mut Weak<Connection> = Box::into_raw(Box::new(conn));
let conn = limbo_ext::Conn::new(
weak_box.cast(),
crate::ext::prepare_stmt,
crate::ext::execute,
crate::ext::close,
);
let ext_conn_ptr = Box::into_raw(Box::new(conn));
// store the leaked connection pointer on the table so it can be freed on drop
*self.connection_ptr.borrow_mut() = Some(ext_conn_ptr);
let cursor = unsafe { (self.implementation.open)(self.table_ptr, ext_conn_ptr) };
VTabOpaqueCursor::new(cursor, self.implementation.close)
}

View File

@@ -391,6 +391,10 @@ pub fn open_loop(
// translate the opposite side of the referenced vtab column
let expr = if is_rhs { lhs } else { rhs };
// argv_index is 1-based; adjust to get the proper register offset.
if argv_index == 0 {
// invalid since argv_index is 1-based
continue;
}
let target_reg =
start_reg + (argv_index - 1) as usize;
translate_expr(

View File

@@ -993,7 +993,7 @@ pub fn op_vopen(
let CursorType::VirtualTable(virtual_table) = cursor_type else {
panic!("VOpen on non-virtual table cursor");
};
let cursor = virtual_table.open()?;
let cursor = virtual_table.open(program.connection.clone())?;
state
.cursors
.borrow_mut()

View File

@@ -3,9 +3,12 @@
mod keywords;
use std::rc::Rc;
use keywords::KEYWORDS;
use limbo_ext::{
register_extension, ResultCode, VTabCursor, VTabModule, VTabModuleDerive, VTable, Value,
register_extension, Connection, ResultCode, VTabCursor, VTabModule, VTabModuleDerive, VTable,
Value,
};
register_extension! {
@@ -84,7 +87,7 @@ impl VTable for CompletionTable {
type Cursor = CompletionCursor;
type Error = ResultCode;
fn open(&self) -> Result<Self::Cursor, Self::Error> {
fn open(&self, _conn: Option<Rc<Connection>>) -> Result<Self::Cursor, Self::Error> {
Ok(CompletionCursor::default())
}
}

View File

@@ -200,7 +200,7 @@ impl VTable for CsvTable {
type Error = &'static str;
/// Open to return a new cursor: In this simple example, the CSV file is read completely into memory on connect.
fn open(&self) -> Result<Self::Cursor, Self::Error> {
fn open(&self, conn: Option<Rc<Connection>>) -> Result<Self::Cursor, Self::Error> {
// Read CSV file contents from "data.csv"
let csv_content = fs::read_to_string("data.csv").unwrap_or_default();
// For simplicity, we'll ignore the header row.
@@ -213,7 +213,9 @@ impl VTable for CsvTable {
.collect()
})
.collect();
Ok(CsvCursor { rows, index: 0 })
// store the connection for later use. Connection is Option to allow writing tests for your module
// but will be available to use by storing on your Cursor implementation
Ok(CsvCursor { rows, index: 0, connection: conn.unwrap() })
}
/// *Optional* methods for non-readonly tables
@@ -238,6 +240,7 @@ impl VTable for CsvTable {
struct CsvCursor {
rows: Vec<Vec<String>>,
index: usize,
connection: Rc<Connection>,
}
/// Implement the VTabCursor trait for your cursor type
@@ -245,7 +248,7 @@ impl VTabCursor for CsvCursor {
type Error = &'static str;
/// Filter through result columns. (not used in this simple example)
fn filter(&mut self, _args: &[Value], _idx_info: Option<(&str, i32)>) -> ResultCode {
fn filter(&mut self, args: &[Value], _idx_info: Option<(&str, i32)>) -> ResultCode {
ResultCode::OK
}
@@ -280,6 +283,35 @@ impl VTabCursor for CsvCursor {
}
```
### Using the core Connection:
You can use the `Rc<Connection>` to query the same underlying connection that creates the VTable:
```rust
let mut stmt = self.connection.prepare("SELECT col FROM table where name = ?;");
stmt.bind_at(NonZeroUsize::new(1).unwrap(), args[0]);
/// use the connection similarly to the API of the core library
while let StepResult::Row = stmt.step() {
let row = stmt.get_row();
if let Some(val) = row.first() {
// access values
println!("result: {:?}", val);
}
}
stmt.close();
if let Ok(Some(last_insert_rowid)) = conn.execute("INSERT INTO table (col, name) VALUES ('test', 'data')") {
println!("rowid of insert: {:?}", last_insert_rowid);
}
```
### VFS Example
**NOTE**: Requires 'vfs' feature enabled.

View File

@@ -11,13 +11,14 @@ use functions::{RegisterAggFn, RegisterScalarFn};
pub use limbo_macros::VfsDerive;
pub use limbo_macros::{register_extension, scalar, AggregateDerive, VTabModuleDerive};
use std::os::raw::c_void;
pub use types::{ResultCode, Value, ValueType};
pub use types::{ResultCode, StepResult, Value, ValueType};
#[cfg(feature = "vfs")]
pub use vfs_modules::{RegisterVfsFn, VfsExtension, VfsFile, VfsFileImpl, VfsImpl, VfsInterface};
use vtabs::RegisterModuleFn;
pub use vtabs::{
ConstraintInfo, ConstraintOp, ConstraintUsage, ExtIndexInfo, IndexInfo, OrderByInfo,
VTabCreateResult, VTabCursor, VTabKind, VTabModule, VTabModuleImpl, VTable,
Conn, Connection, ConstraintInfo, ConstraintOp, ConstraintUsage, ExtIndexInfo, IndexInfo,
OrderByInfo, Statement, Stmt, VTabCreateResult, VTabCursor, VTabKind, VTabModule,
VTabModuleImpl, VTable,
};
pub type ExtResult<T> = std::result::Result<T, ResultCode>;

View File

@@ -23,6 +23,9 @@ pub enum ResultCode {
EOF = 15,
ReadOnly = 16,
RowID = 17,
Row = 18,
Interrupt = 19,
Busy = 20,
}
impl ResultCode {
@@ -60,6 +63,34 @@ impl Display for ResultCode {
ResultCode::EOF => write!(f, "EOF"),
ResultCode::ReadOnly => write!(f, "Read Only"),
ResultCode::RowID => write!(f, "RowID"),
ResultCode::Row => write!(f, "Row"),
ResultCode::Interrupt => write!(f, "Interrupt"),
ResultCode::Busy => write!(f, "Busy"),
}
}
}
#[repr(C)]
#[derive(PartialEq, Debug, Eq, Clone, Copy)]
/// StepResult is used to represent the state of a query as it is exposed
/// to the public API of a connection in a virtual table extension.
/// the IO variant is always handled internally and therefore is not included here.
pub enum StepResult {
Error,
Row,
Done,
Interrupt,
Busy,
}
impl From<ResultCode> for StepResult {
fn from(code: ResultCode) -> Self {
match code {
ResultCode::Error => StepResult::Error,
ResultCode::Row => StepResult::Row,
ResultCode::EOF => StepResult::Done,
ResultCode::Interrupt => StepResult::Interrupt,
ResultCode::Busy => StepResult::Busy,
_ => StepResult::Error,
}
}
}
@@ -80,6 +111,11 @@ pub struct Value {
value_type: ValueType,
value: ValueData,
}
impl Default for Value {
fn default() -> Self {
Self::null()
}
}
#[repr(C)]
union ValueData {

View File

@@ -1,5 +1,9 @@
use crate::{ResultCode, Value};
use std::ffi::{c_char, c_void};
use crate::{types::StepResult, ExtResult, ResultCode, Value};
use std::{
ffi::{c_char, c_void, CStr, CString},
num::NonZeroUsize,
rc::Rc,
};
pub type RegisterModuleFn = unsafe extern "C" fn(
ctx: *mut c_void,
@@ -68,7 +72,7 @@ impl VTabModuleImpl {
pub type VtabFnCreate = unsafe extern "C" fn(args: *const Value, argc: i32) -> VTabCreateResult;
pub type VtabFnOpen = unsafe extern "C" fn(table: *const c_void) -> *const c_void;
pub type VtabFnOpen = unsafe extern "C" fn(table: *const c_void, conn: *mut Conn) -> *const c_void;
pub type VtabFnClose = unsafe extern "C" fn(cursor: *const c_void) -> ResultCode;
@@ -125,7 +129,9 @@ pub trait VTable {
type Cursor: VTabCursor<Error = Self::Error>;
type Error: std::fmt::Display;
fn open(&self) -> Result<Self::Cursor, Self::Error>;
/// 'conn' is an Option to allow for testing. Otherwise a valid connection to the core database
/// that created the virtual table will be available to use in your extension here.
fn open(&self, _conn: Option<Rc<Connection>>) -> Result<Self::Cursor, Self::Error>;
fn update(&mut self, _rowid: i64, _args: &[Value]) -> Result<(), Self::Error> {
Ok(())
}
@@ -336,3 +342,334 @@ impl ConstraintInfo {
((self.plan_info >> 1) as usize, (self.plan_info & 1) != 0)
}
}
pub type PrepareStmtFn = unsafe extern "C" fn(api: *mut Conn, sql: *const c_char) -> *mut Stmt;
pub type ExecuteFn = unsafe extern "C" fn(
ctx: *mut Conn,
sql: *const c_char,
args: *mut Value,
arg_count: i32,
last_insert_rowid: *mut i64,
) -> ResultCode;
pub type GetColumnNamesFn =
unsafe extern "C" fn(ctx: *mut Stmt, count: *mut i32) -> *mut *mut c_char;
pub type BindArgsFn = unsafe extern "C" fn(ctx: *mut Stmt, idx: i32, arg: Value) -> ResultCode;
pub type StmtStepFn = unsafe extern "C" fn(ctx: *mut Stmt) -> ResultCode;
pub type StmtGetRowValuesFn = unsafe extern "C" fn(ctx: *mut Stmt);
pub type FreeCurrentRowFn = unsafe extern "C" fn(ctx: *mut Stmt);
pub type CloseConnectionFn = unsafe extern "C" fn(ctx: *mut c_void);
pub type CloseStmtFn = unsafe extern "C" fn(ctx: *mut Stmt);
/// core database connection
/// public fields for core only
#[repr(C)]
#[derive(Debug, Clone)]
pub struct Conn {
// boxed Rc::Weak from core::Connection
pub _ctx: *mut c_void,
pub _prepare_stmt: PrepareStmtFn,
pub _execute: ExecuteFn,
pub _close: CloseConnectionFn,
}
impl Conn {
pub fn new(
ctx: *mut c_void,
prepare_stmt: PrepareStmtFn,
exec_fn: ExecuteFn,
close: CloseConnectionFn,
) -> Self {
Conn {
_ctx: ctx,
_prepare_stmt: prepare_stmt,
_execute: exec_fn,
_close: close,
}
}
/// # Safety
/// Dereferences a null pointer with a null check
pub unsafe fn from_ptr(ptr: *mut Conn) -> crate::ExtResult<&'static mut Self> {
if ptr.is_null() {
return Err(ResultCode::Error);
}
Ok(unsafe { &mut *(ptr) })
}
pub fn close(&mut self) {
if self._ctx.is_null() {
return;
}
unsafe { (self._close)(self._ctx) };
self._ctx = std::ptr::null_mut();
}
/// execute a SQL statement with the given arguments.
/// optionally returns the last inserted rowid for the query
pub fn execute(&self, sql: &str, args: &[Value]) -> crate::ExtResult<Option<usize>> {
let Ok(sql) = CString::new(sql) else {
return Err(ResultCode::Error);
};
let arg_count = args.len() as i32;
let args = args.as_ptr();
let last_insert_rowid = 0;
if let ResultCode::OK = unsafe {
(self._execute)(
self as *const _ as *mut Conn,
sql.as_ptr(),
args as *mut Value,
arg_count,
&last_insert_rowid as *const _ as *mut i64,
)
} {
return Ok(Some(last_insert_rowid as usize));
}
Err(ResultCode::Error)
}
pub fn prepare_stmt(&self, sql: &str) -> *mut Stmt {
let Ok(sql) = CString::new(sql) else {
return std::ptr::null_mut();
};
unsafe { (self._prepare_stmt)(self as *const _ as *mut Conn, sql.as_ptr()) }
}
}
/// Prepared statement for querying a core database connection public API for extensions
/// Statements can be manually closed.
#[derive(Debug)]
#[repr(transparent)]
pub struct Statement(*mut Stmt);
impl Drop for Statement {
fn drop(&mut self) {
if self.0.is_null() {
return;
}
unsafe { (*self.0).close() }
}
}
/// Public API for methods to allow extensions to query other tables for
/// the connection that opened the VTable. This value and its resources are cleaned up when
/// the VTable is dropped, so there is no need to manually close the connection.
#[derive(Debug)]
#[repr(transparent)]
pub struct Connection(*mut Conn);
impl Connection {
pub fn new(ctx: *mut Conn) -> Self {
Connection(ctx)
}
/// From the included SQL string, prepare a statement for execution.
pub fn prepare(self: &Rc<Self>, sql: &str) -> ExtResult<Statement> {
let stmt = unsafe { (*self.0).prepare_stmt(sql) };
if stmt.is_null() {
return Err(ResultCode::Error);
}
Ok(Statement(stmt))
}
/// Execute a SQL statement with the given arguments.
/// Optionally returns the last inserted rowid for the query.
pub fn execute(self: &Rc<Self>, sql: &str, args: &[Value]) -> crate::ExtResult<Option<usize>> {
if self.0.is_null() {
return Err(ResultCode::Error);
}
unsafe { (*self.0).execute(sql, args) }
}
}
impl Statement {
/// Bind a value to a parameter in the prepared statement.
///```ignore
/// let stmt = conn.prepare_stmt("select * from users where name = ?");
/// stmt.bind_at(1, Value::from_text("test".into()));
///```
pub fn bind_at(&self, idx: NonZeroUsize, arg: Value) {
unsafe {
(*self.0).bind_args(idx, arg);
}
}
/// Execute the statement and return the next row
///```ignore
/// while stmt.step() == StepResult::Row {
/// let row = stmt.get_row();
/// println!("row: {:?}", row);
/// }
/// ```
pub fn step(&self) -> StepResult {
unsafe { (*self.0).step() }
}
// Get the current row values
///```ignore
/// while stmt.step() == StepResult::Row {
/// let row = stmt.get_row();
/// println!("row: {:?}", row);
///```
pub fn get_row(&mut self) -> &[Value] {
unsafe { (*self.0).get_row() }
}
/// Get the result column names for the prepared statement
pub fn get_column_names(&self) -> Vec<String> {
unsafe { (*self.0).get_column_names() }
}
/// Close the statement and clean up resources.
pub fn close(self) {
if self.0.is_null() {
return;
}
unsafe { (*self.0).close() }
}
}
/// Internal/core use _only_
/// Extensions should not import or use this type directly
#[repr(C)]
pub struct Stmt {
// Rc::into_raw from core::Connection
pub _conn: *mut c_void,
// Rc::into_raw from core::Statement
pub _ctx: *mut c_void,
pub _bind_args_fn: BindArgsFn,
pub _step: StmtStepFn,
pub _get_row_values: StmtGetRowValuesFn,
pub _get_column_names: GetColumnNamesFn,
pub _free_current_row: FreeCurrentRowFn,
pub _close: CloseStmtFn,
pub current_row: *mut Value,
pub current_row_len: i32,
}
impl Stmt {
#[allow(clippy::too_many_arguments)]
pub fn new(
conn: *mut c_void,
ctx: *mut c_void,
bind: BindArgsFn,
step: StmtStepFn,
rows: StmtGetRowValuesFn,
names: GetColumnNamesFn,
free_row: FreeCurrentRowFn,
close: CloseStmtFn,
) -> Self {
Stmt {
_conn: conn,
_ctx: ctx,
_bind_args_fn: bind,
_step: step,
_get_row_values: rows,
_get_column_names: names,
_free_current_row: free_row,
_close: close,
current_row: std::ptr::null_mut(),
current_row_len: -1,
}
}
/// Close the statement
pub fn close(&mut self) {
// null check to prevent double free
if self._ctx.is_null() {
return;
}
unsafe { (self._close)(self as *const Stmt as *mut Stmt) };
self._ctx = std::ptr::null_mut();
}
/// # Safety
/// Derefs a null ptr, does a null check first
pub unsafe fn from_ptr(ptr: *mut Stmt) -> ExtResult<&'static mut Self> {
if ptr.is_null() {
return Err(ResultCode::Error);
}
Ok(unsafe { &mut *(ptr) })
}
/// Returns the pointer to the statement.
pub fn to_ptr(&self) -> *mut Stmt {
self as *const Stmt as *mut Stmt
}
/// Bind a value to a parameter in the prepared statement
/// Own the value so it can be freed in core
fn bind_args(&self, idx: NonZeroUsize, arg: Value) {
unsafe {
(self._bind_args_fn)(self.to_ptr(), idx.get() as i32, arg);
};
}
/// Execute the statement to attempt to retrieve the next result row.
fn step(&self) -> StepResult {
unsafe { (self._step)(self.to_ptr()) }.into()
}
/// Free the memory for the values obtained from the `get_row` method.
/// This is easier done on core side because __free_internal_type is 'core_only'
/// feature to prevent extensions causing memory issues.
/// # Safety
/// This fn is unsafe because it derefs a raw pointer after null and
/// length checks. This fn should only be called with the pointer returned from get_row.
pub unsafe fn free_current_row(&mut self) {
if self.current_row.is_null() || self.current_row_len <= 0 {
return;
}
// free from the core side so we don't have to expose `__free_internal_type`
(self._free_current_row)(self.to_ptr());
self.current_row = std::ptr::null_mut();
self.current_row_len = -1;
}
/// Returns the values from the current row in the prepared statement, should
/// be called after the step() method returns `StepResult::Row`
pub fn get_row(&self) -> &[Value] {
unsafe { (self._get_row_values)(self.to_ptr()) };
if self.current_row.is_null() || self.current_row_len < 1 {
return &[];
}
let col_count = self.current_row_len;
unsafe { std::slice::from_raw_parts(self.current_row, col_count as usize) }
}
/// Returns the names of the result columns for the prepared statement.
pub fn get_column_names(&self) -> Vec<String> {
let mut count_value: i32 = 0;
let count: *mut i32 = &mut count_value;
let col_names = unsafe { (self._get_column_names)(self.to_ptr(), count) };
if col_names.is_null() || count_value == 0 {
return Vec::new();
}
let mut names = Vec::new();
let slice = unsafe { std::slice::from_raw_parts(col_names, count_value as usize) };
for x in slice {
let name = unsafe { CStr::from_ptr(*x) };
names.push(name.to_str().unwrap().to_string());
}
unsafe { free_column_names(col_names, count_value) };
names
}
}
/// Free the column names returned from get_column_names
/// # Safety
/// This function is unsafe because it derefs a raw pointer, this fn
/// should only be called with the pointer returned from get_column_names
/// only when they will no longer be used.
pub unsafe fn free_column_names(names: *mut *mut c_char, count: i32) {
if names.is_null() || count < 1 {
return;
}
let slice = std::slice::from_raw_parts_mut(names, count as usize);
for name in slice {
if !name.is_null() {
let _ = CString::from_raw(*name);
}
}
let _ = Box::from_raw(names);
}

View File

@@ -21,11 +21,12 @@
//! - `columns` — number of columns
//! - `schema` — optional custom SQL `CREATE TABLE` schema
use limbo_ext::{
register_extension, ConstraintInfo, IndexInfo, OrderByInfo, ResultCode, VTabCursor, VTabKind,
VTabModule, VTabModuleDerive, VTable, Value,
register_extension, Connection, ConstraintInfo, IndexInfo, OrderByInfo, ResultCode, VTabCursor,
VTabKind, VTabModule, VTabModuleDerive, VTable, Value,
};
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::rc::Rc;
register_extension! {
vtabs: { CsvVTabModule }
@@ -220,7 +221,7 @@ impl VTabModule for CsvVTabModule {
sql.push_str(", ");
}
}
sql.push_str(")");
sql.push(')');
schema = Some(sql);
}
@@ -259,7 +260,7 @@ impl VTable for CsvTable {
type Cursor = CsvCursor;
type Error = ResultCode;
fn open(&self) -> Result<Self::Cursor, Self::Error> {
fn open(&self, _conn: Option<Rc<Connection>>) -> Result<Self::Cursor, Self::Error> {
match self.new_reader() {
Ok(reader) => Ok(CsvCursor::new(reader, self)),
Err(_) => Err(ResultCode::Error),
@@ -455,7 +456,7 @@ mod tests {
&format!("filename={}", file.path().to_string_lossy()),
"header=true",
]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert_eq!(
rows,
@@ -469,7 +470,7 @@ mod tests {
#[test]
fn test_data_with_header() {
let table = new_table(vec!["data=id,name\n1,Alice\n2,Bob\n", "header=true"]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert_eq!(
rows,
@@ -487,7 +488,7 @@ mod tests {
&format!("filename={}", file.path().to_string_lossy()),
"header=false",
]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert_eq!(
rows,
@@ -501,7 +502,7 @@ mod tests {
#[test]
fn test_data_without_header() {
let table = new_table(vec!["data=1,Alice\n2,Bob\n", "header=false"]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert_eq!(
rows,
@@ -519,7 +520,7 @@ mod tests {
&format!("filename={}", file.path().to_string_lossy()),
"header=true",
]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert!(rows.is_empty());
}
@@ -527,7 +528,7 @@ mod tests {
#[test]
fn test_empty_data_with_header() {
let table = new_table(vec!["data=id,name\n", "header=true"]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert!(rows.is_empty());
}
@@ -540,7 +541,7 @@ mod tests {
"header=false",
])
.unwrap();
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert!(rows.is_empty());
assert_eq!(schema, "CREATE TABLE x(\"c0\" TEXT)");
@@ -549,7 +550,7 @@ mod tests {
#[test]
fn test_empty_data_no_header() {
let (schema, table) = try_new_table(vec!["data=", "header=false"]).unwrap();
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert!(rows.is_empty());
assert_eq!(schema, "CREATE TABLE x(\"c0\" TEXT)");
@@ -563,7 +564,7 @@ mod tests {
"header=true",
])
.unwrap();
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert!(rows.is_empty());
assert_eq!(schema, "CREATE TABLE x(\"(NULL)\" TEXT)");
@@ -572,7 +573,7 @@ mod tests {
#[test]
fn test_empty_data_with_header_enabled() {
let (schema, table) = try_new_table(vec!["data=", "header=true"]).unwrap();
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert!(rows.is_empty());
assert_eq!(schema, "CREATE TABLE x(\"(NULL)\" TEXT)");
@@ -585,7 +586,7 @@ mod tests {
&format!("filename={}", file.path().to_string_lossy()),
"header=true",
]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert_eq!(rows, vec![vec![cell!("1"), cell!("A,l,i,c,e")],]);
}
@@ -597,7 +598,7 @@ mod tests {
&format!("filename={}", file.path().to_string_lossy()),
"header=false",
]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 3);
assert_eq!(
rows,
@@ -613,7 +614,7 @@ mod tests {
"header=false",
"schema=CREATE TABLE x(id INT, name TEXT)",
]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert_eq!(
rows,
@@ -686,12 +687,7 @@ mod tests {
&format!("header={}", val),
]);
assert!(result.is_ok(), "Expected Ok for header='{}'", val);
assert_eq!(
result.unwrap().1.header,
true,
"Expected true for '{}'",
val
);
assert!(result.unwrap().1.header, "Expected true for '{}'", val);
}
for &val in &false_values {
@@ -700,12 +696,7 @@ mod tests {
&format!("header={}", val),
]);
assert!(result.is_ok(), "Expected Ok for header='{}'", val);
assert_eq!(
result.unwrap().1.header,
false,
"Expected false for '{}'",
val
);
assert!(!result.unwrap().1.header, "Expected false for '{}'", val);
}
}
@@ -728,7 +719,7 @@ mod tests {
" data = id,name\n1,Alice\n2,Bob\n ",
" header = true ",
]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert_eq!(
rows,
@@ -763,7 +754,7 @@ mod tests {
"data={}aa{}{}bb{}",
quote, quote, quote, quote
)]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 1);
assert_eq!(rows, vec![vec![cell!(format!("aa{}bb", quote))]]);
}
@@ -779,7 +770,7 @@ mod tests {
"data={}aa{}{}bb{}",
outer, inner, inner, outer
)]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 1);
assert_eq!(rows, vec![vec![cell!(format!("aa{}{}bb", inner, inner))]]);
}
@@ -812,7 +803,7 @@ mod tests {
"header=false",
"columns=4",
]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 4);
assert_eq!(
rows,
@@ -831,7 +822,7 @@ mod tests {
"header=false",
"columns=1",
]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 1);
assert_eq!(rows, vec![vec![cell!("1")], vec![cell!("2")]]);
}
@@ -845,7 +836,7 @@ mod tests {
"columns=1",
"schema='CREATE TABLE x(id INT, name TEXT)'",
]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert_eq!(rows, vec![vec![cell!("1"), None], vec![cell!("2"), None]]);
}
@@ -859,7 +850,7 @@ mod tests {
"columns=5",
"schema='CREATE TABLE x(id INT, name TEXT)'",
]);
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert_eq!(
rows,
@@ -878,7 +869,7 @@ mod tests {
"header=true",
])
.unwrap();
let cursor = table.open().unwrap();
let cursor = table.open(None).unwrap();
let rows = read_rows(cursor, 2);
assert_eq!(
rows,

View File

@@ -1,6 +1,8 @@
use std::rc::Rc;
use limbo_ext::{
register_extension, ResultCode, VTabCursor, VTabKind, VTabModule, VTabModuleDerive, VTable,
Value,
register_extension, Connection, ResultCode, VTabCursor, VTabKind, VTabModule, VTabModuleDerive,
VTable, Value,
};
register_extension! {
@@ -43,7 +45,7 @@ impl VTable for GenerateSeriesTable {
type Cursor = GenerateSeriesCursor;
type Error = ResultCode;
fn open(&self) -> Result<Self::Cursor, Self::Error> {
fn open(&self, _conn: Option<Rc<Connection>>) -> Result<Self::Cursor, Self::Error> {
Ok(GenerateSeriesCursor {
start: 0,
stop: 0,
@@ -225,7 +227,7 @@ mod tests {
// Helper function to collect all values from a cursor, returns Result with error code
fn collect_series(series: Series) -> Result<Vec<i64>, ResultCode> {
let tbl = GenerateSeriesTable {};
let mut cursor = tbl.open()?;
let mut cursor = tbl.open(None)?;
// Create args array for filter
let args = vec![
@@ -542,7 +544,7 @@ mod tests {
let stop = series.stop;
let step = series.step;
let tbl = GenerateSeriesTable {};
let mut cursor = tbl.open().unwrap();
let mut cursor = tbl.open(None).unwrap();
let args = vec![
Value::from_integer(start),

View File

@@ -1,18 +1,20 @@
use lazy_static::lazy_static;
use limbo_ext::{
register_extension, scalar, ConstraintInfo, ConstraintOp, ConstraintUsage, ExtResult,
IndexInfo, OrderByInfo, ResultCode, VTabCursor, VTabKind, VTabModule, VTabModuleDerive, VTable,
Value,
register_extension, scalar, Connection, ConstraintInfo, ConstraintOp, ConstraintUsage,
ExtResult, IndexInfo, OrderByInfo, ResultCode, StepResult, VTabCursor, VTabKind, VTabModule,
VTabModuleDerive, VTable, Value,
};
#[cfg(not(target_family = "wasm"))]
use limbo_ext::{VfsDerive, VfsExtension, VfsFile};
use std::collections::BTreeMap;
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::num::NonZeroUsize;
use std::rc::Rc;
use std::sync::Mutex;
register_extension! {
vtabs: { KVStoreVTabModule },
vtabs: { KVStoreVTabModule, TableStatsVtabModule },
scalars: { test_scalar },
vfs: { TestFS },
}
@@ -137,7 +139,7 @@ impl VTable for KVStoreTable {
type Cursor = KVStoreCursor;
type Error = String;
fn open(&self) -> Result<Self::Cursor, Self::Error> {
fn open(&self, _conn: Option<Rc<Connection>>) -> Result<Self::Cursor, Self::Error> {
let _ = env_logger::try_init();
Ok(KVStoreCursor {
rows: Vec::new(),
@@ -294,3 +296,143 @@ impl VfsFile for TestFile {
self.file.metadata().map(|m| m.len() as i64).unwrap_or(-1)
}
}
#[derive(VTabModuleDerive, Default)]
pub struct TableStatsVtabModule;
pub struct StatsCursor {
pos: usize,
rows: Vec<(String, i64)>,
conn: Option<Rc<Connection>>,
}
pub struct StatsTable {}
impl VTabModule for TableStatsVtabModule {
type Table = StatsTable;
const VTAB_KIND: VTabKind = VTabKind::VirtualTable;
const NAME: &'static str = "tablestats";
fn create(_args: &[Value]) -> Result<(String, Self::Table), ResultCode> {
let schema = "CREATE TABLE x(name TEXT, rows INT);".to_string();
Ok((schema, StatsTable {}))
}
}
impl VTable for StatsTable {
type Cursor = StatsCursor;
type Error = String;
fn open(&self, conn: Option<Rc<Connection>>) -> Result<Self::Cursor, Self::Error> {
Ok(StatsCursor {
pos: 0,
rows: Vec::new(),
conn,
})
}
}
impl VTabCursor for StatsCursor {
type Error = String;
fn filter(&mut self, _args: &[Value], _idx_info: Option<(&str, i32)>) -> ResultCode {
self.rows.clear();
self.pos = 0;
let Some(conn) = &self.conn else {
log::error!("no connection present");
return ResultCode::Error;
};
// discover application tables
let mut master = conn
.prepare(
"SELECT name, sql FROM sqlite_schema WHERE type = 'table' AND name NOT LIKE 'sqlite_%';",
)
.map_err(|_| ResultCode::Error)
.unwrap();
let mut tables = Vec::new();
while let StepResult::Row = master.step() {
let row = master.get_row();
let tbl = if let Some(val) = row.first() {
val.to_text().unwrap_or("").to_string()
} else {
"".to_string()
};
if tbl.is_empty() {
continue;
};
if row
.get(1)
.is_some_and(|v| v.to_text().unwrap().contains("CREATE VIRTUAL TABLE"))
{
continue;
}
tables.push(tbl);
}
master.close();
for tbl in tables {
// count rows for each table
if let Ok(mut count_stmt) = conn.prepare(&format!("SELECT COUNT(*) FROM {};", tbl)) {
let count = match count_stmt.step() {
StepResult::Row => count_stmt.get_row()[0].to_integer().unwrap_or(0),
_ => 0,
};
self.rows.push((tbl, count));
count_stmt.close();
}
}
if conn
.execute(
"insert into products (name, price) values(?, ?);",
&[Value::from_text("xConnect".into()), Value::from_integer(42)],
)
.is_err()
{
log::error!("failed to insert into xConnect table");
}
let mut stmt = conn
.prepare("select price from products where name = ? limit 1;")
.map_err(|_| ResultCode::Error)
.unwrap();
stmt.bind_at(
NonZeroUsize::new(1).expect("1 to be not zero"),
Value::from_text("xConnect".into()),
);
while let StepResult::Row = stmt.step() {
let row = stmt.get_row();
if let Some(val) = row.first() {
assert_eq!(val.to_integer(), Some(42));
}
}
stmt.close();
ResultCode::OK
}
fn column(&self, idx: u32) -> Result<Value, Self::Error> {
self.rows
.get(self.pos)
.ok_or("row out of range".to_string())
.and_then(|(name, cnt)| match idx {
0 => Ok(Value::from_text(name.clone())),
1 => Ok(Value::from_integer(*cnt)),
_ => Err("bad column".into()),
})
}
fn next(&mut self) -> ResultCode {
self.pos += 1;
if self.pos >= self.rows.len() {
ResultCode::EOF
} else {
ResultCode::OK
}
}
fn eof(&self) -> bool {
self.pos >= self.rows.len()
}
fn rowid(&self) -> i64 {
self.pos as i64
}
}

View File

@@ -49,13 +49,14 @@ pub fn derive_vtab_module(input: TokenStream) -> TokenStream {
}
#[no_mangle]
unsafe extern "C" fn #open_fn_name(table: *const ::std::ffi::c_void) -> *const ::std::ffi::c_void {
unsafe extern "C" fn #open_fn_name(table: *const ::std::ffi::c_void, conn: *mut ::limbo_ext::Conn) -> *const ::std::ffi::c_void {
if table.is_null() {
return ::std::ptr::null();
}
let table = table as *const <#struct_name as ::limbo_ext::VTabModule>::Table;
let table: &<#struct_name as ::limbo_ext::VTabModule>::Table = &*table;
if let Ok(cursor) = <#struct_name as ::limbo_ext::VTabModule>::Table::open(table) {
let conn = if conn.is_null() { None } else { Some(::std::rc::Rc::new(::limbo_ext::Connection::new(conn)))};
if let Ok(cursor) = <#struct_name as ::limbo_ext::VTabModule>::Table::open(table, conn) {
return ::std::boxed::Box::into_raw(::std::boxed::Box::new(cursor)) as *const ::std::ffi::c_void;
} else {
return ::std::ptr::null();

View File

@@ -629,76 +629,72 @@ def test_csv():
limbo.run_test_fn(
"CREATE VIRTUAL TABLE temp.csv USING csv(filename=./testing/test_files/test.csv);",
null,
"Create virtual table from CSV file"
"Create virtual table from CSV file",
)
limbo.run_test_fn(
"SELECT * FROM temp.csv;",
lambda res: res == "1|2.0|String'1\n3|4.0|String2",
"Read all rows from CSV table"
"Read all rows from CSV table",
)
limbo.run_test_fn(
"SELECT * FROM temp.csv WHERE c2 = 'String2';",
lambda res: res == "3|4.0|String2",
"Filter rows with WHERE clause"
"Filter rows with WHERE clause",
)
limbo.run_test_fn(
"INSERT INTO temp.csv VALUES (5, 6.0, 'String3');",
lambda res: "Virtual table update failed" in res,
"INSERT into CSV table should fail"
"INSERT into CSV table should fail",
)
limbo.run_test_fn(
"UPDATE temp.csv SET c0 = 10 WHERE c1 = '2.0';",
lambda res: "Virtual table update failed" in res,
"UPDATE on CSV table should fail"
"UPDATE on CSV table should fail",
)
limbo.run_test_fn(
"DELETE FROM temp.csv WHERE c1 = '2.0';",
lambda res: "Virtual table update failed" in res,
"DELETE on CSV table should fail"
)
limbo.run_test_fn(
"DROP TABLE temp.csv;",
null,
"Drop CSV table"
"DELETE on CSV table should fail",
)
limbo.run_test_fn("DROP TABLE temp.csv;", null, "Drop CSV table")
limbo.run_test_fn(
"SELECT * FROM temp.csv;",
lambda res: "Parse error: Table csv not found" in res,
"Query dropped CSV table should fail"
"Query dropped CSV table should fail",
)
limbo.run_test_fn(
"create virtual table t1 using csv(data='1'\\'2');",
lambda res: "unrecognized token at" in res,
"Create CSV table with malformed escape sequence"
"Create CSV table with malformed escape sequence",
)
limbo.run_test_fn(
"create virtual table t1 using csv(data=\"12');",
lambda res: "non-terminated literal at" in res,
"Create CSV table with unterminated quoted string"
"Create CSV table with unterminated quoted string",
)
limbo.run_debug("create virtual table t1 using csv(data='');")
limbo.run_test_fn(
"SELECT c0 FROM t1;",
lambda res: res == "",
"Empty CSV table without a header should have one column: 'c0'"
"Empty CSV table without a header should have one column: 'c0'",
)
limbo.run_test_fn(
"SELECT c1 FROM t1;",
lambda res: "Parse error: Column c1 not found" in res,
"Empty CSV table without header should not have columns other than 'c0'"
"Empty CSV table without header should not have columns other than 'c0'",
)
limbo.run_debug("create virtual table t2 using csv(data='', header=true);")
limbo.run_test_fn(
"SELECT \"(NULL)\" FROM t2;",
'SELECT "(NULL)" FROM t2;',
lambda res: res == "",
"Empty CSV table with header should have one column named '(NULL)'"
"Empty CSV table with header should have one column named '(NULL)'",
)
limbo.run_test_fn(
"SELECT c0 FROM t2;",
lambda res: "Parse error: Column c0 not found" in res,
"Empty CSV table with header should not have columns other than '(NULL)'"
"Empty CSV table with header should not have columns other than '(NULL)'",
)
limbo.quit()
@@ -711,6 +707,94 @@ def cleanup():
os.remove("testing/vfs.db-wal")
def test_tablestats():
ext_path = "target/debug/liblimbo_ext_tests"
limbo = TestLimboShell(use_testing_db=True)
limbo.execute_dot("CREATE TABLE people(id INTEGER PRIMARY KEY, name TEXT);")
limbo.execute_dot("INSERT INTO people(name) VALUES ('Ada'), ('Grace'), ('Linus');")
limbo.execute_dot("CREATE TABLE logs(ts INT, msg TEXT);")
limbo.execute_dot("INSERT INTO logs VALUES (1,'boot ok');")
# verify counts
limbo.run_test_fn(
"SELECT COUNT(*) FROM people;",
lambda res: res == "3",
"three people rowsverify user count",
)
limbo.run_test_fn(
"SELECT COUNT(*) FROM logs;",
lambda res: res == "1",
"one logs rowverify logs count",
)
# load extension
limbo.execute_dot(f".load {ext_path}")
limbo.execute_dot("CREATE VIRTUAL TABLE stats USING tablestats;")
def _split(res):
return [ln.strip() for ln in res.splitlines() if ln.strip()]
limbo.run_test_fn(
"SELECT * FROM stats ORDER BY name;",
lambda res: sorted(_split(res))
== sorted(["logs|1", "people|3", "products|11", "users|10000"]),
"stats shows correct initial counts (and skips itself)",
)
limbo.execute_dot("INSERT INTO logs VALUES (2,'panic'), (3,'recovery');")
limbo.execute_dot("DELETE FROM people WHERE name='Linus';")
limbo.run_test_fn(
"SELECT * FROM stats WHERE name='logs';",
lambda res: res == "logs|3",
"rowcount grows after INSERT",
)
limbo.run_test_fn(
"SELECT * FROM stats WHERE name='people';",
lambda res: res == "people|2",
"rowcount shrinks after DELETE",
)
# existing tables in the testing database (users, products)
# the test extension is also doing a testing insert on every query
# as part of its own testing, so we cannot assert 'products|11'
# we need to add 3 for the 3 queries we did above.
limbo.run_test_fn(
"SELECT * FROM stats WHERE name='products';",
lambda x: x == "products|14",
"products table reflects changes",
)
# an insert to products with (name,price) ('xConnect', 42)
# has happened on each query (4 so far) in the testing extension.
# so at this point the sum should be 168
limbo.run_test_fn(
"SELECT sum(price) FROM products WHERE name = 'xConnect';",
lambda x: x == "168.0",
"price sum for 'xConnect' inserts happenning in the testing extension",
)
limbo.run_test_fn(
"SELECT * FROM stats WHERE name='users';",
lambda x: x == "users|10000",
"users table unchanged",
)
limbo.execute_dot("CREATE TABLE misc(x);")
limbo.run_test_fn(
"SELECT * FROM stats WHERE name='misc';",
lambda res: res == "misc|0",
"newlycreated table shows up with zero rows",
)
limbo.execute_dot("DROP TABLE logs;")
limbo.run_test_fn(
"SELECT name FROM stats WHERE name='logs';",
lambda res: res == "",
"dropped table disappears from stats",
)
limbo.quit()
def main():
try:
test_regexp()
@@ -725,6 +809,7 @@ def main():
test_drop_virtual_table()
test_create_virtual_table()
test_csv()
test_tablestats()
except Exception as e:
console.error(f"Test FAILED: {e}")
cleanup()

View File

@@ -103,6 +103,7 @@ class TestLimboShell:
init_commands: Optional[str] = None,
init_blobs_table: bool = False,
exec_name: Optional[str] = None,
use_testing_db: bool = False,
flags="",
):
if exec_name is None:
@@ -110,6 +111,9 @@ class TestLimboShell:
if flags == "":
flags = "-q"
self.config = ShellConfig(exe_name=exec_name, flags=flags)
if use_testing_db:
self.init_test_db()
init_commands = ".open testing/testing_clone.db"
if init_commands is None:
# Default initialization
init_commands = """
@@ -129,6 +133,7 @@ INSERT INTO t VALUES (zeroblob(1024 - 1), zeroblob(1024 - 2), zeroblob(1024 - 3)
self.shell = LimboShell(self.config, init_commands)
def quit(self):
self.cleanup_test_db()
self.shell.quit()
def run_test(self, name: str, sql: str, expected: str) -> None:
@@ -159,9 +164,28 @@ INSERT INTO t VALUES (zeroblob(1024 - 1), zeroblob(1024 - 2), zeroblob(1024 - 3)
def execute_dot(self, dot_command: str) -> None:
self.shell._write_to_pipe(dot_command)
def init_test_db(self) -> None:
self.cleanup_test_db()
path = os.path.join("testing", "testing_clone.db")
if os.path.exists(path):
os.remove(path)
cmd = "sqlite3 testing/testing.db '.clone testing/testing_clone.db'"
subprocess.run(cmd, shell=True, capture_output=True, text=True)
if not os.path.exists("testing/testing_clone.db"):
raise RuntimeError("Failed to clone testing.db to testing/testing_clone.db")
def cleanup_test_db(self) -> None:
path = os.path.join("testing", "testing_clone.db")
if os.path.exists(path):
os.remove(path)
walpath = os.path.join("testing", "testing.db-wal")
if os.path.exists(walpath):
os.remove(walpath)
# Enables the use of `with` syntax
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, exception_traceback):
self.cleanup_test_db()
self.quit()