mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-09 10:14:21 +01:00
pass schema to epilogue for schema_version checking + do not Pragma Schema Version in open_with_flags to avoid infinite loop in reprepare. Just access the database header directly
This commit is contained in:
47
core/lib.rs
47
core/lib.rs
@@ -310,7 +310,7 @@ impl Database {
|
||||
let pager = conn.pager.borrow().clone();
|
||||
|
||||
db.with_schema_mut(|schema| {
|
||||
schema.schema_version = get_schema_version(&conn)?;
|
||||
schema.schema_version = header_accessor::get_schema_cookie(&conn.pager.borrow())?;
|
||||
let result = schema
|
||||
.make_from_btree(None, pager.clone(), &syms)
|
||||
.or_else(|e| {
|
||||
@@ -505,7 +505,6 @@ impl Database {
|
||||
let schema = Arc::make_mut(&mut *schema_ref);
|
||||
f(schema)
|
||||
}
|
||||
|
||||
pub(crate) fn clone_schema(&self) -> Result<Arc<Schema>> {
|
||||
let schema = self.schema.lock().map_err(|_| LimboError::SchemaLocked)?;
|
||||
Ok(schema.clone())
|
||||
@@ -535,50 +534,6 @@ impl Database {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_schema_version(conn: &Arc<Connection>) -> Result<u32> {
|
||||
let mut rows = conn
|
||||
.query("PRAGMA schema_version")?
|
||||
.ok_or(LimboError::InternalError(
|
||||
"failed to parse pragma schema_version on initialization".to_string(),
|
||||
))?;
|
||||
let mut schema_version = None;
|
||||
loop {
|
||||
match rows.step()? {
|
||||
StepResult::Row => {
|
||||
let row = rows.row().unwrap();
|
||||
if schema_version.is_some() {
|
||||
return Err(LimboError::InternalError(
|
||||
"PRAGMA schema_version; returned more that one row".to_string(),
|
||||
));
|
||||
}
|
||||
schema_version = Some(row.get::<i64>(0)? as u32);
|
||||
}
|
||||
StepResult::IO => {
|
||||
rows.run_once()?;
|
||||
}
|
||||
StepResult::Interrupt => {
|
||||
return Err(LimboError::InternalError(
|
||||
"PRAGMA schema_version; returned more that one row".to_string(),
|
||||
));
|
||||
}
|
||||
StepResult::Done => {
|
||||
if let Some(version) = schema_version {
|
||||
return Ok(version);
|
||||
} else {
|
||||
return Err(LimboError::InternalError(
|
||||
"failed to get schema_version".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
StepResult::Busy => {
|
||||
return Err(LimboError::InternalError(
|
||||
"PRAGMA schema_version; returned more that one row".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub enum CaptureDataChangesMode {
|
||||
Off,
|
||||
|
||||
@@ -100,7 +100,7 @@ pub fn translate(
|
||||
stmt => translate_inner(schema, stmt, syms, program, &connection)?,
|
||||
};
|
||||
|
||||
program.epilogue();
|
||||
program.epilogue(schema);
|
||||
|
||||
Ok(program.build(connection, change_cnt_on, input))
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ use turso_sqlite3_parser::ast::{self, TableInternalId};
|
||||
use crate::{
|
||||
numeric::Numeric,
|
||||
parameters::Parameters,
|
||||
schema::{BTreeTable, Index, PseudoCursorType, Table},
|
||||
schema::{BTreeTable, Index, PseudoCursorType, Schema, Table},
|
||||
translate::{
|
||||
collate::CollationSeq,
|
||||
emitter::TransactionMode,
|
||||
@@ -778,7 +778,7 @@ impl ProgramBuilder {
|
||||
/// Clean up and finalize the program, resolving any remaining labels
|
||||
/// Note that although these are the final instructions, typically an SQLite
|
||||
/// query will jump to the Transaction instruction via init_label.
|
||||
pub fn epilogue(&mut self) {
|
||||
pub fn epilogue(&mut self, schema: &Schema) {
|
||||
if self.nested_level == 0 {
|
||||
// "rollback" flag is used to determine if halt should rollback the transaction.
|
||||
self.emit_halt(self.rollback);
|
||||
@@ -788,7 +788,7 @@ impl ProgramBuilder {
|
||||
self.emit_insn(Insn::Transaction {
|
||||
db: 0,
|
||||
write: matches!(self.txn_mode, TransactionMode::Write),
|
||||
schema_cookie: 0, // TODO: placeholder until we have epilogue being called only in one place
|
||||
schema_cookie: schema.schema_version,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -1997,10 +1997,6 @@ pub fn op_transaction(
|
||||
else {
|
||||
unreachable!("unexpected Insn {:?}", insn)
|
||||
};
|
||||
let header_schema_cookie = header_accessor::get_schema_cookie(pager)?;
|
||||
if header_schema_cookie != *schema_cookie {
|
||||
return Err(LimboError::SchemaUpdated);
|
||||
}
|
||||
let conn = program.connection.clone();
|
||||
if *write && conn._db.open_flags.contains(OpenFlags::ReadOnly) {
|
||||
return Err(LimboError::ReadOnly);
|
||||
@@ -2012,6 +2008,10 @@ pub fn op_transaction(
|
||||
if state.mv_tx_id.is_none() {
|
||||
// We allocate the first page lazily in the first transaction.
|
||||
return_if_io!(pager.maybe_allocate_page1());
|
||||
let header_schema_cookie = header_accessor::get_schema_cookie(pager)?;
|
||||
if header_schema_cookie != *schema_cookie {
|
||||
return Err(LimboError::SchemaUpdated);
|
||||
}
|
||||
let tx_id = mv_store.begin_tx(pager.clone());
|
||||
conn.mv_transactions.borrow_mut().push(tx_id);
|
||||
state.mv_tx_id = Some(tx_id);
|
||||
@@ -2081,6 +2081,15 @@ pub fn op_transaction(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Can only read header if page 1 has been allocated already
|
||||
// begin_write_tx and begin_read_tx guarantee that happens
|
||||
let header_schema_cookie = header_accessor::get_schema_cookie(pager)?;
|
||||
tracing::info!(header_schema_cookie, schema_cookie);
|
||||
if header_schema_cookie != *schema_cookie {
|
||||
return Err(LimboError::SchemaUpdated);
|
||||
}
|
||||
|
||||
if updated {
|
||||
conn.transaction_state.replace(new_transaction_state);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user