mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-28 12:24:23 +01:00
Merge 'Handle partial writes in unix IO for pwrite and pwritev' from Preston Thorpe
currently, `io_uring` is setup to handle partial writes for `pwritev` (will add `pwrite` in subsequent PR), but unix and other IO back-ends were not correctly setup for this. Closes #3073
This commit is contained in:
116
core/io/unix.rs
116
core/io/unix.rs
@@ -199,23 +199,47 @@ impl File for UnixFile {
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
||||
let file = self.file.lock();
|
||||
let result = unsafe {
|
||||
libc::pwrite(
|
||||
file.as_raw_fd(),
|
||||
buffer.as_slice().as_ptr() as *const libc::c_void,
|
||||
buffer.as_slice().len(),
|
||||
pos as libc::off_t,
|
||||
)
|
||||
};
|
||||
if result == -1 {
|
||||
let e = std::io::Error::last_os_error();
|
||||
Err(e.into())
|
||||
} else {
|
||||
trace!("pwrite n: {}", result);
|
||||
// Write succeeded immediately
|
||||
c.complete(result as i32);
|
||||
Ok(c)
|
||||
let buf_slice = buffer.as_slice();
|
||||
let total_size = buf_slice.len();
|
||||
|
||||
let mut total_written = 0usize;
|
||||
let mut current_pos = pos;
|
||||
|
||||
while total_written < total_size {
|
||||
let remaining_slice = &buf_slice[total_written..];
|
||||
let result = unsafe {
|
||||
libc::pwrite(
|
||||
file.as_raw_fd(),
|
||||
remaining_slice.as_ptr() as *const libc::c_void,
|
||||
remaining_slice.len(),
|
||||
current_pos as libc::off_t,
|
||||
)
|
||||
};
|
||||
if result == -1 {
|
||||
let e = std::io::Error::last_os_error();
|
||||
if e.kind() == ErrorKind::Interrupted {
|
||||
// EINTR, retry without advancing
|
||||
continue;
|
||||
}
|
||||
return Err(e.into());
|
||||
}
|
||||
let written = result as usize;
|
||||
if written == 0 {
|
||||
// Unexpected EOF for regular files
|
||||
return Err(std::io::Error::new(
|
||||
ErrorKind::UnexpectedEof,
|
||||
"pwrite returned 0 bytes written",
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
total_written += written;
|
||||
current_pos += written as u64;
|
||||
trace!("pwrite iteration: wrote {written}, total {total_written}/{total_size}");
|
||||
}
|
||||
trace!("pwrite complete: wrote {total_written} bytes");
|
||||
c.complete(total_written as i32);
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
@@ -229,18 +253,60 @@ impl File for UnixFile {
|
||||
// use `pwrite` for single buffer
|
||||
return self.pwrite(pos, buffers[0].clone(), c);
|
||||
}
|
||||
let file = self.file.lock();
|
||||
|
||||
match try_pwritev_raw(file.as_raw_fd(), pos, &buffers, 0, 0) {
|
||||
Ok(written) => {
|
||||
trace!("pwritev wrote {written}");
|
||||
c.complete(written as i32);
|
||||
Ok(c)
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(e.into());
|
||||
let file = self.file.lock();
|
||||
let mut total_written = 0usize;
|
||||
let mut current_pos = pos;
|
||||
let mut buf_idx = 0;
|
||||
let mut buf_offset = 0;
|
||||
|
||||
let total_size: usize = buffers.iter().map(|b| b.len()).sum();
|
||||
while total_written < total_size {
|
||||
match try_pwritev_raw(file.as_raw_fd(), current_pos, &buffers, buf_idx, buf_offset) {
|
||||
Ok(written) => {
|
||||
if written == 0 {
|
||||
// Unexpected EOF
|
||||
return Err(std::io::Error::new(
|
||||
ErrorKind::UnexpectedEof,
|
||||
"pwritev returned 0 bytes written",
|
||||
)
|
||||
.into());
|
||||
}
|
||||
total_written += written;
|
||||
current_pos += written as u64;
|
||||
|
||||
let mut remaining = written;
|
||||
while remaining > 0 && buf_idx < buffers.len() {
|
||||
let buf_remaining = buffers[buf_idx].len() - buf_offset;
|
||||
|
||||
if remaining >= buf_remaining {
|
||||
// Consumed rest of current buffer
|
||||
remaining -= buf_remaining;
|
||||
buf_idx += 1;
|
||||
buf_offset = 0;
|
||||
} else {
|
||||
// Partial write within current buffer
|
||||
buf_offset += remaining;
|
||||
remaining = 0;
|
||||
}
|
||||
}
|
||||
|
||||
trace!(
|
||||
"pwritev iteration: wrote {written}, total {total_written}/{total_size}"
|
||||
);
|
||||
}
|
||||
Err(e) if e.kind() == ErrorKind::Interrupted => {
|
||||
// EINTR - retry without advancing
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!("pwritev complete: wrote {total_written} bytes");
|
||||
c.complete(total_written as i32);
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
|
||||
Reference in New Issue
Block a user