mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-10 18:54:22 +01:00
only copy schema on writes
This commit is contained in:
36
core/lib.rs
36
core/lib.rs
@@ -110,7 +110,7 @@ pub(crate) type MvCursor = mvcc::cursor::ScanCursor<mvcc::LocalClock>;
|
||||
|
||||
pub struct Database {
|
||||
mv_store: Option<Rc<MvStore>>,
|
||||
schema: Arc<RwLock<Schema>>,
|
||||
schema: RefCell<Arc<Schema>>,
|
||||
db_file: Arc<dyn DatabaseStorage>,
|
||||
path: String,
|
||||
io: Arc<dyn IO>,
|
||||
@@ -197,11 +197,11 @@ impl Database {
|
||||
};
|
||||
|
||||
let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::default()));
|
||||
let schema = Arc::new(RwLock::new(Schema::new(enable_indexes)));
|
||||
let db = Database {
|
||||
|
||||
let db = Arc::new(Database {
|
||||
mv_store,
|
||||
path: path.to_string(),
|
||||
schema: schema.clone(),
|
||||
schema: RefCell::new(Arc::new(Schema::new(enable_indexes))),
|
||||
_shared_page_cache: shared_page_cache.clone(),
|
||||
maybe_shared_wal: RwLock::new(maybe_shared_wal),
|
||||
db_file,
|
||||
@@ -209,19 +209,16 @@ impl Database {
|
||||
open_flags: flags,
|
||||
db_state: Arc::new(AtomicUsize::new(db_state)),
|
||||
init_lock: Arc::new(Mutex::new(())),
|
||||
};
|
||||
let db = Arc::new(db);
|
||||
});
|
||||
|
||||
// Check: https://github.com/tursodatabase/turso/pull/1761#discussion_r2154013123
|
||||
if db_state == DB_STATE_INITIALIZED {
|
||||
// parse schema
|
||||
let conn = db.connect()?;
|
||||
let schema_version = get_schema_version(&conn)?;
|
||||
schema.write().schema_version = schema_version;
|
||||
|
||||
let mut schema = schema
|
||||
.try_write()
|
||||
.expect("lock on schema should succeed first try");
|
||||
let mut schema_ref = db.schema.borrow_mut();
|
||||
let schema = Arc::make_mut(&mut *schema_ref);
|
||||
schema.schema_version = get_schema_version(&conn)?;
|
||||
|
||||
let syms = conn.syms.borrow();
|
||||
let pager = conn.pager.borrow().clone();
|
||||
@@ -235,6 +232,7 @@ impl Database {
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn connect(self: &Arc<Database>) -> Result<Arc<Connection>> {
|
||||
let pager = self.init_pager(None)?;
|
||||
|
||||
@@ -246,7 +244,7 @@ impl Database {
|
||||
let conn = Arc::new(Connection {
|
||||
_db: self.clone(),
|
||||
pager: RefCell::new(Rc::new(pager)),
|
||||
schema: RefCell::new(self.schema.read().clone()),
|
||||
schema: RefCell::new(self.schema.borrow().clone()),
|
||||
auto_commit: Cell::new(true),
|
||||
mv_transactions: RefCell::new(Vec::new()),
|
||||
transaction_state: Cell::new(TransactionState::None),
|
||||
@@ -492,7 +490,7 @@ impl CaptureDataChangesMode {
|
||||
pub struct Connection {
|
||||
_db: Arc<Database>,
|
||||
pager: RefCell<Rc<Pager>>,
|
||||
schema: RefCell<Schema>,
|
||||
schema: RefCell<Arc<Schema>>,
|
||||
/// Whether to automatically commit transaction
|
||||
auto_commit: Cell<bool>,
|
||||
mv_transactions: RefCell<Vec<crate::mvcc::database::TxID>>,
|
||||
@@ -729,10 +727,10 @@ impl Connection {
|
||||
pub fn maybe_update_schema(&self) {
|
||||
let current_schema_version = self.schema.borrow().schema_version;
|
||||
if matches!(self.transaction_state.get(), TransactionState::None)
|
||||
&& current_schema_version < self._db.schema.read().schema_version
|
||||
&& current_schema_version < self._db.schema.borrow().schema_version
|
||||
{
|
||||
let new_schema = self._db.schema.read();
|
||||
self.schema.replace(new_schema.clone());
|
||||
let new_schema = self._db.schema.borrow().clone();
|
||||
self.schema.replace(new_schema);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -882,11 +880,11 @@ impl Connection {
|
||||
return Err(LimboError::InternalError("Connection closed".to_string()));
|
||||
}
|
||||
let rows = self.query("SELECT * FROM sqlite_schema")?;
|
||||
let mut schema = self.schema.borrow_mut();
|
||||
let mut schema_ref = self.schema.borrow_mut();
|
||||
let schema = Arc::make_mut(&mut *schema_ref);
|
||||
{
|
||||
let syms = self.syms.borrow();
|
||||
if let Err(LimboError::ExtensionError(e)) =
|
||||
parse_schema_rows(rows, &mut schema, &syms, None)
|
||||
if let Err(LimboError::ExtensionError(e)) = parse_schema_rows(rows, schema, &syms, None)
|
||||
{
|
||||
// this means that a vtab exists and we no longer have the module loaded. we print
|
||||
// a warning to the user to load the module
|
||||
|
||||
@@ -715,18 +715,12 @@ impl Pager {
|
||||
match commit_status {
|
||||
IOResult::IO => Ok(IOResult::IO),
|
||||
IOResult::Done(_) => {
|
||||
let maybe_schema_pair = if schema_did_change {
|
||||
let schema = connection.schema.borrow().clone();
|
||||
// Lock first before writing to the database schema in case someone tries to read the schema before it's updated
|
||||
let db_schema = connection._db.schema.write();
|
||||
Some((schema, db_schema))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
self.wal.borrow().end_write_tx()?;
|
||||
self.wal.borrow().end_read_tx()?;
|
||||
if let Some((schema, mut db_schema)) = maybe_schema_pair {
|
||||
*db_schema = schema;
|
||||
|
||||
if schema_did_change {
|
||||
let schema = connection.schema.borrow().clone();
|
||||
*connection._db.schema.borrow_mut() = schema;
|
||||
}
|
||||
Ok(commit_status)
|
||||
}
|
||||
@@ -1314,8 +1308,9 @@ impl Pager {
|
||||
cache.unset_dirty_all_pages();
|
||||
cache.clear().expect("failed to clear page cache");
|
||||
if schema_did_change {
|
||||
let prev_schema = connection._db.schema.read().clone();
|
||||
connection.schema.replace(prev_schema);
|
||||
connection
|
||||
.schema
|
||||
.replace(connection._db.schema.borrow().clone());
|
||||
}
|
||||
self.wal.borrow_mut().rollback()?;
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::{
|
||||
LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable,
|
||||
};
|
||||
use std::{rc::Rc, sync::Arc};
|
||||
use tracing::{instrument, Level};
|
||||
use turso_sqlite3_parser::ast::{
|
||||
self, CreateTableBody, Expr, FunctionTail, Literal, UnaryOperator,
|
||||
};
|
||||
@@ -48,6 +49,7 @@ pub struct UnparsedFromSqlIndex {
|
||||
pub sql: String,
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn parse_schema_rows(
|
||||
rows: Option<Statement>,
|
||||
schema: &mut Schema,
|
||||
|
||||
@@ -216,7 +216,8 @@ pub fn op_drop_index(
|
||||
let Insn::DropIndex { index, db: _ } = insn else {
|
||||
unreachable!("unexpected Insn {:?}", insn)
|
||||
};
|
||||
let mut schema = program.connection.schema.borrow_mut();
|
||||
let mut schema_ref = program.connection.schema.borrow_mut();
|
||||
let schema = Arc::make_mut(&mut *schema_ref);
|
||||
schema.remove_index(index);
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
@@ -5719,7 +5720,8 @@ pub fn op_drop_table(
|
||||
}
|
||||
let conn = program.connection.clone();
|
||||
{
|
||||
let mut schema = conn.schema.borrow_mut();
|
||||
let mut schema_ref = conn.schema.borrow_mut();
|
||||
let schema = Arc::make_mut(&mut *schema_ref);
|
||||
schema.remove_indices_for_table(table_name);
|
||||
schema.remove_table(table_name);
|
||||
}
|
||||
@@ -5804,33 +5806,22 @@ pub fn op_parse_schema(
|
||||
if let Some(where_clause) = where_clause {
|
||||
let stmt = conn.prepare(format!("SELECT * FROM sqlite_schema WHERE {where_clause}"))?;
|
||||
|
||||
let mut new_schema = conn.schema.borrow().clone();
|
||||
let mut schema_ref = conn.schema.borrow_mut();
|
||||
let schema = Arc::make_mut(&mut *schema_ref);
|
||||
|
||||
// TODO: This function below is synchronous, make it async
|
||||
{
|
||||
parse_schema_rows(
|
||||
Some(stmt),
|
||||
&mut new_schema,
|
||||
&conn.syms.borrow(),
|
||||
state.mv_tx_id,
|
||||
)?;
|
||||
parse_schema_rows(Some(stmt), schema, &conn.syms.borrow(), state.mv_tx_id)?;
|
||||
}
|
||||
conn.schema.replace(new_schema);
|
||||
} else {
|
||||
let stmt = conn.prepare("SELECT * FROM sqlite_schema")?;
|
||||
let mut new_schema = conn.schema.borrow().clone();
|
||||
let mut schema_ref = conn.schema.borrow_mut();
|
||||
let schema = Arc::make_mut(&mut *schema_ref);
|
||||
|
||||
// TODO: This function below is synchronous, make it async
|
||||
{
|
||||
parse_schema_rows(
|
||||
Some(stmt),
|
||||
&mut new_schema,
|
||||
&conn.syms.borrow(),
|
||||
state.mv_tx_id,
|
||||
)?;
|
||||
parse_schema_rows(Some(stmt), schema, &conn.syms.borrow(), state.mv_tx_id)?;
|
||||
}
|
||||
|
||||
conn.schema.replace(new_schema);
|
||||
}
|
||||
conn.auto_commit.set(previous_auto_commit);
|
||||
state.pc += 1;
|
||||
@@ -5903,7 +5894,9 @@ pub fn op_set_cookie(
|
||||
TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"),
|
||||
}
|
||||
|
||||
program.connection.schema.borrow_mut().schema_version = *value as u32;
|
||||
let mut schema = program.connection.schema.borrow_mut();
|
||||
Arc::make_mut(&mut *schema).schema_version = *value as u32;
|
||||
|
||||
header_accessor::set_schema_cookie(pager, *value as u32)?;
|
||||
}
|
||||
cookie => todo!("{cookie:?} is not yet implement for SetCookie"),
|
||||
|
||||
Reference in New Issue
Block a user