mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-15 04:54:20 +01:00
Merge 'Maintain pending io_uring ops count for early return on run_once' from Arpit Saxena
Add a `pending_ops` field to `InnerLinuxIO` struct which is incremented for each operation submitted to the ring and decremented when they are taken off the completion queue. With this, we can exit from run_once if there are no pending operations. Otherwise, in that case, it would hang indefinitely due to call of `ring.submit_and_wait(1)` Closes #349
This commit is contained in:
@@ -33,8 +33,13 @@ pub struct LinuxIO {
|
||||
inner: Rc<RefCell<InnerLinuxIO>>,
|
||||
}
|
||||
|
||||
pub struct InnerLinuxIO {
|
||||
struct WrappedIOUring {
|
||||
ring: io_uring::IoUring,
|
||||
pending_ops: usize,
|
||||
}
|
||||
|
||||
struct InnerLinuxIO {
|
||||
ring: WrappedIOUring,
|
||||
iovecs: [iovec; MAX_IOVECS],
|
||||
next_iovec: usize,
|
||||
}
|
||||
@@ -43,7 +48,7 @@ impl LinuxIO {
|
||||
pub fn new() -> Result<Self> {
|
||||
let ring = io_uring::IoUring::new(MAX_IOVECS as u32)?;
|
||||
let inner = InnerLinuxIO {
|
||||
ring: ring,
|
||||
ring: WrappedIOUring{ring, pending_ops: 0},
|
||||
iovecs: [iovec {
|
||||
iov_base: std::ptr::null_mut(),
|
||||
iov_len: 0,
|
||||
@@ -66,6 +71,36 @@ impl InnerLinuxIO {
|
||||
}
|
||||
}
|
||||
|
||||
impl WrappedIOUring {
|
||||
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) {
|
||||
unsafe {
|
||||
self.ring.submission()
|
||||
.push(entry)
|
||||
.expect("submission queue is full");
|
||||
}
|
||||
self.pending_ops += 1;
|
||||
}
|
||||
|
||||
fn wait_for_completion(&mut self) -> Result<()> {
|
||||
self.ring.submit_and_wait(1)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_completion(&mut self) -> Option<io_uring::cqueue::Entry> {
|
||||
// NOTE: This works because CompletionQueue's next function pops the head of the queue. This is not normal behaviour of iterators
|
||||
let entry = self.ring.completion().next();
|
||||
if entry.is_some() {
|
||||
// consumed an entry from completion queue, update pending_ops
|
||||
self.pending_ops -= 1;
|
||||
}
|
||||
entry
|
||||
}
|
||||
|
||||
fn empty(&self) -> bool {
|
||||
self.pending_ops == 0
|
||||
}
|
||||
}
|
||||
|
||||
impl IO for LinuxIO {
|
||||
fn open_file(&self, path: &str) -> Result<Rc<dyn File>> {
|
||||
trace!("open_file(path = {})", path);
|
||||
@@ -91,8 +126,13 @@ impl IO for LinuxIO {
|
||||
trace!("run_once()");
|
||||
let mut inner = self.inner.borrow_mut();
|
||||
let ring = &mut inner.ring;
|
||||
ring.submit_and_wait(1)?;
|
||||
while let Some(cqe) = ring.completion().next() {
|
||||
|
||||
if ring.empty() {
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
ring.wait_for_completion()?;
|
||||
while let Some(cqe) = ring.get_completion() {
|
||||
let result = cqe.result();
|
||||
if result < 0 {
|
||||
return Err(LimboError::LinuxIOError(format!(
|
||||
@@ -192,12 +232,7 @@ impl File for LinuxFile {
|
||||
.build()
|
||||
.user_data(ptr as u64)
|
||||
};
|
||||
let ring = &mut io.ring;
|
||||
unsafe {
|
||||
ring.submission()
|
||||
.push(&read_e)
|
||||
.expect("submission queue is full");
|
||||
}
|
||||
io.ring.submit_entry(&read_e);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -218,12 +253,7 @@ impl File for LinuxFile {
|
||||
.build()
|
||||
.user_data(ptr as u64)
|
||||
};
|
||||
let ring = &mut io.ring;
|
||||
unsafe {
|
||||
ring.submission()
|
||||
.push(&write)
|
||||
.expect("submission queue is full");
|
||||
}
|
||||
io.ring.submit_entry(&write);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user