update schema version for internal schema represenation in maybe_reparse_schema

This commit is contained in:
Nikita Sivukhin
2025-07-24 22:43:07 +04:00
parent 8b0e5b151e
commit e05660133b

View File

@@ -644,31 +644,81 @@ impl Connection {
}
/// Parse schema from scratch if version of schema for the connection differs from the schema cookie in the root page
/// This function must be called outside of any transaction because internally it will start transaction session by itself
fn maybe_reparse_schema(self: &Arc<Connection>) -> Result<()> {
let pager = self.pager.borrow().clone();
// first, quickly read schema_version from the root page in order to check if schema changed
pager.begin_read_tx()?;
let db_cookie = get_schema_cookie(&pager);
let db_schema_version = get_schema_cookie(&pager);
pager.end_read_tx().expect("read txn must be finished");
let db_cookie = db_cookie?;
let connection_cookie = self.schema.borrow().schema_version;
let db_schema_version = db_schema_version?;
let conn_schema_version = self.schema.borrow().schema_version;
turso_assert!(
connection_cookie <= db_cookie,
"connection cookie can't be larger than db cookie: {} vs {}",
connection_cookie,
db_cookie
conn_schema_version <= db_schema_version,
"connection schema_version can't be larger than db schema_version: {} vs {}",
conn_schema_version,
db_schema_version
);
if self.schema.borrow().schema_version != db_cookie {
let stmt = self.prepare("SELECT * FROM sqlite_schema")?;
self.with_schema_mut(|schema| {
// TODO: This function below is synchronous, make it async
parse_schema_rows(Some(stmt), schema, &self.syms.borrow(), None)
})?;
let schema = self.schema.borrow().clone();
self._db.update_schema_if_newer(schema)?;
// if schema_versions matches - exit early
if conn_schema_version == db_schema_version {
return Ok(());
}
// maybe_reparse_schema must be called outside of any transaction
turso_assert!(
self.transaction_state.get() == TransactionState::None,
"unexpected start transaction"
);
// reparse logic extracted to the function in order to not accidentally propagate error from it before closing transaction
let reparse = || -> Result<()> {
let stmt = self.prepare("SELECT * FROM sqlite_schema")?;
self.with_schema_mut(|schema| -> Result<()> {
// create fresh schema as some objects can be deleted
let mut fresh = Schema::new(false); // todo: indices!
// read cookie before consuming statement program - otherwise we can end up reading cookie with closed transaction state
let cookie = get_schema_cookie(&pager)?;
// TODO: This function below is synchronous, make it async
parse_schema_rows(stmt, &mut fresh, &self.syms.borrow(), None)?;
*schema = fresh;
schema.schema_version = cookie;
Result::Ok(())
})?;
Result::Ok(())
};
// start read transaction manually, because we will read schema cookie once again and
// we must be sure that it will consistent with schema content
//
// from now on we must be very careful with errors propagation
// in order to not accidentally keep read transaction opened
pager.begin_read_tx()?;
self.transaction_state.replace(TransactionState::Read);
let reparse_result = reparse();
let previous = self.transaction_state.replace(TransactionState::None);
turso_assert!(
matches!(previous, TransactionState::None | TransactionState::Read),
"unexpected end transaction state"
);
// close opened transaction if it was kept open
// (in most cases, it will be automatically closed if stmt was executed properly)
if previous == TransactionState::Read {
pager.end_read_tx().expect("read txn must be finished");
}
// now we can safely propagate error after ensured that transaction state is reset
let _ = reparse_result?;
let schema = self.schema.borrow().clone();
self._db.update_schema_if_newer(schema)?;
Ok(())
}
@@ -1059,7 +1109,9 @@ impl Connection {
if self.closed.get() {
return Err(LimboError::InternalError("Connection closed".to_string()));
}
let rows = self.query("SELECT * FROM sqlite_schema")?;
let rows = self
.query("SELECT * FROM sqlite_schema")?
.expect("query must be parsed to statement");
let syms = self.syms.borrow();
self.with_schema_mut(|schema| {
if let Err(LimboError::ExtensionError(e)) = parse_schema_rows(rows, schema, &syms, None)