mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-19 23:15:28 +01:00
end_tx does not need schema_did_change variable
This commit is contained in:
21
core/lib.rs
21
core/lib.rs
@@ -1357,10 +1357,12 @@ impl Connection {
|
||||
self.closed.set(true);
|
||||
|
||||
match self.transaction_state.get() {
|
||||
TransactionState::Write { schema_did_change } => {
|
||||
TransactionState::None => {
|
||||
// No active transaction
|
||||
}
|
||||
_ => {
|
||||
while let IOResult::IO = self.pager.borrow().end_tx(
|
||||
true, // rollback = true for close
|
||||
schema_did_change,
|
||||
self,
|
||||
self.wal_checkpoint_disabled.get(),
|
||||
)? {
|
||||
@@ -1368,13 +1370,6 @@ impl Connection {
|
||||
}
|
||||
self.transaction_state.set(TransactionState::None);
|
||||
}
|
||||
TransactionState::PendingUpgrade | TransactionState::Read => {
|
||||
self.pager.borrow().end_read_tx()?;
|
||||
self.transaction_state.set(TransactionState::None);
|
||||
}
|
||||
TransactionState::None => {
|
||||
// No active transaction
|
||||
}
|
||||
}
|
||||
|
||||
self.pager
|
||||
@@ -1933,7 +1928,9 @@ impl Statement {
|
||||
res
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn reprepare(&mut self) -> Result<()> {
|
||||
tracing::trace!("repreparing statement");
|
||||
let conn = self.program.connection.clone();
|
||||
*conn.schema.borrow_mut() = conn._db.clone_schema()?;
|
||||
self.program = {
|
||||
@@ -1969,10 +1966,8 @@ impl Statement {
|
||||
let res = self.pager.io.run_once();
|
||||
if res.is_err() {
|
||||
let state = self.program.connection.transaction_state.get();
|
||||
if let TransactionState::Write { schema_did_change } = state {
|
||||
let end_tx_res =
|
||||
self.pager
|
||||
.end_tx(true, schema_did_change, &self.program.connection, true)?;
|
||||
if let TransactionState::Write { .. } = state {
|
||||
let end_tx_res = self.pager.end_tx(true, &self.program.connection, true)?;
|
||||
self.program
|
||||
.connection
|
||||
.transaction_state
|
||||
|
||||
@@ -514,7 +514,6 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
|
||||
.pager
|
||||
.end_tx(
|
||||
false, // rollback = false since we're committing
|
||||
false, // schema_did_change = false for now (could be improved)
|
||||
&self.connection,
|
||||
self.connection.wal_checkpoint_disabled.get(),
|
||||
)
|
||||
|
||||
@@ -7824,7 +7824,7 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
loop {
|
||||
match pager.end_tx(false, false, &conn, false).unwrap() {
|
||||
match pager.end_tx(false, &conn, false).unwrap() {
|
||||
IOResult::Done(_) => break,
|
||||
IOResult::IO => {
|
||||
pager.io.run_once().unwrap();
|
||||
@@ -7982,7 +7982,7 @@ mod tests {
|
||||
.unwrap();
|
||||
let _c = cursor.move_to_root().unwrap();
|
||||
loop {
|
||||
match pager.end_tx(false, false, &conn, false).unwrap() {
|
||||
match pager.end_tx(false, &conn, false).unwrap() {
|
||||
IOResult::Done(_) => break,
|
||||
IOResult::IO => {
|
||||
pager.io.run_once().unwrap();
|
||||
@@ -8200,7 +8200,7 @@ mod tests {
|
||||
|
||||
let _c = cursor.move_to_root().unwrap();
|
||||
loop {
|
||||
match pager.end_tx(false, false, &conn, false).unwrap() {
|
||||
match pager.end_tx(false, &conn, false).unwrap() {
|
||||
IOResult::Done(_) => break,
|
||||
IOResult::IO => {
|
||||
pager.io.run_once().unwrap();
|
||||
|
||||
@@ -1009,7 +1009,6 @@ impl Pager {
|
||||
pub fn end_tx(
|
||||
&self,
|
||||
rollback: bool,
|
||||
schema_did_change: bool,
|
||||
connection: &Connection,
|
||||
wal_checkpoint_disabled: bool,
|
||||
) -> Result<IOResult<PagerCommitResult>> {
|
||||
@@ -1018,11 +1017,11 @@ impl Pager {
|
||||
// TODO: Unsure what the semantics of "end_tx" is for in-memory databases, ephemeral tables and ephemeral indexes.
|
||||
return Ok(IOResult::Done(PagerCommitResult::Rollback));
|
||||
};
|
||||
let (is_write, schema_did_change) = match connection.transaction_state.get() {
|
||||
TransactionState::Write { schema_did_change } => (true, schema_did_change),
|
||||
_ => (false, false),
|
||||
};
|
||||
if rollback {
|
||||
let is_write = matches!(
|
||||
connection.transaction_state.get(),
|
||||
TransactionState::Write { .. }
|
||||
);
|
||||
if is_write {
|
||||
wal.borrow().end_write_tx();
|
||||
}
|
||||
|
||||
@@ -2071,6 +2071,7 @@ pub fn op_transaction(
|
||||
match pager.begin_write_tx()? {
|
||||
IOResult::Done(r) => {
|
||||
if let LimboResult::Busy = r {
|
||||
tracing::error!("connection is busy");
|
||||
pager.end_read_tx()?;
|
||||
conn.transaction_state.replace(TransactionState::None);
|
||||
conn.auto_commit.replace(true);
|
||||
@@ -2136,17 +2137,11 @@ pub fn op_auto_commit(
|
||||
.commit_txn(pager.clone(), state, mv_store, *rollback)
|
||||
.map(Into::into);
|
||||
}
|
||||
let schema_did_change =
|
||||
if let TransactionState::Write { schema_did_change } = conn.transaction_state.get() {
|
||||
schema_did_change
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if *auto_commit != conn.auto_commit.get() {
|
||||
if *rollback {
|
||||
// TODO(pere): add rollback I/O logic once we implement rollback journal
|
||||
return_if_io!(pager.end_tx(true, schema_did_change, &conn, false));
|
||||
return_if_io!(pager.end_tx(true, &conn, false));
|
||||
conn.transaction_state.replace(TransactionState::None);
|
||||
conn.auto_commit.replace(true);
|
||||
} else {
|
||||
|
||||
@@ -412,8 +412,8 @@ impl Program {
|
||||
if self.connection.closed.get() {
|
||||
// Connection is closed for whatever reason, rollback the transaction.
|
||||
let state = self.connection.transaction_state.get();
|
||||
if let TransactionState::Write { schema_did_change } = state {
|
||||
match pager.end_tx(true, schema_did_change, &self.connection, false)? {
|
||||
if let TransactionState::Write { .. } = state {
|
||||
match pager.end_tx(true, &self.connection, false)? {
|
||||
IOResult::IO => return Ok(StepResult::IO),
|
||||
IOResult::Done(_) => {}
|
||||
}
|
||||
@@ -502,9 +502,7 @@ impl Program {
|
||||
program_state.commit_state
|
||||
);
|
||||
if program_state.commit_state == CommitState::Committing {
|
||||
let TransactionState::Write { schema_did_change } =
|
||||
connection.transaction_state.get()
|
||||
else {
|
||||
let TransactionState::Write { .. } = connection.transaction_state.get() else {
|
||||
unreachable!("invalid state for write commit step")
|
||||
};
|
||||
self.step_end_write_txn(
|
||||
@@ -512,18 +510,16 @@ impl Program {
|
||||
&mut program_state.commit_state,
|
||||
&connection,
|
||||
rollback,
|
||||
schema_did_change,
|
||||
)
|
||||
} else if auto_commit {
|
||||
let current_state = connection.transaction_state.get();
|
||||
tracing::trace!("Auto-commit state: {:?}", current_state);
|
||||
match current_state {
|
||||
TransactionState::Write { schema_did_change } => self.step_end_write_txn(
|
||||
TransactionState::Write { .. } => self.step_end_write_txn(
|
||||
&pager,
|
||||
&mut program_state.commit_state,
|
||||
&connection,
|
||||
rollback,
|
||||
schema_did_change,
|
||||
),
|
||||
TransactionState::Read => {
|
||||
connection.transaction_state.replace(TransactionState::None);
|
||||
@@ -551,11 +547,9 @@ impl Program {
|
||||
commit_state: &mut CommitState,
|
||||
connection: &Connection,
|
||||
rollback: bool,
|
||||
schema_did_change: bool,
|
||||
) -> Result<StepResult> {
|
||||
let cacheflush_status = pager.end_tx(
|
||||
rollback,
|
||||
schema_did_change,
|
||||
connection,
|
||||
connection.wal_checkpoint_disabled.get(),
|
||||
)?;
|
||||
@@ -809,20 +803,15 @@ pub fn handle_program_error(
|
||||
// Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback.
|
||||
LimboError::TableLocked => {}
|
||||
_ => {
|
||||
let state = connection.transaction_state.get();
|
||||
if let TransactionState::Write { schema_did_change } = state {
|
||||
loop {
|
||||
match pager.end_tx(true, schema_did_change, connection, false) {
|
||||
Ok(IOResult::IO) => connection.run_once()?,
|
||||
Ok(IOResult::Done(_)) => break,
|
||||
Err(e) => {
|
||||
tracing::error!("end_tx failed: {e}");
|
||||
break;
|
||||
}
|
||||
loop {
|
||||
match pager.end_tx(true, connection, false) {
|
||||
Ok(IOResult::IO) => connection.run_once()?,
|
||||
Ok(IOResult::Done(_)) => break,
|
||||
Err(e) => {
|
||||
tracing::error!("end_tx failed: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else if let Err(e) = pager.end_read_tx() {
|
||||
tracing::error!("end_read_tx failed: {e}");
|
||||
}
|
||||
connection.transaction_state.replace(TransactionState::None);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user