diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index ccf213e12..0c88fc4c7 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -31,6 +31,8 @@ struct WrappedIOUring { ring: io_uring::IoUring, pending_ops: usize, writev_states: HashMap, + pending: [Option>; ENTRIES as usize + 1], + key: u64, } struct InnerUringIO { @@ -56,6 +58,8 @@ impl UringIO { ring, pending_ops: 0, writev_states: HashMap::new(), + pending: [const { None }; ENTRIES as usize + 1], + key: 0, }, free_files: (0..FILES).collect(), }; @@ -111,6 +115,7 @@ struct WritevState { // intra-buffer offset current_buffer_offset: usize, total_written: usize, + total_len: usize, bufs: Vec>>, // we keep the last iovec allocation alive until CQE: // raw ptr to Box<[iovec]> @@ -133,37 +138,31 @@ impl WritevState { bufs, last_iov: core::ptr::null_mut(), last_iov_len: 0, + total_len: bufs.iter().map(|b| b.borrow().len()).sum(), } } + + #[inline(always)] fn remaining(&self) -> usize { - let mut total = 0; - for (i, b) in self.bufs.iter().enumerate().skip(self.current_buffer_idx) { - let r = b.borrow(); - let len = r.len(); - total += if i == self.current_buffer_idx { - len - self.current_buffer_offset - } else { - len - }; - } - total + self.total_len - self.total_written } /// Advance (idx, off, pos) after written bytes + #[inline(always)] fn advance(&mut self, written: usize) { - let mut rem = written; - while rem > 0 { - let len = { + let mut remaining = written; + while remaining > 0 { + let current_buf_len = { let r = self.bufs[self.current_buffer_idx].borrow(); r.len() }; - let left = len - self.current_buffer_offset; - if rem < left { - self.current_buffer_offset += rem; - self.file_pos += rem; - rem = 0; + let left = current_buf_len - self.current_buffer_offset; + if remaining < left { + self.current_buffer_offset += remaining; + self.file_pos += remaining; + remaining = 0; } else { - rem -= left; + remaining -= left; self.file_pos += left; self.current_buffer_idx += 1; self.current_buffer_offset = 0; @@ -172,6 +171,8 @@ impl WritevState { self.total_written += written; } + /// Free the allocation that keeps the iovec array alive while writev is ongoing + #[inline(always)] fn free_last_iov(&mut self) { if !self.last_iov.is_null() { unsafe { @@ -210,8 +211,9 @@ impl InnerUringIO { } impl WrappedIOUring { - fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) { + fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Arc) { trace!("submit_entry({:?})", entry); + self.pending[entry.get_user_data() as usize] = Some(c); unsafe { let mut sub = self.ring.submission_shared(); match sub.push(entry) { @@ -240,8 +242,10 @@ impl WrappedIOUring { self.pending_ops == 0 } - fn submit_writev(&mut self, key: u64) { - let st = self.writev_states.get_mut(&key).expect("state must exist"); + /// Submit a writev operation for the given key. WritevState MUST exist in the map + /// of `writev_states` + fn submit_writev(&mut self, key: u64, mut st: WritevState, c: Arc) { + self.writev_states.insert(key, st); // the likelyhood of the whole batch size being contiguous is very low, so lets not pre-allocate more than half let max = CKPT_BATCH_PAGES / 2; let mut iov = Vec::with_capacity(max); @@ -275,7 +279,7 @@ impl WrappedIOUring { st.free_last_iov(); st.last_iov = ptr; st.last_iov_len = len; - // leak; freed when CQE processed + // leak the iovec array, will be freed when CQE processed let _ = Box::into_raw(boxed); let entry = with_fd!(st.file_id, |fd| { io_uring::opcode::Writev::new(fd, ptr, len as u32) @@ -283,10 +287,23 @@ impl WrappedIOUring { .build() .user_data(key) }); - self.submit_entry(&entry); + self.submit_entry(&entry, c.clone()); } } +#[inline(always)] +/// use the callback pointer as the user_data for the operation as is +/// common practice for io_uring to prevent more indirection +fn get_key(c: Arc) -> u64 { + Arc::into_raw(c) as u64 +} + +#[inline(always)] +/// convert the user_data back to an Arc pointer +fn completion_from_key(key: u64) -> Arc { + unsafe { Arc::from_raw(key as *const Completion) } +} + impl IO for UringIO { fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result> { trace!("open_file(path = {})", path); @@ -364,21 +381,22 @@ impl IO for UringIO { if let Some(mut st) = ring.writev_states.remove(&user_data) { if result < 0 { st.free_last_iov(); - completion_from_key(user_data).complete(result); + completion_from_key(ud).complete(result); } else { let written = result as usize; st.free_last_iov(); st.advance(written); if st.remaining() == 0 { - completion_from_key(user_data).complete(st.total_written as i32); + // write complete + c.complete(st.total_written as i32); } else { - ring.writev_states.insert(user_data, st); - ring.submit_writev(user_data); + // partial write, submit next + ring.submit_writev(user_data, st, c.clone()); } } - } else { - completion_from_key(user_data).complete(result); + continue; } + completion_from_key(user_data).complete(result) } Ok(()) } @@ -490,10 +508,10 @@ impl File for UringFile { io_uring::opcode::Read::new(fd, buf, len as u32) .offset(pos as u64) .build() - .user_data(get_key(c.clone())) + .user_data(io.ring.get_key()) }) }; - io.ring.submit_entry(&read_e); + io.ring.submit_entry(&read_e, c.clone()); Ok(c) } @@ -511,10 +529,10 @@ impl File for UringFile { io_uring::opcode::Write::new(fd, buf.as_ptr(), buf.len() as u32) .offset(pos as u64) .build() - .user_data(get_key(c.clone())) + .user_data(io.ring.get_key()) }) }; - io.ring.submit_entry(&write); + io.ring.submit_entry(&write, c.clone()); Ok(c) } @@ -526,7 +544,7 @@ impl File for UringFile { .build() .user_data(get_key(c.clone())) }); - io.ring.submit_entry(&sync); + io.ring.submit_entry(&sync, c.clone()); Ok(c) } @@ -541,14 +559,12 @@ impl File for UringFile { return self.pwrite(pos, bufs[0].clone(), c.clone()); } tracing::trace!("pwritev(pos = {}, bufs.len() = {})", pos, bufs.len()); - // create state - let key = get_key(c.clone()); let mut io = self.io.borrow_mut(); - + let key = io.ring.get_key(); + // create state to track ongoing writev operation let state = WritevState::new(self, pos, bufs); - io.ring.writev_states.insert(key, state); - io.ring.submit_writev(key); - Ok(c.clone()) + io.ring.submit_writev(key, state, c.clone()); + Ok(c) } fn size(&self) -> Result {