mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-30 22:44:21 +01:00
Remove serialization of normal write/commit path
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -45,4 +45,5 @@ simulator-output/
|
||||
|
||||
&1
|
||||
bisected.sql
|
||||
*.log
|
||||
*.log
|
||||
|
||||
|
||||
@@ -327,8 +327,6 @@ enum CommitState {
|
||||
Checkpoint,
|
||||
/// Fsync the database file.
|
||||
SyncDbFile,
|
||||
/// After database file is fsynced.
|
||||
AfterSyncDbFile,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
@@ -359,6 +357,7 @@ pub enum BtreePageAllocMode {
|
||||
|
||||
/// This will keep track of the state of current cache commit in order to not repeat work
|
||||
struct CommitInfo {
|
||||
completions: RefCell<Vec<Completion>>,
|
||||
state: Cell<CommitState>,
|
||||
time: Cell<crate::io::clock::Instant>,
|
||||
}
|
||||
@@ -586,6 +585,7 @@ impl Pager {
|
||||
hash::BuildHasherDefault::new(),
|
||||
))),
|
||||
commit_info: CommitInfo {
|
||||
completions: RefCell::new(Vec::new()),
|
||||
state: CommitState::Start.into(),
|
||||
time: now.into(),
|
||||
},
|
||||
@@ -1345,11 +1345,11 @@ impl Pager {
|
||||
return Ok(IOResult::Done(PagerCommitResult::WalWritten));
|
||||
}
|
||||
|
||||
let mut completions = self.commit_info.completions.borrow_mut();
|
||||
completions.clear();
|
||||
let page_sz = self.page_size.get().expect("page size not set");
|
||||
let mut completions: Vec<Completion> = Vec::new();
|
||||
let mut pages: Vec<PageRef> = Vec::with_capacity(dirty_ids.len().min(IOV_MAX));
|
||||
let total = dirty_ids.len();
|
||||
|
||||
let mut cache = self.page_cache.write();
|
||||
for (i, page_id) in dirty_ids.into_iter().enumerate() {
|
||||
let page = {
|
||||
@@ -1400,9 +1400,6 @@ impl Pager {
|
||||
self.commit_info.state.set(CommitState::SyncWal);
|
||||
}
|
||||
}
|
||||
if !completions.iter().all(|c| c.is_completed()) {
|
||||
io_yield_many!(completions);
|
||||
}
|
||||
}
|
||||
CommitState::SyncWal => {
|
||||
self.commit_info.state.set(CommitState::AfterSyncWal);
|
||||
@@ -1414,12 +1411,10 @@ impl Pager {
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
if !c.is_completed() {
|
||||
io_yield_one!(c);
|
||||
}
|
||||
self.commit_info.completions.borrow_mut().push(c);
|
||||
}
|
||||
CommitState::AfterSyncWal => {
|
||||
turso_assert!(!wal.borrow().is_syncing(), "wal should have synced");
|
||||
// turso_assert!(!wal.borrow().is_syncing(), "wal should have synced");
|
||||
if wal_auto_checkpoint_disabled || !wal.borrow().should_checkpoint() {
|
||||
self.commit_info.state.set(CommitState::Start);
|
||||
break PagerCommitResult::WalWritten;
|
||||
@@ -1427,12 +1422,30 @@ impl Pager {
|
||||
self.commit_info.state.set(CommitState::Checkpoint);
|
||||
}
|
||||
CommitState::Checkpoint => {
|
||||
checkpoint_result = return_if_io!(self.checkpoint());
|
||||
// Skip sync if synchronous mode is OFF
|
||||
if sync_mode == crate::SyncMode::Off {
|
||||
self.commit_info.state.set(CommitState::AfterSyncDbFile);
|
||||
} else {
|
||||
self.commit_info.state.set(CommitState::SyncDbFile);
|
||||
let mut completions = self.commit_info.completions.borrow_mut();
|
||||
match self.checkpoint()? {
|
||||
IOResult::IO(cmp) => {
|
||||
match cmp {
|
||||
IOCompletions::Single(c) => {
|
||||
completions.push(c);
|
||||
}
|
||||
IOCompletions::Many(c) => {
|
||||
completions.extend(c);
|
||||
}
|
||||
}
|
||||
// TODO: remove serialization of checkpoint path
|
||||
io_yield_many!(std::mem::take(&mut *completions));
|
||||
}
|
||||
IOResult::Done(res) => {
|
||||
checkpoint_result = res;
|
||||
// Skip sync if synchronous mode is OFF
|
||||
if sync_mode == crate::SyncMode::Off {
|
||||
self.commit_info.state.set(CommitState::Start);
|
||||
break PagerCommitResult::Checkpointed(checkpoint_result);
|
||||
} else {
|
||||
self.commit_info.state.set(CommitState::SyncDbFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
CommitState::SyncDbFile => {
|
||||
@@ -1445,13 +1458,7 @@ impl Pager {
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
self.commit_info.state.set(CommitState::AfterSyncDbFile);
|
||||
if !c.is_completed() {
|
||||
io_yield_one!(c);
|
||||
}
|
||||
}
|
||||
CommitState::AfterSyncDbFile => {
|
||||
turso_assert!(!self.syncing.get(), "should have finished syncing");
|
||||
self.commit_info.completions.borrow_mut().push(c);
|
||||
self.commit_info.state.set(CommitState::Start);
|
||||
break PagerCommitResult::Checkpointed(checkpoint_result);
|
||||
}
|
||||
@@ -1466,8 +1473,13 @@ impl Pager {
|
||||
.unwrap()
|
||||
.as_millis()
|
||||
);
|
||||
wal.borrow_mut().finish_append_frames_commit()?;
|
||||
Ok(IOResult::Done(res))
|
||||
let mut completions = self.commit_info.completions.borrow_mut();
|
||||
if completions.is_empty() || completions.iter().all(|c| c.is_completed()) {
|
||||
completions.clear();
|
||||
wal.borrow_mut().finish_append_frames_commit()?;
|
||||
return Ok(IOResult::Done(res));
|
||||
}
|
||||
io_yield_many!(std::mem::take(&mut completions));
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
|
||||
Reference in New Issue
Block a user