Merge 'core: Wrap symbol table with RwLock' from Pekka Enberg

Make it Send.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #3168
This commit is contained in:
Pekka Enberg
2025-09-17 11:47:40 +03:00
committed by GitHub
3 changed files with 21 additions and 24 deletions

View File

@@ -162,7 +162,7 @@ impl Database {
/// Register any built-in extensions that can be stored on the Database so we do not have
/// to register these once-per-connection, and the connection can just extend its symbol table
pub fn register_global_builtin_extensions(&self) -> Result<(), String> {
let syms = self.builtin_syms.as_ptr();
let syms = self.builtin_syms.data_ptr();
// Pass the mutex pointer and the appropriate handler
let schema_mutex_ptr = &self.schema as *const Mutex<Arc<Schema>> as *mut Mutex<Arc<Schema>>;
let ctx = Box::into_raw(Box::new(ExtensionCtx {
@@ -221,7 +221,7 @@ impl Connection {
let schema_mutex_ptr =
&self._db.schema as *const Mutex<Arc<Schema>> as *mut Mutex<Arc<Schema>>;
let ctx = ExtensionCtx {
syms: self.syms.as_ptr(),
syms: self.syms.data_ptr(),
schema: schema_mutex_ptr as *mut c_void,
};
let ctx = Box::into_raw(Box::new(ctx)) as *mut c_void;

View File

@@ -203,7 +203,7 @@ pub struct Database {
db_state: Arc<AtomicDbState>,
init_lock: Arc<Mutex<()>>,
open_flags: OpenFlags,
builtin_syms: RefCell<SymbolTable>,
builtin_syms: RwLock<SymbolTable>,
opts: DatabaseOpts,
n_connections: AtomicUsize,
}
@@ -445,7 +445,7 @@ impl Database {
// parse schema
let conn = db.connect()?;
let syms = conn.syms.borrow();
let syms = conn.syms.read();
let pager = conn.pager.borrow().clone();
if let Some(encryption_opts) = encryption_opts {
@@ -502,7 +502,7 @@ impl Database {
last_insert_rowid: Cell::new(0),
last_change: Cell::new(0),
total_changes: Cell::new(0),
syms: RefCell::new(SymbolTable::new()),
syms: RwLock::new(SymbolTable::new()),
_shared_cache: false,
cache_size: Cell::new(default_cache_size),
page_size: Cell::new(page_size),
@@ -523,9 +523,9 @@ impl Database {
});
self.n_connections
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let builtin_syms = self.builtin_syms.borrow();
let builtin_syms = self.builtin_syms.read();
// add built-in extensions symbols to the connection to prevent having to load each time
conn.syms.borrow_mut().extend(&builtin_syms);
conn.syms.write().extend(&builtin_syms);
Ok(conn)
}
@@ -983,7 +983,7 @@ pub struct Connection {
last_insert_rowid: Cell<i64>,
last_change: Cell<i64>,
total_changes: Cell<i64>,
syms: RefCell<SymbolTable>,
syms: RwLock<SymbolTable>,
_shared_cache: bool,
cache_size: Cell<i32>,
/// page size used for an uninitialized database or the next vacuum command.
@@ -1042,7 +1042,7 @@ impl Connection {
tracing::trace!("Preparing: {}", sql);
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next_cmd()?;
let syms = self.syms.borrow();
let syms = self.syms.read();
let cmd = cmd.expect("Successful parse on nonempty input string should produce a command");
let byte_offset_end = parser.offset();
let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end])
@@ -1166,7 +1166,7 @@ impl Connection {
let stmt = self.prepare("SELECT * FROM sqlite_schema")?;
// TODO: This function below is synchronous, make it async
parse_schema_rows(stmt, &mut fresh, &self.syms.borrow(), None, existing_views)?;
parse_schema_rows(stmt, &mut fresh, &self.syms.read(), None, existing_views)?;
tracing::debug!(
"reparse_schema: schema_version={}, tables={:?}",
@@ -1194,7 +1194,7 @@ impl Connection {
tracing::trace!("Preparing and executing batch: {}", sql);
let mut parser = Parser::new(sql.as_bytes());
while let Some(cmd) = parser.next_cmd()? {
let syms = self.syms.borrow();
let syms = self.syms.read();
let pager = self.pager.borrow().clone();
let byte_offset_end = parser.offset();
let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end])
@@ -1246,7 +1246,7 @@ impl Connection {
if self.closed.get() {
return Err(LimboError::InternalError("Connection closed".to_string()));
}
let syms = self.syms.borrow();
let syms = self.syms.read();
let pager = self.pager.borrow().clone();
let mode = QueryMode::new(&cmd);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
@@ -1278,7 +1278,7 @@ impl Connection {
self.maybe_update_schema()?;
let mut parser = Parser::new(sql.as_bytes());
while let Some(cmd) = parser.next_cmd()? {
let syms = self.syms.borrow();
let syms = self.syms.read();
let pager = self.pager.borrow().clone();
let byte_offset_end = parser.offset();
let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end])
@@ -1783,7 +1783,7 @@ impl Connection {
let rows = self
.query("SELECT * FROM sqlite_schema")?
.expect("query must be parsed to statement");
let syms = self.syms.borrow();
let syms = self.syms.read();
self.with_schema_mut(|schema| {
let existing_views = schema.incremental_views.clone();
if let Err(LimboError::ExtensionError(e)) =
@@ -2122,7 +2122,7 @@ impl Connection {
/// Creates a HashSet of modules that have been loaded
pub fn get_syms_vtab_mods(&self) -> std::collections::HashSet<String> {
self.syms.borrow().vtab_modules.keys().cloned().collect()
self.syms.read().vtab_modules.keys().cloned().collect()
}
pub fn set_encryption_key(&self, key: EncryptionKey) -> Result<()> {
@@ -2405,7 +2405,7 @@ impl Statement {
let cmd = parser.next_cmd()?;
let cmd = cmd.expect("Same SQL string should be able to be parsed");
let syms = conn.syms.borrow();
let syms = conn.syms.read();
let mode = self.query_mode;
debug_assert_eq!(QueryMode::new(&cmd), mode,);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;

View File

@@ -1078,12 +1078,9 @@ pub fn op_vcreate(
};
let conn = program.connection.clone();
let table =
crate::VirtualTable::table(Some(&table_name), &module_name, args, &conn.syms.borrow())?;
crate::VirtualTable::table(Some(&table_name), &module_name, args, &conn.syms.read())?;
{
conn.syms
.borrow_mut()
.vtabs
.insert(table_name, table.clone());
conn.syms.write().vtabs.insert(table_name, table.clone());
}
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
@@ -1261,7 +1258,7 @@ pub fn op_vdestroy(
load_insn!(VDestroy { db, table_name }, insn);
let conn = program.connection.clone();
{
let Some(vtab) = conn.syms.borrow_mut().vtabs.remove(table_name) else {
let Some(vtab) = conn.syms.write().vtabs.remove(table_name) else {
return Err(crate::LimboError::InternalError(
"Could not find Virtual Table to Destroy".to_string(),
));
@@ -6693,7 +6690,7 @@ pub fn op_parse_schema(
parse_schema_rows(
stmt,
schema,
&conn.syms.borrow(),
&conn.syms.read(),
program.connection.mv_tx.get(),
existing_views,
)
@@ -6708,7 +6705,7 @@ pub fn op_parse_schema(
parse_schema_rows(
stmt,
schema,
&conn.syms.borrow(),
&conn.syms.read(),
program.connection.mv_tx.get(),
existing_views,
)