diff --git a/.gitignore b/.gitignore index 6175dc26c..896698406 100644 --- a/.gitignore +++ b/.gitignore @@ -18,8 +18,6 @@ env .venv dist/ .tmp/ -# perf -/Mobibench *.db **/*.db-wal diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index d6bc3aace..74c19a3b9 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -61,7 +61,7 @@ use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::encryption::EncryptionKey; use crate::storage::pager::Pager; -use crate::storage::wal::{PendingFlush, READMARK_NOT_USED}; +use crate::storage::wal::READMARK_NOT_USED; use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype}; use crate::{bail_corrupt_error, turso_assert, CompletionError, File, Result, WalFileShared}; use std::cell::{Cell, UnsafeCell}; @@ -964,10 +964,11 @@ pub fn begin_write_btree_page(pager: &Pager, page: &PageRef) -> Result>, - flush: &PendingFlush, + done_flag: Arc, encryption_key: Option<&EncryptionKey>, ) -> Result> { if batch.is_empty() { + done_flag.store(true, Ordering::Relaxed); return Ok(Vec::new()); } @@ -992,8 +993,7 @@ pub fn write_pages_vectored( // Create the atomic counters let runs_left = Arc::new(AtomicUsize::new(run_count)); - flush.new_flush(); - let done = flush.done.clone(); + let done = done_flag.clone(); // we know how many runs, but we don't know how many buffers per run, so we can only give an // estimate of the capacity const EST_BUFF_CAPACITY: usize = 32; @@ -1004,21 +1004,21 @@ pub fn write_pages_vectored( let mut run_start_id: Option = None; // Iterate through the batch - let mut iter = batch.into_iter().peekable(); + let mut iter = batch.iter().peekable(); let mut completions = Vec::new(); while let Some((id, item)) = iter.next() { // Track the start of the run if run_start_id.is_none() { - run_start_id = Some(id); + run_start_id = Some(*id); } // Add this page to the current run - run_bufs.push(item); + run_bufs.push(item.clone()); // Check if this is the end of a run let is_end_of_run = match iter.peek() { - Some(&(next_id, _)) => next_id != id + 1, + Some(&(next_id, _)) => *next_id != id + 1, None => true, }; diff --git a/core/storage/wal.rs b/core/storage/wal.rs index e94258112..42f65d854 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -82,6 +82,8 @@ impl CheckpointMode { fn should_restart_log(&self) -> bool { matches!(self, CheckpointMode::Truncate | CheckpointMode::Restart) } + /// All modes other than Passive require a complete backfilling of all available frames + /// from `shared.nbackfills + 1 -> shared.max_frame` fn require_all_backfilled(&self) -> bool { !matches!(self, CheckpointMode::Passive) } @@ -301,66 +303,137 @@ pub enum CheckpointState { } /// IOV_MAX is 1024 on most systems -pub const CKPT_BATCH_PAGES: usize = 512; -type PageId = usize; +pub const CKPT_BATCH_PAGES: usize = 1024; -const MAX_INFLIGHT_READS: usize = 64; +/// TODO: *ALL* of these need to be tuned for perf. It is tricky +/// trying to figure out the ideal numbers here to work together concurrently +const MIN_AVG_RUN_FOR_FLUSH: f32 = 32.0; +const MIN_BATCH_LEN_FOR_FLUSH: usize = 512; +const MAX_INFLIGHT_WRITES: usize = 64; +const MAX_INFLIGHT_READS: usize = 512; + +type PageId = usize; struct InflightRead { + completion: Completion, page_id: PageId, - done: Arc, + /// Buffer slot to contain the page content from the WAL read. buf: Arc>>>, } -/// Batch is a collection of pages that are being checkpointed together. It is used to +/// WriteBatch is a collection of pages that are being checkpointed together. It is used to /// aggregate contiguous pages into a single write operation to the database file. -pub(super) struct Batch { +#[derive(Default)] +struct WriteBatch { + /// BTreeMap for sorting during insertion, helps create more efficient `writev` operations. items: BTreeMap>, + /// total number of `runs`, each representing a contiguous group of `PageId`s + run_count: usize, } -// TODO(preston): implement the same thing for `readv` -impl Batch { + +impl WriteBatch { fn new() -> Self { Self { items: BTreeMap::new(), + run_count: 0, } } - fn insert_raw(&mut self, page_id: usize, buf: Arc) { + #[inline] + /// Add a pageId + Buffer to the batch of Writes to be submitted. + fn insert(&mut self, page_id: PageId, buf: Arc) { + if let std::collections::btree_map::Entry::Occupied(mut entry) = self.items.entry(page_id) { + entry.insert(buf); + return; + } + let left = page_id + .checked_sub(1) + .is_some_and(|p| self.items.contains_key(&p)); + let right = page_id + .checked_add(1) + .is_some_and(|p| self.items.contains_key(&p)); + match (left, right) { + (false, false) => { + // new singleton run + self.run_count += 1; + } + (true, false) | (false, true) => { + // extends an existing run, run_count unchanged + } + (true, true) => { + // merges two runs into one + self.run_count = self.run_count.saturating_sub(1); + } + } self.items.insert(page_id, buf); } + + #[inline] + fn len(&self) -> usize { + self.items.len() + } + #[inline] + fn is_empty(&self) -> bool { + self.items.is_empty() + } + #[inline] fn is_full(&self) -> bool { self.items.len() >= CKPT_BATCH_PAGES } + + #[inline] + fn avg_run_len(&self) -> f32 { + if self.run_count == 0 { + 0.0 + } else { + self.items.len() as f32 / self.run_count as f32 + } + } + + #[inline] + fn take(&mut self) -> BTreeMap> { + self.run_count = 0; + std::mem::take(&mut self.items) + } + + #[inline] + fn clear(&mut self) { + self.items.clear(); + self.run_count = 0; + } } -impl std::ops::Deref for Batch { +impl std::ops::Deref for WriteBatch { type Target = BTreeMap>; fn deref(&self) -> &Self::Target { &self.items } } -impl std::ops::DerefMut for Batch { +impl std::ops::DerefMut for WriteBatch { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.items } } -// Checkpointing is a state machine that has multiple steps. Since there are multiple steps we save -// in flight information of the checkpoint in OngoingCheckpoint. page is just a helper Page to do -// page operations like reading a frame to a page, and writing a page to disk. This page should not -// be placed back in pager page cache or anything, it's just a helper. -// min_frame and max_frame is the range of frames that can be safely transferred from WAL to db -// file. -// current_page is a helper to iterate through all the pages that might have a frame in the safe -// range. This is inefficient for now. +/// Information and struct OngoingCheckpoint { - batch: Batch, - state: CheckpointState, - pending_flush: PendingFlush, + /// Used for benchmarking/debugging a checkpoint operation. + time: std::time::Instant, + /// minimum frame number to be backfilled by this checkpoint operation. min_frame: u64, + /// maximum safe frame number that will be backfilled by this checkpoint operation. max_frame: u64, + /// cursor used to iterate through all the pages that might have a frame in the safe range current_page: u64, + /// State of the checkpoint + state: CheckpointState, + /// Batch repreesnts a collection of pages to be backfilled to the DB file. + pending_writes: WriteBatch, + /// Array of + inflight_reads: Vec, + /// Array of atomic counters representing write operations that are currently in flight. + inflight_writes: Vec>, + /// List of all (page_id, frame_id) combinations to be backfilled. pages_to_checkpoint: Vec<(u64, u64)>, - inflight: Vec, } impl OngoingCheckpoint { @@ -1142,6 +1215,7 @@ impl Wal for WalFile { page_buf }; + let seq = header.checkpoint_seq; let (frame_checksums, frame_bytes) = prepare_wal_frame( &self.buffer_pool, &header, @@ -1318,20 +1392,24 @@ impl WalFile { let header = unsafe { shared.get().as_mut().unwrap().wal_header.lock() }; let last_checksum = unsafe { (*shared.get()).last_checksum }; + let disable_checkpoint_cache = + std::env::var("TURSO_DISABLE_CHECKPOINT_CACHE").unwrap_or_default() != ""; + set_disable_ckpt_cache(disable_checkpoint_cache); Self { io, // default to max frame in WAL, so that when we read schema we can read from WAL too if it's there. max_frame: unsafe { (*shared.get()).max_frame.load(Ordering::Acquire) }, shared, ongoing_checkpoint: OngoingCheckpoint { - batch: Batch::new(), - pending_flush: PendingFlush::new(), + time: std::time::Instant::now(), + pending_writes: WriteBatch::new(), + inflight_writes: Vec::new(), state: CheckpointState::Start, min_frame: 0, max_frame: 0, current_page: 0, pages_to_checkpoint: Vec::new(), - inflight: Vec::with_capacity(MAX_INFLIGHT_READS), + inflight_reads: Vec::with_capacity(MAX_INFLIGHT_READS), }, checkpoint_threshold: 1000, buffer_pool, @@ -1392,6 +1470,8 @@ impl WalFile { self.max_frame_read_lock_index.set(NO_LOCK_HELD); self.ongoing_checkpoint.batch.clear(); self.ongoing_checkpoint.pending_flush.clear(); + self.sync_state.set(SyncState::NotSyncing); + self.ongoing_checkpoint.pages_to_checkpoint.clear(); self.syncing.set(false); } @@ -1516,7 +1596,7 @@ impl WalFile { // if at all possible, at the cost of some batching potential. CheckpointState::Processing => { // Gather I/O completions, estimate with MAX_PENDING_WRITES to prevent realloc - let mut completions = Vec::with_capacity(MAX_PENDING_WRITES); + let mut completions = Vec::with_capacity(MAX_INFLIGHT_WRITES); // Check and clean any completed writes from pending flush if self.ongoing_checkpoint.process_pending_writes() { @@ -1577,7 +1657,12 @@ impl WalFile { let batch_map = self.ongoing_checkpoint.pending_writes.take(); if !batch_map.is_empty() { let done_flag = self.ongoing_checkpoint.add_write(); - completions.extend(write_pages_vectored(pager, batch_map, done_flag)?); + completions.extend(write_pages_vectored( + pager, + batch_map, + done_flag, + self.encryption_key.borrow().as_ref(), + )?); } } @@ -1598,7 +1683,7 @@ impl WalFile { } self.ongoing_checkpoint.state = CheckpointState::Done; } else if !completions.is_empty() { - return Ok(IOResult::IO(IOCompletions::Many(completions))); + io_yield_many!(completions); } } // All eligible frames copied to the db file @@ -1858,11 +1943,9 @@ impl WalFile { fn issue_wal_read_into_buffer(&self, page_id: usize, frame_id: u64) -> Result { let offset = self.frame_offset(frame_id); - let done = Arc::new(AtomicBool::new(false)); let buf_slot = Arc::new(SpinLock::new(None)); let complete = { - let done = done.clone(); let buf_slot = buf_slot.clone(); Box::new(move |buf: Arc, bytes_read: i32| { let buf_len = buf.len(); @@ -1871,11 +1954,10 @@ impl WalFile { "read({bytes_read}) != expected({buf_len}): frame_id={frame_id}" ); *buf_slot.lock() = Some(buf); - done.store(true, Ordering::Release); }) }; // schedule read of the page payload - let _c = begin_read_wal_frame( + let c = begin_read_wal_frame( &self.get_shared().file, offset + WAL_FRAME_HEADER_SIZE, self.buffer_pool.clone(), @@ -1883,8 +1965,8 @@ impl WalFile { )?; Ok(InflightRead { + completion: c, page_id, - done, buf: buf_slot, }) } @@ -1897,7 +1979,7 @@ impl WalFile { self.ongoing_checkpoint.inflight.retain(|slot| { if slot.done.load(Ordering::Acquire) { if let Some(buf) = slot.buf.lock().take() { - self.ongoing_checkpoint.batch.insert_raw(slot.page_id, buf); + self.ongoing_checkpoint.batch.insert(slot.page_id, buf); moved = true; } false // drop this slot