Merge 'fix/wal: only rollback WAL if txn was write + fix start state for WalFile' from Jussi Saurio

Closes #2363
## What
The following sequence of actions is possible:
```
Some committed frames already exist in the WAL. shared.pages_in_frames.len() > 0.

Brand new connection does this:
BEGIN
^-- deferred, no read tx started yet, so its `self.start_pages_in_frames` is `0`
       because it's a brand new WalFile instance

ROLLBACK   <-- calls `wal.rollback()` and truncates `shared.pages_in_frames` to length `0`

PRAGMA wal_checkpoint();
^-- because `pages_in_frames` is empty, it doesnt actually
checkpoint anything but still sets shared.max_frame to 0, causing effectively data loss
```
## Fix
- Only call `wal.rollback()` for write transactions
- Set `start_pages_in_frames` correctly so that this doesn't happen even
if a regression starts calling `wal.rollback()` again

Reviewed-by: Preston Thorpe (@PThorpe92)

Closes #2366
This commit is contained in:
Jussi Saurio
2025-07-31 16:16:20 +03:00
3 changed files with 19 additions and 7 deletions

View File

@@ -1190,7 +1190,7 @@ impl Connection {
wal.end_read_tx();
}
// remove all non-commited changes in case if WAL session left some suffix without commit frame
pager.rollback(false, self)?;
pager.rollback(false, self, true)?;
}
// let's re-parse schema from scratch if schema cookie changed compared to the our in-memory view of schema

View File

@@ -815,14 +815,15 @@ impl Pager {
) -> Result<IOResult<PagerCommitResult>> {
tracing::trace!("end_tx(rollback={})", rollback);
if rollback {
if matches!(
let is_write = matches!(
connection.transaction_state.get(),
TransactionState::Write { .. }
) {
);
if is_write {
self.wal.borrow().end_write_tx();
}
self.wal.borrow().end_read_tx();
self.rollback(schema_did_change, connection)?;
self.rollback(schema_did_change, connection, is_write)?;
return Ok(IOResult::Done(PagerCommitResult::Rollback));
}
let commit_status = self.commit_dirty_pages(wal_checkpoint_disabled)?;
@@ -1799,9 +1800,17 @@ impl Pager {
&self,
schema_did_change: bool,
connection: &Connection,
is_write: bool,
) -> Result<(), LimboError> {
tracing::debug!(schema_did_change);
self.dirty_pages.borrow_mut().clear();
if is_write {
self.dirty_pages.borrow_mut().clear();
} else {
turso_assert!(
self.dirty_pages.borrow().is_empty(),
"dirty pages should be empty for read txn"
);
}
let mut cache = self.page_cache.write();
self.reset_internal_states();
@@ -1811,7 +1820,9 @@ impl Pager {
if schema_did_change {
connection.schema.replace(connection._db.clone_schema()?);
}
self.wal.borrow_mut().rollback()?;
if is_write {
self.wal.borrow_mut().rollback()?;
}
Ok(())
}

View File

@@ -1290,6 +1290,7 @@ impl WalFile {
let header = unsafe { shared.get().as_mut().unwrap().wal_header.lock() };
let last_checksum = unsafe { (*shared.get()).last_checksum };
let start_pages_in_frames = unsafe { (*shared.get()).pages_in_frames.lock().len() };
Self {
io,
// default to max frame in WAL, so that when we read schema we can read from WAL too if it's there.
@@ -1313,7 +1314,7 @@ impl WalFile {
last_checksum,
prev_checkpoint: CheckpointResult::default(),
checkpoint_guard: None,
start_pages_in_frames: 0,
start_pages_in_frames,
header: *header,
}
}