Merge 'coalesce any adjacent buffers from writev calls into fewer iovecs' from Preston Thorpe

In `io_uring` and `unix` IO backends, we can check if our buffers are
sequential in memory and reduce the number of iovecs per call. Although
this is highly unlikely to actually happen at the moment due to our
buffer pool implementation.
Later on, when #2419 is merged, we will be able to specifically request
runs of contiguous buffers, so that our `writev` calls will (in the
ideal case) be coalesced into a single `pwrite` or preferrably
`WriteFixed` operation on the io_uring backend.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #2436
This commit is contained in:
Jussi Saurio
2025-08-04 23:54:57 +03:00
committed by GitHub
2 changed files with 65 additions and 24 deletions

View File

@@ -312,29 +312,45 @@ impl WrappedIOUring {
)
});
let mut iov_count = 0;
for (idx, buffer) in st
.bufs
.iter()
.enumerate()
.skip(st.current_buffer_idx)
.take(MAX_IOVEC_ENTRIES)
{
let mut last_end: Option<(*const u8, usize)> = None;
for buffer in st.bufs.iter().skip(st.current_buffer_idx) {
let buf = buffer.borrow();
let buf_slice = buf.as_slice();
// ensure we are providing a pointer to the proper offset in the buffer
let slice = if idx == st.current_buffer_idx {
&buf_slice[st.current_buffer_offset..]
} else {
buf_slice
};
if slice.is_empty() {
continue;
let ptr = buf.as_ptr();
let len = buf.len();
if let Some((last_ptr, last_len)) = last_end {
// Check if this buffer is adjacent to the last
if unsafe { last_ptr.add(last_len) } == ptr {
// Extend the last iovec instead of adding new
iov_allocation[iov_count - 1].iov_len += len;
last_end = Some((last_ptr, last_len + len));
continue;
}
}
// Add new iovec
iov_allocation[iov_count] = libc::iovec {
iov_base: slice.as_ptr() as *mut _,
iov_len: slice.len(),
iov_base: ptr as *mut _,
iov_len: len,
};
last_end = Some((ptr, len));
iov_count += 1;
if iov_count >= MAX_IOVEC_ENTRIES {
break;
}
}
if iov_count == 1 {
// If we have coalesced everything into a single iovec, submit as a single`pwrite`
let entry = with_fd!(st.file_id, |fd| {
io_uring::opcode::Write::new(
fd,
iov_allocation[0].iov_base as *const u8,
iov_allocation[0].iov_len as u32,
)
.offset(st.file_pos as u64)
.build()
.user_data(key)
});
self.submit_entry(&entry);
return;
}
// Store the pointers and get the pointer to the iovec array that we pass
// to the writev operation, and keep the array itself alive

View File

@@ -207,19 +207,44 @@ fn try_pwritev_raw(
) -> std::io::Result<usize> {
const MAX_IOV: usize = 1024;
let iov_len = std::cmp::min(bufs.len() - start_idx, MAX_IOV);
let mut iov = Vec::with_capacity(iov_len);
let mut iov: Vec<libc::iovec> = Vec::with_capacity(iov_len);
let mut last_end: Option<(*const u8, usize)> = None;
let mut iov_count = 0;
for (i, b) in bufs.iter().enumerate().skip(start_idx).take(iov_len) {
let r = b.borrow(); // borrow just to get pointer/len
let s = r.as_slice();
let s = if i == start_idx { &s[start_off..] } else { s };
let ptr = if i == start_idx { &s[start_off..] } else { s }.as_ptr();
let len = r.len();
if let Some((last_ptr, last_len)) = last_end {
// Check if this buffer is adjacent to the last
if unsafe { last_ptr.add(last_len) } == ptr {
// Extend the last iovec instead of adding new
iov[iov_count - 1].iov_len += len;
last_end = Some((last_ptr, last_len + len));
continue;
}
}
last_end = Some((ptr, len));
iov_count += 1;
iov.push(libc::iovec {
iov_base: s.as_ptr() as *mut _,
iov_len: s.len(),
iov_base: ptr as *mut libc::c_void,
iov_len: len,
});
}
let n = unsafe { libc::pwritev(fd, iov.as_ptr(), iov.len() as i32, off as i64) };
let n = if iov.len().eq(&1) {
unsafe {
libc::pwrite(
fd,
iov[0].iov_base as *const libc::c_void,
iov[0].iov_len,
off as i64,
)
}
} else {
unsafe { libc::pwritev(fd, iov.as_ptr(), iov.len() as i32, off as i64) }
};
if n < 0 {
Err(std::io::Error::last_os_error())
} else {