Merge 'Update stale in memory wal header after restarting log' from Preston Thorpe

Currently during `Restart|Truncate` modes we properly restart the WAL
header in the `WalFileShared`, but keep a stale one on the current
`WalFile` instance.

Closes #2627
This commit is contained in:
Jussi Saurio
2025-08-17 14:37:48 +03:00
committed by GitHub

View File

@@ -1650,23 +1650,33 @@ impl WalFile {
}
}
let handle_err = |e: &LimboError| {
let unlock = |e: Option<&LimboError>| {
// release all read locks we just acquired, the caller will take care of the others
let shared = self.get_shared();
let shared = unsafe { self.shared.get().as_mut().unwrap() };
for idx in 1..shared.read_locks.len() {
shared.read_locks[idx].unlock();
}
tracing::error!(
"Failed to restart WAL header: {:?}, releasing read locks",
e
);
if let Some(e) = e {
tracing::error!(
"Failed to restart WAL header: {:?}, releasing read locks",
e
);
}
};
// reinitialize inmemory state
self.get_shared()
.restart_wal_header(&self.io, mode)
.inspect_err(|e| {
handle_err(e);
unlock(Some(e));
})?;
let (header, cksm) = {
let shared = self.get_shared();
(*shared.wal_header.lock(), shared.last_checksum)
};
self.last_checksum = cksm;
self.header = header;
self.max_frame = 0;
self.min_frame = 0;
// For TRUNCATE mode: shrink the WAL file to 0B
if matches!(mode, CheckpointMode::Truncate) {
@@ -1675,8 +1685,13 @@ impl WalFile {
});
let shared = self.get_shared();
// for now at least, lets do all this IO syncronously
let c = shared.file.truncate(0, c).inspect_err(handle_err)?;
self.io.wait_for_completion(c).inspect_err(handle_err)?;
let c = shared
.file
.truncate(0, c)
.inspect_err(|e| unlock(Some(e)))?;
self.io
.wait_for_completion(c)
.inspect_err(|e| unlock(Some(e)))?;
// fsync after truncation
self.io
.wait_for_completion(
@@ -1685,22 +1700,13 @@ impl WalFile {
.sync(Completion::new_sync(|_| {
tracing::trace!("WAL file synced after reset/truncation");
}))
.inspect_err(handle_err)?,
.inspect_err(|e| unlock(Some(e)))?,
)
.inspect_err(handle_err)?;
.inspect_err(|e| unlock(Some(e)))?;
}
// release readlocks 1..4
{
let shared = self.get_shared();
for idx in 1..shared.read_locks.len() {
shared.read_locks[idx].unlock();
}
}
self.last_checksum = self.get_shared().last_checksum;
self.max_frame = 0;
self.min_frame = 0;
unlock(None);
Ok(())
}