Remove unrelated io_uring changes

This commit is contained in:
PThorpe92
2025-07-26 16:52:47 -04:00
parent c0800ecc29
commit 689007cb74

View File

@@ -31,6 +31,8 @@ struct WrappedIOUring {
ring: io_uring::IoUring,
pending_ops: usize,
writev_states: HashMap<u64, WritevState>,
pending: [Option<Arc<Completion>>; ENTRIES as usize + 1],
key: u64,
}
struct InnerUringIO {
@@ -56,6 +58,8 @@ impl UringIO {
ring,
pending_ops: 0,
writev_states: HashMap::new(),
pending: [const { None }; ENTRIES as usize + 1],
key: 0,
},
free_files: (0..FILES).collect(),
};
@@ -111,6 +115,7 @@ struct WritevState {
// intra-buffer offset
current_buffer_offset: usize,
total_written: usize,
total_len: usize,
bufs: Vec<Arc<RefCell<crate::Buffer>>>,
// we keep the last iovec allocation alive until CQE:
// raw ptr to Box<[iovec]>
@@ -133,37 +138,31 @@ impl WritevState {
bufs,
last_iov: core::ptr::null_mut(),
last_iov_len: 0,
total_len: bufs.iter().map(|b| b.borrow().len()).sum(),
}
}
#[inline(always)]
fn remaining(&self) -> usize {
let mut total = 0;
for (i, b) in self.bufs.iter().enumerate().skip(self.current_buffer_idx) {
let r = b.borrow();
let len = r.len();
total += if i == self.current_buffer_idx {
len - self.current_buffer_offset
} else {
len
};
}
total
self.total_len - self.total_written
}
/// Advance (idx, off, pos) after written bytes
#[inline(always)]
fn advance(&mut self, written: usize) {
let mut rem = written;
while rem > 0 {
let len = {
let mut remaining = written;
while remaining > 0 {
let current_buf_len = {
let r = self.bufs[self.current_buffer_idx].borrow();
r.len()
};
let left = len - self.current_buffer_offset;
if rem < left {
self.current_buffer_offset += rem;
self.file_pos += rem;
rem = 0;
let left = current_buf_len - self.current_buffer_offset;
if remaining < left {
self.current_buffer_offset += remaining;
self.file_pos += remaining;
remaining = 0;
} else {
rem -= left;
remaining -= left;
self.file_pos += left;
self.current_buffer_idx += 1;
self.current_buffer_offset = 0;
@@ -172,6 +171,8 @@ impl WritevState {
self.total_written += written;
}
/// Free the allocation that keeps the iovec array alive while writev is ongoing
#[inline(always)]
fn free_last_iov(&mut self) {
if !self.last_iov.is_null() {
unsafe {
@@ -210,8 +211,9 @@ impl InnerUringIO {
}
impl WrappedIOUring {
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) {
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Arc<Completion>) {
trace!("submit_entry({:?})", entry);
self.pending[entry.get_user_data() as usize] = Some(c);
unsafe {
let mut sub = self.ring.submission_shared();
match sub.push(entry) {
@@ -240,8 +242,10 @@ impl WrappedIOUring {
self.pending_ops == 0
}
fn submit_writev(&mut self, key: u64) {
let st = self.writev_states.get_mut(&key).expect("state must exist");
/// Submit a writev operation for the given key. WritevState MUST exist in the map
/// of `writev_states`
fn submit_writev(&mut self, key: u64, mut st: WritevState, c: Arc<Completion>) {
self.writev_states.insert(key, st);
// the likelyhood of the whole batch size being contiguous is very low, so lets not pre-allocate more than half
let max = CKPT_BATCH_PAGES / 2;
let mut iov = Vec::with_capacity(max);
@@ -275,7 +279,7 @@ impl WrappedIOUring {
st.free_last_iov();
st.last_iov = ptr;
st.last_iov_len = len;
// leak; freed when CQE processed
// leak the iovec array, will be freed when CQE processed
let _ = Box::into_raw(boxed);
let entry = with_fd!(st.file_id, |fd| {
io_uring::opcode::Writev::new(fd, ptr, len as u32)
@@ -283,10 +287,23 @@ impl WrappedIOUring {
.build()
.user_data(key)
});
self.submit_entry(&entry);
self.submit_entry(&entry, c.clone());
}
}
#[inline(always)]
/// use the callback pointer as the user_data for the operation as is
/// common practice for io_uring to prevent more indirection
fn get_key(c: Arc<Completion>) -> u64 {
Arc::into_raw(c) as u64
}
#[inline(always)]
/// convert the user_data back to an Arc<Completion> pointer
fn completion_from_key(key: u64) -> Arc<Completion> {
unsafe { Arc::from_raw(key as *const Completion) }
}
impl IO for UringIO {
fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result<Arc<dyn File>> {
trace!("open_file(path = {})", path);
@@ -364,21 +381,22 @@ impl IO for UringIO {
if let Some(mut st) = ring.writev_states.remove(&user_data) {
if result < 0 {
st.free_last_iov();
completion_from_key(user_data).complete(result);
completion_from_key(ud).complete(result);
} else {
let written = result as usize;
st.free_last_iov();
st.advance(written);
if st.remaining() == 0 {
completion_from_key(user_data).complete(st.total_written as i32);
// write complete
c.complete(st.total_written as i32);
} else {
ring.writev_states.insert(user_data, st);
ring.submit_writev(user_data);
// partial write, submit next
ring.submit_writev(user_data, st, c.clone());
}
}
} else {
completion_from_key(user_data).complete(result);
continue;
}
completion_from_key(user_data).complete(result)
}
Ok(())
}
@@ -490,10 +508,10 @@ impl File for UringFile {
io_uring::opcode::Read::new(fd, buf, len as u32)
.offset(pos as u64)
.build()
.user_data(get_key(c.clone()))
.user_data(io.ring.get_key())
})
};
io.ring.submit_entry(&read_e);
io.ring.submit_entry(&read_e, c.clone());
Ok(c)
}
@@ -511,10 +529,10 @@ impl File for UringFile {
io_uring::opcode::Write::new(fd, buf.as_ptr(), buf.len() as u32)
.offset(pos as u64)
.build()
.user_data(get_key(c.clone()))
.user_data(io.ring.get_key())
})
};
io.ring.submit_entry(&write);
io.ring.submit_entry(&write, c.clone());
Ok(c)
}
@@ -526,7 +544,7 @@ impl File for UringFile {
.build()
.user_data(get_key(c.clone()))
});
io.ring.submit_entry(&sync);
io.ring.submit_entry(&sync, c.clone());
Ok(c)
}
@@ -541,14 +559,12 @@ impl File for UringFile {
return self.pwrite(pos, bufs[0].clone(), c.clone());
}
tracing::trace!("pwritev(pos = {}, bufs.len() = {})", pos, bufs.len());
// create state
let key = get_key(c.clone());
let mut io = self.io.borrow_mut();
let key = io.ring.get_key();
// create state to track ongoing writev operation
let state = WritevState::new(self, pos, bufs);
io.ring.writev_states.insert(key, state);
io.ring.submit_writev(key);
Ok(c.clone())
io.ring.submit_writev(key, state, c.clone());
Ok(c)
}
fn size(&self) -> Result<u64> {