From 4c77d771ff243b1a7d01ebef4d496491f96cc0ad Mon Sep 17 00:00:00 2001 From: "Levy A." Date: Wed, 9 Jul 2025 12:56:05 -0300 Subject: [PATCH 1/3] only copy schema on writes --- core/lib.rs | 36 +++++++++++++++++------------------- core/storage/pager.rs | 19 +++++++------------ core/util.rs | 2 ++ core/vdbe/execute.rs | 33 +++++++++++++-------------------- 4 files changed, 39 insertions(+), 51 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 747ff7a49..6faa064ea 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -110,7 +110,7 @@ pub(crate) type MvCursor = mvcc::cursor::ScanCursor; pub struct Database { mv_store: Option>, - schema: Arc>, + schema: RefCell>, db_file: Arc, path: String, io: Arc, @@ -197,11 +197,11 @@ impl Database { }; let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::default())); - let schema = Arc::new(RwLock::new(Schema::new(enable_indexes))); - let db = Database { + + let db = Arc::new(Database { mv_store, path: path.to_string(), - schema: schema.clone(), + schema: RefCell::new(Arc::new(Schema::new(enable_indexes))), _shared_page_cache: shared_page_cache.clone(), maybe_shared_wal: RwLock::new(maybe_shared_wal), db_file, @@ -209,19 +209,16 @@ impl Database { open_flags: flags, db_state: Arc::new(AtomicUsize::new(db_state)), init_lock: Arc::new(Mutex::new(())), - }; - let db = Arc::new(db); + }); // Check: https://github.com/tursodatabase/turso/pull/1761#discussion_r2154013123 if db_state == DB_STATE_INITIALIZED { // parse schema let conn = db.connect()?; - let schema_version = get_schema_version(&conn)?; - schema.write().schema_version = schema_version; - let mut schema = schema - .try_write() - .expect("lock on schema should succeed first try"); + let mut schema_ref = db.schema.borrow_mut(); + let schema = Arc::make_mut(&mut *schema_ref); + schema.schema_version = get_schema_version(&conn)?; let syms = conn.syms.borrow(); let pager = conn.pager.borrow().clone(); @@ -235,6 +232,7 @@ impl Database { Ok(db) } + #[instrument(skip_all, level = Level::INFO)] pub fn connect(self: &Arc) -> Result> { let pager = self.init_pager(None)?; @@ -246,7 +244,7 @@ impl Database { let conn = Arc::new(Connection { _db: self.clone(), pager: RefCell::new(Rc::new(pager)), - schema: RefCell::new(self.schema.read().clone()), + schema: RefCell::new(self.schema.borrow().clone()), auto_commit: Cell::new(true), mv_transactions: RefCell::new(Vec::new()), transaction_state: Cell::new(TransactionState::None), @@ -492,7 +490,7 @@ impl CaptureDataChangesMode { pub struct Connection { _db: Arc, pager: RefCell>, - schema: RefCell, + schema: RefCell>, /// Whether to automatically commit transaction auto_commit: Cell, mv_transactions: RefCell>, @@ -729,10 +727,10 @@ impl Connection { pub fn maybe_update_schema(&self) { let current_schema_version = self.schema.borrow().schema_version; if matches!(self.transaction_state.get(), TransactionState::None) - && current_schema_version < self._db.schema.read().schema_version + && current_schema_version < self._db.schema.borrow().schema_version { - let new_schema = self._db.schema.read(); - self.schema.replace(new_schema.clone()); + let new_schema = self._db.schema.borrow().clone(); + self.schema.replace(new_schema); } } @@ -882,11 +880,11 @@ impl Connection { return Err(LimboError::InternalError("Connection closed".to_string())); } let rows = self.query("SELECT * FROM sqlite_schema")?; - let mut schema = self.schema.borrow_mut(); + let mut schema_ref = self.schema.borrow_mut(); + let schema = Arc::make_mut(&mut *schema_ref); { let syms = self.syms.borrow(); - if let Err(LimboError::ExtensionError(e)) = - parse_schema_rows(rows, &mut schema, &syms, None) + if let Err(LimboError::ExtensionError(e)) = parse_schema_rows(rows, schema, &syms, None) { // this means that a vtab exists and we no longer have the module loaded. we print // a warning to the user to load the module diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 065809792..550e704f0 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -715,18 +715,12 @@ impl Pager { match commit_status { IOResult::IO => Ok(IOResult::IO), IOResult::Done(_) => { - let maybe_schema_pair = if schema_did_change { - let schema = connection.schema.borrow().clone(); - // Lock first before writing to the database schema in case someone tries to read the schema before it's updated - let db_schema = connection._db.schema.write(); - Some((schema, db_schema)) - } else { - None - }; self.wal.borrow().end_write_tx()?; self.wal.borrow().end_read_tx()?; - if let Some((schema, mut db_schema)) = maybe_schema_pair { - *db_schema = schema; + + if schema_did_change { + let schema = connection.schema.borrow().clone(); + *connection._db.schema.borrow_mut() = schema; } Ok(commit_status) } @@ -1314,8 +1308,9 @@ impl Pager { cache.unset_dirty_all_pages(); cache.clear().expect("failed to clear page cache"); if schema_did_change { - let prev_schema = connection._db.schema.read().clone(); - connection.schema.replace(prev_schema); + connection + .schema + .replace(connection._db.schema.borrow().clone()); } self.wal.borrow_mut().rollback()?; diff --git a/core/util.rs b/core/util.rs index d735678cc..341959ff0 100644 --- a/core/util.rs +++ b/core/util.rs @@ -6,6 +6,7 @@ use crate::{ LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable, }; use std::{rc::Rc, sync::Arc}; +use tracing::{instrument, Level}; use turso_sqlite3_parser::ast::{ self, CreateTableBody, Expr, FunctionTail, Literal, UnaryOperator, }; @@ -48,6 +49,7 @@ pub struct UnparsedFromSqlIndex { pub sql: String, } +#[instrument(skip_all, level = Level::INFO)] pub fn parse_schema_rows( rows: Option, schema: &mut Schema, diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 09120e879..9e11662be 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -216,7 +216,8 @@ pub fn op_drop_index( let Insn::DropIndex { index, db: _ } = insn else { unreachable!("unexpected Insn {:?}", insn) }; - let mut schema = program.connection.schema.borrow_mut(); + let mut schema_ref = program.connection.schema.borrow_mut(); + let schema = Arc::make_mut(&mut *schema_ref); schema.remove_index(index); state.pc += 1; Ok(InsnFunctionStepResult::Step) @@ -5719,7 +5720,8 @@ pub fn op_drop_table( } let conn = program.connection.clone(); { - let mut schema = conn.schema.borrow_mut(); + let mut schema_ref = conn.schema.borrow_mut(); + let schema = Arc::make_mut(&mut *schema_ref); schema.remove_indices_for_table(table_name); schema.remove_table(table_name); } @@ -5804,33 +5806,22 @@ pub fn op_parse_schema( if let Some(where_clause) = where_clause { let stmt = conn.prepare(format!("SELECT * FROM sqlite_schema WHERE {where_clause}"))?; - let mut new_schema = conn.schema.borrow().clone(); + let mut schema_ref = conn.schema.borrow_mut(); + let schema = Arc::make_mut(&mut *schema_ref); // TODO: This function below is synchronous, make it async { - parse_schema_rows( - Some(stmt), - &mut new_schema, - &conn.syms.borrow(), - state.mv_tx_id, - )?; + parse_schema_rows(Some(stmt), schema, &conn.syms.borrow(), state.mv_tx_id)?; } - conn.schema.replace(new_schema); } else { let stmt = conn.prepare("SELECT * FROM sqlite_schema")?; - let mut new_schema = conn.schema.borrow().clone(); + let mut schema_ref = conn.schema.borrow_mut(); + let schema = Arc::make_mut(&mut *schema_ref); // TODO: This function below is synchronous, make it async { - parse_schema_rows( - Some(stmt), - &mut new_schema, - &conn.syms.borrow(), - state.mv_tx_id, - )?; + parse_schema_rows(Some(stmt), schema, &conn.syms.borrow(), state.mv_tx_id)?; } - - conn.schema.replace(new_schema); } conn.auto_commit.set(previous_auto_commit); state.pc += 1; @@ -5903,7 +5894,9 @@ pub fn op_set_cookie( TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"), } - program.connection.schema.borrow_mut().schema_version = *value as u32; + let mut schema = program.connection.schema.borrow_mut(); + Arc::make_mut(&mut *schema).schema_version = *value as u32; + header_accessor::set_schema_cookie(pager, *value as u32)?; } cookie => todo!("{cookie:?} is not yet implement for SetCookie"), From d0e26db01ae54708ff9b1d39d0cfba9804438045 Mon Sep 17 00:00:00 2001 From: "Levy A." Date: Tue, 15 Jul 2025 12:09:05 -0300 Subject: [PATCH 2/3] use lock for database schema --- core/ext/mod.rs | 3 ++- core/lib.rs | 31 +++++++++++++++++++++---------- core/storage/pager.rs | 17 +++++++++++++---- 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/core/ext/mod.rs b/core/ext/mod.rs index b6ab521e9..a350015e0 100644 --- a/core/ext/mod.rs +++ b/core/ext/mod.rs @@ -151,7 +151,8 @@ impl Connection { .insert(name.to_string(), vmodule.into()); if kind == VTabKind::TableValuedFunction { if let Ok(vtab) = VirtualTable::function(name, &self.syms.borrow()) { - self.schema.borrow_mut().add_virtual_table(vtab); + let mut schema_ref = self.schema.borrow_mut(); + Arc::make_mut(&mut *schema_ref).add_virtual_table(vtab); } else { return ResultCode::Error; } diff --git a/core/lib.rs b/core/lib.rs index 6faa064ea..bc838319e 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -110,7 +110,7 @@ pub(crate) type MvCursor = mvcc::cursor::ScanCursor; pub struct Database { mv_store: Option>, - schema: RefCell>, + schema: Mutex>, db_file: Arc, path: String, io: Arc, @@ -201,7 +201,7 @@ impl Database { let db = Arc::new(Database { mv_store, path: path.to_string(), - schema: RefCell::new(Arc::new(Schema::new(enable_indexes))), + schema: Mutex::new(Arc::new(Schema::new(enable_indexes))), _shared_page_cache: shared_page_cache.clone(), maybe_shared_wal: RwLock::new(maybe_shared_wal), db_file, @@ -216,7 +216,7 @@ impl Database { // parse schema let conn = db.connect()?; - let mut schema_ref = db.schema.borrow_mut(); + let mut schema_ref = db.schema.lock().map_err(|_| LimboError::SchemaLocked)?; let schema = Arc::make_mut(&mut *schema_ref); schema.schema_version = get_schema_version(&conn)?; @@ -244,7 +244,12 @@ impl Database { let conn = Arc::new(Connection { _db: self.clone(), pager: RefCell::new(Rc::new(pager)), - schema: RefCell::new(self.schema.borrow().clone()), + schema: RefCell::new( + self.schema + .lock() + .map_err(|_| LimboError::SchemaLocked)? + .clone(), + ), auto_commit: Cell::new(true), mv_transactions: RefCell::new(Vec::new()), transaction_state: Cell::new(TransactionState::None), @@ -532,7 +537,7 @@ impl Connection { let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end]) .unwrap() .trim(); - self.maybe_update_schema(); + self.maybe_update_schema()?; let pager = self.pager.borrow().clone(); match cmd { Cmd::Stmt(stmt) => { @@ -638,7 +643,7 @@ impl Connection { let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end]) .unwrap() .trim(); - self.maybe_update_schema(); + self.maybe_update_schema()?; match cmd { Cmd::Explain(stmt) => { let program = translate::translate( @@ -724,14 +729,20 @@ impl Connection { self.readonly.replace(readonly); } - pub fn maybe_update_schema(&self) { + pub fn maybe_update_schema(&self) -> Result<()> { let current_schema_version = self.schema.borrow().schema_version; + let schema = self + ._db + .schema + .lock() + .map_err(|_| LimboError::SchemaLocked)?; if matches!(self.transaction_state.get(), TransactionState::None) - && current_schema_version < self._db.schema.borrow().schema_version + && current_schema_version < schema.schema_version { - let new_schema = self._db.schema.borrow().clone(); - self.schema.replace(new_schema); + self.schema.replace(schema.clone()); } + + Ok(()) } pub fn wal_frame_count(&self) -> Result { diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 550e704f0..09cd09b90 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -720,7 +720,11 @@ impl Pager { if schema_did_change { let schema = connection.schema.borrow().clone(); - *connection._db.schema.borrow_mut() = schema; + *connection + ._db + .schema + .lock() + .map_err(|_| LimboError::SchemaLocked)? = schema; } Ok(commit_status) } @@ -1308,9 +1312,14 @@ impl Pager { cache.unset_dirty_all_pages(); cache.clear().expect("failed to clear page cache"); if schema_did_change { - connection - .schema - .replace(connection._db.schema.borrow().clone()); + connection.schema.replace( + connection + ._db + .schema + .lock() + .map_err(|_| LimboError::SchemaLocked)? + .clone(), + ); } self.wal.borrow_mut().rollback()?; From 8e8f1682df503d7576961327df8666a0e999dadc Mon Sep 17 00:00:00 2001 From: "Levy A." Date: Wed, 16 Jul 2025 12:11:12 -0300 Subject: [PATCH 3/3] add `with_schema_mut` removes all repeated `Arc::make_mut` --- core/ext/mod.rs | 3 +-- core/lib.rs | 42 ++++++++++++++++++++++++++++-------------- core/vdbe/execute.rs | 41 ++++++++++++++++++----------------------- 3 files changed, 47 insertions(+), 39 deletions(-) diff --git a/core/ext/mod.rs b/core/ext/mod.rs index a350015e0..89c3e1a61 100644 --- a/core/ext/mod.rs +++ b/core/ext/mod.rs @@ -151,8 +151,7 @@ impl Connection { .insert(name.to_string(), vmodule.into()); if kind == VTabKind::TableValuedFunction { if let Ok(vtab) = VirtualTable::function(name, &self.syms.borrow()) { - let mut schema_ref = self.schema.borrow_mut(); - Arc::make_mut(&mut *schema_ref).add_virtual_table(vtab); + self.with_schema_mut(|schema| schema.add_virtual_table(vtab)); } else { return ResultCode::Error; } diff --git a/core/lib.rs b/core/lib.rs index bc838319e..c8666b482 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -216,18 +216,20 @@ impl Database { // parse schema let conn = db.connect()?; - let mut schema_ref = db.schema.lock().map_err(|_| LimboError::SchemaLocked)?; - let schema = Arc::make_mut(&mut *schema_ref); - schema.schema_version = get_schema_version(&conn)?; - let syms = conn.syms.borrow(); let pager = conn.pager.borrow().clone(); - if let Err(LimboError::ExtensionError(e)) = schema.make_from_btree(None, pager, &syms) { - // this means that a vtab exists and we no longer have the module loaded. we print - // a warning to the user to load the module - eprintln!("Warning: {e}"); - } + db.with_schema_mut(|schema| { + schema.schema_version = get_schema_version(&conn)?; + if let Err(LimboError::ExtensionError(e)) = + schema.make_from_btree(None, pager, &syms) + { + // this means that a vtab exists and we no longer have the module loaded. we print + // a warning to the user to load the module + eprintln!("Warning: {e}"); + } + Ok(()) + })?; } Ok(db) } @@ -389,6 +391,13 @@ impl Database { } } } + + #[inline] + pub fn with_schema_mut(&self, f: impl FnOnce(&mut Schema) -> Result) -> Result { + let mut schema_ref = self.schema.lock().map_err(|_| LimboError::SchemaLocked)?; + let schema = Arc::make_mut(&mut *schema_ref); + f(schema) + } } fn get_schema_version(conn: &Arc) -> Result { @@ -891,17 +900,15 @@ impl Connection { return Err(LimboError::InternalError("Connection closed".to_string())); } let rows = self.query("SELECT * FROM sqlite_schema")?; - let mut schema_ref = self.schema.borrow_mut(); - let schema = Arc::make_mut(&mut *schema_ref); - { - let syms = self.syms.borrow(); + let syms = self.syms.borrow(); + self.with_schema_mut(|schema| { if let Err(LimboError::ExtensionError(e)) = parse_schema_rows(rows, schema, &syms, None) { // this means that a vtab exists and we no longer have the module loaded. we print // a warning to the user to load the module eprintln!("Warning: {e}"); } - } + }); Ok(()) } @@ -993,6 +1000,13 @@ impl Connection { Ok(results) } + + #[inline] + pub fn with_schema_mut(&self, f: impl FnOnce(&mut Schema) -> T) -> T { + let mut schema_ref = self.schema.borrow_mut(); + let schema = Arc::make_mut(&mut *schema_ref); + f(schema) + } } pub struct Statement { diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 9e11662be..1ec6d9f0f 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -216,9 +216,9 @@ pub fn op_drop_index( let Insn::DropIndex { index, db: _ } = insn else { unreachable!("unexpected Insn {:?}", insn) }; - let mut schema_ref = program.connection.schema.borrow_mut(); - let schema = Arc::make_mut(&mut *schema_ref); - schema.remove_index(index); + program + .connection + .with_schema_mut(|schema| schema.remove_index(index)); state.pc += 1; Ok(InsnFunctionStepResult::Step) } @@ -5720,10 +5720,10 @@ pub fn op_drop_table( } let conn = program.connection.clone(); { - let mut schema_ref = conn.schema.borrow_mut(); - let schema = Arc::make_mut(&mut *schema_ref); - schema.remove_indices_for_table(table_name); - schema.remove_table(table_name); + conn.with_schema_mut(|schema| { + schema.remove_indices_for_table(table_name); + schema.remove_table(table_name); + }); } state.pc += 1; Ok(InsnFunctionStepResult::Step) @@ -5806,22 +5806,17 @@ pub fn op_parse_schema( if let Some(where_clause) = where_clause { let stmt = conn.prepare(format!("SELECT * FROM sqlite_schema WHERE {where_clause}"))?; - let mut schema_ref = conn.schema.borrow_mut(); - let schema = Arc::make_mut(&mut *schema_ref); - - // TODO: This function below is synchronous, make it async - { - parse_schema_rows(Some(stmt), schema, &conn.syms.borrow(), state.mv_tx_id)?; - } + conn.with_schema_mut(|schema| { + // TODO: This function below is synchronous, make it async + parse_schema_rows(Some(stmt), schema, &conn.syms.borrow(), state.mv_tx_id) + })?; } else { let stmt = conn.prepare("SELECT * FROM sqlite_schema")?; - let mut schema_ref = conn.schema.borrow_mut(); - let schema = Arc::make_mut(&mut *schema_ref); - // TODO: This function below is synchronous, make it async - { - parse_schema_rows(Some(stmt), schema, &conn.syms.borrow(), state.mv_tx_id)?; - } + conn.with_schema_mut(|schema| { + // TODO: This function below is synchronous, make it async + parse_schema_rows(Some(stmt), schema, &conn.syms.borrow(), state.mv_tx_id) + })?; } conn.auto_commit.set(previous_auto_commit); state.pc += 1; @@ -5894,9 +5889,9 @@ pub fn op_set_cookie( TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"), } - let mut schema = program.connection.schema.borrow_mut(); - Arc::make_mut(&mut *schema).schema_version = *value as u32; - + program + .connection + .with_schema_mut(|schema| schema.schema_version = *value as u32); header_accessor::set_schema_cookie(pager, *value as u32)?; } cookie => todo!("{cookie:?} is not yet implement for SetCookie"),