Merge 'core: Copy-on-write for in-memory schema' from Levy A.

<img height="400" alt="image" src="https://github.com/user-
attachments/assets/bdd5c0a8-1bbb-4199-9026-57f0e5202d73" />
<img height="400" alt="image" src="https://github.com/user-
attachments/assets/7ea63e58-2ab7-4132-b29e-b20597c7093f" />
We were copying the schema preemptively on each `Database::connect`, now
the schema is shared until a change needs to be made by sharing a single
`Arc` and mutating it via `Arc::make_mut`. This is faster as reduces
memory usage.

Closes #2022
This commit is contained in:
Pekka Enberg
2025-07-17 10:46:46 +03:00
5 changed files with 91 additions and 74 deletions

View File

@@ -151,7 +151,7 @@ impl Connection {
.insert(name.to_string(), vmodule.into());
if kind == VTabKind::TableValuedFunction {
if let Ok(vtab) = VirtualTable::function(name, &self.syms.borrow()) {
self.schema.borrow_mut().add_virtual_table(vtab);
self.with_schema_mut(|schema| schema.add_virtual_table(vtab));
} else {
return ResultCode::Error;
}

View File

@@ -110,7 +110,7 @@ pub(crate) type MvCursor = mvcc::cursor::ScanCursor<mvcc::LocalClock>;
pub struct Database {
mv_store: Option<Rc<MvStore>>,
schema: Arc<RwLock<Schema>>,
schema: Mutex<Arc<Schema>>,
db_file: Arc<dyn DatabaseStorage>,
path: String,
io: Arc<dyn IO>,
@@ -197,11 +197,11 @@ impl Database {
};
let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::default()));
let schema = Arc::new(RwLock::new(Schema::new(enable_indexes)));
let db = Database {
let db = Arc::new(Database {
mv_store,
path: path.to_string(),
schema: schema.clone(),
schema: Mutex::new(Arc::new(Schema::new(enable_indexes))),
_shared_page_cache: shared_page_cache.clone(),
maybe_shared_wal: RwLock::new(maybe_shared_wal),
db_file,
@@ -209,32 +209,32 @@ impl Database {
open_flags: flags,
db_state: Arc::new(AtomicUsize::new(db_state)),
init_lock: Arc::new(Mutex::new(())),
};
let db = Arc::new(db);
});
// Check: https://github.com/tursodatabase/turso/pull/1761#discussion_r2154013123
if db_state == DB_STATE_INITIALIZED {
// parse schema
let conn = db.connect()?;
let schema_version = get_schema_version(&conn)?;
schema.write().schema_version = schema_version;
let mut schema = schema
.try_write()
.expect("lock on schema should succeed first try");
let syms = conn.syms.borrow();
let pager = conn.pager.borrow().clone();
if let Err(LimboError::ExtensionError(e)) = schema.make_from_btree(None, pager, &syms) {
// this means that a vtab exists and we no longer have the module loaded. we print
// a warning to the user to load the module
eprintln!("Warning: {e}");
}
db.with_schema_mut(|schema| {
schema.schema_version = get_schema_version(&conn)?;
if let Err(LimboError::ExtensionError(e)) =
schema.make_from_btree(None, pager, &syms)
{
// this means that a vtab exists and we no longer have the module loaded. we print
// a warning to the user to load the module
eprintln!("Warning: {e}");
}
Ok(())
})?;
}
Ok(db)
}
#[instrument(skip_all, level = Level::INFO)]
pub fn connect(self: &Arc<Database>) -> Result<Arc<Connection>> {
let pager = self.init_pager(None)?;
@@ -246,7 +246,12 @@ impl Database {
let conn = Arc::new(Connection {
_db: self.clone(),
pager: RefCell::new(Rc::new(pager)),
schema: RefCell::new(self.schema.read().clone()),
schema: RefCell::new(
self.schema
.lock()
.map_err(|_| LimboError::SchemaLocked)?
.clone(),
),
auto_commit: Cell::new(true),
mv_transactions: RefCell::new(Vec::new()),
transaction_state: Cell::new(TransactionState::None),
@@ -386,6 +391,13 @@ impl Database {
}
}
}
#[inline]
pub fn with_schema_mut<T>(&self, f: impl FnOnce(&mut Schema) -> Result<T>) -> Result<T> {
let mut schema_ref = self.schema.lock().map_err(|_| LimboError::SchemaLocked)?;
let schema = Arc::make_mut(&mut *schema_ref);
f(schema)
}
}
fn get_schema_version(conn: &Arc<Connection>) -> Result<u32> {
@@ -492,7 +504,7 @@ impl CaptureDataChangesMode {
pub struct Connection {
_db: Arc<Database>,
pager: RefCell<Rc<Pager>>,
schema: RefCell<Schema>,
schema: RefCell<Arc<Schema>>,
/// Whether to automatically commit transaction
auto_commit: Cell<bool>,
mv_transactions: RefCell<Vec<crate::mvcc::database::TxID>>,
@@ -534,7 +546,7 @@ impl Connection {
let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end])
.unwrap()
.trim();
self.maybe_update_schema();
self.maybe_update_schema()?;
let pager = self.pager.borrow().clone();
match cmd {
Cmd::Stmt(stmt) => {
@@ -640,7 +652,7 @@ impl Connection {
let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end])
.unwrap()
.trim();
self.maybe_update_schema();
self.maybe_update_schema()?;
match cmd {
Cmd::Explain(stmt) => {
let program = translate::translate(
@@ -726,14 +738,20 @@ impl Connection {
self.readonly.replace(readonly);
}
pub fn maybe_update_schema(&self) {
pub fn maybe_update_schema(&self) -> Result<()> {
let current_schema_version = self.schema.borrow().schema_version;
let schema = self
._db
.schema
.lock()
.map_err(|_| LimboError::SchemaLocked)?;
if matches!(self.transaction_state.get(), TransactionState::None)
&& current_schema_version < self._db.schema.read().schema_version
&& current_schema_version < schema.schema_version
{
let new_schema = self._db.schema.read();
self.schema.replace(new_schema.clone());
self.schema.replace(schema.clone());
}
Ok(())
}
pub fn wal_frame_count(&self) -> Result<u64> {
@@ -882,17 +900,15 @@ impl Connection {
return Err(LimboError::InternalError("Connection closed".to_string()));
}
let rows = self.query("SELECT * FROM sqlite_schema")?;
let mut schema = self.schema.borrow_mut();
{
let syms = self.syms.borrow();
if let Err(LimboError::ExtensionError(e)) =
parse_schema_rows(rows, &mut schema, &syms, None)
let syms = self.syms.borrow();
self.with_schema_mut(|schema| {
if let Err(LimboError::ExtensionError(e)) = parse_schema_rows(rows, schema, &syms, None)
{
// this means that a vtab exists and we no longer have the module loaded. we print
// a warning to the user to load the module
eprintln!("Warning: {e}");
}
}
});
Ok(())
}
@@ -984,6 +1000,13 @@ impl Connection {
Ok(results)
}
#[inline]
pub fn with_schema_mut<T>(&self, f: impl FnOnce(&mut Schema) -> T) -> T {
let mut schema_ref = self.schema.borrow_mut();
let schema = Arc::make_mut(&mut *schema_ref);
f(schema)
}
}
pub struct Statement {

View File

@@ -715,18 +715,16 @@ impl Pager {
match commit_status {
IOResult::IO => Ok(IOResult::IO),
IOResult::Done(_) => {
let maybe_schema_pair = if schema_did_change {
let schema = connection.schema.borrow().clone();
// Lock first before writing to the database schema in case someone tries to read the schema before it's updated
let db_schema = connection._db.schema.write();
Some((schema, db_schema))
} else {
None
};
self.wal.borrow().end_write_tx()?;
self.wal.borrow().end_read_tx()?;
if let Some((schema, mut db_schema)) = maybe_schema_pair {
*db_schema = schema;
if schema_did_change {
let schema = connection.schema.borrow().clone();
*connection
._db
.schema
.lock()
.map_err(|_| LimboError::SchemaLocked)? = schema;
}
Ok(commit_status)
}
@@ -1314,8 +1312,14 @@ impl Pager {
cache.unset_dirty_all_pages();
cache.clear().expect("failed to clear page cache");
if schema_did_change {
let prev_schema = connection._db.schema.read().clone();
connection.schema.replace(prev_schema);
connection.schema.replace(
connection
._db
.schema
.lock()
.map_err(|_| LimboError::SchemaLocked)?
.clone(),
);
}
self.wal.borrow_mut().rollback()?;

View File

@@ -6,6 +6,7 @@ use crate::{
LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable,
};
use std::{rc::Rc, sync::Arc};
use tracing::{instrument, Level};
use turso_sqlite3_parser::ast::{
self, CreateTableBody, Expr, FunctionTail, Literal, UnaryOperator,
};
@@ -48,6 +49,7 @@ pub struct UnparsedFromSqlIndex {
pub sql: String,
}
#[instrument(skip_all, level = Level::INFO)]
pub fn parse_schema_rows(
rows: Option<Statement>,
schema: &mut Schema,

View File

@@ -216,8 +216,9 @@ pub fn op_drop_index(
let Insn::DropIndex { index, db: _ } = insn else {
unreachable!("unexpected Insn {:?}", insn)
};
let mut schema = program.connection.schema.borrow_mut();
schema.remove_index(index);
program
.connection
.with_schema_mut(|schema| schema.remove_index(index));
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
@@ -5719,9 +5720,10 @@ pub fn op_drop_table(
}
let conn = program.connection.clone();
{
let mut schema = conn.schema.borrow_mut();
schema.remove_indices_for_table(table_name);
schema.remove_table(table_name);
conn.with_schema_mut(|schema| {
schema.remove_indices_for_table(table_name);
schema.remove_table(table_name);
});
}
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
@@ -5804,33 +5806,17 @@ pub fn op_parse_schema(
if let Some(where_clause) = where_clause {
let stmt = conn.prepare(format!("SELECT * FROM sqlite_schema WHERE {where_clause}"))?;
let mut new_schema = conn.schema.borrow().clone();
// TODO: This function below is synchronous, make it async
{
parse_schema_rows(
Some(stmt),
&mut new_schema,
&conn.syms.borrow(),
state.mv_tx_id,
)?;
}
conn.schema.replace(new_schema);
conn.with_schema_mut(|schema| {
// TODO: This function below is synchronous, make it async
parse_schema_rows(Some(stmt), schema, &conn.syms.borrow(), state.mv_tx_id)
})?;
} else {
let stmt = conn.prepare("SELECT * FROM sqlite_schema")?;
let mut new_schema = conn.schema.borrow().clone();
// TODO: This function below is synchronous, make it async
{
parse_schema_rows(
Some(stmt),
&mut new_schema,
&conn.syms.borrow(),
state.mv_tx_id,
)?;
}
conn.schema.replace(new_schema);
conn.with_schema_mut(|schema| {
// TODO: This function below is synchronous, make it async
parse_schema_rows(Some(stmt), schema, &conn.syms.borrow(), state.mv_tx_id)
})?;
}
conn.auto_commit.set(previous_auto_commit);
state.pc += 1;
@@ -5903,7 +5889,9 @@ pub fn op_set_cookie(
TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"),
}
program.connection.schema.borrow_mut().schema_version = *value as u32;
program
.connection
.with_schema_mut(|schema| schema.schema_version = *value as u32);
header_accessor::set_schema_cookie(pager, *value as u32)?;
}
cookie => todo!("{cookie:?} is not yet implement for SetCookie"),