From 6098bca21134e38d05a3d0e31cb06f6a2cd61675 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Fri, 12 Sep 2025 18:13:02 -0400 Subject: [PATCH] Handle partial writes in unix IO for pwrite and pwritev --- core/io/unix.rs | 116 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 91 insertions(+), 25 deletions(-) diff --git a/core/io/unix.rs b/core/io/unix.rs index 174a3941c..a3cfd6f2f 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -199,23 +199,47 @@ impl File for UnixFile { #[instrument(err, skip_all, level = Level::TRACE)] fn pwrite(&self, pos: u64, buffer: Arc, c: Completion) -> Result { let file = self.file.lock(); - let result = unsafe { - libc::pwrite( - file.as_raw_fd(), - buffer.as_slice().as_ptr() as *const libc::c_void, - buffer.as_slice().len(), - pos as libc::off_t, - ) - }; - if result == -1 { - let e = std::io::Error::last_os_error(); - Err(e.into()) - } else { - trace!("pwrite n: {}", result); - // Write succeeded immediately - c.complete(result as i32); - Ok(c) + let buf_slice = buffer.as_slice(); + let total_size = buf_slice.len(); + + let mut total_written = 0usize; + let mut current_pos = pos; + + while total_written < total_size { + let remaining_slice = &buf_slice[total_written..]; + let result = unsafe { + libc::pwrite( + file.as_raw_fd(), + remaining_slice.as_ptr() as *const libc::c_void, + remaining_slice.len(), + current_pos as libc::off_t, + ) + }; + if result == -1 { + let e = std::io::Error::last_os_error(); + if e.kind() == ErrorKind::Interrupted { + // EINTR, retry without advancing + continue; + } + return Err(e.into()); + } + let written = result as usize; + if written == 0 { + // Unexpected EOF for regular files + return Err(std::io::Error::new( + ErrorKind::UnexpectedEof, + "pwrite returned 0 bytes written", + ) + .into()); + } + + total_written += written; + current_pos += written as u64; + trace!("pwrite iteration: wrote {written}, total {total_written}/{total_size}"); } + trace!("pwrite complete: wrote {total_written} bytes"); + c.complete(total_written as i32); + Ok(c) } #[instrument(err, skip_all, level = Level::TRACE)] @@ -229,18 +253,60 @@ impl File for UnixFile { // use `pwrite` for single buffer return self.pwrite(pos, buffers[0].clone(), c); } - let file = self.file.lock(); - match try_pwritev_raw(file.as_raw_fd(), pos, &buffers, 0, 0) { - Ok(written) => { - trace!("pwritev wrote {written}"); - c.complete(written as i32); - Ok(c) - } - Err(e) => { - return Err(e.into()); + let file = self.file.lock(); + let mut total_written = 0usize; + let mut current_pos = pos; + let mut buf_idx = 0; + let mut buf_offset = 0; + + let total_size: usize = buffers.iter().map(|b| b.len()).sum(); + while total_written < total_size { + match try_pwritev_raw(file.as_raw_fd(), current_pos, &buffers, buf_idx, buf_offset) { + Ok(written) => { + if written == 0 { + // Unexpected EOF + return Err(std::io::Error::new( + ErrorKind::UnexpectedEof, + "pwritev returned 0 bytes written", + ) + .into()); + } + total_written += written; + current_pos += written as u64; + + let mut remaining = written; + while remaining > 0 && buf_idx < buffers.len() { + let buf_remaining = buffers[buf_idx].len() - buf_offset; + + if remaining >= buf_remaining { + // Consumed rest of current buffer + remaining -= buf_remaining; + buf_idx += 1; + buf_offset = 0; + } else { + // Partial write within current buffer + buf_offset += remaining; + remaining = 0; + } + } + + trace!( + "pwritev iteration: wrote {written}, total {total_written}/{total_size}" + ); + } + Err(e) if e.kind() == ErrorKind::Interrupted => { + // EINTR - retry without advancing + continue; + } + Err(e) => { + return Err(e.into()); + } } } + trace!("pwritev complete: wrote {total_written} bytes"); + c.complete(total_written as i32); + Ok(c) } #[instrument(err, skip_all, level = Level::TRACE)]