From 7b2163208bbc55c5ade20b9e5d8b8b7e3fffe983 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Fri, 25 Jul 2025 19:01:41 -0400 Subject: [PATCH 01/18] batch backfilling pages when checkpointing --- core/Cargo.toml | 5 +- core/io/io_uring.rs | 88 +++++++++--- core/io/memory.rs | 43 ++++++ core/io/mod.rs | 25 ++++ core/io/unix.rs | 253 ++++++++++++++++++++++++++------- core/storage/database.rs | 41 +++++- core/storage/pager.rs | 2 +- core/storage/sqlite3_ondisk.rs | 56 +++++++- core/storage/wal.rs | 134 ++++++++++++++--- 9 files changed, 551 insertions(+), 96 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 651282010..2adc63372 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -19,7 +19,7 @@ default = ["fs", "uuid", "time", "json", "series"] fs = ["turso_ext/vfs"] json = [] uuid = ["dep:uuid"] -io_uring = ["dep:io-uring", "rustix/io_uring", "dep:libc"] +io_uring = ["dep:io-uring", "rustix/io_uring"] time = [] fuzz = [] omit_autovacuum = [] @@ -29,10 +29,12 @@ series = [] [target.'cfg(target_os = "linux")'.dependencies] io-uring = { version = "0.7.5", optional = true } +libc = { version = "0.2.172" } [target.'cfg(target_family = "unix")'.dependencies] polling = "3.7.4" rustix = { version = "1.0.5", features = ["fs"] } +libc = { version = "0.2.172" } [target.'cfg(not(target_family = "wasm"))'.dependencies] mimalloc = { version = "0.1.46", default-features = false } @@ -44,7 +46,6 @@ turso_ext = { workspace = true, features = ["core_only"] } cfg_block = "0.1.1" fallible-iterator = "0.3.0" hex = "0.4.3" -libc = { version = "0.2.172", optional = true } turso_sqlite3_parser = { workspace = true } thiserror = "1.0.61" getrandom = { version = "0.2.15" } diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index f33c04db3..51c2c85a8 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -106,28 +106,27 @@ impl WrappedIOUring { fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) { trace!("submit_entry({:?})", entry); unsafe { - self.ring - .submission() - .push(entry) - .expect("submission queue is full"); + let mut sub = self.ring.submission_shared(); + match sub.push(entry) { + Ok(_) => self.pending_ops += 1, + Err(e) => { + tracing::error!("Failed to submit entry: {e}"); + self.ring.submit().expect("failed to submit entry"); + sub.push(entry).expect("failed to push entry after submit"); + self.pending_ops += 1; + } + } } - self.pending_ops += 1; } fn wait_for_completion(&mut self) -> Result<()> { - self.ring.submit_and_wait(1)?; - Ok(()) - } - - fn get_completion(&mut self) -> Option { - // NOTE: This works because CompletionQueue's next function pops the head of the queue. This is not normal behaviour of iterators - let entry = self.ring.completion().next(); - if entry.is_some() { - trace!("get_completion({:?})", entry); - // consumed an entry from completion queue, update pending_ops - self.pending_ops -= 1; + if self.pending_ops == 0 { + return Ok(()); } - entry + let wants = std::cmp::min(self.pending_ops, 8); + tracing::info!("Waiting for {wants} pending operations to complete"); + self.ring.submit_and_wait(wants)?; + Ok(()) } fn empty(&self) -> bool { @@ -185,7 +184,8 @@ impl IO for UringIO { } ring.wait_for_completion()?; - while let Some(cqe) = ring.get_completion() { + while let Some(cqe) = ring.ring.completion().next() { + ring.pending_ops -= 1; let result = cqe.result(); if result < 0 { return Err(LimboError::UringIOError(format!( @@ -196,6 +196,12 @@ impl IO for UringIO { } let ud = cqe.user_data(); turso_assert!(ud > 0, "therea are no linked timeouts or cancelations, all cqe user_data should be valid arc pointers"); + if ud == 0 { + // we currently don't have any linked timeouts or cancelations, but just in case + // lets guard against this case + tracing::error!("Received completion with user_data 0"); + continue; + } completion_from_key(ud).complete(result); } Ok(()) @@ -350,6 +356,52 @@ impl File for UringFile { Ok(c) } + fn pwritev( + &self, + pos: usize, + bufs: Vec>>, + c: Arc, + ) -> Result> { + // build iovecs + let mut iovs: Vec = Vec::with_capacity(bufs.len()); + for b in &bufs { + let rb = b.borrow(); + iovs.push(libc::iovec { + iov_base: rb.as_ptr() as *mut _, + iov_len: rb.len(), + }); + } + // keep iovecs alive until completion + let boxed_iovs = iovs.into_boxed_slice(); + let iov_ptr = boxed_iovs.as_ptr(); + let iov_len = boxed_iovs.len() as u32; + // leak now, free in completion + let raw_iovs = Box::into_raw(boxed_iovs); + + let comp = { + // wrap original completion to free resources + let orig = c.clone(); + Box::new(move |res: i32| { + // reclaim iovecs + unsafe { + let _ = Box::from_raw(raw_iovs); + } + // forward to user closure + orig.complete(res); + }) + }; + let c = Arc::new(Completion::new_write(comp)); + let mut io = self.io.borrow_mut(); + let e = with_fd!(self, |fd| { + io_uring::opcode::Writev::new(fd, iov_ptr, iov_len) + .offset(pos as u64) + .build() + .user_data(get_key(c.clone())) + }); + io.ring.submit_entry(&e); + Ok(c) + } + fn size(&self) -> Result { Ok(self.file.metadata()?.len()) } diff --git a/core/io/memory.rs b/core/io/memory.rs index 7dbf05d50..4d056aeb4 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -187,6 +187,49 @@ impl File for MemoryFile { Ok(c) } + fn pwritev( + &self, + pos: usize, + buffers: Vec>>, + c: Completion, + ) -> Result { + let mut offset = pos; + let mut total_written = 0; + + for buffer in buffers { + let buf = buffer.borrow(); + let buf_len = buf.len(); + if buf_len == 0 { + continue; + } + + let mut remaining = buf_len; + let mut buf_offset = 0; + let data = &buf.as_slice(); + + while remaining > 0 { + let page_no = offset / PAGE_SIZE; + let page_offset = offset % PAGE_SIZE; + let bytes_to_write = remaining.min(PAGE_SIZE - page_offset); + + { + let page = self.get_or_allocate_page(page_no); + page[page_offset..page_offset + bytes_to_write] + .copy_from_slice(&data[buf_offset..buf_offset + bytes_to_write]); + } + + offset += bytes_to_write; + buf_offset += bytes_to_write; + remaining -= bytes_to_write; + } + total_written += buf_len; + } + c.complete(total_written as i32); + self.size + .set(core::cmp::max(pos + total_written, self.size.get())); + Ok(c) + } + fn size(&self) -> Result { Ok(self.size.get() as u64) } diff --git a/core/io/mod.rs b/core/io/mod.rs index 82ef51313..ab299ef64 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -18,6 +18,31 @@ pub trait File: Send + Sync { fn pwrite(&self, pos: usize, buffer: Arc>, c: Completion) -> Result; fn sync(&self, c: Completion) -> Result; + fn pwritev( + &self, + pos: usize, + buffers: Vec>>, + c: Completion, + ) -> Result { + // FIXME: for now, stupid default so i dont have to impl for all backends + let counter = Rc::new(Cell::new(0)); + let len = buffers.len(); + let mut pos = pos; + for buf in buffers { + let _counter = counter.clone(); + let _c = c.clone(); + let default_c = Completion::new_write(move |_| { + _counter.set(_counter.get() + 1); + if _counter.get() == len { + _c.complete(len as i32); // complete the original completion + } + }); + let len = buf.borrow().len(); + self.pwrite(pos, buf, default_c)?; + pos += len; + } + Ok(c) + } fn size(&self) -> Result; fn truncate(&self, len: usize, c: Completion) -> Result; } diff --git a/core/io/unix.rs b/core/io/unix.rs index 9cb50a3f8..82f03ba77 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -1,15 +1,15 @@ +use super::{Completion, File, MemoryIO, OpenFlags, IO}; use crate::error::LimboError; +use crate::io::clock::{Clock, Instant}; use crate::io::common; use crate::Result; - -use super::{Completion, File, MemoryIO, OpenFlags, IO}; -use crate::io::clock::{Clock, Instant}; use polling::{Event, Events, Poller}; use rustix::{ fd::{AsFd, AsRawFd}, fs::{self, FlockOperation, OFlags, OpenOptionsExt}, io::Errno, }; +use std::os::fd::RawFd; use std::{ cell::{RefCell, UnsafeCell}, mem::MaybeUninit, @@ -40,11 +40,6 @@ impl OwnedCallbacks { self.as_mut().inline_count == 0 } - fn get(&self, fd: usize) -> Option<&CompletionCallback> { - let callbacks = unsafe { &mut *self.0.get() }; - callbacks.get(fd) - } - fn remove(&self, fd: usize) -> Option { let callbacks = unsafe { &mut *self.0.get() }; callbacks.remove(fd) @@ -135,16 +130,6 @@ impl Callbacks { } } - fn get(&self, fd: usize) -> Option<&CompletionCallback> { - if let Some(pos) = self.find_inline(fd) { - let (_, callback) = unsafe { self.inline_entries[pos].assume_init_ref() }; - return Some(callback); - } else if let Some(pos) = self.heap_entries.iter().position(|&(k, _)| k == fd) { - return Some(&self.heap_entries[pos].1); - } - None - } - fn remove(&mut self, fd: usize) -> Option { if let Some(pos) = self.find_inline(fd) { let (_, callback) = unsafe { self.inline_entries[pos].assume_init_read() }; @@ -213,6 +198,35 @@ impl Clock for UnixIO { } } +fn try_pwritev_raw( + fd: RawFd, + off: u64, + bufs: &[Arc>], + start_idx: usize, + start_off: usize, +) -> std::io::Result { + const MAX_IOV: usize = 1024; + let iov_len = std::cmp::min(bufs.len() - start_idx, MAX_IOV); + let mut iov = Vec::with_capacity(iov_len); + + for (i, b) in bufs.iter().enumerate().skip(start_idx).take(iov_len) { + let r = b.borrow(); // borrow just to get pointer/len + let s = r.as_slice(); + let s = if i == start_idx { &s[start_off..] } else { s }; + iov.push(libc::iovec { + iov_base: s.as_ptr() as *mut _, + iov_len: s.len(), + }); + } + + let n = unsafe { libc::pwritev(fd, iov.as_ptr(), iov.len() as i32, off as i64) }; + if n < 0 { + Err(std::io::Error::last_os_error()) + } else { + Ok(n as usize) + } +} + impl IO for UnixIO { fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result> { trace!("open_file(path = {})", path); @@ -243,46 +257,129 @@ impl IO for UnixIO { if self.callbacks.is_empty() { return Ok(()); } + self.events.clear(); trace!("run_once() waits for events"); self.poller.wait(self.events.as_mut(), None)?; for event in self.events.iter() { - if let Some(cf) = self.callbacks.get(event.key) { - let result = match cf { - CompletionCallback::Read(ref file, ref c, pos) => { - let file = file.lock().unwrap(); - let r = c.as_read(); - let mut buf = r.buf_mut(); - rustix::io::pread(file.as_fd(), buf.as_mut_slice(), *pos as u64) - } - CompletionCallback::Write(ref file, _, ref buf, pos) => { - let file = file.lock().unwrap(); - let buf = buf.borrow(); - rustix::io::pwrite(file.as_fd(), buf.as_slice(), *pos as u64) - } - }; - match result { - Ok(n) => { - let cf = self - .callbacks - .remove(event.key) - .expect("callback should exist"); - match cf { - CompletionCallback::Read(_, c, _) => c.complete(0), - CompletionCallback::Write(_, c, _, _) => c.complete(n as i32), - } - } - Err(Errno::AGAIN) => (), - Err(e) => { - self.callbacks.remove(event.key); + let key = event.key; + let cb = match self.callbacks.remove(key) { + Some(cb) => cb, + None => continue, // could have been completed/removed already + }; - trace!("run_once() error: {}", e); - return Err(e.into()); + match cb { + CompletionCallback::Read(ref file, c, pos) => { + let f = file + .lock() + .map_err(|e| LimboError::LockingError(e.to_string()))?; + let r = c.as_read(); + let mut buf = r.buf_mut(); + match rustix::io::pread(f.as_fd(), buf.as_mut_slice(), pos as u64) { + Ok(n) => c.complete(n as i32), + Err(Errno::AGAIN) => { + // re-arm + unsafe { self.poller.as_mut().add(&f.as_fd(), Event::readable(key))? }; + self.callbacks.as_mut().insert( + key, + CompletionCallback::Read(file.clone(), c.clone(), pos), + ); + } + Err(e) => return Err(e.into()), + } + } + + CompletionCallback::Write(ref file, c, buf, pos) => { + let f = file + .lock() + .map_err(|e| LimboError::LockingError(e.to_string()))?; + let b = buf.borrow(); + match rustix::io::pwrite(f.as_fd(), b.as_slice(), pos as u64) { + Ok(n) => c.complete(n as i32), + Err(Errno::AGAIN) => { + unsafe { self.poller.as_mut().add(&f.as_fd(), Event::writable(key))? }; + self.callbacks.as_mut().insert( + key, + CompletionCallback::Write(file.clone(), c, buf.clone(), pos), + ); + } + Err(e) => return Err(e.into()), + } + } + + CompletionCallback::Writev(file, c, bufs, mut pos, mut idx, mut off) => { + let f = file + .lock() + .map_err(|e| LimboError::LockingError(e.to_string()))?; + // keep trying until WouldBlock or we're done with this event + match try_pwritev_raw(f.as_raw_fd(), pos as u64, &bufs, idx, off) { + Ok(written) => { + // advance through buffers + let mut rem = written; + while rem > 0 { + let len = { + let r = bufs[idx].borrow(); + r.len() + }; + let left = len - off; + if rem < left { + off += rem; + rem = 0; + } else { + rem -= left; + idx += 1; + off = 0; + if idx == bufs.len() { + break; + } + } + } + pos += written; + + if idx == bufs.len() { + c.complete(pos as i32); + } else { + // Not finished; re-arm and store updated state + unsafe { + self.poller.as_mut().add(&f.as_fd(), Event::writable(key))? + }; + self.callbacks.as_mut().insert( + key, + CompletionCallback::Writev( + file.clone(), + c.clone(), + bufs, + pos, + idx, + off, + ), + ); + } + break; + } + Err(e) if e.kind() == ErrorKind::WouldBlock => { + // re-arm with same state + unsafe { self.poller.as_mut().add(&f.as_fd(), Event::writable(key))? }; + self.callbacks.as_mut().insert( + key, + CompletionCallback::Writev( + file.clone(), + c.clone(), + bufs, + pos, + idx, + off, + ), + ); + break; + } + Err(e) => return Err(e.into()), } } } } + Ok(()) } @@ -312,6 +409,14 @@ enum CompletionCallback { Arc>, usize, ), + Writev( + Arc>, + Arc, + Vec>>, + usize, // absolute file offset + usize, // buf index + usize, // intra-buf offset + ), } pub struct UnixFile<'io> { @@ -432,7 +537,59 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::TRACE)] +<<<<<<< HEAD fn sync(&self, c: Completion) -> Result { +||||||| parent of 7f48531b (batch backfilling pages when checkpointing) + fn sync(&self, c: Arc) -> Result> { +======= + fn pwritev( + &self, + pos: usize, + buffers: Vec>>, + c: Arc, + ) -> Result> { + let file = self + .file + .lock() + .map_err(|e| LimboError::LockingError(e.to_string()))?; + + match try_pwritev_raw(file.as_raw_fd(), pos as u64, &buffers, 0, 0) { + Ok(written) => { + trace!("pwritev wrote {written}"); + c.complete(written as i32); + Ok(c) + } + Err(e) => { + if e.kind() == ErrorKind::WouldBlock { + trace!("pwritev blocks"); + } else { + return Err(e.into()); + } + // Set up state so we can resume later + let fd = file.as_raw_fd(); + self.poller + .add(&file.as_fd(), Event::writable(fd as usize))?; + let buf_idx = 0; + let buf_offset = 0; + self.callbacks.insert( + fd as usize, + CompletionCallback::Writev( + self.file.clone(), + c.clone(), + buffers, + pos, + buf_idx, + buf_offset, + ), + ); + Ok(c) + } + } + } + + #[instrument(err, skip_all, level = Level::TRACE)] + fn sync(&self, c: Arc) -> Result> { +>>>>>>> 7f48531b (batch backfilling pages when checkpointing) let file = self.file.lock().unwrap(); let result = fs::fsync(file.as_fd()); match result { diff --git a/core/storage/database.rs b/core/storage/database.rs index fd2555b59..ff474a436 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -16,7 +16,14 @@ pub trait DatabaseStorage: Send + Sync { buffer: Arc>, c: Completion, ) -> Result; - fn sync(&self, c: Completion) -> Result; + fn write_pages( + &self, + first_page_idx: usize, + page_size: usize, + buffers: Vec>>, + c: Completion, + ) -> Result; + fn sync(&self, c: Completion) -> Result<()>; fn size(&self) -> Result; fn truncate(&self, len: usize, c: Completion) -> Result; } @@ -61,6 +68,22 @@ impl DatabaseStorage for DatabaseFile { self.file.pwrite(pos, buffer, c) } + fn write_pages( + &self, + page_idx: usize, + page_size: usize, + buffers: Vec>>, + c: Completion, + ) -> Result<()> { + assert!(page_idx > 0); + assert!(page_size >= 512); + assert!(page_size <= 65536); + assert_eq!(page_size & (page_size - 1), 0); + let pos = (page_idx - 1) * page_size; + let c = self.file.pwritev(pos, buffers, c)?; + Ok(c) + } + #[instrument(skip_all, level = Level::DEBUG)] fn sync(&self, c: Completion) -> Result { self.file.sync(c) @@ -120,6 +143,22 @@ impl DatabaseStorage for FileMemoryStorage { self.file.pwrite(pos, buffer, c) } + fn write_pages( + &self, + page_idx: usize, + page_size: usize, + buffer: Vec>>, + c: Completion, + ) -> Result<()> { + assert!(page_idx > 0); + assert!(page_size >= 512); + assert!(page_size <= 65536); + assert_eq!(page_size & (page_size - 1), 0); + let pos = (page_idx - 1) * page_size; + let c = self.file.pwritev(pos, buffer, c)?; + Ok(c) + } + #[instrument(skip_all, level = Level::DEBUG)] fn sync(&self, c: Completion) -> Result { self.file.sync(c) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index a31094f19..9889988ea 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -346,7 +346,7 @@ pub struct Pager { /// Cache page_size and reserved_space at Pager init and reuse for subsequent /// `usable_space` calls. TODO: Invalidate reserved_space when we add the functionality /// to change it. - page_size: Cell>, + pub(crate) page_size: Cell>, reserved_space: OnceCell, free_page_state: RefCell, } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 829f049b6..78d6a5dd5 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -58,6 +58,7 @@ use crate::storage::btree::{payload_overflow_threshold_max, payload_overflow_thr use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::pager::Pager; +use crate::storage::wal::{BatchItem, PendingFlush}; use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype}; use crate::{turso_assert, File, Result, WalFileShared}; use std::cell::{RefCell, UnsafeCell}; @@ -853,10 +854,57 @@ pub fn begin_write_btree_page( } #[instrument(skip_all, level = Level::DEBUG)] -pub fn begin_sync( - db_file: Arc, - syncing: Rc>, -) -> Result { +pub fn begin_write_btree_pages_writev( + pager: &Pager, + batch: &[BatchItem], + write_counter: Rc>, +) -> Result { + if batch.is_empty() { + return Ok(PendingFlush::default()); + } + + let mut run = batch.to_vec(); + run.sort_by_key(|b| b.id); + + let page_sz = pager.page_size.get().unwrap_or(DEFAULT_PAGE_SIZE) as usize; + let done = Arc::new(AtomicBool::new(false)); + + let mut all_ids = Vec::with_capacity(run.len()); + let mut start = 0; + while start < run.len() { + let mut end = start + 1; + while end < run.len() && run[end].id == run[end - 1].id + 1 { + end += 1; + } + + // submit contiguous run + let first = run[start].id; + let bufs: Vec<_> = run[start..end].iter().map(|b| b.buf.clone()).collect(); + all_ids.extend(run[start..end].iter().map(|b| b.id)); + + *write_counter.borrow_mut() += 1; + let wc = write_counter.clone(); + let done_clone = done.clone(); + + let c = Completion::new_write(move |_| { + // one run finished + *wc.borrow_mut() -= 1; + if wc.borrow().eq(&0) { + // last run of this batch is done + done_clone.store(true, Ordering::Release); + } + }); + pager.db_file.write_pages(first, page_sz, bufs, c)?; + start = end; + } + Ok(PendingFlush { + pages: all_ids, + done, + }) +} + +#[instrument(skip_all, level = Level::DEBUG)] +pub fn begin_sync(db_file: Arc, syncing: Rc>) -> Result<()> { assert!(!*syncing.borrow()); *syncing.borrow_mut() = true; let completion = Completion::new_sync(move |_| { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index eb55e9dc2..c4f265e7b 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -20,8 +20,8 @@ use crate::fast_lock::SpinLock; use crate::io::{File, IO}; use crate::result::LimboResult; use crate::storage::sqlite3_ondisk::{ - begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, prepare_wal_frame, - WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, + begin_read_wal_frame, begin_read_wal_frame_raw, begin_write_btree_pages_writev, + finish_read_page, prepare_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; use crate::types::IOResult; use crate::{turso_assert, Buffer, LimboError, Result}; @@ -31,7 +31,7 @@ use self::sqlite3_ondisk::{checksum_wal, PageContent, WAL_MAGIC_BE, WAL_MAGIC_LE use super::buffer_pool::BufferPool; use super::pager::{PageRef, Pager}; -use super::sqlite3_ondisk::{self, begin_write_btree_page, WalHeader}; +use super::sqlite3_ondisk::{self, WalHeader}; pub const READMARK_NOT_USED: u32 = 0xffffffff; @@ -393,11 +393,20 @@ pub enum CheckpointState { Start, ReadFrame, WaitReadFrame, - WritePage, - WaitWritePage, + AccumulatePage, + FlushBatch, + WaitFlush, Done, } +const CKPT_BATCH_PAGES: usize = 256; + +#[derive(Clone)] +pub(super) struct BatchItem { + pub(super) id: usize, + pub(super) buf: Arc>, +} + // Checkpointing is a state machine that has multiple steps. Since there are multiple steps we save // in flight information of the checkpoint in OngoingCheckpoint. page is just a helper Page to do // page operations like reading a frame to a page, and writing a page to disk. This page should not @@ -407,13 +416,37 @@ pub enum CheckpointState { // current_page is a helper to iterate through all the pages that might have a frame in the safe // range. This is inefficient for now. struct OngoingCheckpoint { - page: PageRef, + scratch: PageRef, + batch: Vec, state: CheckpointState, + pending_flushes: Vec, min_frame: u64, max_frame: u64, current_page: u64, } +pub(super) struct PendingFlush { + // page ids to clear + pub(super) pages: Vec, + // completion flag set by IO callback + pub(super) done: Arc, +} + +impl Default for PendingFlush { + fn default() -> Self { + Self::new() + } +} + +impl PendingFlush { + pub fn new() -> Self { + Self { + pages: Vec::with_capacity(CKPT_BATCH_PAGES), + done: Arc::new(AtomicBool::new(false)), + } + } +} + impl fmt::Debug for OngoingCheckpoint { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("OngoingCheckpoint") @@ -665,6 +698,30 @@ impl Drop for CheckpointLocks { } } +fn take_page_into_batch(scratch: &PageRef, pool: &Arc, batch: &mut Vec) { + // grab id and buffer + let id = scratch.get().id; + let buf = scratch.get_contents().buffer.clone(); // current data + batch.push(BatchItem { id, buf }); + // give scratch a brand-new empty buffer for the next read + reinit_scratch_buffer(scratch, pool); +} + +fn reinit_scratch_buffer(scratch: &PageRef, pool: &Arc) { + let raw = pool.get(); + let pool_clone = pool.clone(); + let drop_fn = Rc::new(move |b| { + pool_clone.put(b); + }); + let new_buf = Arc::new(RefCell::new(Buffer::new(raw, drop_fn))); + // replace contents + unsafe { + let inner = &mut *scratch.inner.get(); + inner.contents = Some(PageContent::new(0, new_buf)); + inner.flags.store(0, Ordering::Relaxed); + } +} + impl Wal for WalFile { /// Begin a read transaction. The caller must ensure that there is not already /// an ongoing read transaction. @@ -1204,7 +1261,9 @@ impl WalFile { max_frame: unsafe { (*shared.get()).max_frame.load(Ordering::SeqCst) }, shared, ongoing_checkpoint: OngoingCheckpoint { - page: checkpoint_page, + scratch: checkpoint_page, + batch: Vec::new(), + pending_flushes: Vec::new(), state: CheckpointState::Start, min_frame: 0, max_frame: 0, @@ -1390,27 +1449,58 @@ impl WalFile { if self.ongoing_checkpoint.page.is_locked() { return Ok(IOResult::IO); } else { - self.ongoing_checkpoint.state = CheckpointState::WritePage; + self.ongoing_checkpoint.state = CheckpointState::AccumulatePage; } } - CheckpointState::WritePage => { - self.ongoing_checkpoint.page.set_dirty(); - let _ = begin_write_btree_page( - pager, - &self.ongoing_checkpoint.page, - write_counter.clone(), - )?; - self.ongoing_checkpoint.state = CheckpointState::WaitWritePage; + CheckpointState::AccumulatePage => { + // mark before batching + self.ongoing_checkpoint.scratch.set_dirty(); + take_page_into_batch( + &self.ongoing_checkpoint.scratch, + &self.buffer_pool, + &mut self.ongoing_checkpoint.batch, + ); + + let more_pages = (self.ongoing_checkpoint.current_page as usize) + < self.get_shared().pages_in_frames.lock().len() - 1; + + if self.ongoing_checkpoint.batch.len() < CKPT_BATCH_PAGES && more_pages { + self.ongoing_checkpoint.current_page += 1; + self.ongoing_checkpoint.state = CheckpointState::ReadFrame; + } else { + self.ongoing_checkpoint.state = CheckpointState::FlushBatch; + } } - CheckpointState::WaitWritePage => { - if *write_counter.borrow() > 0 { + CheckpointState::FlushBatch => { + self.ongoing_checkpoint + .pending_flushes + .push(begin_write_btree_pages_writev( + pager, + &self.ongoing_checkpoint.batch, + write_counter.clone(), + )?); + // batch is queued + self.ongoing_checkpoint.batch.clear(); + self.ongoing_checkpoint.state = CheckpointState::WaitFlush; + return Ok(IOResult::IO); + } + CheckpointState::WaitFlush => { + if self + .ongoing_checkpoint + .pending_flushes + .iter() + .any(|pf| !pf.done.load(Ordering::Acquire)) + { return Ok(IOResult::IO); } - // If page was in cache clear it. - if let Some(page) = pager.cache_get(self.ongoing_checkpoint.page.get().id) { - page.clear_dirty(); + for pf in self.ongoing_checkpoint.pending_flushes.drain(..) { + for id in pf.pages { + if let Some(p) = pager.cache_get(id) { + p.clear_dirty(); + } + } } - self.ongoing_checkpoint.page.clear_dirty(); + // done with batch let shared = self.get_shared(); if (self.ongoing_checkpoint.current_page as usize) < shared.pages_in_frames.lock().len() From d189f66328328c174c07c12ea3900f45a2452e7e Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Fri, 25 Jul 2025 19:05:52 -0400 Subject: [PATCH 02/18] Add pwritev to wasm/js api --- bindings/javascript/src/lib.rs | 9 +++++++++ testing/cli_tests/vfs_bench.py | 5 +++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index e15cdaf7f..02911294e 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -718,6 +718,15 @@ impl turso_core::DatabaseStorage for DatabaseFile { let pos = (page_idx - 1) * size; self.file.pwrite(pos, buffer, c) } + fn write_pages( + &self, + _first_page_idx: usize, + _page_size: usize, + _buffers: Vec>>, + _c: turso_core::Completion, + ) -> turso_core::Result<()> { + todo!(); + } fn sync(&self, c: turso_core::Completion) -> turso_core::Result { self.file.sync(c) diff --git a/testing/cli_tests/vfs_bench.py b/testing/cli_tests/vfs_bench.py index b54ababf3..88abb6e6a 100644 --- a/testing/cli_tests/vfs_bench.py +++ b/testing/cli_tests/vfs_bench.py @@ -13,7 +13,7 @@ from typing import Dict from cli_tests.console import error, info, test from cli_tests.test_turso_cli import TestTursoShell -LIMBO_BIN = Path("./target/release/tursodb") +LIMBO_BIN = Path("./target/debug/tursodb") DB_FILE = Path("testing/temp.db") vfs_list = ["syscall"] if platform.system() == "Linux": @@ -79,11 +79,13 @@ def main() -> None: averages: Dict[str, float] = {} for vfs in vfs_list: + setup_temp_db() test(f"\n### VFS: {vfs} ###") times = bench_one(vfs, sql, iterations) info(f"All times ({vfs}):", " ".join(f"{t:.6f}" for t in times)) avg = statistics.mean(times) averages[vfs] = avg + cleanup_temp_db() info("\n" + "-" * 60) info("Average runtime per VFS") @@ -106,7 +108,6 @@ def main() -> None: faster_slower = "slower" if pct > 0 else "faster" info(f"{vfs:<{name_pad}} : {avg:.6f} ({abs(pct):.1f}% {faster_slower} than {baseline})") info("-" * 60) - cleanup_temp_db() if __name__ == "__main__": From 62f004c8986333cb029dc52fb8ee95321fbbdf69 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Fri, 25 Jul 2025 20:31:23 -0400 Subject: [PATCH 03/18] Fix write counter for writev batching in checkpoint --- core/storage/pager.rs | 5 +++-- core/storage/sqlite3_ondisk.rs | 26 +++++++++++++++++++------- core/storage/wal.rs | 6 +++--- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 9889988ea..768b8c6c1 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1303,11 +1303,12 @@ impl Pager { return Ok(CheckpointResult::default()); } - let counter = Rc::new(RefCell::new(0)); - let mut checkpoint_result = self.io.block(|| { + let write_counter = Rc::new(RefCell::new(0)); + let checkpoint_result = self.io.block(|| { self.wal .borrow_mut() .checkpoint(self, counter.clone(), mode) + .map_err(|err| panic!("error while clearing cache {err}")) })?; if checkpoint_result.everything_backfilled() diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 78d6a5dd5..b1c07a86a 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -61,7 +61,7 @@ use crate::storage::pager::Pager; use crate::storage::wal::{BatchItem, PendingFlush}; use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype}; use crate::{turso_assert, File, Result, WalFileShared}; -use std::cell::{RefCell, UnsafeCell}; +use std::cell::{Cell, RefCell, UnsafeCell}; use std::collections::HashMap; use std::mem::MaybeUninit; use std::pin::Pin; @@ -857,7 +857,8 @@ pub fn begin_write_btree_page( pub fn begin_write_btree_pages_writev( pager: &Pager, batch: &[BatchItem], - write_counter: Rc>, + // track writes for each flush series + write_counter: Rc>, ) -> Result { if batch.is_empty() { return Ok(PendingFlush::default()); @@ -878,23 +879,34 @@ pub fn begin_write_btree_pages_writev( } // submit contiguous run - let first = run[start].id; + let first_id = run[start].id; let bufs: Vec<_> = run[start..end].iter().map(|b| b.buf.clone()).collect(); all_ids.extend(run[start..end].iter().map(|b| b.id)); - *write_counter.borrow_mut() += 1; + write_counter.set(write_counter.get() + 1); let wc = write_counter.clone(); let done_clone = done.clone(); let c = Completion::new_write(move |_| { // one run finished - *wc.borrow_mut() -= 1; - if wc.borrow().eq(&0) { + wc.set(wc.get() - 1); + if wc.get().eq(&0) { // last run of this batch is done done_clone.store(true, Ordering::Release); } }); - pager.db_file.write_pages(first, page_sz, bufs, c)?; + pager + .db_file + .write_pages(first_id, page_sz, bufs, c) + .inspect_err(|e| { + tracing::error!( + "Failed to write pages {}-{}: {}", + first_id, + first_id + (end - start) - 1, + e + ); + write_counter.set(write_counter.get() - 1); + })?; start = end; } Ok(PendingFlush { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index c4f265e7b..9e345a59c 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -399,7 +399,7 @@ pub enum CheckpointState { Done, } -const CKPT_BATCH_PAGES: usize = 256; +const CKPT_BATCH_PAGES: usize = 512; #[derive(Clone)] pub(super) struct BatchItem { @@ -416,7 +416,7 @@ pub(super) struct BatchItem { // current_page is a helper to iterate through all the pages that might have a frame in the safe // range. This is inefficient for now. struct OngoingCheckpoint { - scratch: PageRef, + scratch_page: PageRef, batch: Vec, state: CheckpointState, pending_flushes: Vec, @@ -1261,7 +1261,7 @@ impl WalFile { max_frame: unsafe { (*shared.get()).max_frame.load(Ordering::SeqCst) }, shared, ongoing_checkpoint: OngoingCheckpoint { - scratch: checkpoint_page, + scratch_page: checkpoint_page, batch: Vec::new(), pending_flushes: Vec::new(), state: CheckpointState::Start, From 5f01eaae3531da286969a1f9696efa5bf1f0311e Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Fri, 25 Jul 2025 22:09:12 -0400 Subject: [PATCH 04/18] Fix default io:;File::pwritev impl --- core/io/mod.rs | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/core/io/mod.rs b/core/io/mod.rs index ab299ef64..f7766ef84 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -24,21 +24,36 @@ pub trait File: Send + Sync { buffers: Vec>>, c: Completion, ) -> Result { - // FIXME: for now, stupid default so i dont have to impl for all backends - let counter = Rc::new(Cell::new(0)); - let len = buffers.len(); + use std::sync::atomic::{AtomicUsize, Ordering}; + if buffers.is_empty() { + c.complete(0); + return Ok(c); + } + // naive default implementation can be overridden on backends where it makes sense to let mut pos = pos; + let outstanding = Arc::new(AtomicUsize::new(buffers.len())); + let total_written = Arc::new(AtomicUsize::new(0)); + for buf in buffers { - let _counter = counter.clone(); - let _c = c.clone(); - let default_c = Completion::new_write(move |_| { - _counter.set(_counter.get() + 1); - if _counter.get() == len { - _c.complete(len as i32); // complete the original completion - } - }); let len = buf.borrow().len(); - self.pwrite(pos, buf, default_c)?; + let child_c = { + let c_main = c.clone(); + let outstanding = outstanding.clone(); + let total_written = total_written.clone(); + Completion::new_write(move |n| { + // accumulate bytes actually reported by the backend + total_written.fetch_add(n as usize, Ordering::Relaxed); + if outstanding.fetch_sub(1, Ordering::AcqRel) == 1 { + // last one finished + c_main.complete(total_written.load(Ordering::Acquire) as i32); + } + }) + }; + if let Err(e) = self.pwrite(pos, buf.clone(), child_c) { + // best-effort: mark as done so caller won't wait forever + c.complete(-1); + return Err(e); + } pos += len; } Ok(c) From daec8aeb22fb69a6527a89f31616a51d5162c1e3 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Fri, 25 Jul 2025 22:29:15 -0400 Subject: [PATCH 05/18] impl pwritev for simulator file --- simulator/runner/file.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index ba3680333..9ed80e34c 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -222,6 +222,34 @@ impl File for SimulatorFile { Ok(c) } + fn pwritev( + &self, + pos: usize, + buffers: Vec>>, + c: turso_core::Completion, + ) -> Result { + self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1); + if self.fault.get() { + tracing::debug!("pwritev fault"); + self.nr_pwrite_faults.set(self.nr_pwrite_faults.get() + 1); + return Err(turso_core::LimboError::InternalError( + FAULT_ERROR_MSG.into(), + )); + } + if let Some(latency) = self.generate_latency_duration() { + let cloned_c = c.clone(); + let op = + Box::new(move |file: &SimulatorFile| file.inner.pwritev(pos, buffers, cloned_c)); + self.queued_io + .borrow_mut() + .push(DelayedIo { time: latency, op }); + Ok(c) + } else { + let c = self.inner.pwritev(pos, buffers, c)?; + Ok(c) + } + } + fn size(&self) -> Result { self.inner.size() } From 88445328a586ddb712109a18d68d4e2357fd001f Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Fri, 25 Jul 2025 23:56:08 -0400 Subject: [PATCH 06/18] Handle partial writes for pwritev calls in io_uring and fix JS bindings --- bindings/javascript/src/lib.rs | 12 +- core/io/io_uring.rs | 274 +++++++++++++++++++++------------ core/storage/wal.rs | 3 +- 3 files changed, 187 insertions(+), 102 deletions(-) diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index 02911294e..6b4b1992f 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -720,12 +720,14 @@ impl turso_core::DatabaseStorage for DatabaseFile { } fn write_pages( &self, - _first_page_idx: usize, - _page_size: usize, - _buffers: Vec>>, - _c: turso_core::Completion, + page_idx: usize, + page_size: usize, + buffers: Vec>>, + c: turso_core::Completion, ) -> turso_core::Result<()> { - todo!(); + let pos = (page_idx - 1) * page_size; + self.file.pwritev(pos, buffers, c.into())?; + Ok(()) } fn sync(&self, c: turso_core::Completion) -> turso_core::Result { diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 51c2c85a8..963e2fe28 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -5,36 +5,18 @@ use crate::io::clock::{Clock, Instant}; use crate::{turso_assert, LimboError, MemoryIO, Result}; use rustix::fs::{self, FlockOperation, OFlags}; use std::cell::RefCell; -use std::collections::VecDeque; -use std::fmt; +use std::collections::{HashMap, VecDeque}; use std::io::ErrorKind; use std::os::fd::AsFd; use std::os::unix::io::AsRawFd; use std::rc::Rc; use std::sync::Arc; -use thiserror::Error; use tracing::{debug, trace}; const ENTRIES: u32 = 512; const SQPOLL_IDLE: u32 = 1000; const FILES: u32 = 8; -#[derive(Debug, Error)] -enum UringIOError { - IOUringCQError(i32), -} - -impl fmt::Display for UringIOError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - UringIOError::IOUringCQError(code) => write!( - f, - "IOUring completion queue error occurred with code {code}", - ), - } - } -} - pub struct UringIO { inner: Rc>, } @@ -45,6 +27,7 @@ unsafe impl Sync for UringIO {} struct WrappedIOUring { ring: io_uring::IoUring, pending_ops: usize, + writev_states: HashMap, } struct InnerUringIO { @@ -69,6 +52,7 @@ impl UringIO { ring: WrappedIOUring { ring, pending_ops: 0, + writev_states: HashMap::new(), }, free_files: (0..FILES).collect(), }; @@ -79,6 +63,86 @@ impl UringIO { } } +macro_rules! with_fd { + ($file:expr, |$fd:ident| $body:expr) => { + match $file.id { + Some(id) => { + let $fd = io_uring::types::Fixed(id); + $body + } + None => { + let $fd = io_uring::types::Fd($file.file.as_raw_fd()); + $body + } + } + }; +} + +struct WritevState { + // fixed fd slot or 0xFFFF_FFFF if none + file_id: u32, + // absolute file offset for next submit + pos: usize, + // current buffer index in `bufs` + idx: usize, + // intra-buffer offset + off: usize, + bufs: Vec>>, + // completion returned to caller + user_c: Arc, + // we keep the last iovec allocation alive until CQE: + // raw ptr to Box<[iovec]> + last_iov: *mut libc::iovec, + last_iov_len: usize, +} + +impl WritevState { + fn remaining(&self) -> usize { + let mut total = 0; + for (i, b) in self.bufs.iter().enumerate().skip(self.idx) { + let r = b.borrow(); + let len = r.len(); + total += if i == self.idx { len - self.off } else { len }; + } + total + } + + /// Advance (idx, off, pos) after written bytes + fn advance(&mut self, written: usize) { + let mut rem = written; + while rem > 0 { + let len = { + let r = self.bufs[self.idx].borrow(); + r.len() + }; + let left = len - self.off; + if rem < left { + self.off += rem; + self.pos += rem; + rem = 0; + } else { + rem -= left; + self.pos += left; + self.idx += 1; + self.off = 0; + } + } + } + + fn free_last_iov(&mut self) { + if !self.last_iov.is_null() { + unsafe { + drop(Box::from_raw(core::slice::from_raw_parts_mut( + self.last_iov, + self.last_iov_len, + ))) + }; + self.last_iov = core::ptr::null_mut(); + self.last_iov_len = 0; + } + } +} + impl InnerUringIO { fn register_file(&mut self, fd: i32) -> Result { if let Some(slot) = self.free_files.pop_front() { @@ -132,6 +196,41 @@ impl WrappedIOUring { fn empty(&self) -> bool { self.pending_ops == 0 } + + fn submit_writev(&mut self, key: u64) { + let st = self.writev_states.get_mut(&key).expect("state must exist"); + // build iovecs for the remaining slice (respect UIO_MAXIOV) + const MAX_IOV: usize = libc::UIO_MAXIOV as usize; + let mut iov = Vec::with_capacity(MAX_IOV); + for (i, b) in st.bufs.iter().enumerate().skip(st.idx).take(MAX_IOV) { + let r = b.borrow(); + let s = r.as_slice(); + let slice = if i == st.idx { &s[st.off..] } else { s }; + if slice.is_empty() { + continue; + } + iov.push(libc::iovec { + iov_base: slice.as_ptr() as *mut _, + iov_len: slice.len(), + }); + } + + // keep iov alive until CQE + let boxed = iov.into_boxed_slice(); + let ptr = boxed.as_ptr() as *mut libc::iovec; + let len = boxed.len(); + st.free_last_iov(); + st.last_iov = ptr; + st.last_iov_len = len; + // leak; freed when CQE processed + let _ = Box::into_raw(boxed); + let entry = + io_uring::opcode::Writev::new(io_uring::types::Fixed(st.file_id), ptr, len as u32) + .offset(st.pos as u64) + .build() + .user_data(key); + self.submit_entry(&entry); + } } impl IO for UringIO { @@ -175,35 +274,53 @@ impl IO for UringIO { } fn run_once(&self) -> Result<()> { - trace!("run_once()"); - let mut inner = self.inner.borrow_mut(); - let ring = &mut inner.ring; + { + let mut inner = self.inner.borrow_mut(); + let ring = &mut inner.ring; - if ring.empty() { - return Ok(()); - } - - ring.wait_for_completion()?; - while let Some(cqe) = ring.ring.completion().next() { - ring.pending_ops -= 1; - let result = cqe.result(); - if result < 0 { - return Err(LimboError::UringIOError(format!( - "{} cqe: {:?}", - UringIOError::IOUringCQError(result), - cqe - ))); + if ring.empty() { + return Ok(()); } let ud = cqe.user_data(); turso_assert!(ud > 0, "therea are no linked timeouts or cancelations, all cqe user_data should be valid arc pointers"); - if ud == 0 { - // we currently don't have any linked timeouts or cancelations, but just in case - // lets guard against this case - tracing::error!("Received completion with user_data 0"); - continue; - } - completion_from_key(ud).complete(result); + ring.wait_for_completion()?; } + loop { + let (had, user_data, result) = { + let mut inner = self.inner.borrow_mut(); + let mut cq = inner.ring.ring.completion(); + if let Some(cqe) = cq.next() { + (true, cqe.user_data(), cqe.result()) + } else { + (false, 0, 0) + } + }; + if !had { + break; + } + self.inner.borrow_mut().ring.pending_ops -= 1; + + let mut inner = self.inner.borrow_mut(); + if let Some(mut st) = inner.ring.writev_states.remove(&user_data) { + if result < 0 { + st.free_last_iov(); + st.user_c.complete(result); + } else { + let written = result as usize; + st.free_last_iov(); + st.advance(written); + if st.remaining() == 0 { + st.user_c.complete(st.pos as i32); + } else { + inner.ring.writev_states.insert(user_data, st); + inner.ring.submit_writev(user_data); // safe: no CQ borrow alive + } + } + } else { + completion_from_key(user_data).complete(result); + } + } + Ok(()) } @@ -251,21 +368,6 @@ pub struct UringFile { unsafe impl Send for UringFile {} unsafe impl Sync for UringFile {} -macro_rules! with_fd { - ($file:expr, |$fd:ident| $body:expr) => { - match $file.id { - Some(id) => { - let $fd = io_uring::types::Fixed(id); - $body - } - None => { - let $fd = io_uring::types::Fd($file.file.as_raw_fd()); - $body - } - } - }; -} - impl File for UringFile { fn lock_file(&self, exclusive: bool) -> Result<()> { let fd = self.file.as_fd(); @@ -360,46 +462,26 @@ impl File for UringFile { &self, pos: usize, bufs: Vec>>, - c: Arc, + user_c: Arc, ) -> Result> { - // build iovecs - let mut iovs: Vec = Vec::with_capacity(bufs.len()); - for b in &bufs { - let rb = b.borrow(); - iovs.push(libc::iovec { - iov_base: rb.as_ptr() as *mut _, - iov_len: rb.len(), - }); - } - // keep iovecs alive until completion - let boxed_iovs = iovs.into_boxed_slice(); - let iov_ptr = boxed_iovs.as_ptr(); - let iov_len = boxed_iovs.len() as u32; - // leak now, free in completion - let raw_iovs = Box::into_raw(boxed_iovs); - - let comp = { - // wrap original completion to free resources - let orig = c.clone(); - Box::new(move |res: i32| { - // reclaim iovecs - unsafe { - let _ = Box::from_raw(raw_iovs); - } - // forward to user closure - orig.complete(res); - }) - }; - let c = Arc::new(Completion::new_write(comp)); + // create state + let key = get_key(user_c.clone()); let mut io = self.io.borrow_mut(); - let e = with_fd!(self, |fd| { - io_uring::opcode::Writev::new(fd, iov_ptr, iov_len) - .offset(pos as u64) - .build() - .user_data(get_key(c.clone())) - }); - io.ring.submit_entry(&e); - Ok(c) + + let state = WritevState { + file_id: self.id.unwrap_or(u32::MAX), + pos, + idx: 0, + off: 0, + bufs, + user_c: user_c.clone(), + last_iov: core::ptr::null_mut(), + last_iov_len: 0, + }; + io.ring.writev_states.insert(key, state); + io.ring.submit_writev(key); + + Ok(user_c.clone()) } fn size(&self) -> Result { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 9e345a59c..7577849c2 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -701,7 +701,8 @@ impl Drop for CheckpointLocks { fn take_page_into_batch(scratch: &PageRef, pool: &Arc, batch: &mut Vec) { // grab id and buffer let id = scratch.get().id; - let buf = scratch.get_contents().buffer.clone(); // current data + let buf = scratch.get_contents().buffer.clone(); + scratch.pin(); // ensure it isnt evicted batch.push(BatchItem { id, buf }); // give scratch a brand-new empty buffer for the next read reinit_scratch_buffer(scratch, pool); From 0f94cdef034194d3cb2dda12c358c19eca037f13 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 26 Jul 2025 15:36:33 -0400 Subject: [PATCH 07/18] Fix io_uring pwritev to properly handle partial writes --- core/io/io_uring.rs | 221 +++++++++++++++++++++++++++++--------------- core/storage/wal.rs | 2 +- 2 files changed, 145 insertions(+), 78 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 963e2fe28..ccf213e12 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -2,15 +2,18 @@ use super::{common, Completion, CompletionInner, File, OpenFlags, IO}; use crate::io::clock::{Clock, Instant}; +use crate::storage::wal::CKPT_BATCH_PAGES; use crate::{turso_assert, LimboError, MemoryIO, Result}; use rustix::fs::{self, FlockOperation, OFlags}; -use std::cell::RefCell; -use std::collections::{HashMap, VecDeque}; -use std::io::ErrorKind; -use std::os::fd::AsFd; -use std::os::unix::io::AsRawFd; -use std::rc::Rc; -use std::sync::Arc; +use std::{ + cell::RefCell, + collections::{HashMap, VecDeque}, + io::ErrorKind, + ops::Deref, + os::{fd::AsFd, unix::io::AsRawFd}, + rc::Rc, + sync::Arc, +}; use tracing::{debug, trace}; const ENTRIES: u32 = 512; @@ -65,31 +68,50 @@ impl UringIO { macro_rules! with_fd { ($file:expr, |$fd:ident| $body:expr) => { - match $file.id { + match $file.id() { Some(id) => { let $fd = io_uring::types::Fixed(id); $body } None => { - let $fd = io_uring::types::Fd($file.file.as_raw_fd()); + let $fd = io_uring::types::Fd($file.as_raw_fd()); $body } } }; } +enum Fd { + Fixed(u32), + RawFd(i32), +} + +impl Fd { + fn as_raw_fd(&self) -> i32 { + match self { + Fd::RawFd(fd) => *fd, + _ => unreachable!("only to be called on RawFd variant"), + } + } + fn id(&self) -> Option { + match self { + Fd::Fixed(id) => Some(*id), + Fd::RawFd(_) => None, + } + } +} + struct WritevState { - // fixed fd slot or 0xFFFF_FFFF if none - file_id: u32, + // fixed fd slot + file_id: Fd, // absolute file offset for next submit - pos: usize, + file_pos: usize, // current buffer index in `bufs` - idx: usize, + current_buffer_idx: usize, // intra-buffer offset - off: usize, + current_buffer_offset: usize, + total_written: usize, bufs: Vec>>, - // completion returned to caller - user_c: Arc, // we keep the last iovec allocation alive until CQE: // raw ptr to Box<[iovec]> last_iov: *mut libc::iovec, @@ -97,12 +119,32 @@ struct WritevState { } impl WritevState { + fn new(file: &UringFile, pos: usize, bufs: Vec>>) -> Self { + let file_id = match file.id() { + Some(id) => Fd::Fixed(id), + None => Fd::RawFd(file.as_raw_fd()), + }; + Self { + file_id, + file_pos: pos, + current_buffer_idx: 0, + current_buffer_offset: 0, + total_written: 0, + bufs, + last_iov: core::ptr::null_mut(), + last_iov_len: 0, + } + } fn remaining(&self) -> usize { let mut total = 0; - for (i, b) in self.bufs.iter().enumerate().skip(self.idx) { + for (i, b) in self.bufs.iter().enumerate().skip(self.current_buffer_idx) { let r = b.borrow(); let len = r.len(); - total += if i == self.idx { len - self.off } else { len }; + total += if i == self.current_buffer_idx { + len - self.current_buffer_offset + } else { + len + }; } total } @@ -112,21 +154,22 @@ impl WritevState { let mut rem = written; while rem > 0 { let len = { - let r = self.bufs[self.idx].borrow(); + let r = self.bufs[self.current_buffer_idx].borrow(); r.len() }; - let left = len - self.off; + let left = len - self.current_buffer_offset; if rem < left { - self.off += rem; - self.pos += rem; + self.current_buffer_offset += rem; + self.file_pos += rem; rem = 0; } else { rem -= left; - self.pos += left; - self.idx += 1; - self.off = 0; + self.file_pos += left; + self.current_buffer_idx += 1; + self.current_buffer_offset = 0; } } + self.total_written += written; } fn free_last_iov(&mut self) { @@ -188,7 +231,7 @@ impl WrappedIOUring { return Ok(()); } let wants = std::cmp::min(self.pending_ops, 8); - tracing::info!("Waiting for {wants} pending operations to complete"); + tracing::trace!("submit_and_wait for {wants} pending operations to complete"); self.ring.submit_and_wait(wants)?; Ok(()) } @@ -199,13 +242,23 @@ impl WrappedIOUring { fn submit_writev(&mut self, key: u64) { let st = self.writev_states.get_mut(&key).expect("state must exist"); - // build iovecs for the remaining slice (respect UIO_MAXIOV) - const MAX_IOV: usize = libc::UIO_MAXIOV as usize; - let mut iov = Vec::with_capacity(MAX_IOV); - for (i, b) in st.bufs.iter().enumerate().skip(st.idx).take(MAX_IOV) { + // the likelyhood of the whole batch size being contiguous is very low, so lets not pre-allocate more than half + let max = CKPT_BATCH_PAGES / 2; + let mut iov = Vec::with_capacity(max); + for (i, b) in st + .bufs + .iter() + .enumerate() + .skip(st.current_buffer_idx) + .take(max) + { let r = b.borrow(); let s = r.as_slice(); - let slice = if i == st.idx { &s[st.off..] } else { s }; + let slice = if i == st.current_buffer_idx { + &s[st.current_buffer_offset..] + } else { + s + }; if slice.is_empty() { continue; } @@ -224,11 +277,12 @@ impl WrappedIOUring { st.last_iov_len = len; // leak; freed when CQE processed let _ = Box::into_raw(boxed); - let entry = - io_uring::opcode::Writev::new(io_uring::types::Fixed(st.file_id), ptr, len as u32) - .offset(st.pos as u64) + let entry = with_fd!(st.file_id, |fd| { + io_uring::opcode::Writev::new(fd, ptr, len as u32) + .offset(st.file_pos as u64) .build() - .user_data(key); + .user_data(key) + }); self.submit_entry(&entry); } } @@ -274,53 +328,58 @@ impl IO for UringIO { } fn run_once(&self) -> Result<()> { - { - let mut inner = self.inner.borrow_mut(); - let ring = &mut inner.ring; + let mut inner = self.inner.borrow_mut(); + let ring = &mut inner.ring; - if ring.empty() { - return Ok(()); - } - let ud = cqe.user_data(); - turso_assert!(ud > 0, "therea are no linked timeouts or cancelations, all cqe user_data should be valid arc pointers"); - ring.wait_for_completion()?; + if ring.empty() { + return Ok(()); } - loop { - let (had, user_data, result) = { - let mut inner = self.inner.borrow_mut(); - let mut cq = inner.ring.ring.completion(); - if let Some(cqe) = cq.next() { - (true, cqe.user_data(), cqe.result()) - } else { - (false, 0, 0) + ring.wait_for_completion()?; + const MAX: usize = ENTRIES as usize; + // to circumvent borrowing rules, collect everything without the heap + let mut uds = [0u64; MAX]; + let mut ress = [0i32; MAX]; + let mut count = 0; + { + let cq = ring.ring.completion(); + for cqe in cq { + ring.pending_ops -= 1; + uds[count] = cqe.user_data(); + ress[count] = cqe.result(); + count += 1; + if count == MAX { + break; } - }; - if !had { - break; } - self.inner.borrow_mut().ring.pending_ops -= 1; + } - let mut inner = self.inner.borrow_mut(); - if let Some(mut st) = inner.ring.writev_states.remove(&user_data) { + for i in 0..count { + ring.pending_ops -= 1; + let user_data = uds[i]; + let result = ress[i]; + turso_assert!( + user_data != 0, + "user_data must not be zero, we dont submit linked timeouts or cancelations that would cause this" + ); + if let Some(mut st) = ring.writev_states.remove(&user_data) { if result < 0 { st.free_last_iov(); - st.user_c.complete(result); + completion_from_key(user_data).complete(result); } else { let written = result as usize; st.free_last_iov(); st.advance(written); if st.remaining() == 0 { - st.user_c.complete(st.pos as i32); + completion_from_key(user_data).complete(st.total_written as i32); } else { - inner.ring.writev_states.insert(user_data, st); - inner.ring.submit_writev(user_data); // safe: no CQ borrow alive + ring.writev_states.insert(user_data, st); + ring.submit_writev(user_data); } } } else { completion_from_key(user_data).complete(result); } } - Ok(()) } @@ -365,6 +424,19 @@ pub struct UringFile { id: Option, } +impl Deref for UringFile { + type Target = std::fs::File; + fn deref(&self) -> &Self::Target { + &self.file + } +} + +impl UringFile { + fn id(&self) -> Option { + self.id + } +} + unsafe impl Send for UringFile {} unsafe impl Sync for UringFile {} @@ -462,26 +534,21 @@ impl File for UringFile { &self, pos: usize, bufs: Vec>>, - user_c: Arc, + c: Arc, ) -> Result> { + // for a single buffer use pwrite directly + if bufs.len().eq(&1) { + return self.pwrite(pos, bufs[0].clone(), c.clone()); + } + tracing::trace!("pwritev(pos = {}, bufs.len() = {})", pos, bufs.len()); // create state - let key = get_key(user_c.clone()); + let key = get_key(c.clone()); let mut io = self.io.borrow_mut(); - let state = WritevState { - file_id: self.id.unwrap_or(u32::MAX), - pos, - idx: 0, - off: 0, - bufs, - user_c: user_c.clone(), - last_iov: core::ptr::null_mut(), - last_iov_len: 0, - }; + let state = WritevState::new(self, pos, bufs); io.ring.writev_states.insert(key, state); io.ring.submit_writev(key); - - Ok(user_c.clone()) + Ok(c.clone()) } fn size(&self) -> Result { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 7577849c2..5aa325af3 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -399,7 +399,7 @@ pub enum CheckpointState { Done, } -const CKPT_BATCH_PAGES: usize = 512; +pub const CKPT_BATCH_PAGES: usize = 512; #[derive(Clone)] pub(super) struct BatchItem { From b04128b5850c42a23a911bff5f14dd06d2eea28e Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 26 Jul 2025 15:40:43 -0400 Subject: [PATCH 08/18] Fix write_pages_vectored to properly track completion --- core/storage/sqlite3_ondisk.rs | 62 +++++++++++++++------------------- 1 file changed, 28 insertions(+), 34 deletions(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index b1c07a86a..d434ec255 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -61,12 +61,12 @@ use crate::storage::pager::Pager; use crate::storage::wal::{BatchItem, PendingFlush}; use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype}; use crate::{turso_assert, File, Result, WalFileShared}; -use std::cell::{Cell, RefCell, UnsafeCell}; +use std::cell::{RefCell, UnsafeCell}; use std::collections::HashMap; use std::mem::MaybeUninit; use std::pin::Pin; use std::rc::Rc; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; /// The size of the database header in bytes. @@ -854,61 +854,55 @@ pub fn begin_write_btree_page( } #[instrument(skip_all, level = Level::DEBUG)] -pub fn begin_write_btree_pages_writev( - pager: &Pager, - batch: &[BatchItem], - // track writes for each flush series - write_counter: Rc>, -) -> Result { +pub fn write_pages_vectored(pager: &Pager, batch: &[BatchItem]) -> Result { if batch.is_empty() { return Ok(PendingFlush::default()); } - let mut run = batch.to_vec(); run.sort_by_key(|b| b.id); let page_sz = pager.page_size.get().unwrap_or(DEFAULT_PAGE_SIZE) as usize; - let done = Arc::new(AtomicBool::new(false)); - let mut all_ids = Vec::with_capacity(run.len()); + // count runs + let mut starts = Vec::with_capacity(5); // arbitrary initialization let mut start = 0; while start < run.len() { let mut end = start + 1; while end < run.len() && run[end].id == run[end - 1].id + 1 { end += 1; } - - // submit contiguous run + starts.push((start, end)); + start = end; + } + let runs = starts.len(); + let runs_left = Arc::new(AtomicUsize::new(runs)); + let done = Arc::new(AtomicBool::new(false)); + for (start, end) in starts { let first_id = run[start].id; let bufs: Vec<_> = run[start..end].iter().map(|b| b.buf.clone()).collect(); all_ids.extend(run[start..end].iter().map(|b| b.id)); - write_counter.set(write_counter.get() + 1); - let wc = write_counter.clone(); - let done_clone = done.clone(); + let runs_left_cl = runs_left.clone(); + let done_cl = done.clone(); let c = Completion::new_write(move |_| { - // one run finished - wc.set(wc.get() - 1); - if wc.get().eq(&0) { - // last run of this batch is done - done_clone.store(true, Ordering::Release); + if runs_left_cl.fetch_sub(1, Ordering::AcqRel) == 1 { + done_cl.store(true, Ordering::Release); } }); - pager - .db_file - .write_pages(first_id, page_sz, bufs, c) - .inspect_err(|e| { - tracing::error!( - "Failed to write pages {}-{}: {}", - first_id, - first_id + (end - start) - 1, - e - ); - write_counter.set(write_counter.get() - 1); - })?; - start = end; + // submit, roll back on error + if let Err(e) = pager.db_file.write_pages(first_id, page_sz, bufs, c) { + if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 { + done.store(true, Ordering::Release); + } + return Err(e); + } } + tracing::debug!( + "write_pages_vectored: {} pages to write, runs: {runs}", + all_ids.len() + ); + Ok(PendingFlush { pages: all_ids, done, From b8e6cd5ae244515d02cd103e7c3c1f230c713c8d Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 26 Jul 2025 15:49:20 -0400 Subject: [PATCH 09/18] Fix taking page content from cached pages in checkpoint loop --- core/storage/wal.rs | 64 ++++++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 5aa325af3..d48d7dfad 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -20,8 +20,8 @@ use crate::fast_lock::SpinLock; use crate::io::{File, IO}; use crate::result::LimboResult; use crate::storage::sqlite3_ondisk::{ - begin_read_wal_frame, begin_read_wal_frame_raw, begin_write_btree_pages_writev, - finish_read_page, prepare_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, + begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, prepare_wal_frame, + write_pages_vectored, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; use crate::types::IOResult; use crate::{turso_assert, Buffer, LimboError, Result}; @@ -399,7 +399,7 @@ pub enum CheckpointState { Done, } -pub const CKPT_BATCH_PAGES: usize = 512; +pub const CKPT_BATCH_PAGES: usize = 1024; #[derive(Clone)] pub(super) struct BatchItem { @@ -699,27 +699,28 @@ impl Drop for CheckpointLocks { } fn take_page_into_batch(scratch: &PageRef, pool: &Arc, batch: &mut Vec) { - // grab id and buffer - let id = scratch.get().id; - let buf = scratch.get_contents().buffer.clone(); - scratch.pin(); // ensure it isnt evicted - batch.push(BatchItem { id, buf }); - // give scratch a brand-new empty buffer for the next read - reinit_scratch_buffer(scratch, pool); -} + let (id, buf_clone) = unsafe { + let inner = &*scratch.inner.get(); + let id = inner.id; + let contents = inner.contents.as_ref().expect("scratch has contents"); + let buf = contents.buffer.clone(); + (id, buf) + }; -fn reinit_scratch_buffer(scratch: &PageRef, pool: &Arc) { + // Push into batch + batch.push(BatchItem { id, buf: buf_clone }); + + // Re-initialize scratch with a fresh buffer let raw = pool.get(); let pool_clone = pool.clone(); - let drop_fn = Rc::new(move |b| { - pool_clone.put(b); - }); + let drop_fn = Rc::new(move |b| pool_clone.put(b)); let new_buf = Arc::new(RefCell::new(Buffer::new(raw, drop_fn))); - // replace contents + unsafe { let inner = &mut *scratch.inner.get(); inner.contents = Some(PageContent::new(0, new_buf)); - inner.flags.store(0, Ordering::Relaxed); + // reset flags on scratch so it won't be cleared later with the real page + inner.flags.store(0, Ordering::SeqCst); } } @@ -1128,8 +1129,8 @@ impl Wal for WalFile { #[instrument(skip_all, level = Level::DEBUG)] fn should_checkpoint(&self) -> bool { let shared = self.get_shared(); - let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize; let nbackfills = shared.nbackfills.load(Ordering::SeqCst) as usize; + let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize; frame_id > self.checkpoint_threshold + nbackfills } @@ -1137,7 +1138,7 @@ impl Wal for WalFile { fn checkpoint( &mut self, pager: &Pager, - write_counter: Rc>, + _write_counter: Rc>, mode: CheckpointMode, ) -> Result> { if matches!(mode, CheckpointMode::Full) { @@ -1323,6 +1324,8 @@ impl WalFile { self.ongoing_checkpoint.max_frame = 0; self.ongoing_checkpoint.current_page = 0; self.max_frame_read_lock_index.set(NO_LOCK_HELD); + self.ongoing_checkpoint.batch.clear(); + self.ongoing_checkpoint.pending_flushes.clear(); self.sync_state.set(SyncState::NotSyncing); self.syncing.set(false); } @@ -1455,17 +1458,16 @@ impl WalFile { } CheckpointState::AccumulatePage => { // mark before batching - self.ongoing_checkpoint.scratch.set_dirty(); + self.ongoing_checkpoint.scratch_page.set_dirty(); take_page_into_batch( - &self.ongoing_checkpoint.scratch, + &self.ongoing_checkpoint.scratch_page, &self.buffer_pool, &mut self.ongoing_checkpoint.batch, ); - let more_pages = (self.ongoing_checkpoint.current_page as usize) < self.get_shared().pages_in_frames.lock().len() - 1; - if self.ongoing_checkpoint.batch.len() < CKPT_BATCH_PAGES && more_pages { + if more_pages { self.ongoing_checkpoint.current_page += 1; self.ongoing_checkpoint.state = CheckpointState::ReadFrame; } else { @@ -1473,17 +1475,13 @@ impl WalFile { } } CheckpointState::FlushBatch => { + tracing::trace!("started checkpoint backfilling batch"); self.ongoing_checkpoint .pending_flushes - .push(begin_write_btree_pages_writev( - pager, - &self.ongoing_checkpoint.batch, - write_counter.clone(), - )?); + .push(write_pages_vectored(pager, &self.ongoing_checkpoint.batch)?); // batch is queued self.ongoing_checkpoint.batch.clear(); self.ongoing_checkpoint.state = CheckpointState::WaitFlush; - return Ok(IOResult::IO); } CheckpointState::WaitFlush => { if self @@ -1494,7 +1492,12 @@ impl WalFile { { return Ok(IOResult::IO); } - for pf in self.ongoing_checkpoint.pending_flushes.drain(..) { + tracing::debug!("finished checkpoint backfilling batch"); + for pf in self + .ongoing_checkpoint + .pending_flushes + .drain(std::ops::RangeFull) + { for id in pf.pages { if let Some(p) = pager.cache_get(id) { p.clear_dirty(); @@ -1509,6 +1512,7 @@ impl WalFile { self.ongoing_checkpoint.current_page += 1; self.ongoing_checkpoint.state = CheckpointState::ReadFrame; } else { + tracing::info!("transitioning checkpoint to done"); self.ongoing_checkpoint.state = CheckpointState::Done; } } From c0800ecc296da5aff05561e79881832798e3bd5a Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 26 Jul 2025 16:21:44 -0400 Subject: [PATCH 10/18] Update test to match cacheflush behavior --- sqlite3/tests/compat/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sqlite3/tests/compat/mod.rs b/sqlite3/tests/compat/mod.rs index 700fa6910..d3aa58001 100644 --- a/sqlite3/tests/compat/mod.rs +++ b/sqlite3/tests/compat/mod.rs @@ -207,6 +207,11 @@ mod tests { #[cfg(not(feature = "sqlite3"))] mod libsql_ext { +<<<<<<< HEAD +||||||| parent of 7f61fbb8 (Update test to match cacheflush behavior) +======= + use limbo_sqlite3::sqlite3_close_v2; +>>>>>>> 7f61fbb8 (Update test to match cacheflush behavior) use super::*; From 689007cb74c1dbf9cb5a383bbcafa9eaf2c50f2f Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 26 Jul 2025 16:52:47 -0400 Subject: [PATCH 11/18] Remove unrelated io_uring changes --- core/io/io_uring.rs | 100 +++++++++++++++++++++++++------------------- 1 file changed, 58 insertions(+), 42 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index ccf213e12..0c88fc4c7 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -31,6 +31,8 @@ struct WrappedIOUring { ring: io_uring::IoUring, pending_ops: usize, writev_states: HashMap, + pending: [Option>; ENTRIES as usize + 1], + key: u64, } struct InnerUringIO { @@ -56,6 +58,8 @@ impl UringIO { ring, pending_ops: 0, writev_states: HashMap::new(), + pending: [const { None }; ENTRIES as usize + 1], + key: 0, }, free_files: (0..FILES).collect(), }; @@ -111,6 +115,7 @@ struct WritevState { // intra-buffer offset current_buffer_offset: usize, total_written: usize, + total_len: usize, bufs: Vec>>, // we keep the last iovec allocation alive until CQE: // raw ptr to Box<[iovec]> @@ -133,37 +138,31 @@ impl WritevState { bufs, last_iov: core::ptr::null_mut(), last_iov_len: 0, + total_len: bufs.iter().map(|b| b.borrow().len()).sum(), } } + + #[inline(always)] fn remaining(&self) -> usize { - let mut total = 0; - for (i, b) in self.bufs.iter().enumerate().skip(self.current_buffer_idx) { - let r = b.borrow(); - let len = r.len(); - total += if i == self.current_buffer_idx { - len - self.current_buffer_offset - } else { - len - }; - } - total + self.total_len - self.total_written } /// Advance (idx, off, pos) after written bytes + #[inline(always)] fn advance(&mut self, written: usize) { - let mut rem = written; - while rem > 0 { - let len = { + let mut remaining = written; + while remaining > 0 { + let current_buf_len = { let r = self.bufs[self.current_buffer_idx].borrow(); r.len() }; - let left = len - self.current_buffer_offset; - if rem < left { - self.current_buffer_offset += rem; - self.file_pos += rem; - rem = 0; + let left = current_buf_len - self.current_buffer_offset; + if remaining < left { + self.current_buffer_offset += remaining; + self.file_pos += remaining; + remaining = 0; } else { - rem -= left; + remaining -= left; self.file_pos += left; self.current_buffer_idx += 1; self.current_buffer_offset = 0; @@ -172,6 +171,8 @@ impl WritevState { self.total_written += written; } + /// Free the allocation that keeps the iovec array alive while writev is ongoing + #[inline(always)] fn free_last_iov(&mut self) { if !self.last_iov.is_null() { unsafe { @@ -210,8 +211,9 @@ impl InnerUringIO { } impl WrappedIOUring { - fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) { + fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Arc) { trace!("submit_entry({:?})", entry); + self.pending[entry.get_user_data() as usize] = Some(c); unsafe { let mut sub = self.ring.submission_shared(); match sub.push(entry) { @@ -240,8 +242,10 @@ impl WrappedIOUring { self.pending_ops == 0 } - fn submit_writev(&mut self, key: u64) { - let st = self.writev_states.get_mut(&key).expect("state must exist"); + /// Submit a writev operation for the given key. WritevState MUST exist in the map + /// of `writev_states` + fn submit_writev(&mut self, key: u64, mut st: WritevState, c: Arc) { + self.writev_states.insert(key, st); // the likelyhood of the whole batch size being contiguous is very low, so lets not pre-allocate more than half let max = CKPT_BATCH_PAGES / 2; let mut iov = Vec::with_capacity(max); @@ -275,7 +279,7 @@ impl WrappedIOUring { st.free_last_iov(); st.last_iov = ptr; st.last_iov_len = len; - // leak; freed when CQE processed + // leak the iovec array, will be freed when CQE processed let _ = Box::into_raw(boxed); let entry = with_fd!(st.file_id, |fd| { io_uring::opcode::Writev::new(fd, ptr, len as u32) @@ -283,10 +287,23 @@ impl WrappedIOUring { .build() .user_data(key) }); - self.submit_entry(&entry); + self.submit_entry(&entry, c.clone()); } } +#[inline(always)] +/// use the callback pointer as the user_data for the operation as is +/// common practice for io_uring to prevent more indirection +fn get_key(c: Arc) -> u64 { + Arc::into_raw(c) as u64 +} + +#[inline(always)] +/// convert the user_data back to an Arc pointer +fn completion_from_key(key: u64) -> Arc { + unsafe { Arc::from_raw(key as *const Completion) } +} + impl IO for UringIO { fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result> { trace!("open_file(path = {})", path); @@ -364,21 +381,22 @@ impl IO for UringIO { if let Some(mut st) = ring.writev_states.remove(&user_data) { if result < 0 { st.free_last_iov(); - completion_from_key(user_data).complete(result); + completion_from_key(ud).complete(result); } else { let written = result as usize; st.free_last_iov(); st.advance(written); if st.remaining() == 0 { - completion_from_key(user_data).complete(st.total_written as i32); + // write complete + c.complete(st.total_written as i32); } else { - ring.writev_states.insert(user_data, st); - ring.submit_writev(user_data); + // partial write, submit next + ring.submit_writev(user_data, st, c.clone()); } } - } else { - completion_from_key(user_data).complete(result); + continue; } + completion_from_key(user_data).complete(result) } Ok(()) } @@ -490,10 +508,10 @@ impl File for UringFile { io_uring::opcode::Read::new(fd, buf, len as u32) .offset(pos as u64) .build() - .user_data(get_key(c.clone())) + .user_data(io.ring.get_key()) }) }; - io.ring.submit_entry(&read_e); + io.ring.submit_entry(&read_e, c.clone()); Ok(c) } @@ -511,10 +529,10 @@ impl File for UringFile { io_uring::opcode::Write::new(fd, buf.as_ptr(), buf.len() as u32) .offset(pos as u64) .build() - .user_data(get_key(c.clone())) + .user_data(io.ring.get_key()) }) }; - io.ring.submit_entry(&write); + io.ring.submit_entry(&write, c.clone()); Ok(c) } @@ -526,7 +544,7 @@ impl File for UringFile { .build() .user_data(get_key(c.clone())) }); - io.ring.submit_entry(&sync); + io.ring.submit_entry(&sync, c.clone()); Ok(c) } @@ -541,14 +559,12 @@ impl File for UringFile { return self.pwrite(pos, bufs[0].clone(), c.clone()); } tracing::trace!("pwritev(pos = {}, bufs.len() = {})", pos, bufs.len()); - // create state - let key = get_key(c.clone()); let mut io = self.io.borrow_mut(); - + let key = io.ring.get_key(); + // create state to track ongoing writev operation let state = WritevState::new(self, pos, bufs); - io.ring.writev_states.insert(key, state); - io.ring.submit_writev(key); - Ok(c.clone()) + io.ring.submit_writev(key, state, c.clone()); + Ok(c) } fn size(&self) -> Result { From efcffd380def6aef8795ed79d6d1fff8973f6ce6 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 26 Jul 2025 18:37:40 -0400 Subject: [PATCH 12/18] Clean up io_uring writev implementation, add iovec and cqe cache --- core/io/io_uring.rs | 260 ++++++++++++++++++++------------- core/storage/wal.rs | 3 +- sqlite3/tests/compat/mod.rs | 6 - testing/cli_tests/vfs_bench.py | 2 +- 4 files changed, 164 insertions(+), 107 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 0c88fc4c7..e29ffd95c 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -19,6 +19,9 @@ use tracing::{debug, trace}; const ENTRIES: u32 = 512; const SQPOLL_IDLE: u32 = 1000; const FILES: u32 = 8; +const IOVEC_POOL_SIZE: usize = 64; +const MAX_IOVEC_ENTRIES: usize = CKPT_BATCH_PAGES; +const MAX_WAIT: usize = 8; pub struct UringIO { inner: Rc>, @@ -31,8 +34,8 @@ struct WrappedIOUring { ring: io_uring::IoUring, pending_ops: usize, writev_states: HashMap, - pending: [Option>; ENTRIES as usize + 1], - key: u64, + iov_pool: IovecPool, + cqes: [Cqe; ENTRIES as usize + 1], } struct InnerUringIO { @@ -40,6 +43,38 @@ struct InnerUringIO { free_files: VecDeque, } +/// preallocated vec of iovec arrays to avoid allocations during writev operations +struct IovecPool { + pool: Vec>, +} + +impl IovecPool { + fn new() -> Self { + let mut pool = Vec::with_capacity(IOVEC_POOL_SIZE); + for _ in 0..IOVEC_POOL_SIZE { + pool.push(Box::new( + [libc::iovec { + iov_base: std::ptr::null_mut(), + iov_len: 0, + }; MAX_IOVEC_ENTRIES], + )); + } + Self { pool } + } + + #[inline(always)] + fn acquire(&mut self) -> Option> { + self.pool.pop() + } + + #[inline(always)] + fn release(&mut self, iovec: Box<[libc::iovec; MAX_IOVEC_ENTRIES]>) { + if self.pool.len() < IOVEC_POOL_SIZE { + self.pool.push(iovec); + } + } +} + impl UringIO { pub fn new() -> Result { let ring = match io_uring::IoUring::builder() @@ -58,8 +93,11 @@ impl UringIO { ring, pending_ops: 0, writev_states: HashMap::new(), - pending: [const { None }; ENTRIES as usize + 1], - key: 0, + iov_pool: IovecPool::new(), + cqes: [Cqe { + user_data: 0, + result: 0, + }; ENTRIES as usize + 1], }, free_files: (0..FILES).collect(), }; @@ -114,13 +152,14 @@ struct WritevState { current_buffer_idx: usize, // intra-buffer offset current_buffer_offset: usize, + // total bytes written so far total_written: usize, + // cache the sum of all buffer lengths total_len: usize, bufs: Vec>>, - // we keep the last iovec allocation alive until CQE: - // raw ptr to Box<[iovec]> - last_iov: *mut libc::iovec, - last_iov_len: usize, + // we keep the last iovec allocation alive until CQE. + // pointer to the beginning of the iovec array + last_iov_allocation: Option>, } impl WritevState { @@ -129,6 +168,7 @@ impl WritevState { Some(id) => Fd::Fixed(id), None => Fd::RawFd(file.as_raw_fd()), }; + let total_len = bufs.iter().map(|b| b.borrow().len()).sum(); Self { file_id, file_pos: pos, @@ -136,9 +176,8 @@ impl WritevState { current_buffer_offset: 0, total_written: 0, bufs, - last_iov: core::ptr::null_mut(), - last_iov_len: 0, - total_len: bufs.iter().map(|b| b.borrow().len()).sum(), + last_iov_allocation: None, + total_len, } } @@ -171,22 +210,21 @@ impl WritevState { self.total_written += written; } - /// Free the allocation that keeps the iovec array alive while writev is ongoing #[inline(always)] - fn free_last_iov(&mut self) { - if !self.last_iov.is_null() { - unsafe { - drop(Box::from_raw(core::slice::from_raw_parts_mut( - self.last_iov, - self.last_iov_len, - ))) - }; - self.last_iov = core::ptr::null_mut(); - self.last_iov_len = 0; + /// Free the allocation that keeps the iovec array alive while writev is ongoing + fn free_last_iov(&mut self, pool: &mut IovecPool) { + if let Some(allocation) = self.last_iov_allocation.take() { + pool.release(allocation); } } } +#[derive(Clone, Copy)] +struct Cqe { + user_data: u64, + result: i32, +} + impl InnerUringIO { fn register_file(&mut self, fd: i32) -> Result { if let Some(slot) = self.free_files.pop_front() { @@ -211,9 +249,8 @@ impl InnerUringIO { } impl WrappedIOUring { - fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Arc) { + fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) { trace!("submit_entry({:?})", entry); - self.pending[entry.get_user_data() as usize] = Some(c); unsafe { let mut sub = self.ring.submission_shared(); match sub.push(entry) { @@ -228,11 +265,11 @@ impl WrappedIOUring { } } - fn wait_for_completion(&mut self) -> Result<()> { + fn submit_and_wait(&mut self) -> Result<()> { if self.pending_ops == 0 { return Ok(()); } - let wants = std::cmp::min(self.pending_ops, 8); + let wants = std::cmp::min(self.pending_ops, MAX_WAIT); tracing::trace!("submit_and_wait for {wants} pending operations to complete"); self.ring.submit_and_wait(wants)?; Ok(()) @@ -242,52 +279,111 @@ impl WrappedIOUring { self.pending_ops == 0 } - /// Submit a writev operation for the given key. WritevState MUST exist in the map - /// of `writev_states` - fn submit_writev(&mut self, key: u64, mut st: WritevState, c: Arc) { - self.writev_states.insert(key, st); - // the likelyhood of the whole batch size being contiguous is very low, so lets not pre-allocate more than half - let max = CKPT_BATCH_PAGES / 2; - let mut iov = Vec::with_capacity(max); - for (i, b) in st + /// Submit or resubmit a writev operation + fn submit_writev(&mut self, key: u64, mut st: WritevState) { + st.free_last_iov(&mut self.iov_pool); + let mut iov_allocation = match self.iov_pool.acquire() { + Some(alloc) => alloc, + None => { + // Fallback: allocate a new one if pool is exhausted + Box::new( + [libc::iovec { + iov_base: std::ptr::null_mut(), + iov_len: 0, + }; MAX_IOVEC_ENTRIES], + ) + } + }; + let mut iov_count = 0; + for (idx, buffer) in st .bufs .iter() .enumerate() .skip(st.current_buffer_idx) - .take(max) + .take(MAX_IOVEC_ENTRIES) { - let r = b.borrow(); - let s = r.as_slice(); - let slice = if i == st.current_buffer_idx { - &s[st.current_buffer_offset..] + let buf = buffer.borrow(); + let buf_slice = buf.as_slice(); + let slice = if idx == st.current_buffer_idx { + &buf_slice[st.current_buffer_offset..] } else { - s + buf_slice }; if slice.is_empty() { continue; } - iov.push(libc::iovec { + iov_allocation[iov_count] = libc::iovec { iov_base: slice.as_ptr() as *mut _, iov_len: slice.len(), - }); + }; + iov_count += 1; } + // Store the allocation and get the pointer + let ptr = iov_allocation.as_ptr() as *mut libc::iovec; + st.last_iov_allocation = Some(iov_allocation); - // keep iov alive until CQE - let boxed = iov.into_boxed_slice(); - let ptr = boxed.as_ptr() as *mut libc::iovec; - let len = boxed.len(); - st.free_last_iov(); - st.last_iov = ptr; - st.last_iov_len = len; - // leak the iovec array, will be freed when CQE processed - let _ = Box::into_raw(boxed); let entry = with_fd!(st.file_id, |fd| { - io_uring::opcode::Writev::new(fd, ptr, len as u32) + io_uring::opcode::Writev::new(fd, ptr, iov_count as u32) .offset(st.file_pos as u64) .build() .user_data(key) }); - self.submit_entry(&entry, c.clone()); + self.writev_states.insert(key, st); + self.submit_entry(&entry); + } + + // to circumvent borrowing rules, collect everything into preallocated array + // and return the number of completed operations + fn reap_cqes(&mut self) -> usize { + let mut count = 0; + { + for cqe in self.ring.completion() { + self.pending_ops -= 1; + self.cqes[count] = Cqe { + user_data: cqe.user_data(), + result: cqe.result(), + }; + count += 1; + if count == ENTRIES as usize { + break; + } + } + } + count + } + + fn handle_writev_completion(&mut self, mut st: WritevState, user_data: u64, result: i32) { + if result < 0 { + tracing::error!( + "writev operation failed for user_data {}: {}", + user_data, + std::io::Error::from_raw_os_error(result) + ); + // error: free iov allocation and call completion with error code + st.free_last_iov(&mut self.iov_pool); + completion_from_key(user_data).complete(result); + } else { + let written = result as usize; + st.advance(written); + if st.remaining() == 0 { + tracing::info!( + "writev operation completed: wrote {} bytes", + st.total_written + ); + // write complete, return iovec to pool + st.free_last_iov(&mut self.iov_pool); + completion_from_key(user_data).complete(st.total_written as i32); + } else { + tracing::trace!( + "resubmitting writev operation for user_data {}: wrote {} bytes, remaining {}", + user_data, + written, + st.remaining() + ); + // partial write, submit next + self.submit_writev(user_data, st); + } + } } } @@ -345,55 +441,22 @@ impl IO for UringIO { } fn run_once(&self) -> Result<()> { + trace!("run_once()"); let mut inner = self.inner.borrow_mut(); let ring = &mut inner.ring; - if ring.empty() { return Ok(()); } - ring.wait_for_completion()?; - const MAX: usize = ENTRIES as usize; - // to circumvent borrowing rules, collect everything without the heap - let mut uds = [0u64; MAX]; - let mut ress = [0i32; MAX]; - let mut count = 0; - { - let cq = ring.ring.completion(); - for cqe in cq { - ring.pending_ops -= 1; - uds[count] = cqe.user_data(); - ress[count] = cqe.result(); - count += 1; - if count == MAX { - break; - } - } - } - + ring.submit_and_wait()?; + let count = ring.reap_cqes(); for i in 0..count { - ring.pending_ops -= 1; - let user_data = uds[i]; - let result = ress[i]; + let Cqe { user_data, result } = ring.cqes[i]; turso_assert!( user_data != 0, "user_data must not be zero, we dont submit linked timeouts or cancelations that would cause this" ); - if let Some(mut st) = ring.writev_states.remove(&user_data) { - if result < 0 { - st.free_last_iov(); - completion_from_key(ud).complete(result); - } else { - let written = result as usize; - st.free_last_iov(); - st.advance(written); - if st.remaining() == 0 { - // write complete - c.complete(st.total_written as i32); - } else { - // partial write, submit next - ring.submit_writev(user_data, st, c.clone()); - } - } + if let Some(state) = ring.writev_states.remove(&user_data) { + ring.handle_writev_completion(state, user_data, result); continue; } completion_from_key(user_data).complete(result) @@ -508,10 +571,10 @@ impl File for UringFile { io_uring::opcode::Read::new(fd, buf, len as u32) .offset(pos as u64) .build() - .user_data(io.ring.get_key()) + .user_data(get_key(c.clone())) }) }; - io.ring.submit_entry(&read_e, c.clone()); + io.ring.submit_entry(&read_e); Ok(c) } @@ -529,10 +592,10 @@ impl File for UringFile { io_uring::opcode::Write::new(fd, buf.as_ptr(), buf.len() as u32) .offset(pos as u64) .build() - .user_data(io.ring.get_key()) + .user_data(get_key(c.clone())) }) }; - io.ring.submit_entry(&write, c.clone()); + io.ring.submit_entry(&write); Ok(c) } @@ -544,7 +607,7 @@ impl File for UringFile { .build() .user_data(get_key(c.clone())) }); - io.ring.submit_entry(&sync, c.clone()); + io.ring.submit_entry(&sync); Ok(c) } @@ -560,10 +623,9 @@ impl File for UringFile { } tracing::trace!("pwritev(pos = {}, bufs.len() = {})", pos, bufs.len()); let mut io = self.io.borrow_mut(); - let key = io.ring.get_key(); // create state to track ongoing writev operation let state = WritevState::new(self, pos, bufs); - io.ring.submit_writev(key, state, c.clone()); + io.ring.submit_writev(get_key(c.clone()), state); Ok(c) } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index d48d7dfad..f27fb9bab 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -399,6 +399,7 @@ pub enum CheckpointState { Done, } +/// IOV_MAX is 1024 on most systems pub const CKPT_BATCH_PAGES: usize = 1024; #[derive(Clone)] @@ -1129,8 +1130,8 @@ impl Wal for WalFile { #[instrument(skip_all, level = Level::DEBUG)] fn should_checkpoint(&self) -> bool { let shared = self.get_shared(); - let nbackfills = shared.nbackfills.load(Ordering::SeqCst) as usize; let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize; + let nbackfills = shared.nbackfills.load(Ordering::SeqCst) as usize; frame_id > self.checkpoint_threshold + nbackfills } diff --git a/sqlite3/tests/compat/mod.rs b/sqlite3/tests/compat/mod.rs index d3aa58001..e504c2a38 100644 --- a/sqlite3/tests/compat/mod.rs +++ b/sqlite3/tests/compat/mod.rs @@ -207,12 +207,6 @@ mod tests { #[cfg(not(feature = "sqlite3"))] mod libsql_ext { -<<<<<<< HEAD -||||||| parent of 7f61fbb8 (Update test to match cacheflush behavior) -======= - use limbo_sqlite3::sqlite3_close_v2; ->>>>>>> 7f61fbb8 (Update test to match cacheflush behavior) - use super::*; #[test] diff --git a/testing/cli_tests/vfs_bench.py b/testing/cli_tests/vfs_bench.py index 88abb6e6a..d081b7526 100644 --- a/testing/cli_tests/vfs_bench.py +++ b/testing/cli_tests/vfs_bench.py @@ -13,7 +13,7 @@ from typing import Dict from cli_tests.console import error, info, test from cli_tests.test_turso_cli import TestTursoShell -LIMBO_BIN = Path("./target/debug/tursodb") +LIMBO_BIN = Path("./target/release/tursodb") DB_FILE = Path("testing/temp.db") vfs_list = ["syscall"] if platform.system() == "Linux": From 28283e4d1c1f5721a5f5d9391c45c83bf79f9ada Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sun, 27 Jul 2025 16:20:16 -0400 Subject: [PATCH 13/18] Fix bench_vfs python script to use fresh db for each run --- core/io/mod.rs | 6 +++--- sqlite3/tests/compat/mod.rs | 1 + testing/cli_tests/vfs_bench.py | 11 ++++++++--- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/core/io/mod.rs b/core/io/mod.rs index f7766ef84..6518157e8 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -1,4 +1,4 @@ -use crate::Result; +use crate::{turso_assert, Result}; use bitflags::bitflags; use cfg_block::cfg_block; use std::fmt; @@ -344,10 +344,10 @@ cfg_block! { pub use unix::UnixIO as PlatformIO; } - #[cfg(target_os = "windows")] { + #[cfg(target_os = "windows")] { mod windows; pub use windows::WindowsIO as PlatformIO; - pub use PlatformIO as SyscallIO; + pub use PlatformIO as SyscallIO; } #[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows", target_os = "android", target_os = "ios")))] { diff --git a/sqlite3/tests/compat/mod.rs b/sqlite3/tests/compat/mod.rs index e504c2a38..700fa6910 100644 --- a/sqlite3/tests/compat/mod.rs +++ b/sqlite3/tests/compat/mod.rs @@ -207,6 +207,7 @@ mod tests { #[cfg(not(feature = "sqlite3"))] mod libsql_ext { + use super::*; #[test] diff --git a/testing/cli_tests/vfs_bench.py b/testing/cli_tests/vfs_bench.py index d081b7526..dc637c37b 100644 --- a/testing/cli_tests/vfs_bench.py +++ b/testing/cli_tests/vfs_bench.py @@ -48,6 +48,9 @@ def bench_one(vfs: str, sql: str, iterations: int) -> list[float]: def setup_temp_db() -> None: + # make sure we start fresh, otherwise we could end up with + # one having to checkpoint the others from the previous run + cleanup_temp_db() cmd = ["sqlite3", "testing/testing.db", ".clone testing/temp.db"] proc = subprocess.run(cmd, check=True) proc.check_returncode() @@ -57,7 +60,9 @@ def setup_temp_db() -> None: def cleanup_temp_db() -> None: if DB_FILE.exists(): DB_FILE.unlink() - os.remove("testing/temp.db-wal") + wal_file = DB_FILE.with_suffix(".db-wal") + if wal_file.exists(): + os.remove(wal_file) def main() -> None: @@ -65,7 +70,6 @@ def main() -> None: parser.add_argument("sql", help="SQL statement to execute (quote it)") parser.add_argument("iterations", type=int, help="number of repetitions") args = parser.parse_args() - setup_temp_db() sql, iterations = args.sql, args.iterations if iterations <= 0: @@ -85,7 +89,8 @@ def main() -> None: info(f"All times ({vfs}):", " ".join(f"{t:.6f}" for t in times)) avg = statistics.mean(times) averages[vfs] = avg - cleanup_temp_db() + + cleanup_temp_db() info("\n" + "-" * 60) info("Average runtime per VFS") From 73882b97d6109d02a30cf99444bfc294653f651e Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 28 Jul 2025 14:38:49 -0400 Subject: [PATCH 14/18] Remove unnecessary collecting CQEs into an array in run_once, comments --- core/io/io_uring.rs | 90 ++++++++++++++++++++++----------------------- core/storage/wal.rs | 9 ++++- 2 files changed, 51 insertions(+), 48 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index e29ffd95c..f27bcba1f 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -16,12 +16,28 @@ use std::{ }; use tracing::{debug, trace}; +/// Size of the io_uring submission and completion queues const ENTRIES: u32 = 512; + +/// Idle timeout for the sqpoll kernel thread before it needs +/// to be woken back up by a call to IORING_ENTER const SQPOLL_IDLE: u32 = 1000; + +/// Number of file descriptors we preallocate for io_uring. +/// NOTE: we may need to increase this when `attach` is fully implemented. const FILES: u32 = 8; + +/// Number of Vec> we preallocate on initialization const IOVEC_POOL_SIZE: usize = 64; + +/// Maximum number of iovec entries per writev operation. +/// IOV_MAX is typically 1024, but we limit it to a smaller number const MAX_IOVEC_ENTRIES: usize = CKPT_BATCH_PAGES; -const MAX_WAIT: usize = 8; + +/// Maximum number of I/O operations to wait for in a single run, +/// waiting for > 1 can reduce the amount of IOURING_ENTER syscalls we +/// make, but can increase single operation latency. +const MAX_WAIT: usize = 4; pub struct UringIO { inner: Rc>, @@ -35,7 +51,6 @@ struct WrappedIOUring { pending_ops: usize, writev_states: HashMap, iov_pool: IovecPool, - cqes: [Cqe; ENTRIES as usize + 1], } struct InnerUringIO { @@ -94,10 +109,6 @@ impl UringIO { pending_ops: 0, writev_states: HashMap::new(), iov_pool: IovecPool::new(), - cqes: [Cqe { - user_data: 0, - result: 0, - }; ENTRIES as usize + 1], }, free_files: (0..FILES).collect(), }; @@ -108,6 +119,8 @@ impl UringIO { } } +/// io_uring crate decides not to export their `UseFixed` trait, so we +/// are forced to use a macro here to handle either fixed or raw file descriptors. macro_rules! with_fd { ($file:expr, |$fd:ident| $body:expr) => { match $file.id() { @@ -123,6 +136,8 @@ macro_rules! with_fd { }; } +/// wrapper type to represent a possibly registered file desriptor, +/// only used in WritevState enum Fd { Fixed(u32), RawFd(i32), @@ -143,22 +158,24 @@ impl Fd { } } +/// State to track an ongoing writev operation in +/// the case of a partial write. struct WritevState { - // fixed fd slot + /// File descriptor/id of the file we are writing to file_id: Fd, - // absolute file offset for next submit + /// absolute file offset for next submit file_pos: usize, - // current buffer index in `bufs` + /// current buffer index in `bufs` current_buffer_idx: usize, - // intra-buffer offset + /// intra-buffer offset current_buffer_offset: usize, - // total bytes written so far + /// total bytes written so far total_written: usize, - // cache the sum of all buffer lengths + /// cache the sum of all buffer lengths for the total expected write total_len: usize, + /// buffers to write bufs: Vec>>, - // we keep the last iovec allocation alive until CQE. - // pointer to the beginning of the iovec array + /// we keep the last iovec allocation alive until final CQE last_iov_allocation: Option>, } @@ -219,12 +236,6 @@ impl WritevState { } } -#[derive(Clone, Copy)] -struct Cqe { - user_data: u64, - result: i32, -} - impl InnerUringIO { fn register_file(&mut self, fd: i32) -> Result { if let Some(slot) = self.free_files.pop_front() { @@ -266,7 +277,7 @@ impl WrappedIOUring { } fn submit_and_wait(&mut self) -> Result<()> { - if self.pending_ops == 0 { + if self.empty() { return Ok(()); } let wants = std::cmp::min(self.pending_ops, MAX_WAIT); @@ -304,6 +315,7 @@ impl WrappedIOUring { { let buf = buffer.borrow(); let buf_slice = buf.as_slice(); + // ensure we are providing a pointer to the proper offset in the buffer let slice = if idx == st.current_buffer_idx { &buf_slice[st.current_buffer_offset..] } else { @@ -318,7 +330,8 @@ impl WrappedIOUring { }; iov_count += 1; } - // Store the allocation and get the pointer + // Store the pointers and get the pointer to the iovec array that we pass + // to the writev operation, and keep the array itself alive let ptr = iov_allocation.as_ptr() as *mut libc::iovec; st.last_iov_allocation = Some(iov_allocation); @@ -328,30 +341,11 @@ impl WrappedIOUring { .build() .user_data(key) }); + // track the current state in case we get a partial write self.writev_states.insert(key, st); self.submit_entry(&entry); } - // to circumvent borrowing rules, collect everything into preallocated array - // and return the number of completed operations - fn reap_cqes(&mut self) -> usize { - let mut count = 0; - { - for cqe in self.ring.completion() { - self.pending_ops -= 1; - self.cqes[count] = Cqe { - user_data: cqe.user_data(), - result: cqe.result(), - }; - count += 1; - if count == ENTRIES as usize { - break; - } - } - } - count - } - fn handle_writev_completion(&mut self, mut st: WritevState, user_data: u64, result: i32) { if result < 0 { tracing::error!( @@ -448,20 +442,24 @@ impl IO for UringIO { return Ok(()); } ring.submit_and_wait()?; - let count = ring.reap_cqes(); - for i in 0..count { - let Cqe { user_data, result } = ring.cqes[i]; + loop { + let Some(cqe) = ring.ring.completion().next() else { + return Ok(()); + }; + ring.pending_ops -= 1; + let user_data = cqe.user_data(); + let result = cqe.result(); turso_assert!( user_data != 0, "user_data must not be zero, we dont submit linked timeouts or cancelations that would cause this" ); if let Some(state) = ring.writev_states.remove(&user_data) { + // if we have ongoing writev state, handle it separately and don't call completion ring.handle_writev_completion(state, user_data, result); continue; } completion_from_key(user_data).complete(result) } - Ok(()) } fn generate_random_number(&self) -> i64 { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index f27fb9bab..399af7746 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -399,8 +399,8 @@ pub enum CheckpointState { Done, } -/// IOV_MAX is 1024 on most systems -pub const CKPT_BATCH_PAGES: usize = 1024; +/// IOV_MAX is 1024 on most systems, lets use 512 to be safe +pub const CKPT_BATCH_PAGES: usize = 512; #[derive(Clone)] pub(super) struct BatchItem { @@ -1587,6 +1587,11 @@ impl WalFile { } else { let _ = self.checkpoint_guard.take(); } + self.ongoing_checkpoint.scratch_page.clear_dirty(); + self.ongoing_checkpoint.scratch_page.get().id = 0; + self.ongoing_checkpoint.scratch_page.get().contents = None; + let _ = self.ongoing_checkpoint.pending_flush.take(); + self.ongoing_checkpoint.batch.clear(); self.ongoing_checkpoint.state = CheckpointState::Start; return Ok(IOResult::Done(checkpoint_result)); } From ef69df72587eb63da520b89e9e890fd17640cd80 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 29 Jul 2025 19:38:48 -0400 Subject: [PATCH 15/18] Apply review suggestions --- bindings/javascript/src/lib.rs | 8 +-- core/io/io_uring.rs | 115 +++++++++++++++------------------ core/io/mod.rs | 2 +- core/io/unix.rs | 14 ++-- core/storage/database.rs | 6 +- core/storage/pager.rs | 5 +- core/storage/sqlite3_ondisk.rs | 103 +++++++++++++++++++---------- core/storage/wal.rs | 95 +++++++++++++++------------ 8 files changed, 188 insertions(+), 160 deletions(-) diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index 6b4b1992f..aa0c4772b 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -724,10 +724,10 @@ impl turso_core::DatabaseStorage for DatabaseFile { page_size: usize, buffers: Vec>>, c: turso_core::Completion, - ) -> turso_core::Result<()> { - let pos = (page_idx - 1) * page_size; - self.file.pwritev(pos, buffers, c.into())?; - Ok(()) + ) -> turso_core::Result { + let pos = page_idx.saturating_sub(1) * page_size; + let c = self.file.pwritev(pos, buffers, c)?; + Ok(c) } fn sync(&self, c: turso_core::Completion) -> turso_core::Result { diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index f27bcba1f..ed0a0f7d8 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -65,15 +65,16 @@ struct IovecPool { impl IovecPool { fn new() -> Self { - let mut pool = Vec::with_capacity(IOVEC_POOL_SIZE); - for _ in 0..IOVEC_POOL_SIZE { - pool.push(Box::new( - [libc::iovec { - iov_base: std::ptr::null_mut(), - iov_len: 0, - }; MAX_IOVEC_ENTRIES], - )); - } + let pool = (0..IOVEC_POOL_SIZE) + .map(|_| { + Box::new( + [libc::iovec { + iov_base: std::ptr::null_mut(), + iov_len: 0, + }; MAX_IOVEC_ENTRIES], + ) + }) + .collect(); Self { pool } } @@ -144,18 +145,20 @@ enum Fd { } impl Fd { - fn as_raw_fd(&self) -> i32 { - match self { - Fd::RawFd(fd) => *fd, - _ => unreachable!("only to be called on RawFd variant"), - } - } + /// to match the behavior of the File, we need to implement the same methods fn id(&self) -> Option { match self { Fd::Fixed(id) => Some(*id), Fd::RawFd(_) => None, } } + /// ONLY to be called by the macro, in the case where id() is None + fn as_raw_fd(&self) -> i32 { + match self { + Fd::RawFd(fd) => *fd, + _ => panic!("Cannot call as_raw_fd on a Fixed Fd"), + } + } } /// State to track an ongoing writev operation in @@ -181,10 +184,10 @@ struct WritevState { impl WritevState { fn new(file: &UringFile, pos: usize, bufs: Vec>>) -> Self { - let file_id = match file.id() { - Some(id) => Fd::Fixed(id), - None => Fd::RawFd(file.as_raw_fd()), - }; + let file_id = file + .id() + .map(Fd::Fixed) + .unwrap_or_else(|| Fd::RawFd(file.as_raw_fd())); let total_len = bufs.iter().map(|b| b.borrow().len()).sum(); Self { file_id, @@ -293,18 +296,15 @@ impl WrappedIOUring { /// Submit or resubmit a writev operation fn submit_writev(&mut self, key: u64, mut st: WritevState) { st.free_last_iov(&mut self.iov_pool); - let mut iov_allocation = match self.iov_pool.acquire() { - Some(alloc) => alloc, - None => { - // Fallback: allocate a new one if pool is exhausted - Box::new( - [libc::iovec { - iov_base: std::ptr::null_mut(), - iov_len: 0, - }; MAX_IOVEC_ENTRIES], - ) - } - }; + let mut iov_allocation = self.iov_pool.acquire().unwrap_or_else(|| { + // Fallback: allocate a new one if pool is exhausted + Box::new( + [libc::iovec { + iov_base: std::ptr::null_mut(), + iov_len: 0, + }; MAX_IOVEC_ENTRIES], + ) + }); let mut iov_count = 0; for (idx, buffer) in st .bufs @@ -346,54 +346,41 @@ impl WrappedIOUring { self.submit_entry(&entry); } - fn handle_writev_completion(&mut self, mut st: WritevState, user_data: u64, result: i32) { + fn handle_writev_completion(&mut self, mut state: WritevState, user_data: u64, result: i32) { if result < 0 { - tracing::error!( - "writev operation failed for user_data {}: {}", - user_data, - std::io::Error::from_raw_os_error(result) - ); - // error: free iov allocation and call completion with error code - st.free_last_iov(&mut self.iov_pool); + let err = std::io::Error::from_raw_os_error(result); + tracing::error!("writev failed (user_data: {}): {}", user_data, err); + state.free_last_iov(&mut self.iov_pool); completion_from_key(user_data).complete(result); - } else { - let written = result as usize; - st.advance(written); - if st.remaining() == 0 { + return; + } + + let written = result as usize; + state.advance(written); + match state.remaining() { + 0 => { tracing::info!( "writev operation completed: wrote {} bytes", - st.total_written + state.total_written ); // write complete, return iovec to pool - st.free_last_iov(&mut self.iov_pool); - completion_from_key(user_data).complete(st.total_written as i32); - } else { + state.free_last_iov(&mut self.iov_pool); + completion_from_key(user_data).complete(state.total_written as i32); + } + remaining => { tracing::trace!( "resubmitting writev operation for user_data {}: wrote {} bytes, remaining {}", user_data, written, - st.remaining() + remaining ); // partial write, submit next - self.submit_writev(user_data, st); + self.submit_writev(user_data, state); } } } } -#[inline(always)] -/// use the callback pointer as the user_data for the operation as is -/// common practice for io_uring to prevent more indirection -fn get_key(c: Arc) -> u64 { - Arc::into_raw(c) as u64 -} - -#[inline(always)] -/// convert the user_data back to an Arc pointer -fn completion_from_key(key: u64) -> Arc { - unsafe { Arc::from_raw(key as *const Completion) } -} - impl IO for UringIO { fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result> { trace!("open_file(path = {})", path); @@ -613,8 +600,8 @@ impl File for UringFile { &self, pos: usize, bufs: Vec>>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { // for a single buffer use pwrite directly if bufs.len().eq(&1) { return self.pwrite(pos, bufs[0].clone(), c.clone()); diff --git a/core/io/mod.rs b/core/io/mod.rs index 6518157e8..8560216e8 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -1,4 +1,4 @@ -use crate::{turso_assert, Result}; +use crate::Result; use bitflags::bitflags; use cfg_block::cfg_block; use std::fmt; diff --git a/core/io/unix.rs b/core/io/unix.rs index 82f03ba77..7e73e6904 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -411,7 +411,7 @@ enum CompletionCallback { ), Writev( Arc>, - Arc, + Completion, Vec>>, usize, // absolute file offset usize, // buf index @@ -537,17 +537,12 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::TRACE)] -<<<<<<< HEAD - fn sync(&self, c: Completion) -> Result { -||||||| parent of 7f48531b (batch backfilling pages when checkpointing) - fn sync(&self, c: Arc) -> Result> { -======= fn pwritev( &self, pos: usize, buffers: Vec>>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { let file = self .file .lock() @@ -588,8 +583,7 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::TRACE)] - fn sync(&self, c: Arc) -> Result> { ->>>>>>> 7f48531b (batch backfilling pages when checkpointing) + fn sync(&self, c: Completion) -> Result { let file = self.file.lock().unwrap(); let result = fs::fsync(file.as_fd()); match result { diff --git a/core/storage/database.rs b/core/storage/database.rs index ff474a436..0370d398c 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -23,7 +23,7 @@ pub trait DatabaseStorage: Send + Sync { buffers: Vec>>, c: Completion, ) -> Result; - fn sync(&self, c: Completion) -> Result<()>; + fn sync(&self, c: Completion) -> Result; fn size(&self) -> Result; fn truncate(&self, len: usize, c: Completion) -> Result; } @@ -74,7 +74,7 @@ impl DatabaseStorage for DatabaseFile { page_size: usize, buffers: Vec>>, c: Completion, - ) -> Result<()> { + ) -> Result { assert!(page_idx > 0); assert!(page_size >= 512); assert!(page_size <= 65536); @@ -149,7 +149,7 @@ impl DatabaseStorage for FileMemoryStorage { page_size: usize, buffer: Vec>>, c: Completion, - ) -> Result<()> { + ) -> Result { assert!(page_idx > 0); assert!(page_size >= 512); assert!(page_size <= 65536); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 768b8c6c1..90fcb2893 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1304,11 +1304,10 @@ impl Pager { } let write_counter = Rc::new(RefCell::new(0)); - let checkpoint_result = self.io.block(|| { + let mut checkpoint_result = self.io.block(|| { self.wal .borrow_mut() - .checkpoint(self, counter.clone(), mode) - .map_err(|err| panic!("error while clearing cache {err}")) + .checkpoint(self, write_counter.clone(), mode) })?; if checkpoint_result.everything_backfilled() diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index d434ec255..1d58f444e 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -62,7 +62,7 @@ use crate::storage::wal::{BatchItem, PendingFlush}; use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype}; use crate::{turso_assert, File, Result, WalFileShared}; use std::cell::{RefCell, UnsafeCell}; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::mem::MaybeUninit; use std::pin::Pin; use std::rc::Rc; @@ -854,52 +854,89 @@ pub fn begin_write_btree_page( } #[instrument(skip_all, level = Level::DEBUG)] -pub fn write_pages_vectored(pager: &Pager, batch: &[BatchItem]) -> Result { +pub fn write_pages_vectored( + pager: &Pager, + batch: BTreeMap, +) -> Result { if batch.is_empty() { return Ok(PendingFlush::default()); } - let mut run = batch.to_vec(); - run.sort_by_key(|b| b.id); + + // batch item array is already sorted by id, so we just need to find contiguous ranges of page_id's + // to submit as `writev`/write_pages calls. let page_sz = pager.page_size.get().unwrap_or(DEFAULT_PAGE_SIZE) as usize; - let mut all_ids = Vec::with_capacity(run.len()); - // count runs - let mut starts = Vec::with_capacity(5); // arbitrary initialization - let mut start = 0; - while start < run.len() { - let mut end = start + 1; - while end < run.len() && run[end].id == run[end - 1].id + 1 { - end += 1; + let mut all_ids = Vec::with_capacity(batch.len()); + + // Count expected number of runs to create the atomic counter we need to track each batch + let mut run_count = 0; + let mut prev_id = None; + for &id in batch.keys() { + if let Some(prev) = prev_id { + if id != prev + 1 { + run_count += 1; + } + } else { + run_count = 1; // First run } - starts.push((start, end)); - start = end; + prev_id = Some(id); } - let runs = starts.len(); - let runs_left = Arc::new(AtomicUsize::new(runs)); + + // Create the atomic counters + let runs_left = Arc::new(AtomicUsize::new(run_count)); let done = Arc::new(AtomicBool::new(false)); - for (start, end) in starts { - let first_id = run[start].id; - let bufs: Vec<_> = run[start..end].iter().map(|b| b.buf.clone()).collect(); - all_ids.extend(run[start..end].iter().map(|b| b.id)); + let mut run_start_id = None; + // we know how many runs, but we don't know how many buffers per run, so we can only give an + // estimate of the capacity + const EST_BUFF_CAPACITY: usize = 32; + let mut run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY); + let mut run_ids = Vec::with_capacity(EST_BUFF_CAPACITY); - let runs_left_cl = runs_left.clone(); - let done_cl = done.clone(); + // Iterate through the batch, submitting each run as soon as it ends + let mut iter = batch.iter().peekable(); + while let Some((&id, item)) = iter.next() { + if run_start_id.is_none() { + run_start_id = Some(id); + } - let c = Completion::new_write(move |_| { - if runs_left_cl.fetch_sub(1, Ordering::AcqRel) == 1 { - done_cl.store(true, Ordering::Release); + run_bufs.push(item.buf.clone()); + run_ids.push(id); + + // Check if this is the end of a run, either the next key is not consecutive or this is the last entry + let is_end_of_run = match iter.peek() { + Some((&next_id, _)) => next_id != id + 1, + None => true, // Last item is always end of a run + }; + + if is_end_of_run { + // Submit this run immediately + let start_id = run_start_id.unwrap(); + let runs_left_cl = runs_left.clone(); + let done_cl = done.clone(); + let c = Completion::new_write(move |_| { + if runs_left_cl.fetch_sub(1, Ordering::AcqRel) == 1 { + done_cl.store(true, Ordering::Release); + } + }); + + // Submit and decrement the runs_left counter on error + if let Err(e) = pager.db_file.write_pages(start_id, page_sz, run_bufs, c) { + if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 { + done.store(true, Ordering::Release); + } + return Err(e); } - }); - // submit, roll back on error - if let Err(e) = pager.db_file.write_pages(first_id, page_sz, bufs, c) { - if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 { - done.store(true, Ordering::Release); - } - return Err(e); + // Add IDs to the all_ids list and prepare for the next run + all_ids.extend(run_ids); + run_start_id = None; + // .clear() will cause borrowing issue, unfortunately we have to reassign + run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY); + run_ids = Vec::with_capacity(EST_BUFF_CAPACITY); } } + tracing::debug!( - "write_pages_vectored: {} pages to write, runs: {runs}", + "write_pages_vectored: {} pages to write, runs: {run_count}", all_ids.len() ); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 399af7746..b37e3af54 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -3,7 +3,7 @@ use std::array; use std::cell::UnsafeCell; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use strum::EnumString; use tracing::{instrument, Level}; @@ -404,7 +404,6 @@ pub const CKPT_BATCH_PAGES: usize = 512; #[derive(Clone)] pub(super) struct BatchItem { - pub(super) id: usize, pub(super) buf: Arc>, } @@ -418,9 +417,9 @@ pub(super) struct BatchItem { // range. This is inefficient for now. struct OngoingCheckpoint { scratch_page: PageRef, - batch: Vec, + batch: BTreeMap, state: CheckpointState, - pending_flushes: Vec, + pending_flush: Option, min_frame: u64, max_frame: u64, current_page: u64, @@ -446,6 +445,14 @@ impl PendingFlush { done: Arc::new(AtomicBool::new(false)), } } + // clear the dirty flag of all pages in the pending flush batch + fn clear_dirty(&self, pager: &Pager) { + for id in &self.pages { + if let Some(p) = pager.cache_get(*id) { + p.clear_dirty(); + } + } + } } impl fmt::Debug for OngoingCheckpoint { @@ -699,7 +706,11 @@ impl Drop for CheckpointLocks { } } -fn take_page_into_batch(scratch: &PageRef, pool: &Arc, batch: &mut Vec) { +fn take_page_into_batch( + scratch: &PageRef, + pool: &Arc, + batch: &mut BTreeMap, +) { let (id, buf_clone) = unsafe { let inner = &*scratch.inner.get(); let id = inner.id; @@ -707,9 +718,8 @@ fn take_page_into_batch(scratch: &PageRef, pool: &Arc, batch: &mut V let buf = contents.buffer.clone(); (id, buf) }; - - // Push into batch - batch.push(BatchItem { id, buf: buf_clone }); + // Insert the new batch item at the correct position + batch.insert(id, BatchItem { buf: buf_clone }); // Re-initialize scratch with a fresh buffer let raw = pool.get(); @@ -1147,7 +1157,7 @@ impl Wal for WalFile { "Full checkpoint mode is not implemented yet".into(), )); } - self.checkpoint_inner(pager, write_counter, mode) + self.checkpoint_inner(pager, _write_counter, mode) .inspect_err(|_| { let _ = self.checkpoint_guard.take(); }) @@ -1265,8 +1275,8 @@ impl WalFile { shared, ongoing_checkpoint: OngoingCheckpoint { scratch_page: checkpoint_page, - batch: Vec::new(), - pending_flushes: Vec::new(), + batch: BTreeMap::new(), + pending_flush: None, state: CheckpointState::Start, min_frame: 0, max_frame: 0, @@ -1326,7 +1336,7 @@ impl WalFile { self.ongoing_checkpoint.current_page = 0; self.max_frame_read_lock_index.set(NO_LOCK_HELD); self.ongoing_checkpoint.batch.clear(); - self.ongoing_checkpoint.pending_flushes.clear(); + let _ = self.ongoing_checkpoint.pending_flush.take(); self.sync_state.set(SyncState::NotSyncing); self.syncing.set(false); } @@ -1375,7 +1385,7 @@ impl WalFile { fn checkpoint_inner( &mut self, pager: &Pager, - write_counter: Rc>, + _write_counter: Rc>, mode: CheckpointMode, ) -> Result> { 'checkpoint_loop: loop { @@ -1438,10 +1448,10 @@ impl WalFile { page, *frame ); - self.ongoing_checkpoint.page.get().id = page as usize; + self.ongoing_checkpoint.scratch_page.get().id = page as usize; let _ = self.read_frame( *frame, - self.ongoing_checkpoint.page.clone(), + self.ongoing_checkpoint.scratch_page.clone(), self.buffer_pool.clone(), )?; self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame; @@ -1451,7 +1461,7 @@ impl WalFile { self.ongoing_checkpoint.current_page += 1; } CheckpointState::WaitReadFrame => { - if self.ongoing_checkpoint.page.is_locked() { + if self.ongoing_checkpoint.scratch_page.is_locked() { return Ok(IOResult::IO); } else { self.ongoing_checkpoint.state = CheckpointState::AccumulatePage; @@ -1460,14 +1470,15 @@ impl WalFile { CheckpointState::AccumulatePage => { // mark before batching self.ongoing_checkpoint.scratch_page.set_dirty(); + // we read the frame into memory, add it to our batch take_page_into_batch( &self.ongoing_checkpoint.scratch_page, &self.buffer_pool, &mut self.ongoing_checkpoint.batch, ); let more_pages = (self.ongoing_checkpoint.current_page as usize) - < self.get_shared().pages_in_frames.lock().len() - 1; - + < self.get_shared().pages_in_frames.lock().len() - 1 + && self.ongoing_checkpoint.batch.len() < CKPT_BATCH_PAGES; if more_pages { self.ongoing_checkpoint.current_page += 1; self.ongoing_checkpoint.state = CheckpointState::ReadFrame; @@ -1477,34 +1488,30 @@ impl WalFile { } CheckpointState::FlushBatch => { tracing::trace!("started checkpoint backfilling batch"); - self.ongoing_checkpoint - .pending_flushes - .push(write_pages_vectored(pager, &self.ongoing_checkpoint.batch)?); + self.ongoing_checkpoint.pending_flush = Some(write_pages_vectored( + pager, + std::mem::take(&mut self.ongoing_checkpoint.batch), + )?); // batch is queued self.ongoing_checkpoint.batch.clear(); self.ongoing_checkpoint.state = CheckpointState::WaitFlush; } CheckpointState::WaitFlush => { - if self - .ongoing_checkpoint - .pending_flushes - .iter() - .any(|pf| !pf.done.load(Ordering::Acquire)) - { - return Ok(IOResult::IO); + match self.ongoing_checkpoint.pending_flush.as_ref() { + Some(pf) if pf.done.load(Ordering::SeqCst) => { + // flush is done, we can continue + tracing::trace!("checkpoint backfilling batch done"); + } + Some(_) => return Ok(IOResult::IO), + None => panic!("we should have a pending flush here"), } tracing::debug!("finished checkpoint backfilling batch"); - for pf in self + let pf = self .ongoing_checkpoint - .pending_flushes - .drain(std::ops::RangeFull) - { - for id in pf.pages { - if let Some(p) = pager.cache_get(id) { - p.clear_dirty(); - } - } - } + .pending_flush + .as_ref() + .expect("we should have a pending flush here"); + pf.clear_dirty(pager); // done with batch let shared = self.get_shared(); if (self.ongoing_checkpoint.current_page as usize) @@ -1513,7 +1520,7 @@ impl WalFile { self.ongoing_checkpoint.current_page += 1; self.ongoing_checkpoint.state = CheckpointState::ReadFrame; } else { - tracing::info!("transitioning checkpoint to done"); + tracing::debug!("WaitFlush transitioning checkpoint to Done"); self.ongoing_checkpoint.state = CheckpointState::Done; } } @@ -1522,9 +1529,13 @@ impl WalFile { // In Restart or Truncate mode, we need to restart the log over and possibly truncate the file // Release all locks and return the current num of wal frames and the amount we backfilled CheckpointState::Done => { - if *write_counter.borrow() > 0 { - return Ok(IOResult::IO); - } + turso_assert!( + self.ongoing_checkpoint + .pending_flush + .as_ref() + .is_some_and(|pf| pf.done.load(Ordering::Relaxed)), + "checkpoint pending flush must have finished" + ); let mut checkpoint_result = { let shared = self.get_shared(); let current_mx = shared.max_frame.load(Ordering::SeqCst); From 693b71449e3d542d3196506b1b7868f2f6d92ab4 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 29 Jul 2025 21:36:39 -0400 Subject: [PATCH 16/18] Clean up writev batching and apply suggestions --- core/io/io_uring.rs | 10 +-- core/storage/sqlite3_ondisk.rs | 60 ++++++++++------- core/storage/wal.rs | 115 +++++++++++++++++++++------------ 3 files changed, 117 insertions(+), 68 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index ed0a0f7d8..b2afeb652 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -20,7 +20,8 @@ use tracing::{debug, trace}; const ENTRIES: u32 = 512; /// Idle timeout for the sqpoll kernel thread before it needs -/// to be woken back up by a call to IORING_ENTER +/// to be woken back up by a call IORING_ENTER_SQ_WAKEUP flag. +/// (handled by the io_uring crate in `submit_and_wait`) const SQPOLL_IDLE: u32 = 1000; /// Number of file descriptors we preallocate for io_uring. @@ -35,7 +36,7 @@ const IOVEC_POOL_SIZE: usize = 64; const MAX_IOVEC_ENTRIES: usize = CKPT_BATCH_PAGES; /// Maximum number of I/O operations to wait for in a single run, -/// waiting for > 1 can reduce the amount of IOURING_ENTER syscalls we +/// waiting for > 1 can reduce the amount of `io_uring_enter` syscalls we /// make, but can increase single operation latency. const MAX_WAIT: usize = 4; @@ -137,8 +138,9 @@ macro_rules! with_fd { }; } -/// wrapper type to represent a possibly registered file desriptor, -/// only used in WritevState +/// wrapper type to represent a possibly registered file descriptor, +/// only used in WritevState, and piggy-backs on the available methods from +/// `UringFile`, so we don't have to store the file on `WritevState`. enum Fd { Fixed(u32), RawFd(i32), diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 1d58f444e..a57734a54 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -58,7 +58,7 @@ use crate::storage::btree::{payload_overflow_threshold_max, payload_overflow_thr use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::pager::Pager; -use crate::storage::wal::{BatchItem, PendingFlush}; +use crate::storage::wal::PendingFlush; use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype}; use crate::{turso_assert, File, Result, WalFileShared}; use std::cell::{RefCell, UnsafeCell}; @@ -854,9 +854,19 @@ pub fn begin_write_btree_page( } #[instrument(skip_all, level = Level::DEBUG)] +/// Write a batch of pages to the database file. +/// +/// we have a batch of pages to write, lets say the following: +/// (they are already sorted by id thanks to BTreeMap) +/// [1,2,3,6,7,9,10,11,12] +// +/// we want to collect this into runs of: +/// [1,2,3], [6,7], [9,10,11,12] +/// and submit each run as a `writev` call, +/// for 3 total syscalls instead of 9. pub fn write_pages_vectored( pager: &Pager, - batch: BTreeMap, + batch: BTreeMap>>, ) -> Result { if batch.is_empty() { return Ok(PendingFlush::default()); @@ -866,7 +876,6 @@ pub fn write_pages_vectored( // to submit as `writev`/write_pages calls. let page_sz = pager.page_size.get().unwrap_or(DEFAULT_PAGE_SIZE) as usize; - let mut all_ids = Vec::with_capacity(batch.len()); // Count expected number of runs to create the atomic counter we need to track each batch let mut run_count = 0; @@ -885,53 +894,60 @@ pub fn write_pages_vectored( // Create the atomic counters let runs_left = Arc::new(AtomicUsize::new(run_count)); let done = Arc::new(AtomicBool::new(false)); - let mut run_start_id = None; // we know how many runs, but we don't know how many buffers per run, so we can only give an // estimate of the capacity const EST_BUFF_CAPACITY: usize = 32; - let mut run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY); - let mut run_ids = Vec::with_capacity(EST_BUFF_CAPACITY); // Iterate through the batch, submitting each run as soon as it ends - let mut iter = batch.iter().peekable(); - while let Some((&id, item)) = iter.next() { + // We can reuse this across runs without reallocating + let mut run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY); + let mut run_start_id: Option = None; + let mut all_ids = Vec::with_capacity(batch.len()); + + // Iterate through the batch + let mut iter = batch.into_iter().peekable(); + + while let Some((id, item)) = iter.next() { + // Track the start of the run if run_start_id.is_none() { run_start_id = Some(id); } - run_bufs.push(item.buf.clone()); - run_ids.push(id); + // Add this page to the current run + run_bufs.push(item); + all_ids.push(id); - // Check if this is the end of a run, either the next key is not consecutive or this is the last entry + // Check if this is the end of a run let is_end_of_run = match iter.peek() { - Some((&next_id, _)) => next_id != id + 1, - None => true, // Last item is always end of a run + Some(&(next_id, _)) => next_id != id + 1, + None => true, }; if is_end_of_run { - // Submit this run immediately - let start_id = run_start_id.unwrap(); + let start_id = run_start_id.expect("should have a start id"); let runs_left_cl = runs_left.clone(); let done_cl = done.clone(); + let c = Completion::new_write(move |_| { if runs_left_cl.fetch_sub(1, Ordering::AcqRel) == 1 { done_cl.store(true, Ordering::Release); } }); - // Submit and decrement the runs_left counter on error - if let Err(e) = pager.db_file.write_pages(start_id, page_sz, run_bufs, c) { + // Submit write operation for this run, decrementing the counter if we error + if let Err(e) = pager + .db_file + .write_pages(start_id, page_sz, run_bufs.clone(), c) + { if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 { done.store(true, Ordering::Release); } return Err(e); } - // Add IDs to the all_ids list and prepare for the next run - all_ids.extend(run_ids); + + // Reset for next run + run_bufs.clear(); run_start_id = None; - // .clear() will cause borrowing issue, unfortunately we have to reassign - run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY); - run_ids = Vec::with_capacity(EST_BUFF_CAPACITY); } } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index b37e3af54..db9f80cd8 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -401,10 +401,56 @@ pub enum CheckpointState { /// IOV_MAX is 1024 on most systems, lets use 512 to be safe pub const CKPT_BATCH_PAGES: usize = 512; +type PageId = usize; -#[derive(Clone)] -pub(super) struct BatchItem { - pub(super) buf: Arc>, +/// Batch is a collection of pages that are being checkpointed together. It is used to +/// aggregate contiguous pages into a single write operation to the database file. +pub(super) struct Batch { + items: BTreeMap>>, +} +// TODO(preston): implement the same thing for `readv` +impl Batch { + fn new() -> Self { + Self { + items: BTreeMap::new(), + } + } + fn add_to_batch(&mut self, scratch: &PageRef, pool: &Arc) { + let (id, buf_clone) = unsafe { + let inner = &*scratch.inner.get(); + let id = inner.id; + let contents = inner.contents.as_ref().expect("scratch has contents"); + let buf = contents.buffer.clone(); + (id, buf) + }; + // Insert the new batch item at the correct position + self.items.insert(id, buf_clone); + + // Re-initialize scratch with a fresh buffer + let raw = pool.get(); + let pool_clone = pool.clone(); + let drop_fn = Rc::new(move |b| pool_clone.put(b)); + let new_buf = Arc::new(RefCell::new(Buffer::new(raw, drop_fn))); + + unsafe { + let inner = &mut *scratch.inner.get(); + inner.contents = Some(PageContent::new(0, new_buf)); + // reset flags on scratch so it won't be cleared later with the real page + inner.flags.store(0, Ordering::SeqCst); + } + } +} + +impl std::ops::Deref for Batch { + type Target = BTreeMap>>; + fn deref(&self) -> &Self::Target { + &self.items + } +} +impl std::ops::DerefMut for Batch { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.items + } } // Checkpointing is a state machine that has multiple steps. Since there are multiple steps we save @@ -417,7 +463,7 @@ pub(super) struct BatchItem { // range. This is inefficient for now. struct OngoingCheckpoint { scratch_page: PageRef, - batch: BTreeMap, + batch: Batch, state: CheckpointState, pending_flush: Option, min_frame: u64, @@ -706,35 +752,6 @@ impl Drop for CheckpointLocks { } } -fn take_page_into_batch( - scratch: &PageRef, - pool: &Arc, - batch: &mut BTreeMap, -) { - let (id, buf_clone) = unsafe { - let inner = &*scratch.inner.get(); - let id = inner.id; - let contents = inner.contents.as_ref().expect("scratch has contents"); - let buf = contents.buffer.clone(); - (id, buf) - }; - // Insert the new batch item at the correct position - batch.insert(id, BatchItem { buf: buf_clone }); - - // Re-initialize scratch with a fresh buffer - let raw = pool.get(); - let pool_clone = pool.clone(); - let drop_fn = Rc::new(move |b| pool_clone.put(b)); - let new_buf = Arc::new(RefCell::new(Buffer::new(raw, drop_fn))); - - unsafe { - let inner = &mut *scratch.inner.get(); - inner.contents = Some(PageContent::new(0, new_buf)); - // reset flags on scratch so it won't be cleared later with the real page - inner.flags.store(0, Ordering::SeqCst); - } -} - impl Wal for WalFile { /// Begin a read transaction. The caller must ensure that there is not already /// an ongoing read transaction. @@ -1275,7 +1292,7 @@ impl WalFile { shared, ongoing_checkpoint: OngoingCheckpoint { scratch_page: checkpoint_page, - batch: BTreeMap::new(), + batch: Batch::new(), pending_flush: None, state: CheckpointState::Start, min_frame: 0, @@ -1432,7 +1449,14 @@ impl WalFile { let frame_cache = frame_cache.lock(); assert!(self.ongoing_checkpoint.current_page as usize <= pages_in_frames.len()); if self.ongoing_checkpoint.current_page as usize == pages_in_frames.len() { - self.ongoing_checkpoint.state = CheckpointState::Done; + if self.ongoing_checkpoint.batch.is_empty() { + // no more pages to checkpoint, we are done + tracing::info!("checkpoint done, no more pages to checkpoint"); + self.ongoing_checkpoint.state = CheckpointState::Done; + } else { + // flush the batch + self.ongoing_checkpoint.state = CheckpointState::FlushBatch; + } continue 'checkpoint_loop; } let page = pages_in_frames[self.ongoing_checkpoint.current_page as usize]; @@ -1471,18 +1495,25 @@ impl WalFile { // mark before batching self.ongoing_checkpoint.scratch_page.set_dirty(); // we read the frame into memory, add it to our batch - take_page_into_batch( - &self.ongoing_checkpoint.scratch_page, - &self.buffer_pool, - &mut self.ongoing_checkpoint.batch, - ); + self.ongoing_checkpoint + .batch + .add_to_batch(&self.ongoing_checkpoint.scratch_page, &self.buffer_pool); + let more_pages = (self.ongoing_checkpoint.current_page as usize) - < self.get_shared().pages_in_frames.lock().len() - 1 - && self.ongoing_checkpoint.batch.len() < CKPT_BATCH_PAGES; + < self + .get_shared() + .pages_in_frames + .lock() + .len() + .saturating_sub(1) + && !self.ongoing_checkpoint.batch.is_full(); + + // if we can read more pages, continue reading and accumulating pages if more_pages { self.ongoing_checkpoint.current_page += 1; self.ongoing_checkpoint.state = CheckpointState::ReadFrame; } else { + // if we have enough pages in the batch, flush it self.ongoing_checkpoint.state = CheckpointState::FlushBatch; } } From ade1c182ded88aa5cde5799c0250cb1caedd195d Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 29 Jul 2025 22:07:49 -0400 Subject: [PATCH 17/18] Add is_full method to checkpoint batch --- core/storage/sqlite3_ondisk.rs | 5 ++++- core/storage/wal.rs | 16 +++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index a57734a54..e196c2ae5 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -963,7 +963,10 @@ pub fn write_pages_vectored( } #[instrument(skip_all, level = Level::DEBUG)] -pub fn begin_sync(db_file: Arc, syncing: Rc>) -> Result<()> { +pub fn begin_sync( + db_file: Arc, + syncing: Rc>, +) -> Result { assert!(!*syncing.borrow()); *syncing.borrow_mut() = true; let completion = Completion::new_sync(move |_| { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index db9f80cd8..cbc178992 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -415,6 +415,9 @@ impl Batch { items: BTreeMap::new(), } } + fn is_full(&self) -> bool { + self.items.len() >= CKPT_BATCH_PAGES + } fn add_to_batch(&mut self, scratch: &PageRef, pool: &Arc) { let (id, buf_clone) = unsafe { let inner = &*scratch.inner.get(); @@ -1560,13 +1563,12 @@ impl WalFile { // In Restart or Truncate mode, we need to restart the log over and possibly truncate the file // Release all locks and return the current num of wal frames and the amount we backfilled CheckpointState::Done => { - turso_assert!( - self.ongoing_checkpoint - .pending_flush - .as_ref() - .is_some_and(|pf| pf.done.load(Ordering::Relaxed)), - "checkpoint pending flush must have finished" - ); + if let Some(pf) = self.ongoing_checkpoint.pending_flush.as_ref() { + turso_assert!( + pf.done.load(Ordering::Relaxed), + "checkpoint pending flush must have finished" + ); + } let mut checkpoint_result = { let shared = self.get_shared(); let current_mx = shared.max_frame.load(Ordering::SeqCst); From 2e741641e68cf9c38c62428d608eb318f927bd03 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Wed, 30 Jul 2025 19:42:38 -0400 Subject: [PATCH 18/18] Add test to assert we are backfilling all the rows properly with vectored writes --- core/storage/wal.rs | 88 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index cbc178992..90e407cd9 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -2063,6 +2063,25 @@ pub mod test { } } + fn count_test_table(conn: &Arc) -> i64 { + let mut stmt = conn.prepare("select count(*) from test").unwrap(); + loop { + match stmt.step() { + Ok(StepResult::Row) => { + break; + } + Ok(StepResult::IO) => { + stmt.run_once().unwrap(); + } + _ => { + panic!("Failed to step through the statement"); + } + } + } + let count: i64 = stmt.row().unwrap().get(0).unwrap(); + count + } + fn run_checkpoint_until_done( wal: &mut dyn Wal, pager: &crate::Pager, @@ -2641,6 +2660,75 @@ pub mod test { std::fs::remove_dir_all(path).unwrap(); } + #[test] + fn test_wal_checkpoint_truncate_db_file_contains_data() { + let (db, path) = get_database(); + let conn = db.connect().unwrap(); + + let walpath = { + let mut p = path.clone().into_os_string().into_string().unwrap(); + p.push_str("/test.db-wal"); + std::path::PathBuf::from(p) + }; + + conn.execute("create table test(id integer primary key, value text)") + .unwrap(); + bulk_inserts(&conn, 10, 100); + + // Get size before checkpoint + let size_before = std::fs::metadata(&walpath).unwrap().len(); + assert!(size_before > 0, "WAL file should have content"); + + // Do a TRUNCATE checkpoint + { + let pager = conn.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Truncate); + } + + // Check file size after truncate + let size_after = std::fs::metadata(&walpath).unwrap().len(); + assert_eq!(size_after, 0, "WAL file should be truncated to 0 bytes"); + + // Verify we can still write to the database + conn.execute("INSERT INTO test VALUES (1001, 'after-truncate')") + .unwrap(); + + // Check WAL has new content + let new_size = std::fs::metadata(&walpath).unwrap().len(); + assert!(new_size >= 32, "WAL file too small"); + let hdr = read_wal_header(&walpath); + let expected_magic = if cfg!(target_endian = "big") { + sqlite3_ondisk::WAL_MAGIC_BE + } else { + sqlite3_ondisk::WAL_MAGIC_LE + }; + assert!( + hdr.magic == expected_magic, + "bad WAL magic: {:#X}, expected: {:#X}", + hdr.magic, + sqlite3_ondisk::WAL_MAGIC_BE + ); + assert_eq!(hdr.file_format, 3007000); + assert_eq!(hdr.page_size, 4096, "invalid page size"); + assert_eq!(hdr.checkpoint_seq, 1, "invalid checkpoint_seq"); + { + let pager = conn.pager.borrow(); + let mut wal = pager.wal.borrow_mut(); + run_checkpoint_until_done(&mut *wal, &pager, CheckpointMode::Passive); + } + // delete the WAL file so we can read right from db and assert + // that everything was backfilled properly + std::fs::remove_file(&walpath).unwrap(); + + let count = count_test_table(&conn); + assert_eq!( + count, 1001, + "we should have 1001 rows in the table all together" + ); + std::fs::remove_dir_all(path).unwrap(); + } + fn read_wal_header(path: &std::path::Path) -> sqlite3_ondisk::WalHeader { use std::{fs::File, io::Read}; let mut hdr = [0u8; 32];