mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-01 23:44:19 +01:00
fix bug after making checkpoint async
This commit is contained in:
@@ -321,7 +321,7 @@ enum CommitState {
|
||||
/// Sync WAL header after prepare
|
||||
PrepareWalSync,
|
||||
/// Appends all frames to the WAL.
|
||||
Start,
|
||||
PrepareFrames,
|
||||
/// Fsync the on-disk WAL.
|
||||
SyncWal,
|
||||
/// Checkpoint the WAL to the database file (if needed).
|
||||
@@ -1263,7 +1263,8 @@ impl Pager {
|
||||
let mut pages = Vec::with_capacity(len);
|
||||
let page_sz = self.page_size.get().unwrap_or_default();
|
||||
|
||||
if let Some(c) = wal.borrow_mut().prepare_wal_start(page_sz)? {
|
||||
let prepare = wal.borrow_mut().prepare_wal_start(page_sz)?;
|
||||
if let Some(c) = prepare {
|
||||
self.io.wait_for_completion(c)?;
|
||||
let c = wal.borrow_mut().prepare_wal_finish()?;
|
||||
self.io.wait_for_completion(c)?;
|
||||
@@ -1340,7 +1341,7 @@ impl Pager {
|
||||
let page_sz = self.page_size.get().expect("page size not set");
|
||||
let c = wal.borrow_mut().prepare_wal_start(page_sz)?;
|
||||
let Some(c) = c else {
|
||||
self.commit_info.state.set(CommitState::Start);
|
||||
self.commit_info.state.set(CommitState::PrepareFrames);
|
||||
continue;
|
||||
};
|
||||
self.commit_info.state.set(CommitState::PrepareWalSync);
|
||||
@@ -1350,12 +1351,12 @@ impl Pager {
|
||||
}
|
||||
CommitState::PrepareWalSync => {
|
||||
let c = wal.borrow_mut().prepare_wal_finish()?;
|
||||
self.commit_info.state.set(CommitState::Start);
|
||||
self.commit_info.state.set(CommitState::PrepareFrames);
|
||||
if !c.is_completed() {
|
||||
io_yield_one!(c);
|
||||
}
|
||||
}
|
||||
CommitState::Start => {
|
||||
CommitState::PrepareFrames => {
|
||||
let now = self.io.now();
|
||||
self.commit_info.time.set(now);
|
||||
let db_size_after = {
|
||||
@@ -1503,7 +1504,7 @@ impl Pager {
|
||||
let mut completions = self.commit_info.completions.borrow_mut();
|
||||
if completions.iter().all(|c| c.is_completed()) {
|
||||
completions.clear();
|
||||
self.commit_info.state.set(CommitState::Start);
|
||||
self.commit_info.state.set(CommitState::PrepareWal);
|
||||
wal.borrow_mut().finish_append_frames_commit()?;
|
||||
let result = self.commit_info.result.borrow_mut().take();
|
||||
return Ok(IOResult::Done(result.expect("commit result should be set")));
|
||||
@@ -2229,7 +2230,7 @@ impl Pager {
|
||||
fn reset_internal_states(&self) {
|
||||
*self.checkpoint_state.write() = CheckpointState::Checkpoint;
|
||||
self.syncing.replace(false);
|
||||
self.commit_info.state.set(CommitState::Start);
|
||||
self.commit_info.state.set(CommitState::PrepareWal);
|
||||
self.commit_info.time.set(self.io.now());
|
||||
self.allocate_page_state.replace(AllocatePageState::Start);
|
||||
self.free_page_state.replace(FreePageState::Start);
|
||||
|
||||
@@ -1936,7 +1936,9 @@ impl WalFile {
|
||||
if mode.require_all_backfilled() && !checkpoint_result.everything_backfilled() {
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
self.restart_log_if_needed(mode)?;
|
||||
if mode.should_restart_log() {
|
||||
self.restart_log(mode)?;
|
||||
}
|
||||
self.ongoing_checkpoint.state = CheckpointState::Truncate {
|
||||
checkpoint_result: Some(checkpoint_result),
|
||||
truncate_sent: false,
|
||||
@@ -1944,7 +1946,12 @@ impl WalFile {
|
||||
};
|
||||
}
|
||||
CheckpointState::Truncate { .. } => {
|
||||
return_if_io!(self.truncate_log_if_needed(mode));
|
||||
if matches!(mode, CheckpointMode::Truncate { .. }) {
|
||||
return_if_io!(self.truncate_log());
|
||||
}
|
||||
if mode.should_restart_log() {
|
||||
Self::unlock(&self.shared, None);
|
||||
}
|
||||
let mut checkpoint_result = {
|
||||
let CheckpointState::Truncate {
|
||||
checkpoint_result, ..
|
||||
@@ -2051,10 +2058,11 @@ impl WalFile {
|
||||
|
||||
/// Called once the entire WAL has been back‑filled in RESTART or TRUNCATE mode.
|
||||
/// Must be invoked while writer and checkpoint locks are still held.
|
||||
fn restart_log_if_needed(&mut self, mode: CheckpointMode) -> Result<()> {
|
||||
if !mode.should_restart_log() {
|
||||
return Ok(());
|
||||
}
|
||||
fn restart_log(&mut self, mode: CheckpointMode) -> Result<()> {
|
||||
turso_assert!(
|
||||
mode.should_restart_log(),
|
||||
"CheckpointMode must be Restart or Truncate"
|
||||
);
|
||||
turso_assert!(
|
||||
matches!(self.checkpoint_guard, Some(CheckpointLocks::Writer { .. })),
|
||||
"We must hold writer and checkpoint locks to restart the log, found: {:?}",
|
||||
@@ -2089,11 +2097,7 @@ impl WalFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn truncate_log_if_needed(&mut self, mode: CheckpointMode) -> Result<IOResult<()>> {
|
||||
if !matches!(mode, CheckpointMode::Truncate { .. }) {
|
||||
Self::unlock(&self.shared, None);
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
fn truncate_log(&mut self) -> Result<IOResult<()>> {
|
||||
let file = {
|
||||
let shared = self.get_shared();
|
||||
assert!(
|
||||
@@ -2143,8 +2147,6 @@ impl WalFile {
|
||||
.inspect_err(|e| Self::unlock(&self.shared, Some(e)))?;
|
||||
*sync_sent = true;
|
||||
io_yield_one!(c);
|
||||
} else {
|
||||
Self::unlock(&self.shared, None);
|
||||
}
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user