Merge 'core: Wrap Connection::schema in RwLock' from Pekka Enberg

Closes #3261
This commit is contained in:
Pekka Enberg
2025-09-23 11:50:24 +03:00
committed by GitHub
6 changed files with 33 additions and 33 deletions

View File

@@ -342,7 +342,7 @@ mod tests {
// Get the schema and view
let view_mutex = conn
.schema
.borrow()
.read()
.get_materialized_view("test_view")
.ok_or(crate::LimboError::InternalError(
"View not found".to_string(),

View File

@@ -493,7 +493,7 @@ impl Database {
let conn = Arc::new(Connection {
db: self.clone(),
pager: RwLock::new(Arc::new(pager)),
schema: RefCell::new(
schema: RwLock::new(
self.schema
.lock()
.map_err(|_| LimboError::SchemaLocked)?
@@ -980,7 +980,7 @@ impl DatabaseCatalog {
pub struct Connection {
db: Arc<Database>,
pager: RwLock<Arc<Pager>>,
schema: RefCell<Arc<Schema>>,
schema: RwLock<Arc<Schema>>,
/// Per-database schema cache (database_index -> schema)
/// Loaded lazily to avoid copying all schemas on connection open
database_schemas: RefCell<std::collections::HashMap<usize, Arc<Schema>>>,
@@ -1061,7 +1061,7 @@ impl Connection {
let mode = QueryMode::new(&cmd);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
let program = translate::translate(
self.schema.borrow().deref(),
self.schema.read().deref(),
stmt,
pager.clone(),
self.clone(),
@@ -1141,7 +1141,7 @@ impl Connection {
reparse_result?;
let schema = self.schema.borrow().clone();
let schema = self.schema.read().clone();
self.db.update_schema_if_newer(schema)
}
@@ -1155,12 +1155,12 @@ impl Connection {
.get();
// create fresh schema as some objects can be deleted
let mut fresh = Schema::new(self.schema.borrow().indexes_enabled);
let mut fresh = Schema::new(self.schema.read().indexes_enabled);
fresh.schema_version = cookie;
// Preserve existing views to avoid expensive repopulation.
// TODO: We may not need to do this if we materialize our views.
let existing_views = self.schema.borrow().incremental_views.clone();
let existing_views = self.schema.read().incremental_views.clone();
// TODO: this is hack to avoid a cyclical problem with schema reprepare
// The problem here is that we prepare a statement here, but when the statement tries
@@ -1218,7 +1218,7 @@ impl Connection {
let mode = QueryMode::new(&cmd);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
let program = translate::translate(
self.schema.borrow().deref(),
self.schema.read().deref(),
stmt,
pager.clone(),
self.clone(),
@@ -1266,7 +1266,7 @@ impl Connection {
let mode = QueryMode::new(&cmd);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
let program = translate::translate(
self.schema.borrow().deref(),
self.schema.read().deref(),
stmt,
pager.clone(),
self.clone(),
@@ -1302,7 +1302,7 @@ impl Connection {
let mode = QueryMode::new(&cmd);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
let program = translate::translate(
self.schema.borrow().deref(),
self.schema.read().deref(),
stmt,
pager.clone(),
self.clone(),
@@ -1411,7 +1411,7 @@ impl Connection {
}
pub fn maybe_update_schema(&self) -> Result<()> {
let current_schema_version = self.schema.borrow().schema_version;
let current_schema_version = self.schema.read().schema_version;
let schema = self
.db
.schema
@@ -1420,7 +1420,7 @@ impl Connection {
if matches!(self.transaction_state.get(), TransactionState::None)
&& current_schema_version != schema.schema_version
{
self.schema.replace(schema.clone());
*self.schema.write() = schema.clone();
}
Ok(())
@@ -1879,7 +1879,7 @@ impl Connection {
#[inline]
pub fn with_schema_mut<T>(&self, f: impl FnOnce(&mut Schema) -> T) -> T {
let mut schema_ref = self.schema.borrow_mut();
let mut schema_ref = self.schema.write();
let schema = Arc::make_mut(&mut *schema_ref);
f(schema)
}
@@ -2029,11 +2029,11 @@ impl Connection {
pub(crate) fn with_schema<T>(&self, database_id: usize, f: impl FnOnce(&Schema) -> T) -> T {
if database_id == 0 {
// Main database - use connection's schema which should be kept in sync
let schema = self.schema.borrow();
let schema = self.schema.read();
f(&schema)
} else if database_id == 1 {
// Temp database - uses same schema as main for now, but this will change later.
let schema = self.schema.borrow();
let schema = self.schema.read();
f(&schema)
} else {
// Attached database - check cache first, then load from database
@@ -2430,7 +2430,7 @@ impl Statement {
fn reprepare(&mut self) -> Result<()> {
tracing::trace!("repreparing statement");
let conn = self.program.connection.clone();
*conn.schema.borrow_mut() = conn.db.clone_schema()?;
*conn.schema.write() = conn.db.clone_schema()?;
self.program = {
let mut parser = Parser::new(self.program.sql.as_bytes());
let cmd = parser.next_cmd()?;
@@ -2441,7 +2441,7 @@ impl Statement {
debug_assert_eq!(QueryMode::new(&cmd), mode,);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
translate::translate(
conn.schema.borrow().deref(),
conn.schema.read().deref(),
stmt,
self.pager.clone(),
conn.clone(),

View File

@@ -869,7 +869,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
_ => false,
};
if schema_did_change {
let schema = connection.schema.borrow().clone();
let schema = connection.schema.read().clone();
connection.db.update_schema_if_newer(schema)?;
}
let tx = mvcc_store.txs.get(&self.tx_id).unwrap();

View File

@@ -1153,7 +1153,7 @@ impl Pager {
wal.borrow().end_read_tx();
if schema_did_change {
let schema = connection.schema.borrow().clone();
let schema = connection.schema.read().clone();
connection.db.update_schema_if_newer(schema)?;
}
Ok(IOResult::Done(commit_status))
@@ -2324,7 +2324,7 @@ impl Pager {
}
self.reset_internal_states();
if schema_did_change {
connection.schema.replace(connection.db.clone_schema()?);
*connection.schema.write() = connection.db.clone_schema()?;
}
if is_write {
if let Some(wal) = self.wal.as_ref() {

View File

@@ -4817,7 +4817,7 @@ pub fn op_function(
));
};
let table = {
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
match schema.get_table(table.as_str()) {
Some(table) => table,
None => {
@@ -5483,7 +5483,7 @@ pub fn op_insert(
loop {
match &state.op_insert_state.sub_state {
OpInsertSubState::MaybeCaptureRecord => {
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
let dependent_views = schema.get_dependent_materialized_views(table_name);
// If there are no dependent views, we don't need to capture the old record.
// We also don't need to do it if the rowid of the UPDATEd row was changed, because that means
@@ -5593,7 +5593,7 @@ pub fn op_insert(
if root_page != 1 {
state.op_insert_state.sub_state = OpInsertSubState::UpdateLastRowid;
} else {
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
let dependent_views = schema.get_dependent_materialized_views(table_name);
if !dependent_views.is_empty() {
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
@@ -5614,7 +5614,7 @@ pub fn op_insert(
let prev_changes = program.n_change.get();
program.n_change.set(prev_changes + 1);
}
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
let dependent_views = schema.get_dependent_materialized_views(table_name);
if !dependent_views.is_empty() {
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
@@ -5623,7 +5623,7 @@ pub fn op_insert(
break;
}
OpInsertSubState::ApplyViewChange => {
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
let dependent_views = schema.get_dependent_materialized_views(table_name);
assert!(!dependent_views.is_empty());
@@ -5658,7 +5658,7 @@ pub fn op_insert(
.collect::<Vec<_>>();
// Fix rowid alias columns: replace Null with actual rowid value
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
if let Some(table) = schema.get_table(table_name) {
for (i, col) in table.columns().iter().enumerate() {
if col.is_rowid_alias && i < new_values.len() {
@@ -5751,7 +5751,7 @@ pub fn op_delete(
loop {
match &state.op_delete_state.sub_state {
OpDeleteSubState::MaybeCaptureRecord => {
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
let dependent_views = schema.get_dependent_materialized_views(table_name);
if dependent_views.is_empty() {
state.op_delete_state.sub_state = OpDeleteSubState::Delete;
@@ -5800,7 +5800,7 @@ pub fn op_delete(
}
// Increment metrics for row write (DELETE is a write operation)
state.metrics.rows_written = state.metrics.rows_written.saturating_add(1);
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
let dependent_views = schema.get_dependent_materialized_views(table_name);
if dependent_views.is_empty() {
break;
@@ -5809,7 +5809,7 @@ pub fn op_delete(
continue;
}
OpDeleteSubState::ApplyViewChange => {
let schema = program.connection.schema.borrow();
let schema = program.connection.schema.read();
let dependent_views = schema.get_dependent_materialized_views(table_name);
assert!(!dependent_views.is_empty());
let maybe_deleted_record = state.op_delete_state.deleted_record.take();
@@ -6507,7 +6507,7 @@ pub fn op_open_write(
};
if let Some(index) = maybe_index {
let conn = program.connection.clone();
let schema = conn.schema.borrow();
let schema = conn.schema.read();
let table = schema
.get_table(&index.table_name)
.and_then(|table| table.btree());
@@ -6866,7 +6866,7 @@ pub fn op_populate_materialized_views(
// Now populate the views (after releasing the schema borrow)
for (view_name, _root_page, cursor_id) in view_info {
let schema = conn.schema.borrow();
let schema = conn.schema.read();
if let Some(view) = schema.get_materialized_view(&view_name) {
let mut view = view.lock().unwrap();
// Drop the schema borrow before calling populate_from_table

View File

@@ -718,7 +718,7 @@ impl Program {
}
// Not a rollback - proceed with processing
let schema = self.connection.schema.borrow();
let schema = self.connection.schema.read();
// Collect materialized views - they should all have storage
let mut views = Vec::new();
@@ -764,7 +764,7 @@ impl Program {
.unwrap()
.get_table_deltas();
let schema = self.connection.schema.borrow();
let schema = self.connection.schema.read();
if let Some(view_mutex) = schema.get_materialized_view(view_name) {
let mut view = view_mutex.lock().unwrap();