Clean up io_uring writev implementation, add iovec and cqe cache

This commit is contained in:
PThorpe92
2025-07-26 18:37:40 -04:00
parent 689007cb74
commit efcffd380d
4 changed files with 164 additions and 107 deletions

View File

@@ -19,6 +19,9 @@ use tracing::{debug, trace};
const ENTRIES: u32 = 512;
const SQPOLL_IDLE: u32 = 1000;
const FILES: u32 = 8;
const IOVEC_POOL_SIZE: usize = 64;
const MAX_IOVEC_ENTRIES: usize = CKPT_BATCH_PAGES;
const MAX_WAIT: usize = 8;
pub struct UringIO {
inner: Rc<RefCell<InnerUringIO>>,
@@ -31,8 +34,8 @@ struct WrappedIOUring {
ring: io_uring::IoUring,
pending_ops: usize,
writev_states: HashMap<u64, WritevState>,
pending: [Option<Arc<Completion>>; ENTRIES as usize + 1],
key: u64,
iov_pool: IovecPool,
cqes: [Cqe; ENTRIES as usize + 1],
}
struct InnerUringIO {
@@ -40,6 +43,38 @@ struct InnerUringIO {
free_files: VecDeque<u32>,
}
/// preallocated vec of iovec arrays to avoid allocations during writev operations
struct IovecPool {
pool: Vec<Box<[libc::iovec; MAX_IOVEC_ENTRIES]>>,
}
impl IovecPool {
fn new() -> Self {
let mut pool = Vec::with_capacity(IOVEC_POOL_SIZE);
for _ in 0..IOVEC_POOL_SIZE {
pool.push(Box::new(
[libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 0,
}; MAX_IOVEC_ENTRIES],
));
}
Self { pool }
}
#[inline(always)]
fn acquire(&mut self) -> Option<Box<[libc::iovec; MAX_IOVEC_ENTRIES]>> {
self.pool.pop()
}
#[inline(always)]
fn release(&mut self, iovec: Box<[libc::iovec; MAX_IOVEC_ENTRIES]>) {
if self.pool.len() < IOVEC_POOL_SIZE {
self.pool.push(iovec);
}
}
}
impl UringIO {
pub fn new() -> Result<Self> {
let ring = match io_uring::IoUring::builder()
@@ -58,8 +93,11 @@ impl UringIO {
ring,
pending_ops: 0,
writev_states: HashMap::new(),
pending: [const { None }; ENTRIES as usize + 1],
key: 0,
iov_pool: IovecPool::new(),
cqes: [Cqe {
user_data: 0,
result: 0,
}; ENTRIES as usize + 1],
},
free_files: (0..FILES).collect(),
};
@@ -114,13 +152,14 @@ struct WritevState {
current_buffer_idx: usize,
// intra-buffer offset
current_buffer_offset: usize,
// total bytes written so far
total_written: usize,
// cache the sum of all buffer lengths
total_len: usize,
bufs: Vec<Arc<RefCell<crate::Buffer>>>,
// we keep the last iovec allocation alive until CQE:
// raw ptr to Box<[iovec]>
last_iov: *mut libc::iovec,
last_iov_len: usize,
// we keep the last iovec allocation alive until CQE.
// pointer to the beginning of the iovec array
last_iov_allocation: Option<Box<[libc::iovec; MAX_IOVEC_ENTRIES]>>,
}
impl WritevState {
@@ -129,6 +168,7 @@ impl WritevState {
Some(id) => Fd::Fixed(id),
None => Fd::RawFd(file.as_raw_fd()),
};
let total_len = bufs.iter().map(|b| b.borrow().len()).sum();
Self {
file_id,
file_pos: pos,
@@ -136,9 +176,8 @@ impl WritevState {
current_buffer_offset: 0,
total_written: 0,
bufs,
last_iov: core::ptr::null_mut(),
last_iov_len: 0,
total_len: bufs.iter().map(|b| b.borrow().len()).sum(),
last_iov_allocation: None,
total_len,
}
}
@@ -171,22 +210,21 @@ impl WritevState {
self.total_written += written;
}
/// Free the allocation that keeps the iovec array alive while writev is ongoing
#[inline(always)]
fn free_last_iov(&mut self) {
if !self.last_iov.is_null() {
unsafe {
drop(Box::from_raw(core::slice::from_raw_parts_mut(
self.last_iov,
self.last_iov_len,
)))
};
self.last_iov = core::ptr::null_mut();
self.last_iov_len = 0;
/// Free the allocation that keeps the iovec array alive while writev is ongoing
fn free_last_iov(&mut self, pool: &mut IovecPool) {
if let Some(allocation) = self.last_iov_allocation.take() {
pool.release(allocation);
}
}
}
#[derive(Clone, Copy)]
struct Cqe {
user_data: u64,
result: i32,
}
impl InnerUringIO {
fn register_file(&mut self, fd: i32) -> Result<u32> {
if let Some(slot) = self.free_files.pop_front() {
@@ -211,9 +249,8 @@ impl InnerUringIO {
}
impl WrappedIOUring {
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Arc<Completion>) {
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) {
trace!("submit_entry({:?})", entry);
self.pending[entry.get_user_data() as usize] = Some(c);
unsafe {
let mut sub = self.ring.submission_shared();
match sub.push(entry) {
@@ -228,11 +265,11 @@ impl WrappedIOUring {
}
}
fn wait_for_completion(&mut self) -> Result<()> {
fn submit_and_wait(&mut self) -> Result<()> {
if self.pending_ops == 0 {
return Ok(());
}
let wants = std::cmp::min(self.pending_ops, 8);
let wants = std::cmp::min(self.pending_ops, MAX_WAIT);
tracing::trace!("submit_and_wait for {wants} pending operations to complete");
self.ring.submit_and_wait(wants)?;
Ok(())
@@ -242,52 +279,111 @@ impl WrappedIOUring {
self.pending_ops == 0
}
/// Submit a writev operation for the given key. WritevState MUST exist in the map
/// of `writev_states`
fn submit_writev(&mut self, key: u64, mut st: WritevState, c: Arc<Completion>) {
self.writev_states.insert(key, st);
// the likelyhood of the whole batch size being contiguous is very low, so lets not pre-allocate more than half
let max = CKPT_BATCH_PAGES / 2;
let mut iov = Vec::with_capacity(max);
for (i, b) in st
/// Submit or resubmit a writev operation
fn submit_writev(&mut self, key: u64, mut st: WritevState) {
st.free_last_iov(&mut self.iov_pool);
let mut iov_allocation = match self.iov_pool.acquire() {
Some(alloc) => alloc,
None => {
// Fallback: allocate a new one if pool is exhausted
Box::new(
[libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 0,
}; MAX_IOVEC_ENTRIES],
)
}
};
let mut iov_count = 0;
for (idx, buffer) in st
.bufs
.iter()
.enumerate()
.skip(st.current_buffer_idx)
.take(max)
.take(MAX_IOVEC_ENTRIES)
{
let r = b.borrow();
let s = r.as_slice();
let slice = if i == st.current_buffer_idx {
&s[st.current_buffer_offset..]
let buf = buffer.borrow();
let buf_slice = buf.as_slice();
let slice = if idx == st.current_buffer_idx {
&buf_slice[st.current_buffer_offset..]
} else {
s
buf_slice
};
if slice.is_empty() {
continue;
}
iov.push(libc::iovec {
iov_allocation[iov_count] = libc::iovec {
iov_base: slice.as_ptr() as *mut _,
iov_len: slice.len(),
});
};
iov_count += 1;
}
// Store the allocation and get the pointer
let ptr = iov_allocation.as_ptr() as *mut libc::iovec;
st.last_iov_allocation = Some(iov_allocation);
// keep iov alive until CQE
let boxed = iov.into_boxed_slice();
let ptr = boxed.as_ptr() as *mut libc::iovec;
let len = boxed.len();
st.free_last_iov();
st.last_iov = ptr;
st.last_iov_len = len;
// leak the iovec array, will be freed when CQE processed
let _ = Box::into_raw(boxed);
let entry = with_fd!(st.file_id, |fd| {
io_uring::opcode::Writev::new(fd, ptr, len as u32)
io_uring::opcode::Writev::new(fd, ptr, iov_count as u32)
.offset(st.file_pos as u64)
.build()
.user_data(key)
});
self.submit_entry(&entry, c.clone());
self.writev_states.insert(key, st);
self.submit_entry(&entry);
}
// to circumvent borrowing rules, collect everything into preallocated array
// and return the number of completed operations
fn reap_cqes(&mut self) -> usize {
let mut count = 0;
{
for cqe in self.ring.completion() {
self.pending_ops -= 1;
self.cqes[count] = Cqe {
user_data: cqe.user_data(),
result: cqe.result(),
};
count += 1;
if count == ENTRIES as usize {
break;
}
}
}
count
}
fn handle_writev_completion(&mut self, mut st: WritevState, user_data: u64, result: i32) {
if result < 0 {
tracing::error!(
"writev operation failed for user_data {}: {}",
user_data,
std::io::Error::from_raw_os_error(result)
);
// error: free iov allocation and call completion with error code
st.free_last_iov(&mut self.iov_pool);
completion_from_key(user_data).complete(result);
} else {
let written = result as usize;
st.advance(written);
if st.remaining() == 0 {
tracing::info!(
"writev operation completed: wrote {} bytes",
st.total_written
);
// write complete, return iovec to pool
st.free_last_iov(&mut self.iov_pool);
completion_from_key(user_data).complete(st.total_written as i32);
} else {
tracing::trace!(
"resubmitting writev operation for user_data {}: wrote {} bytes, remaining {}",
user_data,
written,
st.remaining()
);
// partial write, submit next
self.submit_writev(user_data, st);
}
}
}
}
@@ -345,55 +441,22 @@ impl IO for UringIO {
}
fn run_once(&self) -> Result<()> {
trace!("run_once()");
let mut inner = self.inner.borrow_mut();
let ring = &mut inner.ring;
if ring.empty() {
return Ok(());
}
ring.wait_for_completion()?;
const MAX: usize = ENTRIES as usize;
// to circumvent borrowing rules, collect everything without the heap
let mut uds = [0u64; MAX];
let mut ress = [0i32; MAX];
let mut count = 0;
{
let cq = ring.ring.completion();
for cqe in cq {
ring.pending_ops -= 1;
uds[count] = cqe.user_data();
ress[count] = cqe.result();
count += 1;
if count == MAX {
break;
}
}
}
ring.submit_and_wait()?;
let count = ring.reap_cqes();
for i in 0..count {
ring.pending_ops -= 1;
let user_data = uds[i];
let result = ress[i];
let Cqe { user_data, result } = ring.cqes[i];
turso_assert!(
user_data != 0,
"user_data must not be zero, we dont submit linked timeouts or cancelations that would cause this"
);
if let Some(mut st) = ring.writev_states.remove(&user_data) {
if result < 0 {
st.free_last_iov();
completion_from_key(ud).complete(result);
} else {
let written = result as usize;
st.free_last_iov();
st.advance(written);
if st.remaining() == 0 {
// write complete
c.complete(st.total_written as i32);
} else {
// partial write, submit next
ring.submit_writev(user_data, st, c.clone());
}
}
if let Some(state) = ring.writev_states.remove(&user_data) {
ring.handle_writev_completion(state, user_data, result);
continue;
}
completion_from_key(user_data).complete(result)
@@ -508,10 +571,10 @@ impl File for UringFile {
io_uring::opcode::Read::new(fd, buf, len as u32)
.offset(pos as u64)
.build()
.user_data(io.ring.get_key())
.user_data(get_key(c.clone()))
})
};
io.ring.submit_entry(&read_e, c.clone());
io.ring.submit_entry(&read_e);
Ok(c)
}
@@ -529,10 +592,10 @@ impl File for UringFile {
io_uring::opcode::Write::new(fd, buf.as_ptr(), buf.len() as u32)
.offset(pos as u64)
.build()
.user_data(io.ring.get_key())
.user_data(get_key(c.clone()))
})
};
io.ring.submit_entry(&write, c.clone());
io.ring.submit_entry(&write);
Ok(c)
}
@@ -544,7 +607,7 @@ impl File for UringFile {
.build()
.user_data(get_key(c.clone()))
});
io.ring.submit_entry(&sync, c.clone());
io.ring.submit_entry(&sync);
Ok(c)
}
@@ -560,10 +623,9 @@ impl File for UringFile {
}
tracing::trace!("pwritev(pos = {}, bufs.len() = {})", pos, bufs.len());
let mut io = self.io.borrow_mut();
let key = io.ring.get_key();
// create state to track ongoing writev operation
let state = WritevState::new(self, pos, bufs);
io.ring.submit_writev(key, state, c.clone());
io.ring.submit_writev(get_key(c.clone()), state);
Ok(c)
}

View File

@@ -399,6 +399,7 @@ pub enum CheckpointState {
Done,
}
/// IOV_MAX is 1024 on most systems
pub const CKPT_BATCH_PAGES: usize = 1024;
#[derive(Clone)]
@@ -1129,8 +1130,8 @@ impl Wal for WalFile {
#[instrument(skip_all, level = Level::DEBUG)]
fn should_checkpoint(&self) -> bool {
let shared = self.get_shared();
let nbackfills = shared.nbackfills.load(Ordering::SeqCst) as usize;
let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize;
let nbackfills = shared.nbackfills.load(Ordering::SeqCst) as usize;
frame_id > self.checkpoint_threshold + nbackfills
}

View File

@@ -207,12 +207,6 @@ mod tests {
#[cfg(not(feature = "sqlite3"))]
mod libsql_ext {
<<<<<<< HEAD
||||||| parent of 7f61fbb8 (Update test to match cacheflush behavior)
=======
use limbo_sqlite3::sqlite3_close_v2;
>>>>>>> 7f61fbb8 (Update test to match cacheflush behavior)
use super::*;
#[test]

View File

@@ -13,7 +13,7 @@ from typing import Dict
from cli_tests.console import error, info, test
from cli_tests.test_turso_cli import TestTursoShell
LIMBO_BIN = Path("./target/debug/tursodb")
LIMBO_BIN = Path("./target/release/tursodb")
DB_FILE = Path("testing/temp.db")
vfs_list = ["syscall"]
if platform.system() == "Linux":