From 5ebd3f7271ccaf20f7fde7426517edb64d2671fb Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 21 Jul 2025 19:08:27 -0400 Subject: [PATCH 1/6] Change api of extension api context to support static extensions held on db --- core/ext/mod.rs | 275 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 188 insertions(+), 87 deletions(-) diff --git a/core/ext/mod.rs b/core/ext/mod.rs index 89c3e1a61..da723e8dd 100644 --- a/core/ext/mod.rs +++ b/core/ext/mod.rs @@ -1,12 +1,14 @@ #[cfg(feature = "fs")] mod dynamic; mod vtab_xconnect; -use crate::vtab::VirtualTable; +use crate::schema::{Schema, Table}; #[cfg(all(target_os = "linux", feature = "io_uring"))] use crate::UringIO; use crate::{function::ExternalFunc, Connection, Database, LimboError, IO}; +use crate::{vtab::VirtualTable, SymbolTable}; #[cfg(feature = "fs")] pub use dynamic::{add_builtin_vfs_extensions, add_vfs_module, list_vfs_modules, VfsMod}; +use parking_lot::Mutex; use std::{ ffi::{c_char, c_void, CStr, CString}, rc::Rc, @@ -17,7 +19,113 @@ use turso_ext::{ }; pub use turso_ext::{FinalizeFunction, StepFunction, Value as ExtValue, ValueType as ExtValueType}; pub use vtab_xconnect::{close, execute, prepare_stmt}; -type ExternAggFunc = (InitAggFunction, StepFunction, FinalizeFunction); + +/// The context passed to extensions to register with Core +/// along with the function pointers +#[repr(C)] +pub struct ExtensionCtx { + syms: *mut SymbolTable, + schema_data: *mut c_void, + schema_handler: SchemaHandler, +} + +type SchemaHandler = unsafe extern "C" fn( + schema_data: *mut c_void, + table_name: *const c_char, + table: *mut c_void, +) -> ResultCode; + +/// Handler for our Connection that has direct Arc access +/// to register a table from an extension. +unsafe extern "C" fn handle_schema_insert_connection( + schema_data: *mut c_void, + table_name: *const c_char, + table: *mut c_void, +) -> ResultCode { + let schema = &mut *(schema_data as *mut Schema); + let c_str = CStr::from_ptr(table_name); + let name = match c_str.to_str() { + Ok(s) => s.to_string(), + Err(_) => return ResultCode::InvalidArgs, + }; + let table = Box::from_raw(table as *mut Arc); + schema.tables.insert(name, *table); + ResultCode::OK +} + +/// Handler for Database with Mutex> access to +/// register a table from an extension. +unsafe extern "C" fn handle_schema_insert_database( + schema_data: *mut c_void, + table_name: *const c_char, + table: *mut c_void, +) -> ResultCode { + let mutex = &*(schema_data as *mut Mutex>); + let mut guard = mutex.lock(); + let schema = Arc::make_mut(&mut *guard); + + let c_str = CStr::from_ptr(table_name); + let name = match c_str.to_str() { + Ok(s) => s.to_string(), + Err(_) => return ResultCode::InvalidArgs, + }; + let table = Box::from_raw(table as *mut Arc
); + schema.tables.insert(name, *table); + ResultCode::OK +} + +pub(crate) unsafe extern "C" fn register_vtab_module( + ctx: *mut c_void, + name: *const c_char, + module: VTabModuleImpl, + kind: VTabKind, +) -> ResultCode { + if name.is_null() || ctx.is_null() { + return ResultCode::Error; + } + + let c_str = unsafe { CString::from_raw(name as *mut c_char) }; + let name_str = match c_str.to_str() { + Ok(s) => s.to_string(), + Err(_) => return ResultCode::Error, + }; + + let ext_ctx = unsafe { &mut *(ctx as *mut ExtensionCtx) }; + let module = Rc::new(module); + let vmodule = VTabImpl { + module_kind: kind, + implementation: module, + }; + + unsafe { + let syms = &mut *ext_ctx.syms; + syms.vtab_modules.insert(name_str.clone(), vmodule.into()); + + if kind == VTabKind::TableValuedFunction { + if let Ok(vtab) = VirtualTable::function(&name_str, syms) { + // Use the schema handler to insert the table + let table = Box::into_raw(Box::new(Arc::new(Table::Virtual(vtab)))); + let c_name = match CString::new(name_str) { + Ok(s) => s, + Err(_) => return ResultCode::Error, + }; + + let result = (ext_ctx.schema_handler)( + ext_ctx.schema_data, + c_name.as_ptr(), + table as *mut c_void, + ); + if result != ResultCode::OK { + let _ = Box::from_raw(table); + return result; + } + } else { + return ResultCode::Error; + } + } + } + ResultCode::OK +} #[derive(Clone)] pub struct VTabImpl { @@ -38,8 +146,14 @@ pub(crate) unsafe extern "C" fn register_scalar_function( if ctx.is_null() { return ResultCode::Error; } - let conn = unsafe { &*(ctx as *const Connection) }; - conn.register_scalar_function_impl(&name_str, func) + let ext_ctx = unsafe { &mut *(ctx as *mut ExtensionCtx) }; + unsafe { + (*ext_ctx.syms).functions.insert( + name_str.clone(), + Rc::new(ExternalFunc::new_scalar(name_str, func)), + ); + } + ResultCode::OK } pub(crate) unsafe extern "C" fn register_aggregate_function( @@ -58,30 +172,18 @@ pub(crate) unsafe extern "C" fn register_aggregate_function( if ctx.is_null() { return ResultCode::Error; } - let conn = unsafe { &*(ctx as *const Connection) }; - conn.register_aggregate_function_impl(&name_str, args, (init_func, step_func, finalize_func)) -} - -pub(crate) unsafe extern "C" fn register_vtab_module( - ctx: *mut c_void, - name: *const c_char, - module: VTabModuleImpl, - kind: VTabKind, -) -> ResultCode { - if name.is_null() || ctx.is_null() { - return ResultCode::Error; + let ext_ctx = unsafe { &mut *(ctx as *mut ExtensionCtx) }; + unsafe { + (*ext_ctx.syms).functions.insert( + name_str.clone(), + Rc::new(ExternalFunc::new_aggregate( + name_str, + args, + (init_func, step_func, finalize_func), + )), + ); } - let c_str = unsafe { CString::from_raw(name as *mut _) }; - let name_str = match c_str.to_str() { - Ok(s) => s.to_string(), - Err(_) => return ResultCode::Error, - }; - if ctx.is_null() { - return ResultCode::Error; - } - let conn = unsafe { &mut *(ctx as *mut Connection) }; - - conn.register_vtab_module_impl(&name_str, module, kind) + ResultCode::OK } impl Database { @@ -110,58 +212,64 @@ impl Database { let db = Self::open_file(io.clone(), path, false, false)?; Ok((io, db)) } + + /// Register any built-in extensions that can be stored on the Database so we do not have + /// to register these once-per-connection, and the connection can just extend its symbol table + pub fn register_global_builtin_extensions(&self) -> Result<(), String> { + let syms = self.builtin_syms.as_ptr(); + // Pass the mutex pointer and the appropriate handler + let schema_mutex_ptr = &self.schema as *const Mutex> as *mut Mutex>; + let ctx = Box::into_raw(Box::new(ExtensionCtx { + syms, + schema_data: schema_mutex_ptr as *mut c_void, + schema_handler: handle_schema_insert_database, + })); + let mut ext_api = ExtensionApi { + ctx: ctx as *mut c_void, + register_scalar_function, + register_aggregate_function, + register_vtab_module, + #[cfg(feature = "fs")] + vfs_interface: turso_ext::VfsInterface { + register_vfs: dynamic::register_vfs, + builtin_vfs: std::ptr::null_mut(), + builtin_vfs_count: 0, + }, + }; + + #[cfg(feature = "uuid")] + crate::uuid::register_extension(&mut ext_api); + #[cfg(feature = "series")] + crate::series::register_extension(&mut ext_api); + #[cfg(feature = "fs")] + { + let vfslist = add_builtin_vfs_extensions(Some(ext_api)).map_err(|e| e.to_string())?; + for (name, vfs) in vfslist { + add_vfs_module(name, vfs); + } + } + let _ = unsafe { Box::from_raw(ctx) }; + Ok(()) + } } impl Connection { - fn register_scalar_function_impl(&self, name: &str, func: ScalarFunction) -> ResultCode { - self.syms.borrow_mut().functions.insert( - name.to_string(), - Rc::new(ExternalFunc::new_scalar(name.to_string(), func)), - ); - ResultCode::OK - } - - fn register_aggregate_function_impl( - &self, - name: &str, - args: i32, - func: ExternAggFunc, - ) -> ResultCode { - self.syms.borrow_mut().functions.insert( - name.to_string(), - Rc::new(ExternalFunc::new_aggregate(name.to_string(), args, func)), - ); - ResultCode::OK - } - - fn register_vtab_module_impl( - &mut self, - name: &str, - module: VTabModuleImpl, - kind: VTabKind, - ) -> ResultCode { - let module = Rc::new(module); - let vmodule = VTabImpl { - module_kind: kind, - implementation: module, + /// # Safety: + /// Only to be used when registering a staticly linked extension manually. + /// you probably want to use `Connection::load_extension(path)`. + /// Do not call if you have multiple connection open. + pub fn _build_turso_ext(&self) -> ExtensionApi { + let schema_ptr = self.schema.as_ptr(); + let schema_direct = unsafe { Arc::as_ptr(&*schema_ptr) as *mut Schema }; + let ctx = ExtensionCtx { + syms: self.syms.as_ptr(), + schema_data: schema_direct as *mut c_void, + schema_handler: handle_schema_insert_connection, }; - self.syms - .borrow_mut() - .vtab_modules - .insert(name.to_string(), vmodule.into()); - if kind == VTabKind::TableValuedFunction { - if let Ok(vtab) = VirtualTable::function(name, &self.syms.borrow()) { - self.with_schema_mut(|schema| schema.add_virtual_table(vtab)); - } else { - return ResultCode::Error; - } - } - ResultCode::OK - } - pub fn build_turso_ext(&self) -> ExtensionApi { + let ctx = Box::into_raw(Box::new(ctx)) as *mut c_void; ExtensionApi { - ctx: self as *const _ as *mut c_void, + ctx, register_scalar_function, register_aggregate_function, register_vtab_module, @@ -174,20 +282,13 @@ impl Connection { } } - pub fn register_builtins(&self) -> Result<(), String> { - #[allow(unused_variables)] - let mut ext_api = self.build_turso_ext(); - #[cfg(feature = "uuid")] - crate::uuid::register_extension(&mut ext_api); - #[cfg(feature = "series")] - crate::series::register_extension(&mut ext_api); - #[cfg(feature = "fs")] - { - let vfslist = add_builtin_vfs_extensions(Some(ext_api)).map_err(|e| e.to_string())?; - for (name, vfs) in vfslist { - add_vfs_module(name, vfs); - } + /// # Safety: + /// Only to be used if you have previously called Connection::build_turso_ext + /// before registering an extension manually. + pub unsafe fn _free_extension_ctx(&self, api: ExtensionApi) { + if api.ctx.is_null() { + return; } - Ok(()) + let _ = unsafe { Box::from_raw(api.ctx as *mut ExtensionCtx) }; } } From 411c4f059ae1d5c01d6a9da4804f94a0466aa28a Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 21 Jul 2025 19:09:31 -0400 Subject: [PATCH 2/6] Load compile time extensions on the initial db startup instead of once per conn --- core/ext/dynamic.rs | 2 +- core/lib.rs | 45 ++++++++++++++++++++++++------------------- core/storage/pager.rs | 14 +++++++------- core/vdbe/execute.rs | 3 +-- 4 files changed, 34 insertions(+), 30 deletions(-) diff --git a/core/ext/dynamic.rs b/core/ext/dynamic.rs index 60f76547f..d4d6f7cf6 100644 --- a/core/ext/dynamic.rs +++ b/core/ext/dynamic.rs @@ -35,7 +35,7 @@ impl Connection { ) -> crate::Result<()> { use turso_ext::ExtensionApiRef; - let api = Box::new(self.build_turso_ext()); + let api = Box::new(self._build_turso_ext()); let lib = unsafe { Library::new(path).map_err(|e| LimboError::ExtensionError(e.to_string()))? }; let entry: Symbol = unsafe { diff --git a/core/lib.rs b/core/lib.rs index 331e0384a..d47c56047 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -59,10 +59,9 @@ pub use io::{ Buffer, Completion, CompletionType, File, MemoryIO, OpenFlags, PlatformIO, SyscallIO, WriteCompletion, IO, }; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use schema::Schema; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Mutex; use std::{ borrow::Cow, cell::{Cell, RefCell, UnsafeCell}, @@ -121,6 +120,7 @@ pub struct Database { db_state: Arc, init_lock: Arc>, open_flags: OpenFlags, + builtin_syms: RefCell, } unsafe impl Send for Database {} @@ -149,7 +149,7 @@ impl fmt::Debug for Database { }; debug_struct.field("mv_store", &mv_store_status); - let init_lock_status = if self.init_lock.try_lock().is_ok() { + let init_lock_status = if self.init_lock.try_lock().is_some() { "unlocked" } else { "locked" @@ -245,7 +245,7 @@ impl Database { }; let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::default())); - + let syms = SymbolTable::new(); let db = Arc::new(Database { mv_store, path: path.to_string(), @@ -253,11 +253,14 @@ impl Database { _shared_page_cache: shared_page_cache.clone(), maybe_shared_wal: RwLock::new(maybe_shared_wal), db_file, + builtin_syms: syms.into(), io: io.clone(), open_flags: flags, db_state: Arc::new(AtomicUsize::new(db_state)), init_lock: Arc::new(Mutex::new(())), }); + db.register_global_builtin_extensions() + .expect("unable to register global extensions"); // Check: https://github.com/tursodatabase/turso/pull/1761#discussion_r2154013123 if db_state == DB_STATE_INITIALIZED { @@ -294,12 +297,7 @@ impl Database { let conn = Arc::new(Connection { _db: self.clone(), pager: RefCell::new(Rc::new(pager)), - schema: RefCell::new( - self.schema - .lock() - .map_err(|_| LimboError::SchemaLocked)? - .clone(), - ), + schema: RefCell::new(self.schema.lock().clone()), auto_commit: Cell::new(true), mv_transactions: RefCell::new(Vec::new()), transaction_state: Cell::new(TransactionState::None), @@ -315,10 +313,9 @@ impl Database { capture_data_changes: RefCell::new(CaptureDataChangesMode::Off), closed: Cell::new(false), }); - - if let Err(e) = conn.register_builtins() { - return Err(LimboError::ExtensionError(e)); - } + let builtin_syms = self.builtin_syms.borrow(); + // add built-in extensions symbols to the connection to prevent having to load each time + conn.syms.borrow_mut().extend(&builtin_syms); Ok(conn) } @@ -443,7 +440,7 @@ impl Database { #[inline] pub fn with_schema_mut(&self, f: impl FnOnce(&mut Schema) -> Result) -> Result { - let mut schema_ref = self.schema.lock().map_err(|_| LimboError::SchemaLocked)?; + let mut schema_ref = self.schema.try_lock().ok_or(LimboError::SchemaLocked)?; let schema = Arc::make_mut(&mut *schema_ref); f(schema) } @@ -789,11 +786,7 @@ impl Connection { pub fn maybe_update_schema(&self) -> Result<()> { let current_schema_version = self.schema.borrow().schema_version; - let schema = self - ._db - .schema - .lock() - .map_err(|_| LimboError::SchemaLocked)?; + let schema = self._db.schema.try_lock().ok_or(LimboError::SchemaLocked)?; if matches!(self.transaction_state.get(), TransactionState::None) && current_schema_version < schema.schema_version { @@ -1222,6 +1215,18 @@ impl SymbolTable { ) -> Option> { self.functions.get(name).cloned() } + + pub fn extend(&mut self, other: &SymbolTable) { + for (name, func) in &other.functions { + self.functions.insert(name.clone(), func.clone()); + } + for (name, vtab) in &other.vtabs { + self.vtabs.insert(name.clone(), vtab.clone()); + } + for (name, module) in &other.vtab_modules { + self.vtab_modules.insert(name.clone(), module.clone()); + } + } } pub struct QueryRunner<'a> { diff --git a/core/storage/pager.rs b/core/storage/pager.rs index bb3829a54..2b5aa682c 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -9,12 +9,12 @@ use crate::types::IOResult; use crate::util::IOExt as _; use crate::{return_if_io, Completion}; use crate::{turso_assert, Buffer, Connection, LimboError, Result}; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use std::cell::{Cell, OnceCell, RefCell, UnsafeCell}; use std::collections::HashSet; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use tracing::{instrument, trace, Level}; use super::btree::{btree_init_page, BTreePage}; @@ -680,7 +680,7 @@ impl Pager { #[instrument(skip_all, level = Level::DEBUG)] pub fn maybe_allocate_page1(&self) -> Result> { if self.db_state.load(Ordering::SeqCst) < DB_STATE_INITIALIZED { - if let Ok(_lock) = self.init_lock.try_lock() { + if let Some(_lock) = self.init_lock.try_lock() { match ( self.db_state.load(Ordering::SeqCst), self.allocating_page1(), @@ -738,8 +738,8 @@ impl Pager { *connection ._db .schema - .lock() - .map_err(|_| LimboError::SchemaLocked)? = schema; + .try_lock() + .ok_or(LimboError::SchemaLocked)? = schema; } Ok(commit_status) } @@ -1360,8 +1360,8 @@ impl Pager { connection ._db .schema - .lock() - .map_err(|_| LimboError::SchemaLocked)? + .try_lock() + .ok_or(LimboError::SchemaLocked)? .clone(), ); } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 5c6671f76..cf670a84d 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -31,7 +31,6 @@ use crate::{ }; use std::ops::DerefMut; use std::sync::atomic::AtomicUsize; -use std::sync::Mutex; use std::{borrow::BorrowMut, rc::Rc, sync::Arc}; use crate::{pseudo::PseudoCursor, result::LimboResult}; @@ -62,7 +61,7 @@ use super::{ CommitState, }; use fallible_iterator::FallibleIterator; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use rand::{thread_rng, Rng}; use turso_sqlite3_parser::ast; use turso_sqlite3_parser::ast::fmt::ToTokens; From 5e5b3ce071f888f735e80bf79eb5c2cab3d397f1 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 21 Jul 2025 19:09:58 -0400 Subject: [PATCH 3/6] Fix leak of extension CTX in cli/app --- cli/app.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/cli/app.rs b/cli/app.rs index 121cd0c4e..98d80af5a 100644 --- a/cli/app.rs +++ b/cli/app.rs @@ -138,10 +138,13 @@ impl Limbo { (io, conn) }; let mut ext_api = conn.build_turso_ext(); - if unsafe { !limbo_completion::register_extension_static(&mut ext_api).is_ok() } { - return Err(anyhow!( - "Failed to register completion extension".to_string() - )); + unsafe { + if !limbo_completion::register_extension_static(&mut ext_api).is_ok() { + return Err(anyhow!( + "Failed to register completion extension".to_string() + )); + } + conn._free_extension_ctx(ext_api); } let interrupt_count = Arc::new(AtomicUsize::new(0)); { From f7ba8efdbd8a5d39d9ba035da006d86024c3c749 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 21 Jul 2025 19:20:17 -0400 Subject: [PATCH 4/6] Switch back to std::mutex because it was an unnecessary change --- cli/app.rs | 2 +- core/ext/mod.rs | 7 ++++--- core/lib.rs | 21 +++++++++++++++------ core/storage/pager.rs | 14 +++++++------- core/vdbe/execute.rs | 8 ++++++-- 5 files changed, 33 insertions(+), 19 deletions(-) diff --git a/cli/app.rs b/cli/app.rs index 98d80af5a..4bb389384 100644 --- a/cli/app.rs +++ b/cli/app.rs @@ -137,8 +137,8 @@ impl Limbo { let conn = db.connect()?; (io, conn) }; - let mut ext_api = conn.build_turso_ext(); unsafe { + let mut ext_api = conn._build_turso_ext(); if !limbo_completion::register_extension_static(&mut ext_api).is_ok() { return Err(anyhow!( "Failed to register completion extension".to_string() diff --git a/core/ext/mod.rs b/core/ext/mod.rs index da723e8dd..517ca4090 100644 --- a/core/ext/mod.rs +++ b/core/ext/mod.rs @@ -8,11 +8,10 @@ use crate::{function::ExternalFunc, Connection, Database, LimboError, IO}; use crate::{vtab::VirtualTable, SymbolTable}; #[cfg(feature = "fs")] pub use dynamic::{add_builtin_vfs_extensions, add_vfs_module, list_vfs_modules, VfsMod}; -use parking_lot::Mutex; use std::{ ffi::{c_char, c_void, CStr, CString}, rc::Rc, - sync::Arc, + sync::{Arc, Mutex}, }; use turso_ext::{ ExtensionApi, InitAggFunction, ResultCode, ScalarFunction, VTabKind, VTabModuleImpl, @@ -61,7 +60,9 @@ unsafe extern "C" fn handle_schema_insert_database( table: *mut c_void, ) -> ResultCode { let mutex = &*(schema_data as *mut Mutex>); - let mut guard = mutex.lock(); + let Ok(mut guard) = mutex.lock() else { + return ResultCode::Error; + }; let schema = Arc::make_mut(&mut *guard); let c_str = CStr::from_ptr(table_name); diff --git a/core/lib.rs b/core/lib.rs index d47c56047..a4c74dc47 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -59,7 +59,7 @@ pub use io::{ Buffer, Completion, CompletionType, File, MemoryIO, OpenFlags, PlatformIO, SyscallIO, WriteCompletion, IO, }; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use schema::Schema; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{ @@ -71,7 +71,7 @@ use std::{ num::NonZero, ops::Deref, rc::Rc, - sync::Arc, + sync::{Arc, Mutex}, }; #[cfg(feature = "fs")] use storage::database::DatabaseFile; @@ -149,7 +149,7 @@ impl fmt::Debug for Database { }; debug_struct.field("mv_store", &mv_store_status); - let init_lock_status = if self.init_lock.try_lock().is_some() { + let init_lock_status = if self.init_lock.lock().is_ok() { "unlocked" } else { "locked" @@ -297,7 +297,12 @@ impl Database { let conn = Arc::new(Connection { _db: self.clone(), pager: RefCell::new(Rc::new(pager)), - schema: RefCell::new(self.schema.lock().clone()), + schema: RefCell::new( + self.schema + .lock() + .map_err(|_| LimboError::SchemaLocked)? + .clone(), + ), auto_commit: Cell::new(true), mv_transactions: RefCell::new(Vec::new()), transaction_state: Cell::new(TransactionState::None), @@ -440,7 +445,7 @@ impl Database { #[inline] pub fn with_schema_mut(&self, f: impl FnOnce(&mut Schema) -> Result) -> Result { - let mut schema_ref = self.schema.try_lock().ok_or(LimboError::SchemaLocked)?; + let mut schema_ref = self.schema.lock().map_err(|_| LimboError::SchemaLocked)?; let schema = Arc::make_mut(&mut *schema_ref); f(schema) } @@ -786,7 +791,11 @@ impl Connection { pub fn maybe_update_schema(&self) -> Result<()> { let current_schema_version = self.schema.borrow().schema_version; - let schema = self._db.schema.try_lock().ok_or(LimboError::SchemaLocked)?; + let schema = self + ._db + .schema + .lock() + .map_err(|_| LimboError::SchemaLocked)?; if matches!(self.transaction_state.get(), TransactionState::None) && current_schema_version < schema.schema_version { diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 2b5aa682c..aca164cb3 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -9,12 +9,12 @@ use crate::types::IOResult; use crate::util::IOExt as _; use crate::{return_if_io, Completion}; use crate::{turso_assert, Buffer, Connection, LimboError, Result}; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use std::cell::{Cell, OnceCell, RefCell, UnsafeCell}; use std::collections::HashSet; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use tracing::{instrument, trace, Level}; use super::btree::{btree_init_page, BTreePage}; @@ -680,7 +680,7 @@ impl Pager { #[instrument(skip_all, level = Level::DEBUG)] pub fn maybe_allocate_page1(&self) -> Result> { if self.db_state.load(Ordering::SeqCst) < DB_STATE_INITIALIZED { - if let Some(_lock) = self.init_lock.try_lock() { + if let Ok(_lock) = self.init_lock.lock() { match ( self.db_state.load(Ordering::SeqCst), self.allocating_page1(), @@ -738,8 +738,8 @@ impl Pager { *connection ._db .schema - .try_lock() - .ok_or(LimboError::SchemaLocked)? = schema; + .lock() + .map_err(|_| LimboError::SchemaLocked)? = schema; } Ok(commit_status) } @@ -1360,8 +1360,8 @@ impl Pager { connection ._db .schema - .try_lock() - .ok_or(LimboError::SchemaLocked)? + .lock() + .map_err(|_| LimboError::SchemaLocked)? .clone(), ); } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index cf670a84d..af396384b 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -31,7 +31,11 @@ use crate::{ }; use std::ops::DerefMut; use std::sync::atomic::AtomicUsize; -use std::{borrow::BorrowMut, rc::Rc, sync::Arc}; +use std::{ + borrow::BorrowMut, + rc::Rc, + sync::{Arc, Mutex}, +}; use crate::{pseudo::PseudoCursor, result::LimboResult}; @@ -61,7 +65,7 @@ use super::{ CommitState, }; use fallible_iterator::FallibleIterator; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use rand::{thread_rng, Rng}; use turso_sqlite3_parser::ast; use turso_sqlite3_parser::ast::fmt::ToTokens; From d514304ac2586c231ca7fde9ce21b327f718d042 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 21 Jul 2025 19:24:24 -0400 Subject: [PATCH 5/6] Remove unneeded changes --- core/lib.rs | 2 +- core/storage/pager.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index a4c74dc47..aa139b319 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -149,7 +149,7 @@ impl fmt::Debug for Database { }; debug_struct.field("mv_store", &mv_store_status); - let init_lock_status = if self.init_lock.lock().is_ok() { + let init_lock_status = if self.init_lock.try_lock().is_ok() { "unlocked" } else { "locked" diff --git a/core/storage/pager.rs b/core/storage/pager.rs index aca164cb3..bb3829a54 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -680,7 +680,7 @@ impl Pager { #[instrument(skip_all, level = Level::DEBUG)] pub fn maybe_allocate_page1(&self) -> Result> { if self.db_state.load(Ordering::SeqCst) < DB_STATE_INITIALIZED { - if let Ok(_lock) = self.init_lock.lock() { + if let Ok(_lock) = self.init_lock.try_lock() { match ( self.db_state.load(Ordering::SeqCst), self.allocating_page1(), From a92126961dc829c981d7e2717053900fb278b98a Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 21 Jul 2025 20:02:09 -0400 Subject: [PATCH 6/6] Remove duplicate case and just send Mutex along with schema for extension registrations --- core/ext/dynamic.rs | 2 +- core/ext/mod.rs | 106 +++++++++++++------------------------------- 2 files changed, 31 insertions(+), 77 deletions(-) diff --git a/core/ext/dynamic.rs b/core/ext/dynamic.rs index d4d6f7cf6..851e38b67 100644 --- a/core/ext/dynamic.rs +++ b/core/ext/dynamic.rs @@ -35,7 +35,7 @@ impl Connection { ) -> crate::Result<()> { use turso_ext::ExtensionApiRef; - let api = Box::new(self._build_turso_ext()); + let api = Box::new(unsafe { self._build_turso_ext() }); let lib = unsafe { Library::new(path).map_err(|e| LimboError::ExtensionError(e.to_string()))? }; let entry: Symbol = unsafe { diff --git a/core/ext/mod.rs b/core/ext/mod.rs index 517ca4090..52761741b 100644 --- a/core/ext/mod.rs +++ b/core/ext/mod.rs @@ -24,55 +24,7 @@ pub use vtab_xconnect::{close, execute, prepare_stmt}; #[repr(C)] pub struct ExtensionCtx { syms: *mut SymbolTable, - schema_data: *mut c_void, - schema_handler: SchemaHandler, -} - -type SchemaHandler = unsafe extern "C" fn( - schema_data: *mut c_void, - table_name: *const c_char, - table: *mut c_void, -) -> ResultCode; - -/// Handler for our Connection that has direct Arc access -/// to register a table from an extension. -unsafe extern "C" fn handle_schema_insert_connection( - schema_data: *mut c_void, - table_name: *const c_char, - table: *mut c_void, -) -> ResultCode { - let schema = &mut *(schema_data as *mut Schema); - let c_str = CStr::from_ptr(table_name); - let name = match c_str.to_str() { - Ok(s) => s.to_string(), - Err(_) => return ResultCode::InvalidArgs, - }; - let table = Box::from_raw(table as *mut Arc
); - schema.tables.insert(name, *table); - ResultCode::OK -} - -/// Handler for Database with Mutex> access to -/// register a table from an extension. -unsafe extern "C" fn handle_schema_insert_database( - schema_data: *mut c_void, - table_name: *const c_char, - table: *mut c_void, -) -> ResultCode { - let mutex = &*(schema_data as *mut Mutex>); - let Ok(mut guard) = mutex.lock() else { - return ResultCode::Error; - }; - let schema = Arc::make_mut(&mut *guard); - - let c_str = CStr::from_ptr(table_name); - let name = match c_str.to_str() { - Ok(s) => s.to_string(), - Err(_) => return ResultCode::InvalidArgs, - }; - let table = Box::from_raw(table as *mut Arc
); - schema.tables.insert(name, *table); - ResultCode::OK + schema: *mut c_void, } pub(crate) unsafe extern "C" fn register_vtab_module( @@ -105,21 +57,13 @@ pub(crate) unsafe extern "C" fn register_vtab_module( if kind == VTabKind::TableValuedFunction { if let Ok(vtab) = VirtualTable::function(&name_str, syms) { // Use the schema handler to insert the table - let table = Box::into_raw(Box::new(Arc::new(Table::Virtual(vtab)))); - let c_name = match CString::new(name_str) { - Ok(s) => s, - Err(_) => return ResultCode::Error, + let table = Arc::new(Table::Virtual(vtab)); + let mutex = &*(ext_ctx.schema as *mut Mutex>); + let Ok(guard) = mutex.lock() else { + return ResultCode::Error; }; - - let result = (ext_ctx.schema_handler)( - ext_ctx.schema_data, - c_name.as_ptr(), - table as *mut c_void, - ); - if result != ResultCode::OK { - let _ = Box::from_raw(table); - return result; - } + let schema_ptr = Arc::as_ptr(&*guard) as *mut Schema; + (*schema_ptr).tables.insert(name_str, table); } else { return ResultCode::Error; } @@ -222,8 +166,7 @@ impl Database { let schema_mutex_ptr = &self.schema as *const Mutex> as *mut Mutex>; let ctx = Box::into_raw(Box::new(ExtensionCtx { syms, - schema_data: schema_mutex_ptr as *mut c_void, - schema_handler: handle_schema_insert_database, + schema: schema_mutex_ptr as *mut c_void, })); let mut ext_api = ExtensionApi { ctx: ctx as *mut c_void, @@ -255,19 +198,30 @@ impl Database { } impl Connection { - /// # Safety: - /// Only to be used when registering a staticly linked extension manually. + /// Build the connection's extension api context for manually registering an extension. /// you probably want to use `Connection::load_extension(path)`. - /// Do not call if you have multiple connection open. - pub fn _build_turso_ext(&self) -> ExtensionApi { - let schema_ptr = self.schema.as_ptr(); - let schema_direct = unsafe { Arc::as_ptr(&*schema_ptr) as *mut Schema }; + /// + /// # Safety + /// Only to be used when registering a staticly linked extension manually. + /// You should only ever call this method on your applications startup, + /// The caller is responsible for calling `_free_extension_ctx` after registering the + /// extension. + /// + /// usage: + /// ```ignore + /// let ext_api = conn._build_turso_ext(); + /// unsafe { + /// my_extension::register_extension(&mut ext_api); + /// conn._free_extension_ctx(ext_api); + /// } + ///``` + pub unsafe fn _build_turso_ext(&self) -> ExtensionApi { + let schema_mutex_ptr = + &self._db.schema as *const Mutex> as *mut Mutex>; let ctx = ExtensionCtx { syms: self.syms.as_ptr(), - schema_data: schema_direct as *mut c_void, - schema_handler: handle_schema_insert_connection, + schema: schema_mutex_ptr as *mut c_void, }; - let ctx = Box::into_raw(Box::new(ctx)) as *mut c_void; ExtensionApi { ctx, @@ -283,9 +237,9 @@ impl Connection { } } - /// # Safety: + /// Free the connection's extension libary context after registering an extension manually. + /// # Safety /// Only to be used if you have previously called Connection::build_turso_ext - /// before registering an extension manually. pub unsafe fn _free_extension_ctx(&self, api: ExtensionApi) { if api.ctx.is_null() { return;