From f81bfa3daf364465b18e1d7cab58bcaf6dace48f Mon Sep 17 00:00:00 2001 From: Ihor Andrianov Date: Tue, 8 Jul 2025 23:52:53 +0300 Subject: [PATCH] use pread and pwrite for unix io --- core/io/unix.rs | 47 +++++++++++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/core/io/unix.rs b/core/io/unix.rs index 235df10d0..ee182ec74 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -14,10 +14,8 @@ use std::{ cell::{RefCell, UnsafeCell}, mem::MaybeUninit, }; -use std::{ - io::{ErrorKind, Read, Seek, Write}, - sync::Arc, -}; + +use std::{io::ErrorKind, sync::Arc}; use tracing::{debug, instrument, trace, Level}; struct OwnedCallbacks(UnsafeCell); @@ -38,6 +36,11 @@ impl OwnedCallbacks { self.as_mut().inline_count == 0 } + fn get(&self, fd: usize) -> Option<&CompletionCallback> { + let callbacks = unsafe { &mut *self.0.get() }; + callbacks.get(fd) + } + fn remove(&self, fd: usize) -> Option { let callbacks = unsafe { &mut *self.0.get() }; callbacks.remove(fd) @@ -126,6 +129,16 @@ impl Callbacks { } } + fn get(&self, fd: usize) -> Option<&CompletionCallback> { + if let Some(pos) = self.find_inline(fd) { + let (_, callback) = unsafe { self.inline_entries[pos].assume_init_ref() }; + return Some(&callback); + } else if let Some(pos) = self.heap_entries.iter().position(|&(k, _)| k == fd) { + return Some(&self.heap_entries[pos].1); + } + None + } + fn remove(&mut self, fd: usize) -> Option { if let Some(pos) = self.find_inline(fd) { let (_, callback) = unsafe { self.inline_entries[pos].assume_init_read() }; @@ -229,27 +242,29 @@ impl IO for UnixIO { self.poller.wait(self.events.as_mut(), None)?; for event in self.events.iter() { - if let Some(cf) = self.callbacks.remove(event.key) { + if let Some(cf) = self.callbacks.get(event.key) { let result = match cf { CompletionCallback::Read(ref file, ref c, pos) => { - let mut file = file.borrow_mut(); + let file = file.borrow_mut(); let r = c.as_read(); let mut buf = r.buf_mut(); - file.seek(std::io::SeekFrom::Start(pos as u64))?; - file.read(buf.as_mut_slice()) + rustix::io::pread(file.as_fd(), buf.as_mut_slice(), *pos as u64) } CompletionCallback::Write(ref file, _, ref buf, pos) => { - let mut file = file.borrow_mut(); + let file = file.borrow_mut(); let buf = buf.borrow(); - file.seek(std::io::SeekFrom::Start(pos as u64))?; - file.write(buf.as_slice()) + rustix::io::pwrite(file.as_fd(), buf.as_slice(), *pos as u64) } }; match result { - Ok(n) => match &cf { - CompletionCallback::Read(_, ref c, _) => c.complete(0), - CompletionCallback::Write(_, ref c, _, _) => c.complete(n as i32), - }, + Ok(n) => { + self.callbacks.remove(event.key); + match &cf { + CompletionCallback::Read(_, ref c, _) => c.complete(0), + CompletionCallback::Write(_, ref c, _, _) => c.complete(n as i32), + } + } + Err(Errno::AGAIN) => {} Err(e) => return Err(e.into()), } } @@ -393,7 +408,7 @@ impl File for UnixFile<'_> { // Would block, set up polling let fd = file.as_raw_fd(); self.poller - .add(&file.as_fd(), Event::readable(fd as usize))?; + .add(&file.as_fd(), Event::writable(fd as usize))?; self.callbacks.insert( fd as usize, CompletionCallback::Write(self.file.clone(), c.clone(), buffer.clone(), pos),