mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-11 19:24:21 +01:00
Merge 'Fix segfault on schema update for virtual tables' from Preston Thorpe
Closes #2478 ```console turso>create table t(a,b); turso>insert into t select 1,2 from generate_series(1,20); turso>create index idxa on t(a); turso>create index idxb on t(b); # segfault ``` The issue was the `turso_ext::Conn` pointer was stored on the `VirtualTable`, which lives longer than necessary and the underlying core `Connection` is not guaranteed pinned. Moving the `turso_ext::Conn` on to the cursor fixes this issue because it's independent of the Schema. This also fixes the issue that comes up when that issue is fixed, and some of the relevant code for this hadn't been updated when other surrounding changes had been made. Closes #2479
This commit is contained in:
@@ -19,7 +19,7 @@ use turso_ext::{
|
||||
ExtensionApi, InitAggFunction, ResultCode, ScalarFunction, VTabKind, VTabModuleImpl,
|
||||
};
|
||||
pub use turso_ext::{FinalizeFunction, StepFunction, Value as ExtValue, ValueType as ExtValueType};
|
||||
pub use vtab_xconnect::{close, execute, prepare_stmt};
|
||||
pub use vtab_xconnect::{execute, prepare_stmt};
|
||||
|
||||
/// The context passed to extensions to register with Core
|
||||
/// along with the function pointers
|
||||
|
||||
@@ -4,22 +4,10 @@ use std::{
|
||||
ffi::{c_char, c_void, CStr, CString},
|
||||
num::NonZeroUsize,
|
||||
ptr,
|
||||
sync::Arc,
|
||||
sync::Weak,
|
||||
};
|
||||
use turso_ext::{Conn as ExtConn, ResultCode, Stmt, Value as ExtValue};
|
||||
|
||||
/// Free memory for the internal context of the connection.
|
||||
/// This function does not close the core connection itself,
|
||||
/// it only frees the memory the table is responsible for.
|
||||
pub unsafe extern "C" fn close(ctx: *mut c_void) {
|
||||
if ctx.is_null() {
|
||||
return;
|
||||
}
|
||||
// only free the memory for the boxed connection, we don't upgrade
|
||||
// or actually close the core connection, as we were 'sharing' it.
|
||||
let _ = Box::from_raw(ctx as *mut Arc<Connection>);
|
||||
}
|
||||
|
||||
/// Wrapper around core Connection::execute with optional arguments to bind
|
||||
/// to the statment This function takes ownership of the optional turso_ext::Value array if provided
|
||||
pub unsafe extern "C" fn execute(
|
||||
@@ -41,48 +29,54 @@ pub unsafe extern "C" fn execute(
|
||||
tracing::error!("query: null connection");
|
||||
return ResultCode::Error;
|
||||
};
|
||||
let conn_ptr = extcon._ctx as *const Arc<Connection>;
|
||||
let conn = &*conn_ptr;
|
||||
match conn.query(&sql_str) {
|
||||
Ok(Some(mut stmt)) => {
|
||||
if arg_count > 0 {
|
||||
let args_slice = &mut std::slice::from_raw_parts_mut(args, arg_count as usize);
|
||||
for (i, val) in args_slice.iter_mut().enumerate() {
|
||||
stmt.bind_at(
|
||||
NonZeroUsize::new(i + 1).unwrap(),
|
||||
Value::from_ffi(std::mem::take(val)).unwrap_or(Value::Null),
|
||||
);
|
||||
if extcon._ctx.is_null() {
|
||||
tracing::error!("execute: connection ctx is null");
|
||||
return ResultCode::Error;
|
||||
}
|
||||
let weak_box = extcon._ctx as *const Weak<Connection>;
|
||||
let weak = unsafe { &*weak_box };
|
||||
if let Some(conn) = weak.upgrade() {
|
||||
match conn.query(&sql_str) {
|
||||
Ok(Some(mut stmt)) => {
|
||||
if arg_count > 0 {
|
||||
let args_slice = &mut std::slice::from_raw_parts_mut(args, arg_count as usize);
|
||||
for (i, val) in args_slice.iter_mut().enumerate() {
|
||||
stmt.bind_at(
|
||||
NonZeroUsize::new(i + 1).unwrap(),
|
||||
Value::from_ffi(std::mem::take(val)).unwrap_or(Value::Null),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
loop {
|
||||
match stmt.step() {
|
||||
Ok(StepResult::Row) => {
|
||||
tracing::error!("execute used for query returning a row");
|
||||
return ResultCode::Error;
|
||||
}
|
||||
Ok(StepResult::Done) => {
|
||||
*last_insert_rowid = conn.last_insert_rowid();
|
||||
return ResultCode::OK;
|
||||
}
|
||||
Ok(StepResult::IO) => {
|
||||
let res = stmt.run_once();
|
||||
if res.is_err() {
|
||||
loop {
|
||||
match stmt.step() {
|
||||
Ok(StepResult::Row) => {
|
||||
tracing::error!("execute used for query returning a row");
|
||||
return ResultCode::Error;
|
||||
}
|
||||
Ok(StepResult::Done) => {
|
||||
*last_insert_rowid = conn.last_insert_rowid();
|
||||
return ResultCode::OK;
|
||||
}
|
||||
Ok(StepResult::IO) => {
|
||||
let res = stmt.run_once();
|
||||
if res.is_err() {
|
||||
return ResultCode::Error;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Ok(StepResult::Interrupt) => return ResultCode::Interrupt,
|
||||
Ok(StepResult::Busy) => return ResultCode::Busy,
|
||||
Err(e) => {
|
||||
tracing::error!("execute: failed to execute query: {:?}", e);
|
||||
return ResultCode::Error;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Ok(StepResult::Interrupt) => return ResultCode::Interrupt,
|
||||
Ok(StepResult::Busy) => return ResultCode::Busy,
|
||||
Err(e) => {
|
||||
tracing::error!("execute: failed to execute query: {:?}", e);
|
||||
return ResultCode::Error;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => tracing::error!("query: no statement returned"),
|
||||
Err(e) => tracing::error!("query: failed to execute query: {:?}", e),
|
||||
};
|
||||
Ok(None) => tracing::error!("query: no statement returned"),
|
||||
Err(e) => tracing::error!("query: failed to execute query: {:?}", e),
|
||||
};
|
||||
}
|
||||
ResultCode::Error
|
||||
}
|
||||
|
||||
@@ -101,26 +95,35 @@ pub unsafe extern "C" fn prepare_stmt(ctx: *mut ExtConn, sql: *const c_char) ->
|
||||
tracing::error!("prepare_stmt: null connection");
|
||||
return ptr::null_mut();
|
||||
};
|
||||
let db_ptr = extcon._ctx as *const Arc<Connection>;
|
||||
let conn = &*db_ptr;
|
||||
match conn.prepare(&sql_str) {
|
||||
Ok(stmt) => {
|
||||
let raw_stmt = Box::into_raw(Box::new(stmt)) as *mut c_void;
|
||||
Box::into_raw(Box::new(Stmt::new(
|
||||
extcon._ctx,
|
||||
raw_stmt,
|
||||
stmt_bind_args_fn,
|
||||
stmt_step,
|
||||
stmt_get_row,
|
||||
stmt_get_column_names,
|
||||
stmt_free_current_row,
|
||||
stmt_close,
|
||||
)))
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("prepare_stmt: failed to prepare statement: {:?}", e);
|
||||
ptr::null_mut()
|
||||
if extcon._ctx.is_null() {
|
||||
tracing::error!("prepare_stmt: null connection ctx");
|
||||
return ptr::null_mut();
|
||||
}
|
||||
let weak_box = extcon._ctx as *const Weak<Connection>;
|
||||
let weak = unsafe { &*weak_box };
|
||||
if let Some(conn) = weak.upgrade() {
|
||||
match conn.prepare(&sql_str) {
|
||||
Ok(stmt) => {
|
||||
let raw_stmt = Box::into_raw(Box::new(stmt)) as *mut c_void;
|
||||
Box::into_raw(Box::new(Stmt::new(
|
||||
extcon._ctx,
|
||||
raw_stmt,
|
||||
stmt_bind_args_fn,
|
||||
stmt_step,
|
||||
stmt_get_row,
|
||||
stmt_get_column_names,
|
||||
stmt_free_current_row,
|
||||
stmt_close,
|
||||
)))
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("prepare_stmt: failed to prepare statement: {:?}", e);
|
||||
ptr::null_mut()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::error!("failed to upgrade stored connection on vtable module");
|
||||
ptr::null_mut()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
73
core/vtab.rs
73
core/vtab.rs
@@ -3,8 +3,8 @@ use crate::schema::Column;
|
||||
use crate::util::columns_from_create_table_body;
|
||||
use crate::{Connection, LimboError, SymbolTable, Value};
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use std::cell::RefCell;
|
||||
use std::ffi::c_void;
|
||||
use std::ptr::NonNull;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use turso_ext::{ConstraintInfo, IndexInfo, OrderByInfo, ResultCode, VTabKind, VTabModuleImpl};
|
||||
@@ -105,7 +105,7 @@ impl VirtualTable {
|
||||
Ok(VirtualTableCursor::Pragma(Box::new(table.open(conn)?)))
|
||||
}
|
||||
VirtualTableType::External(table) => {
|
||||
Ok(VirtualTableCursor::External(table.open(conn)?))
|
||||
Ok(VirtualTableCursor::External(table.open(conn.clone())?))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -183,22 +183,6 @@ impl VirtualTableCursor {
|
||||
pub(crate) struct ExtVirtualTable {
|
||||
implementation: Rc<VTabModuleImpl>,
|
||||
table_ptr: *const c_void,
|
||||
connection_ptr: RefCell<Option<*mut turso_ext::Conn>>,
|
||||
}
|
||||
|
||||
impl Drop for ExtVirtualTable {
|
||||
fn drop(&mut self) {
|
||||
if let Some(conn) = self.connection_ptr.borrow_mut().take() {
|
||||
if conn.is_null() {
|
||||
return;
|
||||
}
|
||||
// free the memory for the turso_ext::Conn itself
|
||||
let mut conn = unsafe { Box::from_raw(conn) };
|
||||
// frees the boxed Weak pointer
|
||||
conn.close();
|
||||
}
|
||||
*self.connection_ptr.borrow_mut() = None;
|
||||
}
|
||||
}
|
||||
|
||||
impl ExtVirtualTable {
|
||||
@@ -241,7 +225,6 @@ impl ExtVirtualTable {
|
||||
}
|
||||
let (schema, table_ptr) = module.implementation.create(args)?;
|
||||
let vtab = ExtVirtualTable {
|
||||
connection_ptr: RefCell::new(None),
|
||||
implementation: module.implementation.clone(),
|
||||
table_ptr,
|
||||
};
|
||||
@@ -252,18 +235,21 @@ impl ExtVirtualTable {
|
||||
/// can optionally use to query the other tables.
|
||||
fn open(&self, conn: Arc<Connection>) -> crate::Result<ExtVirtualTableCursor> {
|
||||
// we need a Weak<Connection> to upgrade and call from the extension.
|
||||
let weak_box: *mut Arc<Connection> = Box::into_raw(Box::new(conn));
|
||||
let weak = Arc::downgrade(&conn);
|
||||
let weak_box = Box::into_raw(Box::new(weak));
|
||||
let conn = turso_ext::Conn::new(
|
||||
weak_box.cast(),
|
||||
weak_box as *mut c_void,
|
||||
crate::ext::prepare_stmt,
|
||||
crate::ext::execute,
|
||||
crate::ext::close,
|
||||
);
|
||||
let ext_conn_ptr = Box::into_raw(Box::new(conn));
|
||||
let ext_conn_ptr = NonNull::new(Box::into_raw(Box::new(conn))).expect("null pointer");
|
||||
// store the leaked connection pointer on the table so it can be freed on drop
|
||||
*self.connection_ptr.borrow_mut() = Some(ext_conn_ptr);
|
||||
let cursor = unsafe { (self.implementation.open)(self.table_ptr, ext_conn_ptr) };
|
||||
ExtVirtualTableCursor::new(cursor, self.implementation.clone())
|
||||
let Some(cursor) = NonNull::new(unsafe {
|
||||
(self.implementation.open)(self.table_ptr, ext_conn_ptr.as_ptr()) as *mut c_void
|
||||
}) else {
|
||||
return Err(LimboError::ExtensionError("Open returned null".to_string()));
|
||||
};
|
||||
ExtVirtualTableCursor::new(cursor, ext_conn_ptr, self.implementation.clone())
|
||||
}
|
||||
|
||||
fn update(&self, args: &[Value]) -> crate::Result<Option<i64>> {
|
||||
@@ -300,25 +286,28 @@ impl ExtVirtualTable {
|
||||
}
|
||||
|
||||
pub struct ExtVirtualTableCursor {
|
||||
cursor: *const c_void,
|
||||
cursor: NonNull<c_void>,
|
||||
// the core `[Connection]` pointer the vtab module needs to
|
||||
// query other internal tables.
|
||||
conn_ptr: Option<NonNull<turso_ext::Conn>>,
|
||||
implementation: Rc<VTabModuleImpl>,
|
||||
}
|
||||
|
||||
impl ExtVirtualTableCursor {
|
||||
fn new(cursor: *const c_void, implementation: Rc<VTabModuleImpl>) -> crate::Result<Self> {
|
||||
if cursor.is_null() {
|
||||
return Err(LimboError::InternalError(
|
||||
"VirtualTableCursor: cursor is null".into(),
|
||||
));
|
||||
}
|
||||
fn new(
|
||||
cursor: NonNull<c_void>,
|
||||
conn_ptr: NonNull<turso_ext::Conn>,
|
||||
implementation: Rc<VTabModuleImpl>,
|
||||
) -> crate::Result<Self> {
|
||||
Ok(Self {
|
||||
cursor,
|
||||
conn_ptr: Some(conn_ptr),
|
||||
implementation,
|
||||
})
|
||||
}
|
||||
|
||||
fn rowid(&self) -> i64 {
|
||||
unsafe { (self.implementation.rowid)(self.cursor) }
|
||||
unsafe { (self.implementation.rowid)(self.cursor.as_ptr()) }
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
@@ -337,7 +326,7 @@ impl ExtVirtualTableCursor {
|
||||
.unwrap_or(std::ptr::null_mut());
|
||||
let rc = unsafe {
|
||||
(self.implementation.filter)(
|
||||
self.cursor,
|
||||
self.cursor.as_ptr(),
|
||||
arg_count as i32,
|
||||
ext_args.as_ptr(),
|
||||
c_idx_str,
|
||||
@@ -357,12 +346,12 @@ impl ExtVirtualTableCursor {
|
||||
}
|
||||
|
||||
fn column(&self, column: usize) -> crate::Result<Value> {
|
||||
let val = unsafe { (self.implementation.column)(self.cursor, column as u32) };
|
||||
let val = unsafe { (self.implementation.column)(self.cursor.as_ptr(), column as u32) };
|
||||
Value::from_ffi(val)
|
||||
}
|
||||
|
||||
fn next(&self) -> crate::Result<bool> {
|
||||
let rc = unsafe { (self.implementation.next)(self.cursor) };
|
||||
let rc = unsafe { (self.implementation.next)(self.cursor.as_ptr()) };
|
||||
match rc {
|
||||
ResultCode::OK => Ok(true),
|
||||
ResultCode::EOF => Ok(false),
|
||||
@@ -373,7 +362,15 @@ impl ExtVirtualTableCursor {
|
||||
|
||||
impl Drop for ExtVirtualTableCursor {
|
||||
fn drop(&mut self) {
|
||||
let result = unsafe { (self.implementation.close)(self.cursor) };
|
||||
if let Some(ptr) = self.conn_ptr.take() {
|
||||
// first free the boxed turso_ext::Conn pointer itself
|
||||
let conn = unsafe { Box::from_raw(ptr.as_ptr()) };
|
||||
if !conn._ctx.is_null() {
|
||||
// we also leaked the Weak 'ctx' pointer, so free this as well
|
||||
let _ = unsafe { Box::from_raw(conn._ctx as *mut std::sync::Weak<Connection>) };
|
||||
}
|
||||
}
|
||||
let result = unsafe { (self.implementation.close)(self.cursor.as_ptr()) };
|
||||
if !result.is_ok() {
|
||||
tracing::error!("Failed to close virtual table cursor");
|
||||
}
|
||||
|
||||
@@ -73,7 +73,8 @@ impl VTabModuleImpl {
|
||||
|
||||
pub type VtabFnCreate = unsafe extern "C" fn(args: *const Value, argc: i32) -> VTabCreateResult;
|
||||
|
||||
pub type VtabFnOpen = unsafe extern "C" fn(table: *const c_void, conn: *mut Conn) -> *const c_void;
|
||||
pub type VtabFnOpen =
|
||||
unsafe extern "C" fn(table: *const c_void, conn: *const Conn) -> *const c_void;
|
||||
|
||||
pub type VtabFnClose = unsafe extern "C" fn(cursor: *const c_void) -> ResultCode;
|
||||
|
||||
@@ -408,7 +409,6 @@ pub type BindArgsFn = unsafe extern "C" fn(ctx: *mut Stmt, idx: i32, arg: Value)
|
||||
pub type StmtStepFn = unsafe extern "C" fn(ctx: *mut Stmt) -> ResultCode;
|
||||
pub type StmtGetRowValuesFn = unsafe extern "C" fn(ctx: *mut Stmt);
|
||||
pub type FreeCurrentRowFn = unsafe extern "C" fn(ctx: *mut Stmt);
|
||||
pub type CloseConnectionFn = unsafe extern "C" fn(ctx: *mut c_void);
|
||||
pub type CloseStmtFn = unsafe extern "C" fn(ctx: *mut Stmt);
|
||||
|
||||
/// core database connection
|
||||
@@ -416,25 +416,18 @@ pub type CloseStmtFn = unsafe extern "C" fn(ctx: *mut Stmt);
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Conn {
|
||||
// boxed Rc::Weak from core::Connection
|
||||
// std::sync::Weak from core::Connection
|
||||
pub _ctx: *mut c_void,
|
||||
pub _prepare_stmt: PrepareStmtFn,
|
||||
pub _execute: ExecuteFn,
|
||||
pub _close: CloseConnectionFn,
|
||||
}
|
||||
|
||||
impl Conn {
|
||||
pub fn new(
|
||||
ctx: *mut c_void,
|
||||
prepare_stmt: PrepareStmtFn,
|
||||
exec_fn: ExecuteFn,
|
||||
close: CloseConnectionFn,
|
||||
) -> Self {
|
||||
pub fn new(ctx: *mut c_void, prepare_stmt: PrepareStmtFn, exec_fn: ExecuteFn) -> Self {
|
||||
Conn {
|
||||
_ctx: ctx,
|
||||
_prepare_stmt: prepare_stmt,
|
||||
_execute: exec_fn,
|
||||
_close: close,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -447,14 +440,6 @@ impl Conn {
|
||||
Ok(unsafe { &mut *(ptr) })
|
||||
}
|
||||
|
||||
pub fn close(&mut self) {
|
||||
if self._ctx.is_null() {
|
||||
return;
|
||||
}
|
||||
unsafe { (self._close)(self._ctx) };
|
||||
self._ctx = std::ptr::null_mut();
|
||||
}
|
||||
|
||||
/// execute a SQL statement with the given arguments.
|
||||
/// optionally returns the last inserted rowid for the query
|
||||
pub fn execute(&self, sql: &str, args: &[Value]) -> crate::ExtResult<Option<usize>> {
|
||||
@@ -482,7 +467,7 @@ impl Conn {
|
||||
let Ok(sql) = CString::new(sql) else {
|
||||
return std::ptr::null_mut();
|
||||
};
|
||||
unsafe { (self._prepare_stmt)(self as *const _ as *mut Conn, sql.as_ptr()) }
|
||||
unsafe { (self._prepare_stmt)(self as *const Conn as *mut Conn, sql.as_ptr()) }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -506,10 +491,10 @@ impl Drop for Statement {
|
||||
/// the VTable is dropped, so there is no need to manually close the connection.
|
||||
#[derive(Debug)]
|
||||
#[repr(transparent)]
|
||||
pub struct Connection(*mut Conn);
|
||||
pub struct Connection(*const Conn);
|
||||
|
||||
impl Connection {
|
||||
pub fn new(ctx: *mut Conn) -> Self {
|
||||
pub fn new(ctx: *const Conn) -> Self {
|
||||
Connection(ctx)
|
||||
}
|
||||
|
||||
|
||||
@@ -49,7 +49,7 @@ pub fn derive_vtab_module(input: TokenStream) -> TokenStream {
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
unsafe extern "C" fn #open_fn_name(table: *const ::std::ffi::c_void, conn: *mut ::turso_ext::Conn) -> *const ::std::ffi::c_void {
|
||||
unsafe extern "C" fn #open_fn_name(table: *const ::std::ffi::c_void, conn: *const ::turso_ext::Conn) -> *const ::std::ffi::c_void {
|
||||
if table.is_null() {
|
||||
return ::std::ptr::null();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user