mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-06 16:54:23 +01:00
Link all writes to the fsync barrier, not just the commit frame
This commit is contained in:
@@ -996,7 +996,6 @@ pub fn write_pages_vectored(
|
||||
pager: &Pager,
|
||||
batch: BTreeMap<usize, Arc<Buffer>>,
|
||||
done_flag: Arc<AtomicBool>,
|
||||
final_write: bool,
|
||||
) -> Result<Vec<Completion>> {
|
||||
if batch.is_empty() {
|
||||
done_flag.store(true, Ordering::Relaxed);
|
||||
@@ -1004,90 +1003,54 @@ pub fn write_pages_vectored(
|
||||
}
|
||||
|
||||
let page_sz = pager.page_size.get().expect("page size is not set").get() as usize;
|
||||
// Count expected number of runs to create the atomic counter we need to track each batch
|
||||
let mut run_count = 0;
|
||||
let mut prev_id = None;
|
||||
for &id in batch.keys() {
|
||||
if let Some(prev) = prev_id {
|
||||
if id != prev + 1 {
|
||||
run_count += 1;
|
||||
}
|
||||
} else {
|
||||
run_count = 1; // First run
|
||||
}
|
||||
prev_id = Some(id);
|
||||
}
|
||||
|
||||
// Create the atomic counters
|
||||
let run_count = batch
|
||||
.keys()
|
||||
.zip(batch.keys().skip(1))
|
||||
.filter(|(&curr, &next)| next != curr + 1)
|
||||
.count()
|
||||
+ 1;
|
||||
let runs_left = Arc::new(AtomicUsize::new(run_count));
|
||||
let done = done_flag.clone();
|
||||
const EST_BUFF_CAPACITY: usize = 32;
|
||||
|
||||
const EST_BUFF_CAPACITY: usize = 32;
|
||||
let mut run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY);
|
||||
let mut run_start_id: Option<usize> = None;
|
||||
|
||||
// Track which run we're on to identify the last one
|
||||
let mut current_run = 0;
|
||||
let mut iter = batch.iter().peekable();
|
||||
let mut completions = Vec::new();
|
||||
|
||||
while let Some((id, item)) = iter.next() {
|
||||
// Track the start of the run
|
||||
let mut iter = batch.iter().peekable();
|
||||
while let Some((id, buffer)) = iter.next() {
|
||||
if run_start_id.is_none() {
|
||||
run_start_id = Some(*id);
|
||||
}
|
||||
run_bufs.push(item.clone());
|
||||
run_bufs.push(buffer.clone());
|
||||
|
||||
// Check if this is the end of a run
|
||||
let is_end_of_run = match iter.peek() {
|
||||
Some(&(next_id, _)) => *next_id != id + 1,
|
||||
None => true,
|
||||
};
|
||||
|
||||
if is_end_of_run {
|
||||
current_run += 1;
|
||||
if iter.peek().is_none_or(|(next_id, _)| **next_id != id + 1) {
|
||||
let start_id = run_start_id.expect("should have a start id");
|
||||
let runs_left_cl = runs_left.clone();
|
||||
let done_cl = done.clone();
|
||||
|
||||
// This is the last chunk if it's the last run AND final_write is true
|
||||
let is_last_chunk = current_run == run_count && final_write;
|
||||
|
||||
let done_cl = done_flag.clone();
|
||||
let total_sz = (page_sz * run_bufs.len()) as i32;
|
||||
let cmp = move |res| {
|
||||
let Ok(res) = res else {
|
||||
return;
|
||||
};
|
||||
|
||||
let cmp = Completion::new_write_linked(move |res| {
|
||||
let Ok(res) = res else { return };
|
||||
turso_assert!(total_sz == res, "failed to write expected size");
|
||||
if runs_left_cl.fetch_sub(1, Ordering::AcqRel) == 1 {
|
||||
done_cl.store(true, Ordering::Release);
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
let c = if is_last_chunk {
|
||||
Completion::new_write_linked(cmp)
|
||||
} else {
|
||||
Completion::new_write(cmp)
|
||||
};
|
||||
|
||||
// Submit write operation for this run
|
||||
let io_ctx = &pager.io_ctx.borrow();
|
||||
match pager.db_file.write_pages(
|
||||
start_id,
|
||||
page_sz,
|
||||
std::mem::replace(&mut run_bufs, Vec::with_capacity(EST_BUFF_CAPACITY)),
|
||||
io_ctx,
|
||||
c,
|
||||
cmp,
|
||||
) {
|
||||
Ok(c) => {
|
||||
completions.push(c);
|
||||
}
|
||||
Ok(c) => completions.push(c),
|
||||
Err(e) => {
|
||||
if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 {
|
||||
done.store(true, Ordering::Release);
|
||||
done_flag.store(true, Ordering::Release);
|
||||
}
|
||||
pager.io.cancel(&completions)?;
|
||||
// cancel any submitted completions and drain the IO before returning an error
|
||||
pager.io.drain()?;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
@@ -471,13 +471,6 @@ impl OngoingCheckpoint {
|
||||
self.state = CheckpointState::Start;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn is_final_write(&self) -> bool {
|
||||
self.current_page as usize >= self.pages_to_checkpoint.len()
|
||||
&& self.inflight_reads.is_empty()
|
||||
&& !self.pending_writes.is_empty()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Whether or not new reads should be issued during checkpoint processing.
|
||||
fn should_issue_reads(&self) -> bool {
|
||||
@@ -1585,11 +1578,7 @@ impl Wal for WalFile {
|
||||
}
|
||||
};
|
||||
|
||||
let c = if db_size_on_commit.is_some() {
|
||||
Completion::new_write_linked(cmp)
|
||||
} else {
|
||||
Completion::new_write(cmp)
|
||||
};
|
||||
let c = Completion::new_write_linked(cmp);
|
||||
|
||||
let shared = self.get_shared();
|
||||
assert!(
|
||||
@@ -1930,10 +1919,7 @@ impl WalFile {
|
||||
let batch_map = self.ongoing_checkpoint.pending_writes.take();
|
||||
if !batch_map.is_empty() {
|
||||
let done_flag = self.ongoing_checkpoint.add_write();
|
||||
let is_final = self.ongoing_checkpoint.is_final_write();
|
||||
completions.extend(write_pages_vectored(
|
||||
pager, batch_map, done_flag, is_final,
|
||||
)?);
|
||||
completions.extend(write_pages_vectored(pager, batch_map, done_flag)?);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user