diff --git a/core/io/mod.rs b/core/io/mod.rs index 4f92ad3e1..089add355 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -56,6 +56,32 @@ impl Completion { Self::Sync(s) => s.complete(result), // fix } } + + /// only call this method if you are sure that the completion is + /// a ReadCompletion, panics otherwise + pub fn read(&self) -> &ReadCompletion { + match self { + Self::Read(ref r) => r, + _ => unreachable!(), + } + } + /// only call this method if you are sure that the completion is + /// a WriteCompletion, panics otherwise + pub fn write(&self) -> &WriteCompletion { + match self { + Self::Write(ref w) => w, + _ => unreachable!(), + } + } + /// + /// only call this method if you are sure that the completion is + /// a SyncCompletion, panics otherwise + pub fn sync(&self) -> &SyncCompletion { + match self { + Self::Sync(ref s) => s, + _ => unreachable!(), + } + } } pub struct WriteCompletion { diff --git a/core/io/unix.rs b/core/io/unix.rs index f28a867f3..68e17a9e7 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -13,12 +13,92 @@ use rustix::{ use std::cell::{RefCell, UnsafeCell}; use std::io::{ErrorKind, Read, Seek, Write}; use std::rc::Rc; -const MAX_FD: usize = 1024; +struct OwnedCallbacks(UnsafeCell); +struct BorrowedCallbacks<'io>(UnsafeCell<&'io mut Callbacks>); + +impl OwnedCallbacks { + fn new() -> Self { + Self(UnsafeCell::new(Callbacks::new())) + } + fn as_mut<'io>(&self) -> &'io mut Callbacks { + unsafe { &mut *self.0.get() } + } +} + +impl<'io> BorrowedCallbacks<'io> { + fn as_mut(&self) -> &'io mut Callbacks { + unsafe { *self.0.get() } + } +} + +struct EventsHandler(UnsafeCell); + +impl EventsHandler { + fn new() -> Self { + Self(UnsafeCell::new(Events::new())) + } + + fn as_mut<'io>(&self) -> &'io mut Events { + unsafe { &mut *self.0.get() } + } +} +struct PollHandler(UnsafeCell); +struct BorrowedPollHandler<'io>(UnsafeCell<&'io mut Poller>); + +impl<'io> BorrowedPollHandler<'io> { + fn get(&self) -> &'io mut Poller { + unsafe { *self.0.get() } + } +} + +impl PollHandler { + fn new() -> Self { + Self(UnsafeCell::new(Poller::new().unwrap())) + } + + fn as_mut<'io>(&self) -> &'io mut Poller { + unsafe { &mut *self.0.get() } + } +} + +type CallbackEntry = (usize, CompletionCallback); + +struct Callbacks { + entries: Vec, +} + +impl Callbacks { + fn new() -> Self { + Self { + entries: Vec::with_capacity(32), + } + } + + fn insert(&mut self, fd: usize, callback: CompletionCallback) { + if let Some(entry) = self.entries.iter_mut().find(|(key, _)| *key == fd) { + // replace existing + let _ = std::mem::replace(&mut entry.1, callback); + return; + } + self.entries.push((fd, callback)); + } + + fn remove(&mut self, fd: usize) -> Option { + if let Some(pos) = self.entries.iter().position(|&(k, _)| k == fd) { + Some(self.entries.swap_remove(pos).1) + } else { + None + } + } +} + +/// UnixIO lives longer than any of the files it creates, so it's +/// safe to store references to it's internals in the UnixFiles pub struct UnixIO { - poller: UnsafeCell, - events: UnsafeCell, - callbacks: UnsafeCell<[Option; MAX_FD]>, + poller: PollHandler, + events: EventsHandler, + callbacks: OwnedCallbacks, } impl UnixIO { @@ -26,9 +106,9 @@ impl UnixIO { pub fn new() -> Result { debug!("Using IO backend 'syscall'"); Ok(Self { - poller: Poller::new()?.into(), - events: Events::new().into(), - callbacks: [const { None }; MAX_FD].into(), + poller: PollHandler::new(), + events: EventsHandler::new(), + callbacks: OwnedCallbacks::new(), }) } } @@ -45,8 +125,8 @@ impl IO for UnixIO { let unix_file = Rc::new(UnixFile { file: Rc::new(RefCell::new(file)), - poller: UnsafeCell::new(unsafe { &mut *self.poller.get() }), - callbacks: UnsafeCell::new(unsafe { &mut *self.callbacks.get() }), + poller: BorrowedPollHandler(self.poller.as_mut().into()), + callbacks: BorrowedCallbacks(self.callbacks.as_mut().into()), }); if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() { unix_file.lock_file(true)?; @@ -56,38 +136,33 @@ impl IO for UnixIO { fn run_once(&self) -> Result<()> { { - let callbacks = unsafe { &mut *self.callbacks.get() }; - if callbacks.iter().all(|c| c.is_none()) { + if self.callbacks.as_mut().entries.is_empty() { return Ok(()); } } - let events = unsafe { &mut *self.events.get() }; - events.clear(); - + { + self.events.as_mut().clear(); + } trace!("run_once() waits for events"); - let poller = unsafe { &mut *self.poller.get() }; - poller.wait(events, None)?; - - for event in events.iter() { - let callbacks = unsafe { &mut *self.callbacks.get() }; - if let Some(cf) = callbacks[event.key].as_ref() { + { + self.poller.as_mut().wait(self.events.as_mut(), None)?; + } + for event in self.events.as_mut().iter() { + let callbacks = self.callbacks.as_mut(); + if let Some(cf) = callbacks.remove(event.key) { let result = { match cf { CompletionCallback::Read(ref file, ref c, pos) => { let mut file = file.borrow_mut(); - let c: &Completion = c; - let r = match c { - Completion::Read(r) => r, - _ => unreachable!(), - }; + let r = c.read(); let mut buf = r.buf_mut(); - file.seek(std::io::SeekFrom::Start(*pos as u64))?; + file.seek(std::io::SeekFrom::Start(pos as u64))?; file.read(buf.as_mut_slice()) } CompletionCallback::Write(ref file, _, ref buf, pos) => { let mut file = file.borrow_mut(); let buf = buf.borrow(); - file.seek(std::io::SeekFrom::Start(*pos as u64))?; + file.seek(std::io::SeekFrom::Start(pos as u64))?; file.write(buf.as_slice()) } } @@ -107,7 +182,6 @@ impl IO for UnixIO { Err(e) => Err(e.into()), }; } - callbacks[event.key] = None; } Ok(()) } @@ -135,8 +209,8 @@ enum CompletionCallback { pub struct UnixFile<'io> { file: Rc>, - poller: UnsafeCell<&'io mut Poller>, - callbacks: UnsafeCell<&'io mut [Option; MAX_FD]>, + poller: BorrowedPollHandler<'io>, + callbacks: BorrowedCallbacks<'io>, } impl File for UnixFile<'_> { @@ -182,10 +256,7 @@ impl File for UnixFile<'_> { fn pread(&self, pos: usize, c: Completion) -> Result<()> { let file = self.file.borrow(); let result = { - let r = match c { - Completion::Read(ref r) => r, - _ => unreachable!(), - }; + let r = c.read(); let mut buf = r.buf_mut(); rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64) }; @@ -205,9 +276,10 @@ impl File for UnixFile<'_> { poller.add(&file.as_fd(), Event::readable(fd as usize))?; } { - let callbacks = unsafe { &mut *self.callbacks.get() }; - callbacks[fd as usize] = - Some(CompletionCallback::Read(self.file.clone(), c, pos)); + self.callbacks.as_mut().insert( + fd as usize, + CompletionCallback::Read(self.file.clone(), c, pos), + ); } Ok(()) } @@ -234,18 +306,15 @@ impl File for UnixFile<'_> { let fd = file.as_raw_fd(); { unsafe { - let poller = &mut *self.poller.get(); + let poller = self.poller.get(); poller.add(&file.as_fd(), Event::readable(fd as usize))?; } } { - let callbacks = unsafe { &mut *self.callbacks.get() }; - callbacks[fd as usize] = Some(CompletionCallback::Write( - self.file.clone(), - c, - buffer.clone(), - pos, - )); + self.callbacks.as_mut().insert( + fd as usize, + CompletionCallback::Write(self.file.clone(), c, buffer.clone(), pos), + ); } Ok(()) }