mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-26 03:14:23 +01:00
rename change_schema to schema_did_change
This commit is contained in:
10
core/lib.rs
10
core/lib.rs
@@ -97,17 +97,17 @@ pub type Result<T, E = LimboError> = std::result::Result<T, E>;
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
|
||||
enum TransactionState {
|
||||
Write { change_schema: bool },
|
||||
Write { schema_did_change: bool },
|
||||
Read,
|
||||
None,
|
||||
}
|
||||
|
||||
impl TransactionState {
|
||||
fn change_schema(&self) -> bool {
|
||||
fn schema_did_change(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
TransactionState::Write {
|
||||
change_schema: true
|
||||
schema_did_change: true
|
||||
}
|
||||
)
|
||||
}
|
||||
@@ -633,7 +633,7 @@ impl Connection {
|
||||
let res = self._db.io.run_once();
|
||||
if res.is_err() {
|
||||
let state = self.transaction_state.get();
|
||||
self.pager.rollback(state.change_schema(), self)?;
|
||||
self.pager.rollback(state.schema_did_change(), self)?;
|
||||
}
|
||||
res
|
||||
}
|
||||
@@ -907,7 +907,7 @@ impl Statement {
|
||||
if res.is_err() {
|
||||
let state = self.program.connection.transaction_state.get();
|
||||
self.pager
|
||||
.rollback(state.change_schema(), &self.program.connection)?;
|
||||
.rollback(state.schema_did_change(), &self.program.connection)?;
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
@@ -639,7 +639,7 @@ impl Pager {
|
||||
pub fn end_tx(
|
||||
&self,
|
||||
rollback: bool,
|
||||
change_schema: bool,
|
||||
schema_did_change: bool,
|
||||
connection: &Connection,
|
||||
wal_checkpoint_disabled: bool,
|
||||
) -> Result<PagerCacheflushStatus> {
|
||||
@@ -653,7 +653,7 @@ impl Pager {
|
||||
match cacheflush_status {
|
||||
PagerCacheflushStatus::IO => Ok(PagerCacheflushStatus::IO),
|
||||
PagerCacheflushStatus::Done(_) => {
|
||||
let maybe_schema_pair = if change_schema {
|
||||
let maybe_schema_pair = if schema_did_change {
|
||||
let schema = connection.schema.borrow().clone();
|
||||
// Lock first before writing to the database schema in case someone tries to read the schema before it's updated
|
||||
let db_schema = connection._db.schema.write();
|
||||
@@ -1206,13 +1206,17 @@ impl Pager {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::INFO)]
|
||||
pub fn rollback(&self, change_schema: bool, connection: &Connection) -> Result<(), LimboError> {
|
||||
tracing::debug!(change_schema);
|
||||
pub fn rollback(
|
||||
&self,
|
||||
schema_did_change: bool,
|
||||
connection: &Connection,
|
||||
) -> Result<(), LimboError> {
|
||||
tracing::debug!(schema_did_change);
|
||||
self.dirty_pages.borrow_mut().clear();
|
||||
let mut cache = self.page_cache.write();
|
||||
cache.unset_dirty_all_pages();
|
||||
cache.clear().expect("failed to clear page cache");
|
||||
if change_schema {
|
||||
if schema_did_change {
|
||||
let prev_schema = connection._db.schema.read().clone();
|
||||
connection.schema.replace(prev_schema);
|
||||
}
|
||||
|
||||
@@ -1699,22 +1699,22 @@ pub fn op_transaction(
|
||||
} else {
|
||||
let current_state = conn.transaction_state.get();
|
||||
let (new_transaction_state, updated) = match (current_state, write) {
|
||||
(TransactionState::Write { change_schema }, true) => {
|
||||
(TransactionState::Write { change_schema }, false)
|
||||
(TransactionState::Write { schema_did_change }, true) => {
|
||||
(TransactionState::Write { schema_did_change }, false)
|
||||
}
|
||||
(TransactionState::Write { change_schema }, false) => {
|
||||
(TransactionState::Write { change_schema }, false)
|
||||
(TransactionState::Write { schema_did_change }, false) => {
|
||||
(TransactionState::Write { schema_did_change }, false)
|
||||
}
|
||||
(TransactionState::Read, true) => (
|
||||
TransactionState::Write {
|
||||
change_schema: false,
|
||||
schema_did_change: false,
|
||||
},
|
||||
true,
|
||||
),
|
||||
(TransactionState::Read, false) => (TransactionState::Read, false),
|
||||
(TransactionState::None, true) => (
|
||||
TransactionState::Write {
|
||||
change_schema: false,
|
||||
schema_did_change: false,
|
||||
},
|
||||
true,
|
||||
),
|
||||
@@ -1766,9 +1766,9 @@ pub fn op_auto_commit(
|
||||
super::StepResult::Busy => Ok(InsnFunctionStepResult::Busy),
|
||||
};
|
||||
}
|
||||
let change_schema =
|
||||
if let TransactionState::Write { change_schema } = conn.transaction_state.get() {
|
||||
change_schema
|
||||
let schema_did_change =
|
||||
if let TransactionState::Write { schema_did_change } = conn.transaction_state.get() {
|
||||
schema_did_change
|
||||
} else {
|
||||
false
|
||||
};
|
||||
@@ -1776,7 +1776,7 @@ pub fn op_auto_commit(
|
||||
if *auto_commit != conn.auto_commit.get() {
|
||||
if *rollback {
|
||||
// TODO(pere): add rollback I/O logic once we implement rollback journal
|
||||
pager.rollback(change_schema, &conn)?;
|
||||
pager.rollback(schema_did_change, &conn)?;
|
||||
conn.auto_commit.replace(true);
|
||||
} else {
|
||||
conn.auto_commit.replace(*auto_commit);
|
||||
@@ -5063,8 +5063,8 @@ pub fn op_set_cookie(
|
||||
Cookie::SchemaVersion => {
|
||||
// we update transaction state to indicate that the schema has changed
|
||||
match program.connection.transaction_state.get() {
|
||||
TransactionState::Write { change_schema } => {
|
||||
program.connection.transaction_state.set(TransactionState::Write { change_schema: true });
|
||||
TransactionState::Write { schema_did_change } => {
|
||||
program.connection.transaction_state.set(TransactionState::Write { schema_did_change: true });
|
||||
},
|
||||
TransactionState::Read => unreachable!("invalid transaction state for SetCookie: TransactionState::Read, should be write"),
|
||||
TransactionState::None => unreachable!("invalid transaction state for SetCookie: TransactionState::None, should be write"),
|
||||
|
||||
@@ -386,7 +386,7 @@ impl Program {
|
||||
let res = insn_function(self, state, insn, &pager, mv_store.as_ref());
|
||||
if res.is_err() {
|
||||
let state = self.connection.transaction_state.get();
|
||||
pager.rollback(state.change_schema(), &self.connection)?
|
||||
pager.rollback(state.schema_did_change(), &self.connection)?
|
||||
}
|
||||
match res? {
|
||||
InsnFunctionStepResult::Step => {}
|
||||
@@ -427,7 +427,8 @@ impl Program {
|
||||
program_state.commit_state
|
||||
);
|
||||
if program_state.commit_state == CommitState::Committing {
|
||||
let TransactionState::Write { change_schema } = connection.transaction_state.get()
|
||||
let TransactionState::Write { schema_did_change } =
|
||||
connection.transaction_state.get()
|
||||
else {
|
||||
unreachable!("invalid state for write commit step")
|
||||
};
|
||||
@@ -436,18 +437,18 @@ impl Program {
|
||||
&mut program_state.commit_state,
|
||||
&connection,
|
||||
rollback,
|
||||
change_schema,
|
||||
schema_did_change,
|
||||
)
|
||||
} else if auto_commit {
|
||||
let current_state = connection.transaction_state.get();
|
||||
tracing::trace!("Auto-commit state: {:?}", current_state);
|
||||
match current_state {
|
||||
TransactionState::Write { change_schema } => self.step_end_write_txn(
|
||||
TransactionState::Write { schema_did_change } => self.step_end_write_txn(
|
||||
&pager,
|
||||
&mut program_state.commit_state,
|
||||
&connection,
|
||||
rollback,
|
||||
change_schema,
|
||||
schema_did_change,
|
||||
),
|
||||
TransactionState::Read => {
|
||||
connection.transaction_state.replace(TransactionState::None);
|
||||
@@ -472,11 +473,11 @@ impl Program {
|
||||
commit_state: &mut CommitState,
|
||||
connection: &Connection,
|
||||
rollback: bool,
|
||||
change_schema: bool,
|
||||
schema_did_change: bool,
|
||||
) -> Result<StepResult> {
|
||||
let cacheflush_status = pager.end_tx(
|
||||
rollback,
|
||||
change_schema,
|
||||
schema_did_change,
|
||||
connection,
|
||||
connection.wal_checkpoint_disabled.get(),
|
||||
)?;
|
||||
@@ -489,7 +490,7 @@ impl Program {
|
||||
status,
|
||||
crate::storage::pager::PagerCacheflushResult::Rollback
|
||||
) {
|
||||
pager.rollback(change_schema, connection)?;
|
||||
pager.rollback(schema_did_change, connection)?;
|
||||
}
|
||||
connection.transaction_state.replace(TransactionState::None);
|
||||
*commit_state = CommitState::Ready;
|
||||
|
||||
Reference in New Issue
Block a user