From 4c77d771ff243b1a7d01ebef4d496491f96cc0ad Mon Sep 17 00:00:00 2001 From: "Levy A." Date: Wed, 9 Jul 2025 12:56:05 -0300 Subject: [PATCH] only copy schema on writes --- core/lib.rs | 36 +++++++++++++++++------------------- core/storage/pager.rs | 19 +++++++------------ core/util.rs | 2 ++ core/vdbe/execute.rs | 33 +++++++++++++-------------------- 4 files changed, 39 insertions(+), 51 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 747ff7a49..6faa064ea 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -110,7 +110,7 @@ pub(crate) type MvCursor = mvcc::cursor::ScanCursor; pub struct Database { mv_store: Option>, - schema: Arc>, + schema: RefCell>, db_file: Arc, path: String, io: Arc, @@ -197,11 +197,11 @@ impl Database { }; let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::default())); - let schema = Arc::new(RwLock::new(Schema::new(enable_indexes))); - let db = Database { + + let db = Arc::new(Database { mv_store, path: path.to_string(), - schema: schema.clone(), + schema: RefCell::new(Arc::new(Schema::new(enable_indexes))), _shared_page_cache: shared_page_cache.clone(), maybe_shared_wal: RwLock::new(maybe_shared_wal), db_file, @@ -209,19 +209,16 @@ impl Database { open_flags: flags, db_state: Arc::new(AtomicUsize::new(db_state)), init_lock: Arc::new(Mutex::new(())), - }; - let db = Arc::new(db); + }); // Check: https://github.com/tursodatabase/turso/pull/1761#discussion_r2154013123 if db_state == DB_STATE_INITIALIZED { // parse schema let conn = db.connect()?; - let schema_version = get_schema_version(&conn)?; - schema.write().schema_version = schema_version; - let mut schema = schema - .try_write() - .expect("lock on schema should succeed first try"); + let mut schema_ref = db.schema.borrow_mut(); + let schema = Arc::make_mut(&mut *schema_ref); + schema.schema_version = get_schema_version(&conn)?; let syms = conn.syms.borrow(); let pager = conn.pager.borrow().clone(); @@ -235,6 +232,7 @@ impl Database { Ok(db) } + #[instrument(skip_all, level = Level::INFO)] pub fn connect(self: &Arc) -> Result> { let pager = self.init_pager(None)?; @@ -246,7 +244,7 @@ impl Database { let conn = Arc::new(Connection { _db: self.clone(), pager: RefCell::new(Rc::new(pager)), - schema: RefCell::new(self.schema.read().clone()), + schema: RefCell::new(self.schema.borrow().clone()), auto_commit: Cell::new(true), mv_transactions: RefCell::new(Vec::new()), transaction_state: Cell::new(TransactionState::None), @@ -492,7 +490,7 @@ impl CaptureDataChangesMode { pub struct Connection { _db: Arc, pager: RefCell>, - schema: RefCell, + schema: RefCell>, /// Whether to automatically commit transaction auto_commit: Cell, mv_transactions: RefCell>, @@ -729,10 +727,10 @@ impl Connection { pub fn maybe_update_schema(&self) { let current_schema_version = self.schema.borrow().schema_version; if matches!(self.transaction_state.get(), TransactionState::None) - && current_schema_version < self._db.schema.read().schema_version + && current_schema_version < self._db.schema.borrow().schema_version { - let new_schema = self._db.schema.read(); - self.schema.replace(new_schema.clone()); + let new_schema = self._db.schema.borrow().clone(); + self.schema.replace(new_schema); } } @@ -882,11 +880,11 @@ impl Connection { return Err(LimboError::InternalError("Connection closed".to_string())); } let rows = self.query("SELECT * FROM sqlite_schema")?; - let mut schema = self.schema.borrow_mut(); + let mut schema_ref = self.schema.borrow_mut(); + let schema = Arc::make_mut(&mut *schema_ref); { let syms = self.syms.borrow(); - if let Err(LimboError::ExtensionError(e)) = - parse_schema_rows(rows, &mut schema, &syms, None) + if let Err(LimboError::ExtensionError(e)) = parse_schema_rows(rows, schema, &syms, None) { // this means that a vtab exists and we no longer have the module loaded. we print // a warning to the user to load the module diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 065809792..550e704f0 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -715,18 +715,12 @@ impl Pager { match commit_status { IOResult::IO => Ok(IOResult::IO), IOResult::Done(_) => { - let maybe_schema_pair = if schema_did_change { - let schema = connection.schema.borrow().clone(); - // Lock first before writing to the database schema in case someone tries to read the schema before it's updated - let db_schema = connection._db.schema.write(); - Some((schema, db_schema)) - } else { - None - }; self.wal.borrow().end_write_tx()?; self.wal.borrow().end_read_tx()?; - if let Some((schema, mut db_schema)) = maybe_schema_pair { - *db_schema = schema; + + if schema_did_change { + let schema = connection.schema.borrow().clone(); + *connection._db.schema.borrow_mut() = schema; } Ok(commit_status) } @@ -1314,8 +1308,9 @@ impl Pager { cache.unset_dirty_all_pages(); cache.clear().expect("failed to clear page cache"); if schema_did_change { - let prev_schema = connection._db.schema.read().clone(); - connection.schema.replace(prev_schema); + connection + .schema + .replace(connection._db.schema.borrow().clone()); } self.wal.borrow_mut().rollback()?; diff --git a/core/util.rs b/core/util.rs index d735678cc..341959ff0 100644 --- a/core/util.rs +++ b/core/util.rs @@ -6,6 +6,7 @@ use crate::{ LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable, }; use std::{rc::Rc, sync::Arc}; +use tracing::{instrument, Level}; use turso_sqlite3_parser::ast::{ self, CreateTableBody, Expr, FunctionTail, Literal, UnaryOperator, }; @@ -48,6 +49,7 @@ pub struct UnparsedFromSqlIndex { pub sql: String, } +#[instrument(skip_all, level = Level::INFO)] pub fn parse_schema_rows( rows: Option, schema: &mut Schema, diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 09120e879..9e11662be 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -216,7 +216,8 @@ pub fn op_drop_index( let Insn::DropIndex { index, db: _ } = insn else { unreachable!("unexpected Insn {:?}", insn) }; - let mut schema = program.connection.schema.borrow_mut(); + let mut schema_ref = program.connection.schema.borrow_mut(); + let schema = Arc::make_mut(&mut *schema_ref); schema.remove_index(index); state.pc += 1; Ok(InsnFunctionStepResult::Step) @@ -5719,7 +5720,8 @@ pub fn op_drop_table( } let conn = program.connection.clone(); { - let mut schema = conn.schema.borrow_mut(); + let mut schema_ref = conn.schema.borrow_mut(); + let schema = Arc::make_mut(&mut *schema_ref); schema.remove_indices_for_table(table_name); schema.remove_table(table_name); } @@ -5804,33 +5806,22 @@ pub fn op_parse_schema( if let Some(where_clause) = where_clause { let stmt = conn.prepare(format!("SELECT * FROM sqlite_schema WHERE {where_clause}"))?; - let mut new_schema = conn.schema.borrow().clone(); + let mut schema_ref = conn.schema.borrow_mut(); + let schema = Arc::make_mut(&mut *schema_ref); // TODO: This function below is synchronous, make it async { - parse_schema_rows( - Some(stmt), - &mut new_schema, - &conn.syms.borrow(), - state.mv_tx_id, - )?; + parse_schema_rows(Some(stmt), schema, &conn.syms.borrow(), state.mv_tx_id)?; } - conn.schema.replace(new_schema); } else { let stmt = conn.prepare("SELECT * FROM sqlite_schema")?; - let mut new_schema = conn.schema.borrow().clone(); + let mut schema_ref = conn.schema.borrow_mut(); + let schema = Arc::make_mut(&mut *schema_ref); // TODO: This function below is synchronous, make it async { - parse_schema_rows( - Some(stmt), - &mut new_schema, - &conn.syms.borrow(), - state.mv_tx_id, - )?; + parse_schema_rows(Some(stmt), schema, &conn.syms.borrow(), state.mv_tx_id)?; } - - conn.schema.replace(new_schema); } conn.auto_commit.set(previous_auto_commit); state.pc += 1; @@ -5903,7 +5894,9 @@ pub fn op_set_cookie( TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"), } - program.connection.schema.borrow_mut().schema_version = *value as u32; + let mut schema = program.connection.schema.borrow_mut(); + Arc::make_mut(&mut *schema).schema_version = *value as u32; + header_accessor::set_schema_cookie(pager, *value as u32)?; } cookie => todo!("{cookie:?} is not yet implement for SetCookie"),