From f81bfa3daf364465b18e1d7cab58bcaf6dace48f Mon Sep 17 00:00:00 2001 From: Ihor Andrianov Date: Tue, 8 Jul 2025 23:52:53 +0300 Subject: [PATCH 1/3] use pread and pwrite for unix io --- core/io/unix.rs | 47 +++++++++++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/core/io/unix.rs b/core/io/unix.rs index 235df10d0..ee182ec74 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -14,10 +14,8 @@ use std::{ cell::{RefCell, UnsafeCell}, mem::MaybeUninit, }; -use std::{ - io::{ErrorKind, Read, Seek, Write}, - sync::Arc, -}; + +use std::{io::ErrorKind, sync::Arc}; use tracing::{debug, instrument, trace, Level}; struct OwnedCallbacks(UnsafeCell); @@ -38,6 +36,11 @@ impl OwnedCallbacks { self.as_mut().inline_count == 0 } + fn get(&self, fd: usize) -> Option<&CompletionCallback> { + let callbacks = unsafe { &mut *self.0.get() }; + callbacks.get(fd) + } + fn remove(&self, fd: usize) -> Option { let callbacks = unsafe { &mut *self.0.get() }; callbacks.remove(fd) @@ -126,6 +129,16 @@ impl Callbacks { } } + fn get(&self, fd: usize) -> Option<&CompletionCallback> { + if let Some(pos) = self.find_inline(fd) { + let (_, callback) = unsafe { self.inline_entries[pos].assume_init_ref() }; + return Some(&callback); + } else if let Some(pos) = self.heap_entries.iter().position(|&(k, _)| k == fd) { + return Some(&self.heap_entries[pos].1); + } + None + } + fn remove(&mut self, fd: usize) -> Option { if let Some(pos) = self.find_inline(fd) { let (_, callback) = unsafe { self.inline_entries[pos].assume_init_read() }; @@ -229,27 +242,29 @@ impl IO for UnixIO { self.poller.wait(self.events.as_mut(), None)?; for event in self.events.iter() { - if let Some(cf) = self.callbacks.remove(event.key) { + if let Some(cf) = self.callbacks.get(event.key) { let result = match cf { CompletionCallback::Read(ref file, ref c, pos) => { - let mut file = file.borrow_mut(); + let file = file.borrow_mut(); let r = c.as_read(); let mut buf = r.buf_mut(); - file.seek(std::io::SeekFrom::Start(pos as u64))?; - file.read(buf.as_mut_slice()) + rustix::io::pread(file.as_fd(), buf.as_mut_slice(), *pos as u64) } CompletionCallback::Write(ref file, _, ref buf, pos) => { - let mut file = file.borrow_mut(); + let file = file.borrow_mut(); let buf = buf.borrow(); - file.seek(std::io::SeekFrom::Start(pos as u64))?; - file.write(buf.as_slice()) + rustix::io::pwrite(file.as_fd(), buf.as_slice(), *pos as u64) } }; match result { - Ok(n) => match &cf { - CompletionCallback::Read(_, ref c, _) => c.complete(0), - CompletionCallback::Write(_, ref c, _, _) => c.complete(n as i32), - }, + Ok(n) => { + self.callbacks.remove(event.key); + match &cf { + CompletionCallback::Read(_, ref c, _) => c.complete(0), + CompletionCallback::Write(_, ref c, _, _) => c.complete(n as i32), + } + } + Err(Errno::AGAIN) => {} Err(e) => return Err(e.into()), } } @@ -393,7 +408,7 @@ impl File for UnixFile<'_> { // Would block, set up polling let fd = file.as_raw_fd(); self.poller - .add(&file.as_fd(), Event::readable(fd as usize))?; + .add(&file.as_fd(), Event::writable(fd as usize))?; self.callbacks.insert( fd as usize, CompletionCallback::Write(self.file.clone(), c.clone(), buffer.clone(), pos), From 2d41791f3dbc1fb18f3978ce46ce971f309ff0bb Mon Sep 17 00:00:00 2001 From: Ihor Andrianov Date: Wed, 9 Jul 2025 00:02:05 +0300 Subject: [PATCH 2/3] clippy --- core/io/unix.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/io/unix.rs b/core/io/unix.rs index ee182ec74..28cba01ba 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -132,7 +132,7 @@ impl Callbacks { fn get(&self, fd: usize) -> Option<&CompletionCallback> { if let Some(pos) = self.find_inline(fd) { let (_, callback) = unsafe { self.inline_entries[pos].assume_init_ref() }; - return Some(&callback); + return Some(callback); } else if let Some(pos) = self.heap_entries.iter().position(|&(k, _)| k == fd) { return Some(&self.heap_entries[pos].1); } From 11003123545c8a8e2ca0cd256a184866b68c8d76 Mon Sep 17 00:00:00 2001 From: Ihor Andrianov Date: Wed, 9 Jul 2025 17:16:22 +0300 Subject: [PATCH 3/3] use removed cb to complete copletion --- core/io/unix.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/core/io/unix.rs b/core/io/unix.rs index 28cba01ba..4066f5cf2 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -258,14 +258,22 @@ impl IO for UnixIO { }; match result { Ok(n) => { - self.callbacks.remove(event.key); - match &cf { - CompletionCallback::Read(_, ref c, _) => c.complete(0), - CompletionCallback::Write(_, ref c, _, _) => c.complete(n as i32), + let cf = self + .callbacks + .remove(event.key) + .expect("callback should exist"); + match cf { + CompletionCallback::Read(_, c, _) => c.complete(0), + CompletionCallback::Write(_, c, _, _) => c.complete(n as i32), } } - Err(Errno::AGAIN) => {} - Err(e) => return Err(e.into()), + Err(Errno::AGAIN) => (), + Err(e) => { + self.callbacks.remove(event.key); + + trace!("run_once() error: {}", e); + return Err(e.into()); + } } } }