mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-09 02:04:22 +01:00
Fix write counter for writev batching in checkpoint
This commit is contained in:
@@ -1303,11 +1303,12 @@ impl Pager {
|
||||
return Ok(CheckpointResult::default());
|
||||
}
|
||||
|
||||
let counter = Rc::new(RefCell::new(0));
|
||||
let mut checkpoint_result = self.io.block(|| {
|
||||
let write_counter = Rc::new(RefCell::new(0));
|
||||
let checkpoint_result = self.io.block(|| {
|
||||
self.wal
|
||||
.borrow_mut()
|
||||
.checkpoint(self, counter.clone(), mode)
|
||||
.map_err(|err| panic!("error while clearing cache {err}"))
|
||||
})?;
|
||||
|
||||
if checkpoint_result.everything_backfilled()
|
||||
|
||||
@@ -61,7 +61,7 @@ use crate::storage::pager::Pager;
|
||||
use crate::storage::wal::{BatchItem, PendingFlush};
|
||||
use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype};
|
||||
use crate::{turso_assert, File, Result, WalFileShared};
|
||||
use std::cell::{RefCell, UnsafeCell};
|
||||
use std::cell::{Cell, RefCell, UnsafeCell};
|
||||
use std::collections::HashMap;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::pin::Pin;
|
||||
@@ -857,7 +857,8 @@ pub fn begin_write_btree_page(
|
||||
pub fn begin_write_btree_pages_writev(
|
||||
pager: &Pager,
|
||||
batch: &[BatchItem],
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
// track writes for each flush series
|
||||
write_counter: Rc<Cell<usize>>,
|
||||
) -> Result<PendingFlush> {
|
||||
if batch.is_empty() {
|
||||
return Ok(PendingFlush::default());
|
||||
@@ -878,23 +879,34 @@ pub fn begin_write_btree_pages_writev(
|
||||
}
|
||||
|
||||
// submit contiguous run
|
||||
let first = run[start].id;
|
||||
let first_id = run[start].id;
|
||||
let bufs: Vec<_> = run[start..end].iter().map(|b| b.buf.clone()).collect();
|
||||
all_ids.extend(run[start..end].iter().map(|b| b.id));
|
||||
|
||||
*write_counter.borrow_mut() += 1;
|
||||
write_counter.set(write_counter.get() + 1);
|
||||
let wc = write_counter.clone();
|
||||
let done_clone = done.clone();
|
||||
|
||||
let c = Completion::new_write(move |_| {
|
||||
// one run finished
|
||||
*wc.borrow_mut() -= 1;
|
||||
if wc.borrow().eq(&0) {
|
||||
wc.set(wc.get() - 1);
|
||||
if wc.get().eq(&0) {
|
||||
// last run of this batch is done
|
||||
done_clone.store(true, Ordering::Release);
|
||||
}
|
||||
});
|
||||
pager.db_file.write_pages(first, page_sz, bufs, c)?;
|
||||
pager
|
||||
.db_file
|
||||
.write_pages(first_id, page_sz, bufs, c)
|
||||
.inspect_err(|e| {
|
||||
tracing::error!(
|
||||
"Failed to write pages {}-{}: {}",
|
||||
first_id,
|
||||
first_id + (end - start) - 1,
|
||||
e
|
||||
);
|
||||
write_counter.set(write_counter.get() - 1);
|
||||
})?;
|
||||
start = end;
|
||||
}
|
||||
Ok(PendingFlush {
|
||||
|
||||
@@ -399,7 +399,7 @@ pub enum CheckpointState {
|
||||
Done,
|
||||
}
|
||||
|
||||
const CKPT_BATCH_PAGES: usize = 256;
|
||||
const CKPT_BATCH_PAGES: usize = 512;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(super) struct BatchItem {
|
||||
@@ -416,7 +416,7 @@ pub(super) struct BatchItem {
|
||||
// current_page is a helper to iterate through all the pages that might have a frame in the safe
|
||||
// range. This is inefficient for now.
|
||||
struct OngoingCheckpoint {
|
||||
scratch: PageRef,
|
||||
scratch_page: PageRef,
|
||||
batch: Vec<BatchItem>,
|
||||
state: CheckpointState,
|
||||
pending_flushes: Vec<PendingFlush>,
|
||||
@@ -1261,7 +1261,7 @@ impl WalFile {
|
||||
max_frame: unsafe { (*shared.get()).max_frame.load(Ordering::SeqCst) },
|
||||
shared,
|
||||
ongoing_checkpoint: OngoingCheckpoint {
|
||||
scratch: checkpoint_page,
|
||||
scratch_page: checkpoint_page,
|
||||
batch: Vec::new(),
|
||||
pending_flushes: Vec::new(),
|
||||
state: CheckpointState::Start,
|
||||
|
||||
Reference in New Issue
Block a user