diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 7fff12b7a..8f0879339 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1076,7 +1076,7 @@ impl Wal for WalFile { syncing.set(false); }); let shared = self.get_shared(); - let c = shared.file.sync(completion)?; + let _c = shared.file.sync(completion)?; self.sync_state.set(SyncState::Syncing); Ok(IOResult::IO) } @@ -1279,28 +1279,51 @@ impl WalFile { for idx in 1..shared.read_locks.len() { let lock = &mut shared.read_locks[idx]; if !lock.write() { + // release everything we got so far + for j in 1..idx { + shared.read_locks[j].unlock(); + } // Reader is active, cannot proceed return Err(LimboError::Busy); } lock.value.store(READMARK_NOT_USED, Ordering::SeqCst); - lock.unlock(); } } + let handle_err = |e: &LimboError| { + // release all read locks we just acquired, the caller will take care of the others + let shared = self.get_shared(); + for idx in 1..shared.read_locks.len() { + shared.read_locks[idx].unlock(); + } + tracing::error!( + "Failed to restart WAL header: {:?}, releasing read locks", + e + ); + }; // reinitialize in‑memory state - ensure_unlock!(self, self.get_shared().restart_wal_header(&self.io, mode)); + self.get_shared() + .restart_wal_header(&self.io, mode) + .inspect_err(|e| { + handle_err(e); + })?; // For TRUNCATE: physically shrink the WAL to 0 B if matches!(mode, CheckpointMode::Truncate) { let c = Completion::new_trunc(|_| { tracing::trace!("WAL file truncated to 0 B"); }); - ensure_unlock!(self, self.get_shared().file.truncate(0, c)); + let shared = self.get_shared(); + shared.file.truncate(0, c.clone()).inspect_err(|e| { + handle_err(e); + })?; let c = Completion::new_sync(|_| { tracing::trace!("WAL file synced after truncation"); }); // fsync after truncation - self.get_shared().file.sync(c.into())?; + shared.file.sync(c).inspect_err(|e| { + handle_err(e); + })?; } // release read‑locks 1..4 @@ -1485,8 +1508,8 @@ impl WalFileShared { pub mod test { use crate::{ result::LimboResult, storage::sqlite3_ondisk::WAL_HEADER_SIZE, types::IOResult, - CheckpointMode, CheckpointResult, Completion, Connection, Database, PlatformIO, Wal, - WalFileShared, IO, + CheckpointMode, CheckpointResult, Completion, Connection, Database, LimboError, PlatformIO, + Wal, WalFileShared, IO, }; use std::{ cell::{Cell, RefCell, UnsafeCell}, @@ -1534,7 +1557,7 @@ pub mod test { } #[test] - fn test_truncate_checkpoint() { + fn test_wal_truncate_checkpoint() { let (db, path) = get_database(); let mut walpath = path.clone().into_os_string().into_string().unwrap(); walpath.push_str("/test.db-wal"); @@ -1620,7 +1643,7 @@ pub mod test { } #[test] - fn test_full_checkpoint_mode() { + fn test_wal_full_checkpoint_mode() { let (db, path) = get_database(); let conn = db.connect().unwrap(); conn.execute("create table test(id integer primary key, value text)") @@ -1727,7 +1750,7 @@ pub mod test { } #[test] - fn test_restart_checkpoint_resets_sequence() { + fn test_wal_restart_checkpoint_resets_sequence() { let (db, path) = get_database(); let mut walpath = path.clone().into_os_string().into_string().unwrap(); @@ -1815,7 +1838,7 @@ pub mod test { } #[test] - fn test_passive_partial_then_complete() { + fn test_wal_passive_partial_then_complete() { let (db, _tmp) = get_database(); let conn1 = db.connect().unwrap(); let conn2 = db.connect().unwrap(); @@ -1830,7 +1853,7 @@ pub mod test { { let pager = conn2.pager.borrow_mut(); let mut wal2 = pager.wal.borrow_mut(); - assert!(matches!(wal2.begin_read_tx().unwrap(), LimboResult::Ok)); + assert!(matches!(wal2.begin_read_tx().unwrap().0, LimboResult::Ok)); } // generate more frames that the reader will not see. @@ -1861,7 +1884,7 @@ pub mod test { { let pager = conn2.pager.borrow_mut(); let wal2 = pager.wal.borrow_mut(); - wal2.end_read_tx().unwrap(); + wal2.end_read_tx(); } // Second passive checkpoint should finish @@ -1873,4 +1896,28 @@ pub mod test { "Second checkpoint completes remaining frames" ); } + + #[test] + fn test_wal_restart_blocks_readers() { + let (db, _) = get_database(); + let conn1 = db.connect().unwrap(); + let conn2 = db.connect().unwrap(); + + // Start a read transaction + conn2 + .pager + .borrow_mut() + .wal + .borrow_mut() + .begin_read_tx() + .unwrap(); + + // checkpoint should fail + let result = { + let p = conn1.pager.borrow(); + let mut w = p.wal.borrow_mut(); + w.checkpoint(&p, Rc::new(RefCell::new(0)), CheckpointMode::Restart) + }; + assert!(matches!(result, Err(LimboError::Busy))); + } }