diff --git a/core/ext/mod.rs b/core/ext/mod.rs index b6ab521e9..89c3e1a61 100644 --- a/core/ext/mod.rs +++ b/core/ext/mod.rs @@ -151,7 +151,7 @@ impl Connection { .insert(name.to_string(), vmodule.into()); if kind == VTabKind::TableValuedFunction { if let Ok(vtab) = VirtualTable::function(name, &self.syms.borrow()) { - self.schema.borrow_mut().add_virtual_table(vtab); + self.with_schema_mut(|schema| schema.add_virtual_table(vtab)); } else { return ResultCode::Error; } diff --git a/core/lib.rs b/core/lib.rs index 747ff7a49..c8666b482 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -110,7 +110,7 @@ pub(crate) type MvCursor = mvcc::cursor::ScanCursor; pub struct Database { mv_store: Option>, - schema: Arc>, + schema: Mutex>, db_file: Arc, path: String, io: Arc, @@ -197,11 +197,11 @@ impl Database { }; let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::default())); - let schema = Arc::new(RwLock::new(Schema::new(enable_indexes))); - let db = Database { + + let db = Arc::new(Database { mv_store, path: path.to_string(), - schema: schema.clone(), + schema: Mutex::new(Arc::new(Schema::new(enable_indexes))), _shared_page_cache: shared_page_cache.clone(), maybe_shared_wal: RwLock::new(maybe_shared_wal), db_file, @@ -209,32 +209,32 @@ impl Database { open_flags: flags, db_state: Arc::new(AtomicUsize::new(db_state)), init_lock: Arc::new(Mutex::new(())), - }; - let db = Arc::new(db); + }); // Check: https://github.com/tursodatabase/turso/pull/1761#discussion_r2154013123 if db_state == DB_STATE_INITIALIZED { // parse schema let conn = db.connect()?; - let schema_version = get_schema_version(&conn)?; - schema.write().schema_version = schema_version; - - let mut schema = schema - .try_write() - .expect("lock on schema should succeed first try"); let syms = conn.syms.borrow(); let pager = conn.pager.borrow().clone(); - if let Err(LimboError::ExtensionError(e)) = schema.make_from_btree(None, pager, &syms) { - // this means that a vtab exists and we no longer have the module loaded. we print - // a warning to the user to load the module - eprintln!("Warning: {e}"); - } + db.with_schema_mut(|schema| { + schema.schema_version = get_schema_version(&conn)?; + if let Err(LimboError::ExtensionError(e)) = + schema.make_from_btree(None, pager, &syms) + { + // this means that a vtab exists and we no longer have the module loaded. we print + // a warning to the user to load the module + eprintln!("Warning: {e}"); + } + Ok(()) + })?; } Ok(db) } + #[instrument(skip_all, level = Level::INFO)] pub fn connect(self: &Arc) -> Result> { let pager = self.init_pager(None)?; @@ -246,7 +246,12 @@ impl Database { let conn = Arc::new(Connection { _db: self.clone(), pager: RefCell::new(Rc::new(pager)), - schema: RefCell::new(self.schema.read().clone()), + schema: RefCell::new( + self.schema + .lock() + .map_err(|_| LimboError::SchemaLocked)? + .clone(), + ), auto_commit: Cell::new(true), mv_transactions: RefCell::new(Vec::new()), transaction_state: Cell::new(TransactionState::None), @@ -386,6 +391,13 @@ impl Database { } } } + + #[inline] + pub fn with_schema_mut(&self, f: impl FnOnce(&mut Schema) -> Result) -> Result { + let mut schema_ref = self.schema.lock().map_err(|_| LimboError::SchemaLocked)?; + let schema = Arc::make_mut(&mut *schema_ref); + f(schema) + } } fn get_schema_version(conn: &Arc) -> Result { @@ -492,7 +504,7 @@ impl CaptureDataChangesMode { pub struct Connection { _db: Arc, pager: RefCell>, - schema: RefCell, + schema: RefCell>, /// Whether to automatically commit transaction auto_commit: Cell, mv_transactions: RefCell>, @@ -534,7 +546,7 @@ impl Connection { let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end]) .unwrap() .trim(); - self.maybe_update_schema(); + self.maybe_update_schema()?; let pager = self.pager.borrow().clone(); match cmd { Cmd::Stmt(stmt) => { @@ -640,7 +652,7 @@ impl Connection { let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end]) .unwrap() .trim(); - self.maybe_update_schema(); + self.maybe_update_schema()?; match cmd { Cmd::Explain(stmt) => { let program = translate::translate( @@ -726,14 +738,20 @@ impl Connection { self.readonly.replace(readonly); } - pub fn maybe_update_schema(&self) { + pub fn maybe_update_schema(&self) -> Result<()> { let current_schema_version = self.schema.borrow().schema_version; + let schema = self + ._db + .schema + .lock() + .map_err(|_| LimboError::SchemaLocked)?; if matches!(self.transaction_state.get(), TransactionState::None) - && current_schema_version < self._db.schema.read().schema_version + && current_schema_version < schema.schema_version { - let new_schema = self._db.schema.read(); - self.schema.replace(new_schema.clone()); + self.schema.replace(schema.clone()); } + + Ok(()) } pub fn wal_frame_count(&self) -> Result { @@ -882,17 +900,15 @@ impl Connection { return Err(LimboError::InternalError("Connection closed".to_string())); } let rows = self.query("SELECT * FROM sqlite_schema")?; - let mut schema = self.schema.borrow_mut(); - { - let syms = self.syms.borrow(); - if let Err(LimboError::ExtensionError(e)) = - parse_schema_rows(rows, &mut schema, &syms, None) + let syms = self.syms.borrow(); + self.with_schema_mut(|schema| { + if let Err(LimboError::ExtensionError(e)) = parse_schema_rows(rows, schema, &syms, None) { // this means that a vtab exists and we no longer have the module loaded. we print // a warning to the user to load the module eprintln!("Warning: {e}"); } - } + }); Ok(()) } @@ -984,6 +1000,13 @@ impl Connection { Ok(results) } + + #[inline] + pub fn with_schema_mut(&self, f: impl FnOnce(&mut Schema) -> T) -> T { + let mut schema_ref = self.schema.borrow_mut(); + let schema = Arc::make_mut(&mut *schema_ref); + f(schema) + } } pub struct Statement { diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 065809792..09cd09b90 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -715,18 +715,16 @@ impl Pager { match commit_status { IOResult::IO => Ok(IOResult::IO), IOResult::Done(_) => { - let maybe_schema_pair = if schema_did_change { - let schema = connection.schema.borrow().clone(); - // Lock first before writing to the database schema in case someone tries to read the schema before it's updated - let db_schema = connection._db.schema.write(); - Some((schema, db_schema)) - } else { - None - }; self.wal.borrow().end_write_tx()?; self.wal.borrow().end_read_tx()?; - if let Some((schema, mut db_schema)) = maybe_schema_pair { - *db_schema = schema; + + if schema_did_change { + let schema = connection.schema.borrow().clone(); + *connection + ._db + .schema + .lock() + .map_err(|_| LimboError::SchemaLocked)? = schema; } Ok(commit_status) } @@ -1314,8 +1312,14 @@ impl Pager { cache.unset_dirty_all_pages(); cache.clear().expect("failed to clear page cache"); if schema_did_change { - let prev_schema = connection._db.schema.read().clone(); - connection.schema.replace(prev_schema); + connection.schema.replace( + connection + ._db + .schema + .lock() + .map_err(|_| LimboError::SchemaLocked)? + .clone(), + ); } self.wal.borrow_mut().rollback()?; diff --git a/core/util.rs b/core/util.rs index d735678cc..341959ff0 100644 --- a/core/util.rs +++ b/core/util.rs @@ -6,6 +6,7 @@ use crate::{ LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable, }; use std::{rc::Rc, sync::Arc}; +use tracing::{instrument, Level}; use turso_sqlite3_parser::ast::{ self, CreateTableBody, Expr, FunctionTail, Literal, UnaryOperator, }; @@ -48,6 +49,7 @@ pub struct UnparsedFromSqlIndex { pub sql: String, } +#[instrument(skip_all, level = Level::INFO)] pub fn parse_schema_rows( rows: Option, schema: &mut Schema, diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 09120e879..1ec6d9f0f 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -216,8 +216,9 @@ pub fn op_drop_index( let Insn::DropIndex { index, db: _ } = insn else { unreachable!("unexpected Insn {:?}", insn) }; - let mut schema = program.connection.schema.borrow_mut(); - schema.remove_index(index); + program + .connection + .with_schema_mut(|schema| schema.remove_index(index)); state.pc += 1; Ok(InsnFunctionStepResult::Step) } @@ -5719,9 +5720,10 @@ pub fn op_drop_table( } let conn = program.connection.clone(); { - let mut schema = conn.schema.borrow_mut(); - schema.remove_indices_for_table(table_name); - schema.remove_table(table_name); + conn.with_schema_mut(|schema| { + schema.remove_indices_for_table(table_name); + schema.remove_table(table_name); + }); } state.pc += 1; Ok(InsnFunctionStepResult::Step) @@ -5804,33 +5806,17 @@ pub fn op_parse_schema( if let Some(where_clause) = where_clause { let stmt = conn.prepare(format!("SELECT * FROM sqlite_schema WHERE {where_clause}"))?; - let mut new_schema = conn.schema.borrow().clone(); - - // TODO: This function below is synchronous, make it async - { - parse_schema_rows( - Some(stmt), - &mut new_schema, - &conn.syms.borrow(), - state.mv_tx_id, - )?; - } - conn.schema.replace(new_schema); + conn.with_schema_mut(|schema| { + // TODO: This function below is synchronous, make it async + parse_schema_rows(Some(stmt), schema, &conn.syms.borrow(), state.mv_tx_id) + })?; } else { let stmt = conn.prepare("SELECT * FROM sqlite_schema")?; - let mut new_schema = conn.schema.borrow().clone(); - // TODO: This function below is synchronous, make it async - { - parse_schema_rows( - Some(stmt), - &mut new_schema, - &conn.syms.borrow(), - state.mv_tx_id, - )?; - } - - conn.schema.replace(new_schema); + conn.with_schema_mut(|schema| { + // TODO: This function below is synchronous, make it async + parse_schema_rows(Some(stmt), schema, &conn.syms.borrow(), state.mv_tx_id) + })?; } conn.auto_commit.set(previous_auto_commit); state.pc += 1; @@ -5903,7 +5889,9 @@ pub fn op_set_cookie( TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"), } - program.connection.schema.borrow_mut().schema_version = *value as u32; + program + .connection + .with_schema_mut(|schema| schema.schema_version = *value as u32); header_accessor::set_schema_cookie(pager, *value as u32)?; } cookie => todo!("{cookie:?} is not yet implement for SetCookie"),