diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index 6b4b1992f..aa0c4772b 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -724,10 +724,10 @@ impl turso_core::DatabaseStorage for DatabaseFile { page_size: usize, buffers: Vec>>, c: turso_core::Completion, - ) -> turso_core::Result<()> { - let pos = (page_idx - 1) * page_size; - self.file.pwritev(pos, buffers, c.into())?; - Ok(()) + ) -> turso_core::Result { + let pos = page_idx.saturating_sub(1) * page_size; + let c = self.file.pwritev(pos, buffers, c)?; + Ok(c) } fn sync(&self, c: turso_core::Completion) -> turso_core::Result { diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index f27bcba1f..ed0a0f7d8 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -65,15 +65,16 @@ struct IovecPool { impl IovecPool { fn new() -> Self { - let mut pool = Vec::with_capacity(IOVEC_POOL_SIZE); - for _ in 0..IOVEC_POOL_SIZE { - pool.push(Box::new( - [libc::iovec { - iov_base: std::ptr::null_mut(), - iov_len: 0, - }; MAX_IOVEC_ENTRIES], - )); - } + let pool = (0..IOVEC_POOL_SIZE) + .map(|_| { + Box::new( + [libc::iovec { + iov_base: std::ptr::null_mut(), + iov_len: 0, + }; MAX_IOVEC_ENTRIES], + ) + }) + .collect(); Self { pool } } @@ -144,18 +145,20 @@ enum Fd { } impl Fd { - fn as_raw_fd(&self) -> i32 { - match self { - Fd::RawFd(fd) => *fd, - _ => unreachable!("only to be called on RawFd variant"), - } - } + /// to match the behavior of the File, we need to implement the same methods fn id(&self) -> Option { match self { Fd::Fixed(id) => Some(*id), Fd::RawFd(_) => None, } } + /// ONLY to be called by the macro, in the case where id() is None + fn as_raw_fd(&self) -> i32 { + match self { + Fd::RawFd(fd) => *fd, + _ => panic!("Cannot call as_raw_fd on a Fixed Fd"), + } + } } /// State to track an ongoing writev operation in @@ -181,10 +184,10 @@ struct WritevState { impl WritevState { fn new(file: &UringFile, pos: usize, bufs: Vec>>) -> Self { - let file_id = match file.id() { - Some(id) => Fd::Fixed(id), - None => Fd::RawFd(file.as_raw_fd()), - }; + let file_id = file + .id() + .map(Fd::Fixed) + .unwrap_or_else(|| Fd::RawFd(file.as_raw_fd())); let total_len = bufs.iter().map(|b| b.borrow().len()).sum(); Self { file_id, @@ -293,18 +296,15 @@ impl WrappedIOUring { /// Submit or resubmit a writev operation fn submit_writev(&mut self, key: u64, mut st: WritevState) { st.free_last_iov(&mut self.iov_pool); - let mut iov_allocation = match self.iov_pool.acquire() { - Some(alloc) => alloc, - None => { - // Fallback: allocate a new one if pool is exhausted - Box::new( - [libc::iovec { - iov_base: std::ptr::null_mut(), - iov_len: 0, - }; MAX_IOVEC_ENTRIES], - ) - } - }; + let mut iov_allocation = self.iov_pool.acquire().unwrap_or_else(|| { + // Fallback: allocate a new one if pool is exhausted + Box::new( + [libc::iovec { + iov_base: std::ptr::null_mut(), + iov_len: 0, + }; MAX_IOVEC_ENTRIES], + ) + }); let mut iov_count = 0; for (idx, buffer) in st .bufs @@ -346,54 +346,41 @@ impl WrappedIOUring { self.submit_entry(&entry); } - fn handle_writev_completion(&mut self, mut st: WritevState, user_data: u64, result: i32) { + fn handle_writev_completion(&mut self, mut state: WritevState, user_data: u64, result: i32) { if result < 0 { - tracing::error!( - "writev operation failed for user_data {}: {}", - user_data, - std::io::Error::from_raw_os_error(result) - ); - // error: free iov allocation and call completion with error code - st.free_last_iov(&mut self.iov_pool); + let err = std::io::Error::from_raw_os_error(result); + tracing::error!("writev failed (user_data: {}): {}", user_data, err); + state.free_last_iov(&mut self.iov_pool); completion_from_key(user_data).complete(result); - } else { - let written = result as usize; - st.advance(written); - if st.remaining() == 0 { + return; + } + + let written = result as usize; + state.advance(written); + match state.remaining() { + 0 => { tracing::info!( "writev operation completed: wrote {} bytes", - st.total_written + state.total_written ); // write complete, return iovec to pool - st.free_last_iov(&mut self.iov_pool); - completion_from_key(user_data).complete(st.total_written as i32); - } else { + state.free_last_iov(&mut self.iov_pool); + completion_from_key(user_data).complete(state.total_written as i32); + } + remaining => { tracing::trace!( "resubmitting writev operation for user_data {}: wrote {} bytes, remaining {}", user_data, written, - st.remaining() + remaining ); // partial write, submit next - self.submit_writev(user_data, st); + self.submit_writev(user_data, state); } } } } -#[inline(always)] -/// use the callback pointer as the user_data for the operation as is -/// common practice for io_uring to prevent more indirection -fn get_key(c: Arc) -> u64 { - Arc::into_raw(c) as u64 -} - -#[inline(always)] -/// convert the user_data back to an Arc pointer -fn completion_from_key(key: u64) -> Arc { - unsafe { Arc::from_raw(key as *const Completion) } -} - impl IO for UringIO { fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result> { trace!("open_file(path = {})", path); @@ -613,8 +600,8 @@ impl File for UringFile { &self, pos: usize, bufs: Vec>>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { // for a single buffer use pwrite directly if bufs.len().eq(&1) { return self.pwrite(pos, bufs[0].clone(), c.clone()); diff --git a/core/io/mod.rs b/core/io/mod.rs index 6518157e8..8560216e8 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -1,4 +1,4 @@ -use crate::{turso_assert, Result}; +use crate::Result; use bitflags::bitflags; use cfg_block::cfg_block; use std::fmt; diff --git a/core/io/unix.rs b/core/io/unix.rs index 82f03ba77..7e73e6904 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -411,7 +411,7 @@ enum CompletionCallback { ), Writev( Arc>, - Arc, + Completion, Vec>>, usize, // absolute file offset usize, // buf index @@ -537,17 +537,12 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::TRACE)] -<<<<<<< HEAD - fn sync(&self, c: Completion) -> Result { -||||||| parent of 7f48531b (batch backfilling pages when checkpointing) - fn sync(&self, c: Arc) -> Result> { -======= fn pwritev( &self, pos: usize, buffers: Vec>>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { let file = self .file .lock() @@ -588,8 +583,7 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::TRACE)] - fn sync(&self, c: Arc) -> Result> { ->>>>>>> 7f48531b (batch backfilling pages when checkpointing) + fn sync(&self, c: Completion) -> Result { let file = self.file.lock().unwrap(); let result = fs::fsync(file.as_fd()); match result { diff --git a/core/storage/database.rs b/core/storage/database.rs index ff474a436..0370d398c 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -23,7 +23,7 @@ pub trait DatabaseStorage: Send + Sync { buffers: Vec>>, c: Completion, ) -> Result; - fn sync(&self, c: Completion) -> Result<()>; + fn sync(&self, c: Completion) -> Result; fn size(&self) -> Result; fn truncate(&self, len: usize, c: Completion) -> Result; } @@ -74,7 +74,7 @@ impl DatabaseStorage for DatabaseFile { page_size: usize, buffers: Vec>>, c: Completion, - ) -> Result<()> { + ) -> Result { assert!(page_idx > 0); assert!(page_size >= 512); assert!(page_size <= 65536); @@ -149,7 +149,7 @@ impl DatabaseStorage for FileMemoryStorage { page_size: usize, buffer: Vec>>, c: Completion, - ) -> Result<()> { + ) -> Result { assert!(page_idx > 0); assert!(page_size >= 512); assert!(page_size <= 65536); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 768b8c6c1..90fcb2893 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1304,11 +1304,10 @@ impl Pager { } let write_counter = Rc::new(RefCell::new(0)); - let checkpoint_result = self.io.block(|| { + let mut checkpoint_result = self.io.block(|| { self.wal .borrow_mut() - .checkpoint(self, counter.clone(), mode) - .map_err(|err| panic!("error while clearing cache {err}")) + .checkpoint(self, write_counter.clone(), mode) })?; if checkpoint_result.everything_backfilled() diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index d434ec255..1d58f444e 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -62,7 +62,7 @@ use crate::storage::wal::{BatchItem, PendingFlush}; use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype}; use crate::{turso_assert, File, Result, WalFileShared}; use std::cell::{RefCell, UnsafeCell}; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::mem::MaybeUninit; use std::pin::Pin; use std::rc::Rc; @@ -854,52 +854,89 @@ pub fn begin_write_btree_page( } #[instrument(skip_all, level = Level::DEBUG)] -pub fn write_pages_vectored(pager: &Pager, batch: &[BatchItem]) -> Result { +pub fn write_pages_vectored( + pager: &Pager, + batch: BTreeMap, +) -> Result { if batch.is_empty() { return Ok(PendingFlush::default()); } - let mut run = batch.to_vec(); - run.sort_by_key(|b| b.id); + + // batch item array is already sorted by id, so we just need to find contiguous ranges of page_id's + // to submit as `writev`/write_pages calls. let page_sz = pager.page_size.get().unwrap_or(DEFAULT_PAGE_SIZE) as usize; - let mut all_ids = Vec::with_capacity(run.len()); - // count runs - let mut starts = Vec::with_capacity(5); // arbitrary initialization - let mut start = 0; - while start < run.len() { - let mut end = start + 1; - while end < run.len() && run[end].id == run[end - 1].id + 1 { - end += 1; + let mut all_ids = Vec::with_capacity(batch.len()); + + // Count expected number of runs to create the atomic counter we need to track each batch + let mut run_count = 0; + let mut prev_id = None; + for &id in batch.keys() { + if let Some(prev) = prev_id { + if id != prev + 1 { + run_count += 1; + } + } else { + run_count = 1; // First run } - starts.push((start, end)); - start = end; + prev_id = Some(id); } - let runs = starts.len(); - let runs_left = Arc::new(AtomicUsize::new(runs)); + + // Create the atomic counters + let runs_left = Arc::new(AtomicUsize::new(run_count)); let done = Arc::new(AtomicBool::new(false)); - for (start, end) in starts { - let first_id = run[start].id; - let bufs: Vec<_> = run[start..end].iter().map(|b| b.buf.clone()).collect(); - all_ids.extend(run[start..end].iter().map(|b| b.id)); + let mut run_start_id = None; + // we know how many runs, but we don't know how many buffers per run, so we can only give an + // estimate of the capacity + const EST_BUFF_CAPACITY: usize = 32; + let mut run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY); + let mut run_ids = Vec::with_capacity(EST_BUFF_CAPACITY); - let runs_left_cl = runs_left.clone(); - let done_cl = done.clone(); + // Iterate through the batch, submitting each run as soon as it ends + let mut iter = batch.iter().peekable(); + while let Some((&id, item)) = iter.next() { + if run_start_id.is_none() { + run_start_id = Some(id); + } - let c = Completion::new_write(move |_| { - if runs_left_cl.fetch_sub(1, Ordering::AcqRel) == 1 { - done_cl.store(true, Ordering::Release); + run_bufs.push(item.buf.clone()); + run_ids.push(id); + + // Check if this is the end of a run, either the next key is not consecutive or this is the last entry + let is_end_of_run = match iter.peek() { + Some((&next_id, _)) => next_id != id + 1, + None => true, // Last item is always end of a run + }; + + if is_end_of_run { + // Submit this run immediately + let start_id = run_start_id.unwrap(); + let runs_left_cl = runs_left.clone(); + let done_cl = done.clone(); + let c = Completion::new_write(move |_| { + if runs_left_cl.fetch_sub(1, Ordering::AcqRel) == 1 { + done_cl.store(true, Ordering::Release); + } + }); + + // Submit and decrement the runs_left counter on error + if let Err(e) = pager.db_file.write_pages(start_id, page_sz, run_bufs, c) { + if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 { + done.store(true, Ordering::Release); + } + return Err(e); } - }); - // submit, roll back on error - if let Err(e) = pager.db_file.write_pages(first_id, page_sz, bufs, c) { - if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 { - done.store(true, Ordering::Release); - } - return Err(e); + // Add IDs to the all_ids list and prepare for the next run + all_ids.extend(run_ids); + run_start_id = None; + // .clear() will cause borrowing issue, unfortunately we have to reassign + run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY); + run_ids = Vec::with_capacity(EST_BUFF_CAPACITY); } } + tracing::debug!( - "write_pages_vectored: {} pages to write, runs: {runs}", + "write_pages_vectored: {} pages to write, runs: {run_count}", all_ids.len() ); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 399af7746..b37e3af54 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -3,7 +3,7 @@ use std::array; use std::cell::UnsafeCell; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use strum::EnumString; use tracing::{instrument, Level}; @@ -404,7 +404,6 @@ pub const CKPT_BATCH_PAGES: usize = 512; #[derive(Clone)] pub(super) struct BatchItem { - pub(super) id: usize, pub(super) buf: Arc>, } @@ -418,9 +417,9 @@ pub(super) struct BatchItem { // range. This is inefficient for now. struct OngoingCheckpoint { scratch_page: PageRef, - batch: Vec, + batch: BTreeMap, state: CheckpointState, - pending_flushes: Vec, + pending_flush: Option, min_frame: u64, max_frame: u64, current_page: u64, @@ -446,6 +445,14 @@ impl PendingFlush { done: Arc::new(AtomicBool::new(false)), } } + // clear the dirty flag of all pages in the pending flush batch + fn clear_dirty(&self, pager: &Pager) { + for id in &self.pages { + if let Some(p) = pager.cache_get(*id) { + p.clear_dirty(); + } + } + } } impl fmt::Debug for OngoingCheckpoint { @@ -699,7 +706,11 @@ impl Drop for CheckpointLocks { } } -fn take_page_into_batch(scratch: &PageRef, pool: &Arc, batch: &mut Vec) { +fn take_page_into_batch( + scratch: &PageRef, + pool: &Arc, + batch: &mut BTreeMap, +) { let (id, buf_clone) = unsafe { let inner = &*scratch.inner.get(); let id = inner.id; @@ -707,9 +718,8 @@ fn take_page_into_batch(scratch: &PageRef, pool: &Arc, batch: &mut V let buf = contents.buffer.clone(); (id, buf) }; - - // Push into batch - batch.push(BatchItem { id, buf: buf_clone }); + // Insert the new batch item at the correct position + batch.insert(id, BatchItem { buf: buf_clone }); // Re-initialize scratch with a fresh buffer let raw = pool.get(); @@ -1147,7 +1157,7 @@ impl Wal for WalFile { "Full checkpoint mode is not implemented yet".into(), )); } - self.checkpoint_inner(pager, write_counter, mode) + self.checkpoint_inner(pager, _write_counter, mode) .inspect_err(|_| { let _ = self.checkpoint_guard.take(); }) @@ -1265,8 +1275,8 @@ impl WalFile { shared, ongoing_checkpoint: OngoingCheckpoint { scratch_page: checkpoint_page, - batch: Vec::new(), - pending_flushes: Vec::new(), + batch: BTreeMap::new(), + pending_flush: None, state: CheckpointState::Start, min_frame: 0, max_frame: 0, @@ -1326,7 +1336,7 @@ impl WalFile { self.ongoing_checkpoint.current_page = 0; self.max_frame_read_lock_index.set(NO_LOCK_HELD); self.ongoing_checkpoint.batch.clear(); - self.ongoing_checkpoint.pending_flushes.clear(); + let _ = self.ongoing_checkpoint.pending_flush.take(); self.sync_state.set(SyncState::NotSyncing); self.syncing.set(false); } @@ -1375,7 +1385,7 @@ impl WalFile { fn checkpoint_inner( &mut self, pager: &Pager, - write_counter: Rc>, + _write_counter: Rc>, mode: CheckpointMode, ) -> Result> { 'checkpoint_loop: loop { @@ -1438,10 +1448,10 @@ impl WalFile { page, *frame ); - self.ongoing_checkpoint.page.get().id = page as usize; + self.ongoing_checkpoint.scratch_page.get().id = page as usize; let _ = self.read_frame( *frame, - self.ongoing_checkpoint.page.clone(), + self.ongoing_checkpoint.scratch_page.clone(), self.buffer_pool.clone(), )?; self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame; @@ -1451,7 +1461,7 @@ impl WalFile { self.ongoing_checkpoint.current_page += 1; } CheckpointState::WaitReadFrame => { - if self.ongoing_checkpoint.page.is_locked() { + if self.ongoing_checkpoint.scratch_page.is_locked() { return Ok(IOResult::IO); } else { self.ongoing_checkpoint.state = CheckpointState::AccumulatePage; @@ -1460,14 +1470,15 @@ impl WalFile { CheckpointState::AccumulatePage => { // mark before batching self.ongoing_checkpoint.scratch_page.set_dirty(); + // we read the frame into memory, add it to our batch take_page_into_batch( &self.ongoing_checkpoint.scratch_page, &self.buffer_pool, &mut self.ongoing_checkpoint.batch, ); let more_pages = (self.ongoing_checkpoint.current_page as usize) - < self.get_shared().pages_in_frames.lock().len() - 1; - + < self.get_shared().pages_in_frames.lock().len() - 1 + && self.ongoing_checkpoint.batch.len() < CKPT_BATCH_PAGES; if more_pages { self.ongoing_checkpoint.current_page += 1; self.ongoing_checkpoint.state = CheckpointState::ReadFrame; @@ -1477,34 +1488,30 @@ impl WalFile { } CheckpointState::FlushBatch => { tracing::trace!("started checkpoint backfilling batch"); - self.ongoing_checkpoint - .pending_flushes - .push(write_pages_vectored(pager, &self.ongoing_checkpoint.batch)?); + self.ongoing_checkpoint.pending_flush = Some(write_pages_vectored( + pager, + std::mem::take(&mut self.ongoing_checkpoint.batch), + )?); // batch is queued self.ongoing_checkpoint.batch.clear(); self.ongoing_checkpoint.state = CheckpointState::WaitFlush; } CheckpointState::WaitFlush => { - if self - .ongoing_checkpoint - .pending_flushes - .iter() - .any(|pf| !pf.done.load(Ordering::Acquire)) - { - return Ok(IOResult::IO); + match self.ongoing_checkpoint.pending_flush.as_ref() { + Some(pf) if pf.done.load(Ordering::SeqCst) => { + // flush is done, we can continue + tracing::trace!("checkpoint backfilling batch done"); + } + Some(_) => return Ok(IOResult::IO), + None => panic!("we should have a pending flush here"), } tracing::debug!("finished checkpoint backfilling batch"); - for pf in self + let pf = self .ongoing_checkpoint - .pending_flushes - .drain(std::ops::RangeFull) - { - for id in pf.pages { - if let Some(p) = pager.cache_get(id) { - p.clear_dirty(); - } - } - } + .pending_flush + .as_ref() + .expect("we should have a pending flush here"); + pf.clear_dirty(pager); // done with batch let shared = self.get_shared(); if (self.ongoing_checkpoint.current_page as usize) @@ -1513,7 +1520,7 @@ impl WalFile { self.ongoing_checkpoint.current_page += 1; self.ongoing_checkpoint.state = CheckpointState::ReadFrame; } else { - tracing::info!("transitioning checkpoint to done"); + tracing::debug!("WaitFlush transitioning checkpoint to Done"); self.ongoing_checkpoint.state = CheckpointState::Done; } } @@ -1522,9 +1529,13 @@ impl WalFile { // In Restart or Truncate mode, we need to restart the log over and possibly truncate the file // Release all locks and return the current num of wal frames and the amount we backfilled CheckpointState::Done => { - if *write_counter.borrow() > 0 { - return Ok(IOResult::IO); - } + turso_assert!( + self.ongoing_checkpoint + .pending_flush + .as_ref() + .is_some_and(|pf| pf.done.load(Ordering::Relaxed)), + "checkpoint pending flush must have finished" + ); let mut checkpoint_result = { let shared = self.get_shared(); let current_mx = shared.max_frame.load(Ordering::SeqCst);