From b94aa22499cedb94e96000de7de8cd59646a2986 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Mon, 22 Sep 2025 10:55:57 +0300 Subject: [PATCH] core: Wrap Connection::schema in RwLock --- core/incremental/cursor.rs | 2 +- core/lib.rs | 32 ++++++++++++++++---------------- core/mvcc/database/mod.rs | 2 +- core/storage/pager.rs | 4 ++-- core/vdbe/execute.rs | 22 +++++++++++----------- core/vdbe/mod.rs | 4 ++-- 6 files changed, 33 insertions(+), 33 deletions(-) diff --git a/core/incremental/cursor.rs b/core/incremental/cursor.rs index 963d813dd..20bce4205 100644 --- a/core/incremental/cursor.rs +++ b/core/incremental/cursor.rs @@ -342,7 +342,7 @@ mod tests { // Get the schema and view let view_mutex = conn .schema - .borrow() + .read() .get_materialized_view("test_view") .ok_or(crate::LimboError::InternalError( "View not found".to_string(), diff --git a/core/lib.rs b/core/lib.rs index f4dd9afe1..957445bc9 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -493,7 +493,7 @@ impl Database { let conn = Arc::new(Connection { db: self.clone(), pager: RwLock::new(Arc::new(pager)), - schema: RefCell::new( + schema: RwLock::new( self.schema .lock() .map_err(|_| LimboError::SchemaLocked)? @@ -980,7 +980,7 @@ impl DatabaseCatalog { pub struct Connection { db: Arc, pager: RwLock>, - schema: RefCell>, + schema: RwLock>, /// Per-database schema cache (database_index -> schema) /// Loaded lazily to avoid copying all schemas on connection open database_schemas: RefCell>>, @@ -1061,7 +1061,7 @@ impl Connection { let mode = QueryMode::new(&cmd); let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd; let program = translate::translate( - self.schema.borrow().deref(), + self.schema.read().deref(), stmt, pager.clone(), self.clone(), @@ -1141,7 +1141,7 @@ impl Connection { reparse_result?; - let schema = self.schema.borrow().clone(); + let schema = self.schema.read().clone(); self.db.update_schema_if_newer(schema) } @@ -1155,12 +1155,12 @@ impl Connection { .get(); // create fresh schema as some objects can be deleted - let mut fresh = Schema::new(self.schema.borrow().indexes_enabled); + let mut fresh = Schema::new(self.schema.read().indexes_enabled); fresh.schema_version = cookie; // Preserve existing views to avoid expensive repopulation. // TODO: We may not need to do this if we materialize our views. - let existing_views = self.schema.borrow().incremental_views.clone(); + let existing_views = self.schema.read().incremental_views.clone(); // TODO: this is hack to avoid a cyclical problem with schema reprepare // The problem here is that we prepare a statement here, but when the statement tries @@ -1218,7 +1218,7 @@ impl Connection { let mode = QueryMode::new(&cmd); let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd; let program = translate::translate( - self.schema.borrow().deref(), + self.schema.read().deref(), stmt, pager.clone(), self.clone(), @@ -1266,7 +1266,7 @@ impl Connection { let mode = QueryMode::new(&cmd); let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd; let program = translate::translate( - self.schema.borrow().deref(), + self.schema.read().deref(), stmt, pager.clone(), self.clone(), @@ -1302,7 +1302,7 @@ impl Connection { let mode = QueryMode::new(&cmd); let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd; let program = translate::translate( - self.schema.borrow().deref(), + self.schema.read().deref(), stmt, pager.clone(), self.clone(), @@ -1411,7 +1411,7 @@ impl Connection { } pub fn maybe_update_schema(&self) -> Result<()> { - let current_schema_version = self.schema.borrow().schema_version; + let current_schema_version = self.schema.read().schema_version; let schema = self .db .schema @@ -1420,7 +1420,7 @@ impl Connection { if matches!(self.transaction_state.get(), TransactionState::None) && current_schema_version != schema.schema_version { - self.schema.replace(schema.clone()); + *self.schema.write() = schema.clone(); } Ok(()) @@ -1879,7 +1879,7 @@ impl Connection { #[inline] pub fn with_schema_mut(&self, f: impl FnOnce(&mut Schema) -> T) -> T { - let mut schema_ref = self.schema.borrow_mut(); + let mut schema_ref = self.schema.write(); let schema = Arc::make_mut(&mut *schema_ref); f(schema) } @@ -2029,11 +2029,11 @@ impl Connection { pub(crate) fn with_schema(&self, database_id: usize, f: impl FnOnce(&Schema) -> T) -> T { if database_id == 0 { // Main database - use connection's schema which should be kept in sync - let schema = self.schema.borrow(); + let schema = self.schema.read(); f(&schema) } else if database_id == 1 { // Temp database - uses same schema as main for now, but this will change later. - let schema = self.schema.borrow(); + let schema = self.schema.read(); f(&schema) } else { // Attached database - check cache first, then load from database @@ -2430,7 +2430,7 @@ impl Statement { fn reprepare(&mut self) -> Result<()> { tracing::trace!("repreparing statement"); let conn = self.program.connection.clone(); - *conn.schema.borrow_mut() = conn.db.clone_schema()?; + *conn.schema.write() = conn.db.clone_schema()?; self.program = { let mut parser = Parser::new(self.program.sql.as_bytes()); let cmd = parser.next_cmd()?; @@ -2441,7 +2441,7 @@ impl Statement { debug_assert_eq!(QueryMode::new(&cmd), mode,); let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd; translate::translate( - conn.schema.borrow().deref(), + conn.schema.read().deref(), stmt, self.pager.clone(), conn.clone(), diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 1b2df0ebb..071c70cc8 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -869,7 +869,7 @@ impl StateTransition for CommitStateMachine { _ => false, }; if schema_did_change { - let schema = connection.schema.borrow().clone(); + let schema = connection.schema.read().clone(); connection.db.update_schema_if_newer(schema)?; } let tx = mvcc_store.txs.get(&self.tx_id).unwrap(); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index ce47f2c5d..fafffe31d 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1153,7 +1153,7 @@ impl Pager { wal.borrow().end_read_tx(); if schema_did_change { - let schema = connection.schema.borrow().clone(); + let schema = connection.schema.read().clone(); connection.db.update_schema_if_newer(schema)?; } Ok(IOResult::Done(commit_status)) @@ -2324,7 +2324,7 @@ impl Pager { } self.reset_internal_states(); if schema_did_change { - connection.schema.replace(connection.db.clone_schema()?); + *connection.schema.write() = connection.db.clone_schema()?; } if is_write { if let Some(wal) = self.wal.as_ref() { diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 9fa0dc431..aa91b0ceb 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -4817,7 +4817,7 @@ pub fn op_function( )); }; let table = { - let schema = program.connection.schema.borrow(); + let schema = program.connection.schema.read(); match schema.get_table(table.as_str()) { Some(table) => table, None => { @@ -5483,7 +5483,7 @@ pub fn op_insert( loop { match &state.op_insert_state.sub_state { OpInsertSubState::MaybeCaptureRecord => { - let schema = program.connection.schema.borrow(); + let schema = program.connection.schema.read(); let dependent_views = schema.get_dependent_materialized_views(table_name); // If there are no dependent views, we don't need to capture the old record. // We also don't need to do it if the rowid of the UPDATEd row was changed, because that means @@ -5593,7 +5593,7 @@ pub fn op_insert( if root_page != 1 { state.op_insert_state.sub_state = OpInsertSubState::UpdateLastRowid; } else { - let schema = program.connection.schema.borrow(); + let schema = program.connection.schema.read(); let dependent_views = schema.get_dependent_materialized_views(table_name); if !dependent_views.is_empty() { state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange; @@ -5614,7 +5614,7 @@ pub fn op_insert( let prev_changes = program.n_change.get(); program.n_change.set(prev_changes + 1); } - let schema = program.connection.schema.borrow(); + let schema = program.connection.schema.read(); let dependent_views = schema.get_dependent_materialized_views(table_name); if !dependent_views.is_empty() { state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange; @@ -5623,7 +5623,7 @@ pub fn op_insert( break; } OpInsertSubState::ApplyViewChange => { - let schema = program.connection.schema.borrow(); + let schema = program.connection.schema.read(); let dependent_views = schema.get_dependent_materialized_views(table_name); assert!(!dependent_views.is_empty()); @@ -5658,7 +5658,7 @@ pub fn op_insert( .collect::>(); // Fix rowid alias columns: replace Null with actual rowid value - let schema = program.connection.schema.borrow(); + let schema = program.connection.schema.read(); if let Some(table) = schema.get_table(table_name) { for (i, col) in table.columns().iter().enumerate() { if col.is_rowid_alias && i < new_values.len() { @@ -5751,7 +5751,7 @@ pub fn op_delete( loop { match &state.op_delete_state.sub_state { OpDeleteSubState::MaybeCaptureRecord => { - let schema = program.connection.schema.borrow(); + let schema = program.connection.schema.read(); let dependent_views = schema.get_dependent_materialized_views(table_name); if dependent_views.is_empty() { state.op_delete_state.sub_state = OpDeleteSubState::Delete; @@ -5800,7 +5800,7 @@ pub fn op_delete( } // Increment metrics for row write (DELETE is a write operation) state.metrics.rows_written = state.metrics.rows_written.saturating_add(1); - let schema = program.connection.schema.borrow(); + let schema = program.connection.schema.read(); let dependent_views = schema.get_dependent_materialized_views(table_name); if dependent_views.is_empty() { break; @@ -5809,7 +5809,7 @@ pub fn op_delete( continue; } OpDeleteSubState::ApplyViewChange => { - let schema = program.connection.schema.borrow(); + let schema = program.connection.schema.read(); let dependent_views = schema.get_dependent_materialized_views(table_name); assert!(!dependent_views.is_empty()); let maybe_deleted_record = state.op_delete_state.deleted_record.take(); @@ -6509,7 +6509,7 @@ pub fn op_open_write( }; if let Some(index) = maybe_index { let conn = program.connection.clone(); - let schema = conn.schema.borrow(); + let schema = conn.schema.read(); let table = schema .get_table(&index.table_name) .and_then(|table| table.btree()); @@ -6868,7 +6868,7 @@ pub fn op_populate_materialized_views( // Now populate the views (after releasing the schema borrow) for (view_name, _root_page, cursor_id) in view_info { - let schema = conn.schema.borrow(); + let schema = conn.schema.read(); if let Some(view) = schema.get_materialized_view(&view_name) { let mut view = view.lock().unwrap(); // Drop the schema borrow before calling populate_from_table diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 203fb1d55..6b4f2223d 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -718,7 +718,7 @@ impl Program { } // Not a rollback - proceed with processing - let schema = self.connection.schema.borrow(); + let schema = self.connection.schema.read(); // Collect materialized views - they should all have storage let mut views = Vec::new(); @@ -764,7 +764,7 @@ impl Program { .unwrap() .get_table_deltas(); - let schema = self.connection.schema.borrow(); + let schema = self.connection.schema.read(); if let Some(view_mutex) = schema.get_materialized_view(view_name) { let mut view = view_mutex.lock().unwrap();