mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-18 14:35:14 +01:00
remove lock from private schema copy
This commit is contained in:
44
core/lib.rs
44
core/lib.rs
@@ -267,7 +267,7 @@ impl Database {
|
||||
let conn = Arc::new(Connection {
|
||||
_db: self.clone(),
|
||||
pager: pager.clone(),
|
||||
schema: self.schema.clone(),
|
||||
schema: RefCell::new(self.schema.read().clone()),
|
||||
last_insert_rowid: Cell::new(0),
|
||||
auto_commit: Cell::new(true),
|
||||
mv_transactions: RefCell::new(Vec::new()),
|
||||
@@ -318,7 +318,7 @@ impl Database {
|
||||
let conn = Arc::new(Connection {
|
||||
_db: self.clone(),
|
||||
pager: Rc::new(pager),
|
||||
schema: self.schema.clone(),
|
||||
schema: RefCell::new(self.schema.read().clone()),
|
||||
auto_commit: Cell::new(true),
|
||||
mv_transactions: RefCell::new(Vec::new()),
|
||||
transaction_state: Cell::new(TransactionState::None),
|
||||
@@ -436,7 +436,7 @@ fn get_schema_version(conn: &Arc<Connection>, io: &Arc<dyn IO>) -> Result<u32> {
|
||||
pub struct Connection {
|
||||
_db: Arc<Database>,
|
||||
pager: Rc<Pager>,
|
||||
schema: Arc<RwLock<Schema>>,
|
||||
schema: RefCell<Schema>,
|
||||
/// Whether to automatically commit transaction
|
||||
auto_commit: Cell<bool>,
|
||||
mv_transactions: RefCell<Vec<crate::mvcc::database::TxID>>,
|
||||
@@ -472,10 +472,7 @@ impl Connection {
|
||||
match cmd {
|
||||
Cmd::Stmt(stmt) => {
|
||||
let program = Rc::new(translate::translate(
|
||||
self.schema
|
||||
.try_read()
|
||||
.ok_or(LimboError::SchemaLocked)?
|
||||
.deref(),
|
||||
self.schema.borrow().deref(),
|
||||
stmt,
|
||||
self.pager.clone(),
|
||||
self.clone(),
|
||||
@@ -520,10 +517,7 @@ impl Connection {
|
||||
match cmd {
|
||||
Cmd::Stmt(ref stmt) | Cmd::Explain(ref stmt) => {
|
||||
let program = translate::translate(
|
||||
self.schema
|
||||
.try_read()
|
||||
.ok_or(LimboError::SchemaLocked)?
|
||||
.deref(),
|
||||
self.schema.borrow().deref(),
|
||||
stmt.clone(),
|
||||
self.pager.clone(),
|
||||
self.clone(),
|
||||
@@ -543,23 +537,14 @@ impl Connection {
|
||||
match stmt {
|
||||
ast::Stmt::Select(select) => {
|
||||
let mut plan = prepare_select_plan(
|
||||
self.schema
|
||||
.try_read()
|
||||
.ok_or(LimboError::SchemaLocked)?
|
||||
.deref(),
|
||||
self.schema.borrow().deref(),
|
||||
*select,
|
||||
&syms,
|
||||
&[],
|
||||
&mut table_ref_counter,
|
||||
translate::plan::QueryDestination::ResultRows,
|
||||
)?;
|
||||
optimize_plan(
|
||||
&mut plan,
|
||||
self.schema
|
||||
.try_read()
|
||||
.ok_or(LimboError::SchemaLocked)?
|
||||
.deref(),
|
||||
)?;
|
||||
optimize_plan(&mut plan, self.schema.borrow().deref())?;
|
||||
let _ = std::io::stdout().write_all(plan.to_string().as_bytes());
|
||||
}
|
||||
_ => todo!(),
|
||||
@@ -588,10 +573,7 @@ impl Connection {
|
||||
match cmd {
|
||||
Cmd::Explain(stmt) => {
|
||||
let program = translate::translate(
|
||||
self.schema
|
||||
.try_read()
|
||||
.ok_or(LimboError::SchemaLocked)?
|
||||
.deref(),
|
||||
self.schema.borrow().deref(),
|
||||
stmt,
|
||||
self.pager.clone(),
|
||||
self.clone(),
|
||||
@@ -604,10 +586,7 @@ impl Connection {
|
||||
Cmd::ExplainQueryPlan(_stmt) => todo!(),
|
||||
Cmd::Stmt(stmt) => {
|
||||
let program = translate::translate(
|
||||
self.schema
|
||||
.try_read()
|
||||
.ok_or(LimboError::SchemaLocked)?
|
||||
.deref(),
|
||||
self.schema.borrow().deref(),
|
||||
stmt,
|
||||
self.pager.clone(),
|
||||
self.clone(),
|
||||
@@ -753,10 +732,7 @@ impl Connection {
|
||||
|
||||
pub fn parse_schema_rows(self: &Arc<Connection>) -> Result<()> {
|
||||
let rows = self.query("SELECT * FROM sqlite_schema")?;
|
||||
let mut schema = self
|
||||
.schema
|
||||
.try_write()
|
||||
.expect("lock on schema should succeed first try");
|
||||
let mut schema = self.schema.borrow_mut();
|
||||
{
|
||||
let syms = self.syms.borrow();
|
||||
if let Err(LimboError::ExtensionError(e)) =
|
||||
|
||||
@@ -640,7 +640,8 @@ impl Pager {
|
||||
tracing::trace!("end_tx(rollback={})", rollback);
|
||||
if rollback {
|
||||
let maybe_schema_pair = if change_schema {
|
||||
let schema = connection.schema.clone().write().clone();
|
||||
let schema = connection.schema.borrow().clone();
|
||||
// Lock first before writing to the database schema in case someone tries to read the schema before it's updated
|
||||
let db_schema = connection._db.schema.write();
|
||||
Some((schema, db_schema))
|
||||
} else {
|
||||
|
||||
@@ -216,7 +216,7 @@ pub fn op_drop_index(
|
||||
let Insn::DropIndex { index, db: _ } = insn else {
|
||||
unreachable!("unexpected Insn {:?}", insn)
|
||||
};
|
||||
let mut schema = program.connection.schema.write();
|
||||
let mut schema = program.connection.schema.borrow_mut();
|
||||
schema.remove_index(index);
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
@@ -901,7 +901,7 @@ pub fn op_open_read(
|
||||
}
|
||||
CursorType::BTreeIndex(index) => {
|
||||
let conn = program.connection.clone();
|
||||
let schema = conn.schema.try_read().ok_or(LimboError::SchemaLocked)?;
|
||||
let schema = conn.schema.borrow();
|
||||
let table = schema
|
||||
.get_table(&index.table_name)
|
||||
.and_then(|table| table.btree());
|
||||
@@ -4725,7 +4725,7 @@ pub fn op_open_write(
|
||||
};
|
||||
if let Some(index) = maybe_index {
|
||||
let conn = program.connection.clone();
|
||||
let schema = conn.schema.try_read().ok_or(LimboError::SchemaLocked)?;
|
||||
let schema = conn.schema.borrow();
|
||||
let table = schema
|
||||
.get_table(&index.table_name)
|
||||
.and_then(|table| table.btree());
|
||||
@@ -4855,7 +4855,7 @@ pub fn op_drop_table(
|
||||
}
|
||||
let conn = program.connection.clone();
|
||||
{
|
||||
let mut schema = conn.schema.write();
|
||||
let mut schema = conn.schema.borrow_mut();
|
||||
schema.remove_indices_for_table(table_name);
|
||||
schema.remove_table(table_name);
|
||||
}
|
||||
@@ -4939,7 +4939,7 @@ pub fn op_parse_schema(
|
||||
where_clause
|
||||
))?;
|
||||
|
||||
let mut schema = conn.schema.write();
|
||||
let mut schema = conn.schema.borrow_mut();
|
||||
|
||||
// TODO: This function below is synchronous, make it async
|
||||
{
|
||||
@@ -4953,7 +4953,7 @@ pub fn op_parse_schema(
|
||||
}
|
||||
} else {
|
||||
let stmt = conn.prepare("SELECT * FROM sqlite_schema")?;
|
||||
let mut new = Schema::new(conn.schema.read().indexes_enabled());
|
||||
let mut new = Schema::new(conn.schema.borrow().indexes_enabled());
|
||||
|
||||
// TODO: This function below is synchronous, make it async
|
||||
{
|
||||
@@ -4966,7 +4966,7 @@ pub fn op_parse_schema(
|
||||
)?;
|
||||
}
|
||||
|
||||
let mut schema = conn.schema.write();
|
||||
let mut schema = conn.schema.borrow_mut();
|
||||
*schema = new;
|
||||
}
|
||||
state.pc += 1;
|
||||
|
||||
Reference in New Issue
Block a user