mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-28 12:24:23 +01:00
Performance improvements to checkpointing. prevent serializing I/O
This commit is contained in:
@@ -61,7 +61,7 @@ use crate::storage::buffer_pool::BufferPool;
|
||||
use crate::storage::database::DatabaseStorage;
|
||||
use crate::storage::encryption::EncryptionKey;
|
||||
use crate::storage::pager::Pager;
|
||||
use crate::storage::wal::{PendingFlush, READMARK_NOT_USED};
|
||||
use crate::storage::wal::READMARK_NOT_USED;
|
||||
use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype};
|
||||
use crate::{bail_corrupt_error, turso_assert, CompletionError, File, Result, WalFileShared};
|
||||
use std::cell::{Cell, UnsafeCell};
|
||||
@@ -964,10 +964,11 @@ pub fn begin_write_btree_page(pager: &Pager, page: &PageRef) -> Result<Completio
|
||||
pub fn write_pages_vectored(
|
||||
pager: &Pager,
|
||||
batch: BTreeMap<usize, Arc<Buffer>>,
|
||||
flush: &PendingFlush,
|
||||
done_flag: Arc<AtomicBool>,
|
||||
encryption_key: Option<&EncryptionKey>,
|
||||
) -> Result<Vec<Completion>> {
|
||||
if batch.is_empty() {
|
||||
done_flag.store(true, Ordering::Relaxed);
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
@@ -992,8 +993,7 @@ pub fn write_pages_vectored(
|
||||
|
||||
// Create the atomic counters
|
||||
let runs_left = Arc::new(AtomicUsize::new(run_count));
|
||||
flush.new_flush();
|
||||
let done = flush.done.clone();
|
||||
let done = done_flag.clone();
|
||||
// we know how many runs, but we don't know how many buffers per run, so we can only give an
|
||||
// estimate of the capacity
|
||||
const EST_BUFF_CAPACITY: usize = 32;
|
||||
@@ -1004,21 +1004,21 @@ pub fn write_pages_vectored(
|
||||
let mut run_start_id: Option<usize> = None;
|
||||
|
||||
// Iterate through the batch
|
||||
let mut iter = batch.into_iter().peekable();
|
||||
let mut iter = batch.iter().peekable();
|
||||
|
||||
let mut completions = Vec::new();
|
||||
while let Some((id, item)) = iter.next() {
|
||||
// Track the start of the run
|
||||
if run_start_id.is_none() {
|
||||
run_start_id = Some(id);
|
||||
run_start_id = Some(*id);
|
||||
}
|
||||
|
||||
// Add this page to the current run
|
||||
run_bufs.push(item);
|
||||
run_bufs.push(item.clone());
|
||||
|
||||
// Check if this is the end of a run
|
||||
let is_end_of_run = match iter.peek() {
|
||||
Some(&(next_id, _)) => next_id != id + 1,
|
||||
Some(&(next_id, _)) => *next_id != id + 1,
|
||||
None => true,
|
||||
};
|
||||
|
||||
|
||||
@@ -82,6 +82,8 @@ impl CheckpointMode {
|
||||
fn should_restart_log(&self) -> bool {
|
||||
matches!(self, CheckpointMode::Truncate | CheckpointMode::Restart)
|
||||
}
|
||||
/// All modes other than Passive require a complete backfilling of all available frames
|
||||
/// from `shared.nbackfills + 1 -> shared.max_frame`
|
||||
fn require_all_backfilled(&self) -> bool {
|
||||
!matches!(self, CheckpointMode::Passive)
|
||||
}
|
||||
@@ -301,66 +303,137 @@ pub enum CheckpointState {
|
||||
}
|
||||
|
||||
/// IOV_MAX is 1024 on most systems
|
||||
pub const CKPT_BATCH_PAGES: usize = 512;
|
||||
type PageId = usize;
|
||||
pub const CKPT_BATCH_PAGES: usize = 1024;
|
||||
|
||||
const MAX_INFLIGHT_READS: usize = 64;
|
||||
/// TODO: *ALL* of these need to be tuned for perf. It is tricky
|
||||
/// trying to figure out the ideal numbers here to work together concurrently
|
||||
const MIN_AVG_RUN_FOR_FLUSH: f32 = 32.0;
|
||||
const MIN_BATCH_LEN_FOR_FLUSH: usize = 512;
|
||||
const MAX_INFLIGHT_WRITES: usize = 64;
|
||||
const MAX_INFLIGHT_READS: usize = 512;
|
||||
|
||||
type PageId = usize;
|
||||
struct InflightRead {
|
||||
completion: Completion,
|
||||
page_id: PageId,
|
||||
done: Arc<AtomicBool>,
|
||||
/// Buffer slot to contain the page content from the WAL read.
|
||||
buf: Arc<SpinLock<Option<Arc<Buffer>>>>,
|
||||
}
|
||||
|
||||
/// Batch is a collection of pages that are being checkpointed together. It is used to
|
||||
/// WriteBatch is a collection of pages that are being checkpointed together. It is used to
|
||||
/// aggregate contiguous pages into a single write operation to the database file.
|
||||
pub(super) struct Batch {
|
||||
#[derive(Default)]
|
||||
struct WriteBatch {
|
||||
/// BTreeMap for sorting during insertion, helps create more efficient `writev` operations.
|
||||
items: BTreeMap<PageId, Arc<Buffer>>,
|
||||
/// total number of `runs`, each representing a contiguous group of `PageId`s
|
||||
run_count: usize,
|
||||
}
|
||||
// TODO(preston): implement the same thing for `readv`
|
||||
impl Batch {
|
||||
|
||||
impl WriteBatch {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
items: BTreeMap::new(),
|
||||
run_count: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_raw(&mut self, page_id: usize, buf: Arc<Buffer>) {
|
||||
#[inline]
|
||||
/// Add a pageId + Buffer to the batch of Writes to be submitted.
|
||||
fn insert(&mut self, page_id: PageId, buf: Arc<Buffer>) {
|
||||
if let std::collections::btree_map::Entry::Occupied(mut entry) = self.items.entry(page_id) {
|
||||
entry.insert(buf);
|
||||
return;
|
||||
}
|
||||
let left = page_id
|
||||
.checked_sub(1)
|
||||
.is_some_and(|p| self.items.contains_key(&p));
|
||||
let right = page_id
|
||||
.checked_add(1)
|
||||
.is_some_and(|p| self.items.contains_key(&p));
|
||||
match (left, right) {
|
||||
(false, false) => {
|
||||
// new singleton run
|
||||
self.run_count += 1;
|
||||
}
|
||||
(true, false) | (false, true) => {
|
||||
// extends an existing run, run_count unchanged
|
||||
}
|
||||
(true, true) => {
|
||||
// merges two runs into one
|
||||
self.run_count = self.run_count.saturating_sub(1);
|
||||
}
|
||||
}
|
||||
self.items.insert(page_id, buf);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn len(&self) -> usize {
|
||||
self.items.len()
|
||||
}
|
||||
#[inline]
|
||||
fn is_empty(&self) -> bool {
|
||||
self.items.is_empty()
|
||||
}
|
||||
#[inline]
|
||||
fn is_full(&self) -> bool {
|
||||
self.items.len() >= CKPT_BATCH_PAGES
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn avg_run_len(&self) -> f32 {
|
||||
if self.run_count == 0 {
|
||||
0.0
|
||||
} else {
|
||||
self.items.len() as f32 / self.run_count as f32
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn take(&mut self) -> BTreeMap<PageId, Arc<Buffer>> {
|
||||
self.run_count = 0;
|
||||
std::mem::take(&mut self.items)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn clear(&mut self) {
|
||||
self.items.clear();
|
||||
self.run_count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for Batch {
|
||||
impl std::ops::Deref for WriteBatch {
|
||||
type Target = BTreeMap<PageId, Arc<Buffer>>;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.items
|
||||
}
|
||||
}
|
||||
impl std::ops::DerefMut for Batch {
|
||||
impl std::ops::DerefMut for WriteBatch {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.items
|
||||
}
|
||||
}
|
||||
|
||||
// Checkpointing is a state machine that has multiple steps. Since there are multiple steps we save
|
||||
// in flight information of the checkpoint in OngoingCheckpoint. page is just a helper Page to do
|
||||
// page operations like reading a frame to a page, and writing a page to disk. This page should not
|
||||
// be placed back in pager page cache or anything, it's just a helper.
|
||||
// min_frame and max_frame is the range of frames that can be safely transferred from WAL to db
|
||||
// file.
|
||||
// current_page is a helper to iterate through all the pages that might have a frame in the safe
|
||||
// range. This is inefficient for now.
|
||||
/// Information and
|
||||
struct OngoingCheckpoint {
|
||||
batch: Batch,
|
||||
state: CheckpointState,
|
||||
pending_flush: PendingFlush,
|
||||
/// Used for benchmarking/debugging a checkpoint operation.
|
||||
time: std::time::Instant,
|
||||
/// minimum frame number to be backfilled by this checkpoint operation.
|
||||
min_frame: u64,
|
||||
/// maximum safe frame number that will be backfilled by this checkpoint operation.
|
||||
max_frame: u64,
|
||||
/// cursor used to iterate through all the pages that might have a frame in the safe range
|
||||
current_page: u64,
|
||||
/// State of the checkpoint
|
||||
state: CheckpointState,
|
||||
/// Batch repreesnts a collection of pages to be backfilled to the DB file.
|
||||
pending_writes: WriteBatch,
|
||||
/// Array of
|
||||
inflight_reads: Vec<InflightRead>,
|
||||
/// Array of atomic counters representing write operations that are currently in flight.
|
||||
inflight_writes: Vec<Arc<AtomicBool>>,
|
||||
/// List of all (page_id, frame_id) combinations to be backfilled.
|
||||
pages_to_checkpoint: Vec<(u64, u64)>,
|
||||
inflight: Vec<InflightRead>,
|
||||
}
|
||||
|
||||
impl OngoingCheckpoint {
|
||||
@@ -1142,6 +1215,7 @@ impl Wal for WalFile {
|
||||
page_buf
|
||||
};
|
||||
|
||||
let seq = header.checkpoint_seq;
|
||||
let (frame_checksums, frame_bytes) = prepare_wal_frame(
|
||||
&self.buffer_pool,
|
||||
&header,
|
||||
@@ -1318,20 +1392,24 @@ impl WalFile {
|
||||
|
||||
let header = unsafe { shared.get().as_mut().unwrap().wal_header.lock() };
|
||||
let last_checksum = unsafe { (*shared.get()).last_checksum };
|
||||
let disable_checkpoint_cache =
|
||||
std::env::var("TURSO_DISABLE_CHECKPOINT_CACHE").unwrap_or_default() != "";
|
||||
set_disable_ckpt_cache(disable_checkpoint_cache);
|
||||
Self {
|
||||
io,
|
||||
// default to max frame in WAL, so that when we read schema we can read from WAL too if it's there.
|
||||
max_frame: unsafe { (*shared.get()).max_frame.load(Ordering::Acquire) },
|
||||
shared,
|
||||
ongoing_checkpoint: OngoingCheckpoint {
|
||||
batch: Batch::new(),
|
||||
pending_flush: PendingFlush::new(),
|
||||
time: std::time::Instant::now(),
|
||||
pending_writes: WriteBatch::new(),
|
||||
inflight_writes: Vec::new(),
|
||||
state: CheckpointState::Start,
|
||||
min_frame: 0,
|
||||
max_frame: 0,
|
||||
current_page: 0,
|
||||
pages_to_checkpoint: Vec::new(),
|
||||
inflight: Vec::with_capacity(MAX_INFLIGHT_READS),
|
||||
inflight_reads: Vec::with_capacity(MAX_INFLIGHT_READS),
|
||||
},
|
||||
checkpoint_threshold: 1000,
|
||||
buffer_pool,
|
||||
@@ -1392,6 +1470,8 @@ impl WalFile {
|
||||
self.max_frame_read_lock_index.set(NO_LOCK_HELD);
|
||||
self.ongoing_checkpoint.batch.clear();
|
||||
self.ongoing_checkpoint.pending_flush.clear();
|
||||
self.sync_state.set(SyncState::NotSyncing);
|
||||
self.ongoing_checkpoint.pages_to_checkpoint.clear();
|
||||
self.syncing.set(false);
|
||||
}
|
||||
|
||||
@@ -1516,7 +1596,7 @@ impl WalFile {
|
||||
// if at all possible, at the cost of some batching potential.
|
||||
CheckpointState::Processing => {
|
||||
// Gather I/O completions, estimate with MAX_PENDING_WRITES to prevent realloc
|
||||
let mut completions = Vec::with_capacity(MAX_PENDING_WRITES);
|
||||
let mut completions = Vec::with_capacity(MAX_INFLIGHT_WRITES);
|
||||
|
||||
// Check and clean any completed writes from pending flush
|
||||
if self.ongoing_checkpoint.process_pending_writes() {
|
||||
@@ -1577,7 +1657,12 @@ impl WalFile {
|
||||
let batch_map = self.ongoing_checkpoint.pending_writes.take();
|
||||
if !batch_map.is_empty() {
|
||||
let done_flag = self.ongoing_checkpoint.add_write();
|
||||
completions.extend(write_pages_vectored(pager, batch_map, done_flag)?);
|
||||
completions.extend(write_pages_vectored(
|
||||
pager,
|
||||
batch_map,
|
||||
done_flag,
|
||||
self.encryption_key.borrow().as_ref(),
|
||||
)?);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1598,7 +1683,7 @@ impl WalFile {
|
||||
}
|
||||
self.ongoing_checkpoint.state = CheckpointState::Done;
|
||||
} else if !completions.is_empty() {
|
||||
return Ok(IOResult::IO(IOCompletions::Many(completions)));
|
||||
io_yield_many!(completions);
|
||||
}
|
||||
}
|
||||
// All eligible frames copied to the db file
|
||||
@@ -1858,11 +1943,9 @@ impl WalFile {
|
||||
|
||||
fn issue_wal_read_into_buffer(&self, page_id: usize, frame_id: u64) -> Result<InflightRead> {
|
||||
let offset = self.frame_offset(frame_id);
|
||||
let done = Arc::new(AtomicBool::new(false));
|
||||
let buf_slot = Arc::new(SpinLock::new(None));
|
||||
|
||||
let complete = {
|
||||
let done = done.clone();
|
||||
let buf_slot = buf_slot.clone();
|
||||
Box::new(move |buf: Arc<Buffer>, bytes_read: i32| {
|
||||
let buf_len = buf.len();
|
||||
@@ -1871,11 +1954,10 @@ impl WalFile {
|
||||
"read({bytes_read}) != expected({buf_len}): frame_id={frame_id}"
|
||||
);
|
||||
*buf_slot.lock() = Some(buf);
|
||||
done.store(true, Ordering::Release);
|
||||
})
|
||||
};
|
||||
// schedule read of the page payload
|
||||
let _c = begin_read_wal_frame(
|
||||
let c = begin_read_wal_frame(
|
||||
&self.get_shared().file,
|
||||
offset + WAL_FRAME_HEADER_SIZE,
|
||||
self.buffer_pool.clone(),
|
||||
@@ -1883,8 +1965,8 @@ impl WalFile {
|
||||
)?;
|
||||
|
||||
Ok(InflightRead {
|
||||
completion: c,
|
||||
page_id,
|
||||
done,
|
||||
buf: buf_slot,
|
||||
})
|
||||
}
|
||||
@@ -1897,7 +1979,7 @@ impl WalFile {
|
||||
self.ongoing_checkpoint.inflight.retain(|slot| {
|
||||
if slot.done.load(Ordering::Acquire) {
|
||||
if let Some(buf) = slot.buf.lock().take() {
|
||||
self.ongoing_checkpoint.batch.insert_raw(slot.page_id, buf);
|
||||
self.ongoing_checkpoint.batch.insert(slot.page_id, buf);
|
||||
moved = true;
|
||||
}
|
||||
false // drop this slot
|
||||
|
||||
Reference in New Issue
Block a user