diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 0c88fc4c7..e29ffd95c 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -19,6 +19,9 @@ use tracing::{debug, trace}; const ENTRIES: u32 = 512; const SQPOLL_IDLE: u32 = 1000; const FILES: u32 = 8; +const IOVEC_POOL_SIZE: usize = 64; +const MAX_IOVEC_ENTRIES: usize = CKPT_BATCH_PAGES; +const MAX_WAIT: usize = 8; pub struct UringIO { inner: Rc>, @@ -31,8 +34,8 @@ struct WrappedIOUring { ring: io_uring::IoUring, pending_ops: usize, writev_states: HashMap, - pending: [Option>; ENTRIES as usize + 1], - key: u64, + iov_pool: IovecPool, + cqes: [Cqe; ENTRIES as usize + 1], } struct InnerUringIO { @@ -40,6 +43,38 @@ struct InnerUringIO { free_files: VecDeque, } +/// preallocated vec of iovec arrays to avoid allocations during writev operations +struct IovecPool { + pool: Vec>, +} + +impl IovecPool { + fn new() -> Self { + let mut pool = Vec::with_capacity(IOVEC_POOL_SIZE); + for _ in 0..IOVEC_POOL_SIZE { + pool.push(Box::new( + [libc::iovec { + iov_base: std::ptr::null_mut(), + iov_len: 0, + }; MAX_IOVEC_ENTRIES], + )); + } + Self { pool } + } + + #[inline(always)] + fn acquire(&mut self) -> Option> { + self.pool.pop() + } + + #[inline(always)] + fn release(&mut self, iovec: Box<[libc::iovec; MAX_IOVEC_ENTRIES]>) { + if self.pool.len() < IOVEC_POOL_SIZE { + self.pool.push(iovec); + } + } +} + impl UringIO { pub fn new() -> Result { let ring = match io_uring::IoUring::builder() @@ -58,8 +93,11 @@ impl UringIO { ring, pending_ops: 0, writev_states: HashMap::new(), - pending: [const { None }; ENTRIES as usize + 1], - key: 0, + iov_pool: IovecPool::new(), + cqes: [Cqe { + user_data: 0, + result: 0, + }; ENTRIES as usize + 1], }, free_files: (0..FILES).collect(), }; @@ -114,13 +152,14 @@ struct WritevState { current_buffer_idx: usize, // intra-buffer offset current_buffer_offset: usize, + // total bytes written so far total_written: usize, + // cache the sum of all buffer lengths total_len: usize, bufs: Vec>>, - // we keep the last iovec allocation alive until CQE: - // raw ptr to Box<[iovec]> - last_iov: *mut libc::iovec, - last_iov_len: usize, + // we keep the last iovec allocation alive until CQE. + // pointer to the beginning of the iovec array + last_iov_allocation: Option>, } impl WritevState { @@ -129,6 +168,7 @@ impl WritevState { Some(id) => Fd::Fixed(id), None => Fd::RawFd(file.as_raw_fd()), }; + let total_len = bufs.iter().map(|b| b.borrow().len()).sum(); Self { file_id, file_pos: pos, @@ -136,9 +176,8 @@ impl WritevState { current_buffer_offset: 0, total_written: 0, bufs, - last_iov: core::ptr::null_mut(), - last_iov_len: 0, - total_len: bufs.iter().map(|b| b.borrow().len()).sum(), + last_iov_allocation: None, + total_len, } } @@ -171,22 +210,21 @@ impl WritevState { self.total_written += written; } - /// Free the allocation that keeps the iovec array alive while writev is ongoing #[inline(always)] - fn free_last_iov(&mut self) { - if !self.last_iov.is_null() { - unsafe { - drop(Box::from_raw(core::slice::from_raw_parts_mut( - self.last_iov, - self.last_iov_len, - ))) - }; - self.last_iov = core::ptr::null_mut(); - self.last_iov_len = 0; + /// Free the allocation that keeps the iovec array alive while writev is ongoing + fn free_last_iov(&mut self, pool: &mut IovecPool) { + if let Some(allocation) = self.last_iov_allocation.take() { + pool.release(allocation); } } } +#[derive(Clone, Copy)] +struct Cqe { + user_data: u64, + result: i32, +} + impl InnerUringIO { fn register_file(&mut self, fd: i32) -> Result { if let Some(slot) = self.free_files.pop_front() { @@ -211,9 +249,8 @@ impl InnerUringIO { } impl WrappedIOUring { - fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Arc) { + fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) { trace!("submit_entry({:?})", entry); - self.pending[entry.get_user_data() as usize] = Some(c); unsafe { let mut sub = self.ring.submission_shared(); match sub.push(entry) { @@ -228,11 +265,11 @@ impl WrappedIOUring { } } - fn wait_for_completion(&mut self) -> Result<()> { + fn submit_and_wait(&mut self) -> Result<()> { if self.pending_ops == 0 { return Ok(()); } - let wants = std::cmp::min(self.pending_ops, 8); + let wants = std::cmp::min(self.pending_ops, MAX_WAIT); tracing::trace!("submit_and_wait for {wants} pending operations to complete"); self.ring.submit_and_wait(wants)?; Ok(()) @@ -242,52 +279,111 @@ impl WrappedIOUring { self.pending_ops == 0 } - /// Submit a writev operation for the given key. WritevState MUST exist in the map - /// of `writev_states` - fn submit_writev(&mut self, key: u64, mut st: WritevState, c: Arc) { - self.writev_states.insert(key, st); - // the likelyhood of the whole batch size being contiguous is very low, so lets not pre-allocate more than half - let max = CKPT_BATCH_PAGES / 2; - let mut iov = Vec::with_capacity(max); - for (i, b) in st + /// Submit or resubmit a writev operation + fn submit_writev(&mut self, key: u64, mut st: WritevState) { + st.free_last_iov(&mut self.iov_pool); + let mut iov_allocation = match self.iov_pool.acquire() { + Some(alloc) => alloc, + None => { + // Fallback: allocate a new one if pool is exhausted + Box::new( + [libc::iovec { + iov_base: std::ptr::null_mut(), + iov_len: 0, + }; MAX_IOVEC_ENTRIES], + ) + } + }; + let mut iov_count = 0; + for (idx, buffer) in st .bufs .iter() .enumerate() .skip(st.current_buffer_idx) - .take(max) + .take(MAX_IOVEC_ENTRIES) { - let r = b.borrow(); - let s = r.as_slice(); - let slice = if i == st.current_buffer_idx { - &s[st.current_buffer_offset..] + let buf = buffer.borrow(); + let buf_slice = buf.as_slice(); + let slice = if idx == st.current_buffer_idx { + &buf_slice[st.current_buffer_offset..] } else { - s + buf_slice }; if slice.is_empty() { continue; } - iov.push(libc::iovec { + iov_allocation[iov_count] = libc::iovec { iov_base: slice.as_ptr() as *mut _, iov_len: slice.len(), - }); + }; + iov_count += 1; } + // Store the allocation and get the pointer + let ptr = iov_allocation.as_ptr() as *mut libc::iovec; + st.last_iov_allocation = Some(iov_allocation); - // keep iov alive until CQE - let boxed = iov.into_boxed_slice(); - let ptr = boxed.as_ptr() as *mut libc::iovec; - let len = boxed.len(); - st.free_last_iov(); - st.last_iov = ptr; - st.last_iov_len = len; - // leak the iovec array, will be freed when CQE processed - let _ = Box::into_raw(boxed); let entry = with_fd!(st.file_id, |fd| { - io_uring::opcode::Writev::new(fd, ptr, len as u32) + io_uring::opcode::Writev::new(fd, ptr, iov_count as u32) .offset(st.file_pos as u64) .build() .user_data(key) }); - self.submit_entry(&entry, c.clone()); + self.writev_states.insert(key, st); + self.submit_entry(&entry); + } + + // to circumvent borrowing rules, collect everything into preallocated array + // and return the number of completed operations + fn reap_cqes(&mut self) -> usize { + let mut count = 0; + { + for cqe in self.ring.completion() { + self.pending_ops -= 1; + self.cqes[count] = Cqe { + user_data: cqe.user_data(), + result: cqe.result(), + }; + count += 1; + if count == ENTRIES as usize { + break; + } + } + } + count + } + + fn handle_writev_completion(&mut self, mut st: WritevState, user_data: u64, result: i32) { + if result < 0 { + tracing::error!( + "writev operation failed for user_data {}: {}", + user_data, + std::io::Error::from_raw_os_error(result) + ); + // error: free iov allocation and call completion with error code + st.free_last_iov(&mut self.iov_pool); + completion_from_key(user_data).complete(result); + } else { + let written = result as usize; + st.advance(written); + if st.remaining() == 0 { + tracing::info!( + "writev operation completed: wrote {} bytes", + st.total_written + ); + // write complete, return iovec to pool + st.free_last_iov(&mut self.iov_pool); + completion_from_key(user_data).complete(st.total_written as i32); + } else { + tracing::trace!( + "resubmitting writev operation for user_data {}: wrote {} bytes, remaining {}", + user_data, + written, + st.remaining() + ); + // partial write, submit next + self.submit_writev(user_data, st); + } + } } } @@ -345,55 +441,22 @@ impl IO for UringIO { } fn run_once(&self) -> Result<()> { + trace!("run_once()"); let mut inner = self.inner.borrow_mut(); let ring = &mut inner.ring; - if ring.empty() { return Ok(()); } - ring.wait_for_completion()?; - const MAX: usize = ENTRIES as usize; - // to circumvent borrowing rules, collect everything without the heap - let mut uds = [0u64; MAX]; - let mut ress = [0i32; MAX]; - let mut count = 0; - { - let cq = ring.ring.completion(); - for cqe in cq { - ring.pending_ops -= 1; - uds[count] = cqe.user_data(); - ress[count] = cqe.result(); - count += 1; - if count == MAX { - break; - } - } - } - + ring.submit_and_wait()?; + let count = ring.reap_cqes(); for i in 0..count { - ring.pending_ops -= 1; - let user_data = uds[i]; - let result = ress[i]; + let Cqe { user_data, result } = ring.cqes[i]; turso_assert!( user_data != 0, "user_data must not be zero, we dont submit linked timeouts or cancelations that would cause this" ); - if let Some(mut st) = ring.writev_states.remove(&user_data) { - if result < 0 { - st.free_last_iov(); - completion_from_key(ud).complete(result); - } else { - let written = result as usize; - st.free_last_iov(); - st.advance(written); - if st.remaining() == 0 { - // write complete - c.complete(st.total_written as i32); - } else { - // partial write, submit next - ring.submit_writev(user_data, st, c.clone()); - } - } + if let Some(state) = ring.writev_states.remove(&user_data) { + ring.handle_writev_completion(state, user_data, result); continue; } completion_from_key(user_data).complete(result) @@ -508,10 +571,10 @@ impl File for UringFile { io_uring::opcode::Read::new(fd, buf, len as u32) .offset(pos as u64) .build() - .user_data(io.ring.get_key()) + .user_data(get_key(c.clone())) }) }; - io.ring.submit_entry(&read_e, c.clone()); + io.ring.submit_entry(&read_e); Ok(c) } @@ -529,10 +592,10 @@ impl File for UringFile { io_uring::opcode::Write::new(fd, buf.as_ptr(), buf.len() as u32) .offset(pos as u64) .build() - .user_data(io.ring.get_key()) + .user_data(get_key(c.clone())) }) }; - io.ring.submit_entry(&write, c.clone()); + io.ring.submit_entry(&write); Ok(c) } @@ -544,7 +607,7 @@ impl File for UringFile { .build() .user_data(get_key(c.clone())) }); - io.ring.submit_entry(&sync, c.clone()); + io.ring.submit_entry(&sync); Ok(c) } @@ -560,10 +623,9 @@ impl File for UringFile { } tracing::trace!("pwritev(pos = {}, bufs.len() = {})", pos, bufs.len()); let mut io = self.io.borrow_mut(); - let key = io.ring.get_key(); // create state to track ongoing writev operation let state = WritevState::new(self, pos, bufs); - io.ring.submit_writev(key, state, c.clone()); + io.ring.submit_writev(get_key(c.clone()), state); Ok(c) } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index d48d7dfad..f27fb9bab 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -399,6 +399,7 @@ pub enum CheckpointState { Done, } +/// IOV_MAX is 1024 on most systems pub const CKPT_BATCH_PAGES: usize = 1024; #[derive(Clone)] @@ -1129,8 +1130,8 @@ impl Wal for WalFile { #[instrument(skip_all, level = Level::DEBUG)] fn should_checkpoint(&self) -> bool { let shared = self.get_shared(); - let nbackfills = shared.nbackfills.load(Ordering::SeqCst) as usize; let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize; + let nbackfills = shared.nbackfills.load(Ordering::SeqCst) as usize; frame_id > self.checkpoint_threshold + nbackfills } diff --git a/sqlite3/tests/compat/mod.rs b/sqlite3/tests/compat/mod.rs index d3aa58001..e504c2a38 100644 --- a/sqlite3/tests/compat/mod.rs +++ b/sqlite3/tests/compat/mod.rs @@ -207,12 +207,6 @@ mod tests { #[cfg(not(feature = "sqlite3"))] mod libsql_ext { -<<<<<<< HEAD -||||||| parent of 7f61fbb8 (Update test to match cacheflush behavior) -======= - use limbo_sqlite3::sqlite3_close_v2; ->>>>>>> 7f61fbb8 (Update test to match cacheflush behavior) - use super::*; #[test] diff --git a/testing/cli_tests/vfs_bench.py b/testing/cli_tests/vfs_bench.py index 88abb6e6a..d081b7526 100644 --- a/testing/cli_tests/vfs_bench.py +++ b/testing/cli_tests/vfs_bench.py @@ -13,7 +13,7 @@ from typing import Dict from cli_tests.console import error, info, test from cli_tests.test_turso_cli import TestTursoShell -LIMBO_BIN = Path("./target/debug/tursodb") +LIMBO_BIN = Path("./target/release/tursodb") DB_FILE = Path("testing/temp.db") vfs_list = ["syscall"] if platform.system() == "Linux":