Reformat unix io with wrappers for unsafecell

This commit is contained in:
PThorpe92
2025-02-08 08:03:42 -05:00
parent 159e2fbd71
commit 975cf95c37
2 changed files with 140 additions and 45 deletions

View File

@@ -56,6 +56,32 @@ impl Completion {
Self::Sync(s) => s.complete(result), // fix
}
}
/// only call this method if you are sure that the completion is
/// a ReadCompletion, panics otherwise
pub fn read(&self) -> &ReadCompletion {
match self {
Self::Read(ref r) => r,
_ => unreachable!(),
}
}
/// only call this method if you are sure that the completion is
/// a WriteCompletion, panics otherwise
pub fn write(&self) -> &WriteCompletion {
match self {
Self::Write(ref w) => w,
_ => unreachable!(),
}
}
///
/// only call this method if you are sure that the completion is
/// a SyncCompletion, panics otherwise
pub fn sync(&self) -> &SyncCompletion {
match self {
Self::Sync(ref s) => s,
_ => unreachable!(),
}
}
}
pub struct WriteCompletion {

View File

@@ -13,12 +13,92 @@ use rustix::{
use std::cell::{RefCell, UnsafeCell};
use std::io::{ErrorKind, Read, Seek, Write};
use std::rc::Rc;
const MAX_FD: usize = 1024;
struct OwnedCallbacks(UnsafeCell<Callbacks>);
struct BorrowedCallbacks<'io>(UnsafeCell<&'io mut Callbacks>);
impl OwnedCallbacks {
fn new() -> Self {
Self(UnsafeCell::new(Callbacks::new()))
}
fn as_mut<'io>(&self) -> &'io mut Callbacks {
unsafe { &mut *self.0.get() }
}
}
impl<'io> BorrowedCallbacks<'io> {
fn as_mut(&self) -> &'io mut Callbacks {
unsafe { *self.0.get() }
}
}
struct EventsHandler(UnsafeCell<Events>);
impl EventsHandler {
fn new() -> Self {
Self(UnsafeCell::new(Events::new()))
}
fn as_mut<'io>(&self) -> &'io mut Events {
unsafe { &mut *self.0.get() }
}
}
struct PollHandler(UnsafeCell<Poller>);
struct BorrowedPollHandler<'io>(UnsafeCell<&'io mut Poller>);
impl<'io> BorrowedPollHandler<'io> {
fn get(&self) -> &'io mut Poller {
unsafe { *self.0.get() }
}
}
impl PollHandler {
fn new() -> Self {
Self(UnsafeCell::new(Poller::new().unwrap()))
}
fn as_mut<'io>(&self) -> &'io mut Poller {
unsafe { &mut *self.0.get() }
}
}
type CallbackEntry = (usize, CompletionCallback);
struct Callbacks {
entries: Vec<CallbackEntry>,
}
impl Callbacks {
fn new() -> Self {
Self {
entries: Vec::with_capacity(32),
}
}
fn insert(&mut self, fd: usize, callback: CompletionCallback) {
if let Some(entry) = self.entries.iter_mut().find(|(key, _)| *key == fd) {
// replace existing
let _ = std::mem::replace(&mut entry.1, callback);
return;
}
self.entries.push((fd, callback));
}
fn remove(&mut self, fd: usize) -> Option<CompletionCallback> {
if let Some(pos) = self.entries.iter().position(|&(k, _)| k == fd) {
Some(self.entries.swap_remove(pos).1)
} else {
None
}
}
}
/// UnixIO lives longer than any of the files it creates, so it's
/// safe to store references to it's internals in the UnixFiles
pub struct UnixIO {
poller: UnsafeCell<Poller>,
events: UnsafeCell<Events>,
callbacks: UnsafeCell<[Option<CompletionCallback>; MAX_FD]>,
poller: PollHandler,
events: EventsHandler,
callbacks: OwnedCallbacks,
}
impl UnixIO {
@@ -26,9 +106,9 @@ impl UnixIO {
pub fn new() -> Result<Self> {
debug!("Using IO backend 'syscall'");
Ok(Self {
poller: Poller::new()?.into(),
events: Events::new().into(),
callbacks: [const { None }; MAX_FD].into(),
poller: PollHandler::new(),
events: EventsHandler::new(),
callbacks: OwnedCallbacks::new(),
})
}
}
@@ -45,8 +125,8 @@ impl IO for UnixIO {
let unix_file = Rc::new(UnixFile {
file: Rc::new(RefCell::new(file)),
poller: UnsafeCell::new(unsafe { &mut *self.poller.get() }),
callbacks: UnsafeCell::new(unsafe { &mut *self.callbacks.get() }),
poller: BorrowedPollHandler(self.poller.as_mut().into()),
callbacks: BorrowedCallbacks(self.callbacks.as_mut().into()),
});
if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() {
unix_file.lock_file(true)?;
@@ -56,38 +136,33 @@ impl IO for UnixIO {
fn run_once(&self) -> Result<()> {
{
let callbacks = unsafe { &mut *self.callbacks.get() };
if callbacks.iter().all(|c| c.is_none()) {
if self.callbacks.as_mut().entries.is_empty() {
return Ok(());
}
}
let events = unsafe { &mut *self.events.get() };
events.clear();
{
self.events.as_mut().clear();
}
trace!("run_once() waits for events");
let poller = unsafe { &mut *self.poller.get() };
poller.wait(events, None)?;
for event in events.iter() {
let callbacks = unsafe { &mut *self.callbacks.get() };
if let Some(cf) = callbacks[event.key].as_ref() {
{
self.poller.as_mut().wait(self.events.as_mut(), None)?;
}
for event in self.events.as_mut().iter() {
let callbacks = self.callbacks.as_mut();
if let Some(cf) = callbacks.remove(event.key) {
let result = {
match cf {
CompletionCallback::Read(ref file, ref c, pos) => {
let mut file = file.borrow_mut();
let c: &Completion = c;
let r = match c {
Completion::Read(r) => r,
_ => unreachable!(),
};
let r = c.read();
let mut buf = r.buf_mut();
file.seek(std::io::SeekFrom::Start(*pos as u64))?;
file.seek(std::io::SeekFrom::Start(pos as u64))?;
file.read(buf.as_mut_slice())
}
CompletionCallback::Write(ref file, _, ref buf, pos) => {
let mut file = file.borrow_mut();
let buf = buf.borrow();
file.seek(std::io::SeekFrom::Start(*pos as u64))?;
file.seek(std::io::SeekFrom::Start(pos as u64))?;
file.write(buf.as_slice())
}
}
@@ -107,7 +182,6 @@ impl IO for UnixIO {
Err(e) => Err(e.into()),
};
}
callbacks[event.key] = None;
}
Ok(())
}
@@ -135,8 +209,8 @@ enum CompletionCallback {
pub struct UnixFile<'io> {
file: Rc<RefCell<std::fs::File>>,
poller: UnsafeCell<&'io mut Poller>,
callbacks: UnsafeCell<&'io mut [Option<CompletionCallback>; MAX_FD]>,
poller: BorrowedPollHandler<'io>,
callbacks: BorrowedCallbacks<'io>,
}
impl File for UnixFile<'_> {
@@ -182,10 +256,7 @@ impl File for UnixFile<'_> {
fn pread(&self, pos: usize, c: Completion) -> Result<()> {
let file = self.file.borrow();
let result = {
let r = match c {
Completion::Read(ref r) => r,
_ => unreachable!(),
};
let r = c.read();
let mut buf = r.buf_mut();
rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64)
};
@@ -205,9 +276,10 @@ impl File for UnixFile<'_> {
poller.add(&file.as_fd(), Event::readable(fd as usize))?;
}
{
let callbacks = unsafe { &mut *self.callbacks.get() };
callbacks[fd as usize] =
Some(CompletionCallback::Read(self.file.clone(), c, pos));
self.callbacks.as_mut().insert(
fd as usize,
CompletionCallback::Read(self.file.clone(), c, pos),
);
}
Ok(())
}
@@ -234,18 +306,15 @@ impl File for UnixFile<'_> {
let fd = file.as_raw_fd();
{
unsafe {
let poller = &mut *self.poller.get();
let poller = self.poller.get();
poller.add(&file.as_fd(), Event::readable(fd as usize))?;
}
}
{
let callbacks = unsafe { &mut *self.callbacks.get() };
callbacks[fd as usize] = Some(CompletionCallback::Write(
self.file.clone(),
c,
buffer.clone(),
pos,
));
self.callbacks.as_mut().insert(
fd as usize,
CompletionCallback::Write(self.file.clone(), c, buffer.clone(), pos),
);
}
Ok(())
}