mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 17:05:36 +01:00
Ensure we properly hold and release read locks in log restart method and fix tests
This commit is contained in:
@@ -1076,7 +1076,7 @@ impl Wal for WalFile {
|
||||
syncing.set(false);
|
||||
});
|
||||
let shared = self.get_shared();
|
||||
let c = shared.file.sync(completion)?;
|
||||
let _c = shared.file.sync(completion)?;
|
||||
self.sync_state.set(SyncState::Syncing);
|
||||
Ok(IOResult::IO)
|
||||
}
|
||||
@@ -1279,28 +1279,51 @@ impl WalFile {
|
||||
for idx in 1..shared.read_locks.len() {
|
||||
let lock = &mut shared.read_locks[idx];
|
||||
if !lock.write() {
|
||||
// release everything we got so far
|
||||
for j in 1..idx {
|
||||
shared.read_locks[j].unlock();
|
||||
}
|
||||
// Reader is active, cannot proceed
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
lock.value.store(READMARK_NOT_USED, Ordering::SeqCst);
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
let handle_err = |e: &LimboError| {
|
||||
// release all read locks we just acquired, the caller will take care of the others
|
||||
let shared = self.get_shared();
|
||||
for idx in 1..shared.read_locks.len() {
|
||||
shared.read_locks[idx].unlock();
|
||||
}
|
||||
tracing::error!(
|
||||
"Failed to restart WAL header: {:?}, releasing read locks",
|
||||
e
|
||||
);
|
||||
};
|
||||
// reinitialize in‑memory state
|
||||
ensure_unlock!(self, self.get_shared().restart_wal_header(&self.io, mode));
|
||||
self.get_shared()
|
||||
.restart_wal_header(&self.io, mode)
|
||||
.inspect_err(|e| {
|
||||
handle_err(e);
|
||||
})?;
|
||||
// For TRUNCATE: physically shrink the WAL to 0 B
|
||||
if matches!(mode, CheckpointMode::Truncate) {
|
||||
let c = Completion::new_trunc(|_| {
|
||||
tracing::trace!("WAL file truncated to 0 B");
|
||||
});
|
||||
ensure_unlock!(self, self.get_shared().file.truncate(0, c));
|
||||
let shared = self.get_shared();
|
||||
shared.file.truncate(0, c.clone()).inspect_err(|e| {
|
||||
handle_err(e);
|
||||
})?;
|
||||
|
||||
let c = Completion::new_sync(|_| {
|
||||
tracing::trace!("WAL file synced after truncation");
|
||||
});
|
||||
// fsync after truncation
|
||||
self.get_shared().file.sync(c.into())?;
|
||||
shared.file.sync(c).inspect_err(|e| {
|
||||
handle_err(e);
|
||||
})?;
|
||||
}
|
||||
|
||||
// release read‑locks 1..4
|
||||
@@ -1485,8 +1508,8 @@ impl WalFileShared {
|
||||
pub mod test {
|
||||
use crate::{
|
||||
result::LimboResult, storage::sqlite3_ondisk::WAL_HEADER_SIZE, types::IOResult,
|
||||
CheckpointMode, CheckpointResult, Completion, Connection, Database, PlatformIO, Wal,
|
||||
WalFileShared, IO,
|
||||
CheckpointMode, CheckpointResult, Completion, Connection, Database, LimboError, PlatformIO,
|
||||
Wal, WalFileShared, IO,
|
||||
};
|
||||
use std::{
|
||||
cell::{Cell, RefCell, UnsafeCell},
|
||||
@@ -1534,7 +1557,7 @@ pub mod test {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_truncate_checkpoint() {
|
||||
fn test_wal_truncate_checkpoint() {
|
||||
let (db, path) = get_database();
|
||||
let mut walpath = path.clone().into_os_string().into_string().unwrap();
|
||||
walpath.push_str("/test.db-wal");
|
||||
@@ -1620,7 +1643,7 @@ pub mod test {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_full_checkpoint_mode() {
|
||||
fn test_wal_full_checkpoint_mode() {
|
||||
let (db, path) = get_database();
|
||||
let conn = db.connect().unwrap();
|
||||
conn.execute("create table test(id integer primary key, value text)")
|
||||
@@ -1727,7 +1750,7 @@ pub mod test {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_restart_checkpoint_resets_sequence() {
|
||||
fn test_wal_restart_checkpoint_resets_sequence() {
|
||||
let (db, path) = get_database();
|
||||
|
||||
let mut walpath = path.clone().into_os_string().into_string().unwrap();
|
||||
@@ -1815,7 +1838,7 @@ pub mod test {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_passive_partial_then_complete() {
|
||||
fn test_wal_passive_partial_then_complete() {
|
||||
let (db, _tmp) = get_database();
|
||||
let conn1 = db.connect().unwrap();
|
||||
let conn2 = db.connect().unwrap();
|
||||
@@ -1830,7 +1853,7 @@ pub mod test {
|
||||
{
|
||||
let pager = conn2.pager.borrow_mut();
|
||||
let mut wal2 = pager.wal.borrow_mut();
|
||||
assert!(matches!(wal2.begin_read_tx().unwrap(), LimboResult::Ok));
|
||||
assert!(matches!(wal2.begin_read_tx().unwrap().0, LimboResult::Ok));
|
||||
}
|
||||
|
||||
// generate more frames that the reader will not see.
|
||||
@@ -1861,7 +1884,7 @@ pub mod test {
|
||||
{
|
||||
let pager = conn2.pager.borrow_mut();
|
||||
let wal2 = pager.wal.borrow_mut();
|
||||
wal2.end_read_tx().unwrap();
|
||||
wal2.end_read_tx();
|
||||
}
|
||||
|
||||
// Second passive checkpoint should finish
|
||||
@@ -1873,4 +1896,28 @@ pub mod test {
|
||||
"Second checkpoint completes remaining frames"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wal_restart_blocks_readers() {
|
||||
let (db, _) = get_database();
|
||||
let conn1 = db.connect().unwrap();
|
||||
let conn2 = db.connect().unwrap();
|
||||
|
||||
// Start a read transaction
|
||||
conn2
|
||||
.pager
|
||||
.borrow_mut()
|
||||
.wal
|
||||
.borrow_mut()
|
||||
.begin_read_tx()
|
||||
.unwrap();
|
||||
|
||||
// checkpoint should fail
|
||||
let result = {
|
||||
let p = conn1.pager.borrow();
|
||||
let mut w = p.wal.borrow_mut();
|
||||
w.checkpoint(&p, Rc::new(RefCell::new(0)), CheckpointMode::Restart)
|
||||
};
|
||||
assert!(matches!(result, Err(LimboError::Busy)));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user