diff --git a/core/io/unix.rs b/core/io/unix.rs index b228ba68a..e63456205 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -3,171 +3,21 @@ use crate::error::LimboError; use crate::io::clock::{Clock, Instant}; use crate::io::common; use crate::Result; -use polling::{Event, Events, Poller}; +use parking_lot::Mutex; use rustix::{ fd::{AsFd, AsRawFd}, fs::{self, FlockOperation, OFlags, OpenOptionsExt}, - io::Errno, }; use std::os::fd::RawFd; -use std::{cell::UnsafeCell, mem::MaybeUninit, sync::Mutex}; use std::{io::ErrorKind, sync::Arc}; #[cfg(feature = "fs")] use tracing::debug; use tracing::{instrument, trace, Level}; -struct OwnedCallbacks(UnsafeCell); -// We assume we locking on IO level is done by user. -unsafe impl Send for OwnedCallbacks {} -unsafe impl Sync for OwnedCallbacks {} -struct BorrowedCallbacks<'io>(UnsafeCell<&'io mut Callbacks>); - -impl OwnedCallbacks { - #[allow(dead_code)] - fn new() -> Self { - Self(UnsafeCell::new(Callbacks::new())) - } - fn as_mut<'io>(&self) -> &'io mut Callbacks { - unsafe { &mut *self.0.get() } - } - - fn is_empty(&self) -> bool { - self.as_mut().inline_count == 0 - } - - fn remove(&self, fd: usize) -> Option { - let callbacks = unsafe { &mut *self.0.get() }; - callbacks.remove(fd) - } -} - -impl BorrowedCallbacks<'_> { - fn insert(&self, fd: usize, callback: CompletionCallback) { - let callbacks = unsafe { &mut *self.0.get() }; - callbacks.insert(fd, callback); - } -} - -struct EventsHandler(UnsafeCell); - -impl EventsHandler { - #[allow(dead_code)] - fn new() -> Self { - Self(UnsafeCell::new(Events::new())) - } - - fn clear(&self) { - let events = unsafe { &mut *self.0.get() }; - events.clear(); - } - - fn iter(&self) -> impl Iterator { - let events = unsafe { &*self.0.get() }; - events.iter() - } - - fn as_mut<'io>(&self) -> &'io mut Events { - unsafe { &mut *self.0.get() } - } -} -struct PollHandler(UnsafeCell); -struct BorrowedPollHandler<'io>(UnsafeCell<&'io mut Poller>); - -impl BorrowedPollHandler<'_> { - fn add(&self, fd: &rustix::fd::BorrowedFd, event: Event) -> Result<()> { - let poller = unsafe { &mut *self.0.get() }; - unsafe { poller.add(fd, event)? } - Ok(()) - } -} - -impl PollHandler { - #[allow(dead_code)] - fn new() -> Self { - Self(UnsafeCell::new(Poller::new().unwrap())) - } - fn wait(&self, events: &mut Events, timeout: Option) -> Result<()> { - let poller = unsafe { &mut *self.0.get() }; - poller.wait(events, timeout)?; - Ok(()) - } - - fn as_mut<'io>(&self) -> &'io mut Poller { - unsafe { &mut *self.0.get() } - } -} - -type CallbackEntry = (usize, CompletionCallback); - -const FD_INLINE_SIZE: usize = 32; - -struct Callbacks { - inline_entries: [MaybeUninit<(usize, CompletionCallback)>; FD_INLINE_SIZE], - heap_entries: Vec, - inline_count: usize, -} - -impl Callbacks { - fn new() -> Self { - Self { - inline_entries: [const { MaybeUninit::uninit() }; FD_INLINE_SIZE], - heap_entries: Vec::new(), - inline_count: 0, - } - } - - fn insert(&mut self, fd: usize, callback: CompletionCallback) { - if self.inline_count < FD_INLINE_SIZE { - self.inline_entries[self.inline_count].write((fd, callback)); - self.inline_count += 1; - } else { - self.heap_entries.push((fd, callback)); - } - } - - fn remove(&mut self, fd: usize) -> Option { - if let Some(pos) = self.find_inline(fd) { - let (_, callback) = unsafe { self.inline_entries[pos].assume_init_read() }; - - // if not the last element, move the last valid entry into this position - if pos < self.inline_count - 1 { - let last_valid = - unsafe { self.inline_entries[self.inline_count - 1].assume_init_read() }; - self.inline_entries[pos].write(last_valid); - } - - self.inline_count -= 1; - return Some(callback); - } - - if let Some(pos) = self.heap_entries.iter().position(|&(k, _)| k == fd) { - return Some(self.heap_entries.swap_remove(pos).1); - } - None - } - - fn find_inline(&self, fd: usize) -> Option { - (0..self.inline_count) - .find(|&i| unsafe { self.inline_entries[i].assume_init_ref().0 == fd }) - } -} - -impl Drop for Callbacks { - fn drop(&mut self) { - for i in 0..self.inline_count { - unsafe { self.inline_entries[i].assume_init_drop() }; - } - } -} - /// UnixIO lives longer than any of the files it creates, so it is /// safe to store references to it's internals in the UnixFiles -pub struct UnixIO { - poller: PollHandler, - events: EventsHandler, - callbacks: OwnedCallbacks, -} +pub struct UnixIO {} unsafe impl Send for UnixIO {} unsafe impl Sync for UnixIO {} @@ -176,11 +26,7 @@ impl UnixIO { #[cfg(feature = "fs")] pub fn new() -> Result { debug!("Using IO backend 'syscall'"); - Ok(Self { - poller: PollHandler::new(), - events: EventsHandler::new(), - callbacks: OwnedCallbacks::new(), - }) + Ok(Self {}) } } @@ -263,8 +109,6 @@ impl IO for UnixIO { #[allow(clippy::arc_with_non_send_sync)] let unix_file = Arc::new(UnixFile { file: Arc::new(Mutex::new(file)), - poller: BorrowedPollHandler(self.poller.as_mut().into()), - callbacks: BorrowedCallbacks(self.callbacks.as_mut().into()), }); if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() { unix_file.lock_file(!flags.contains(OpenFlags::ReadOnly))?; @@ -274,162 +118,37 @@ impl IO for UnixIO { #[instrument(err, skip_all, level = Level::TRACE)] fn run_once(&self) -> Result<()> { - if self.callbacks.is_empty() { - return Ok(()); - } - - self.events.clear(); - trace!("run_once() waits for events"); - self.poller.wait(self.events.as_mut(), None)?; - - for event in self.events.iter() { - let key = event.key; - let cb = match self.callbacks.remove(key) { - Some(cb) => cb, - None => continue, // could have been completed/removed already - }; - - match cb { - CompletionCallback::Read(ref file, c, pos) => { - let f = file - .lock() - .map_err(|e| LimboError::LockingError(e.to_string()))?; - let r = c.as_read(); - let buf = r.buf(); - match rustix::io::pread(f.as_fd(), buf.as_mut_slice(), pos as u64) { - Ok(n) => c.complete(n as i32), - Err(Errno::AGAIN) => { - // re-arm - unsafe { self.poller.as_mut().add(&f.as_fd(), Event::readable(key))? }; - self.callbacks.as_mut().insert( - key, - CompletionCallback::Read(file.clone(), c.clone(), pos), - ); - } - Err(e) => return Err(e.into()), - } - } - - CompletionCallback::Write(ref file, c, buf, pos) => { - let f = file - .lock() - .map_err(|e| LimboError::LockingError(e.to_string()))?; - match rustix::io::pwrite(f.as_fd(), buf.as_slice(), pos as u64) { - Ok(n) => c.complete(n as i32), - Err(Errno::AGAIN) => { - unsafe { self.poller.as_mut().add(&f.as_fd(), Event::writable(key))? }; - self.callbacks.as_mut().insert( - key, - CompletionCallback::Write(file.clone(), c, buf.clone(), pos), - ); - } - Err(e) => return Err(e.into()), - } - } - - CompletionCallback::Writev(file, c, bufs, mut pos, mut idx, mut off) => { - let f = file - .lock() - .map_err(|e| LimboError::LockingError(e.to_string()))?; - // keep trying until WouldBlock or we're done with this event - match try_pwritev_raw(f.as_raw_fd(), pos as u64, &bufs, idx, off) { - Ok(written) => { - // advance through buffers - let mut rem = written; - while rem > 0 { - let len = bufs[idx].len(); - let left = len - off; - if rem < left { - off += rem; - rem = 0; - } else { - rem -= left; - idx += 1; - off = 0; - if idx == bufs.len() { - break; - } - } - } - pos += written; - - if idx == bufs.len() { - c.complete(pos as i32); - } else { - // Not finished; re-arm and store updated state - unsafe { - self.poller.as_mut().add(&f.as_fd(), Event::writable(key))? - }; - self.callbacks.as_mut().insert( - key, - CompletionCallback::Writev( - file.clone(), - c.clone(), - bufs, - pos, - idx, - off, - ), - ); - } - break; - } - Err(e) if e.kind() == ErrorKind::WouldBlock => { - // re-arm with same state - unsafe { self.poller.as_mut().add(&f.as_fd(), Event::writable(key))? }; - self.callbacks.as_mut().insert( - key, - CompletionCallback::Writev( - file.clone(), - c.clone(), - bufs, - pos, - idx, - off, - ), - ); - break; - } - Err(e) => return Err(e.into()), - } - } - } - } - Ok(()) } } -enum CompletionCallback { - Read(Arc>, Completion, usize), - Write( - Arc>, - Completion, - Arc, - usize, - ), - Writev( - Arc>, - Completion, - Vec>, - usize, // absolute file offset - usize, // buf index - usize, // intra-buf offset - ), -} +// enum CompletionCallback { +// Read(Arc>, Completion, usize), +// Write( +// Arc>, +// Completion, +// Arc, +// usize, +// ), +// Writev( +// Arc>, +// Completion, +// Vec>, +// usize, // absolute file offset +// usize, // buf index +// usize, // intra-buf offset +// ), +// } -pub struct UnixFile<'io> { - #[allow(clippy::arc_with_non_send_sync)] +pub struct UnixFile { file: Arc>, - poller: BorrowedPollHandler<'io>, - callbacks: BorrowedCallbacks<'io>, } -unsafe impl Send for UnixFile<'_> {} -unsafe impl Sync for UnixFile<'_> {} +unsafe impl Send for UnixFile {} +unsafe impl Sync for UnixFile {} -impl File for UnixFile<'_> { +impl File for UnixFile { fn lock_file(&self, exclusive: bool) -> Result<()> { - let fd = self.file.lock().unwrap(); + let fd = self.file.lock(); let fd = fd.as_fd(); // F_SETLK is a non-blocking lock. The lock will be released when the file is closed // or the process exits or after an explicit unlock. @@ -456,7 +175,7 @@ impl File for UnixFile<'_> { } fn unlock_file(&self) -> Result<()> { - let fd = self.file.lock().unwrap(); + let fd = self.file.lock(); let fd = fd.as_fd(); fs::fcntl_lock(fd, FlockOperation::NonBlockingUnlock).map_err(|e| { LimboError::LockingError(format!( @@ -469,7 +188,10 @@ impl File for UnixFile<'_> { #[instrument(err, skip_all, level = Level::TRACE)] fn pread(&self, pos: usize, c: Completion) -> Result { - let file = self.file.lock().unwrap(); + let file = self + .file + .try_lock() + .ok_or_else(|| LimboError::LockingError("Failed locking file".to_string()))?; let result = { let r = c.as_read(); let buf = r.buf(); @@ -482,27 +204,16 @@ impl File for UnixFile<'_> { c.complete(n as i32); Ok(c) } - Err(Errno::AGAIN) => { - trace!("pread blocks"); - // Would block, set up polling - let fd = file.as_raw_fd(); - self.poller - .add(&file.as_fd(), Event::readable(fd as usize))?; - { - self.callbacks.insert( - fd as usize, - CompletionCallback::Read(self.file.clone(), c.clone(), pos), - ); - } - Ok(c) - } Err(e) => Err(e.into()), } } #[instrument(err, skip_all, level = Level::TRACE)] fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { - let file = self.file.lock().unwrap(); + let file = self + .file + .try_lock() + .ok_or_else(|| LimboError::LockingError("Failed locking file".to_string()))?; let result = { rustix::io::pwrite(file.as_fd(), buffer.as_slice(), pos as u64) }; match result { Ok(n) => { @@ -511,18 +222,6 @@ impl File for UnixFile<'_> { c.complete(n as i32); Ok(c) } - Err(Errno::AGAIN) => { - trace!("pwrite blocks"); - // Would block, set up polling - let fd = file.as_raw_fd(); - self.poller - .add(&file.as_fd(), Event::writable(fd as usize))?; - self.callbacks.insert( - fd as usize, - CompletionCallback::Write(self.file.clone(), c.clone(), buffer.clone(), pos), - ); - Ok(c) - } Err(e) => Err(e.into()), } } @@ -540,8 +239,8 @@ impl File for UnixFile<'_> { } let file = self .file - .lock() - .map_err(|e| LimboError::LockingError(e.to_string()))?; + .try_lock() + .ok_or_else(|| LimboError::LockingError("Failed locking file".to_string()))?; match try_pwritev_raw(file.as_raw_fd(), pos as u64, &buffers, 0, 0) { Ok(written) => { @@ -550,36 +249,17 @@ impl File for UnixFile<'_> { Ok(c) } Err(e) => { - if e.kind() == ErrorKind::WouldBlock { - trace!("pwritev blocks"); - } else { - return Err(e.into()); - } - // Set up state so we can resume later - let fd = file.as_raw_fd(); - self.poller - .add(&file.as_fd(), Event::writable(fd as usize))?; - let buf_idx = 0; - let buf_offset = 0; - self.callbacks.insert( - fd as usize, - CompletionCallback::Writev( - self.file.clone(), - c.clone(), - buffers, - pos, - buf_idx, - buf_offset, - ), - ); - Ok(c) + return Err(e.into()); } } } #[instrument(err, skip_all, level = Level::TRACE)] fn sync(&self, c: Completion) -> Result { - let file = self.file.lock().unwrap(); + let file = self + .file + .try_lock() + .ok_or_else(|| LimboError::LockingError("Failed locking file".to_string()))?; let result = fs::fsync(file.as_fd()); match result { Ok(()) => { @@ -593,15 +273,19 @@ impl File for UnixFile<'_> { #[instrument(err, skip_all, level = Level::TRACE)] fn size(&self) -> Result { - let file = self.file.lock().unwrap(); + let file = self + .file + .try_lock() + .ok_or_else(|| LimboError::LockingError("Failed locking file".to_string()))?; Ok(file.metadata()?.len()) } #[instrument(err, skip_all, level = Level::INFO)] fn truncate(&self, len: usize, c: Completion) -> Result { - let file = self.file.lock().map_err(|e| { - LimboError::LockingError(format!("Failed to lock file for truncation: {e}")) - })?; + let file = self + .file + .try_lock() + .ok_or_else(|| LimboError::LockingError("Failed locking file".to_string()))?; let result = file.set_len(len as u64); match result { Ok(()) => { @@ -614,7 +298,7 @@ impl File for UnixFile<'_> { } } -impl Drop for UnixFile<'_> { +impl Drop for UnixFile { fn drop(&mut self) { self.unlock_file().expect("Failed to unlock file"); } diff --git a/core/lib.rs b/core/lib.rs index e1e5c6ae4..c7c4dfa04 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -2048,6 +2048,9 @@ impl Statement { return res; } if res.is_err() { + if let Some(io) = &self.state.io_completions { + io.abort(); + } let state = self.program.connection.transaction_state.get(); if let TransactionState::Write { .. } = state { let end_tx_res = self.pager.end_tx(true, &self.program.connection, true)?; diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 381498d5b..fff5abd5c 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -876,6 +876,7 @@ pub fn begin_read_page( let buf = Arc::new(buf); let complete = Box::new(move |res: Result<(Arc, i32), CompletionError>| { let Ok((mut buf, bytes_read)) = res else { + page.clear_locked(); return; }; let buf_len = buf.len(); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index b4ba856df..1e48132fc 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -915,6 +915,7 @@ impl Wal for WalFile { let frame = page.clone(); let complete = Box::new(move |res: Result<(Arc, i32), CompletionError>| { let Ok((buf, bytes_read)) = res else { + page.clear_locked(); return; }; let buf_len = buf.len(); diff --git a/core/types.rs b/core/types.rs index 251d6d7f0..00fdbe741 100644 --- a/core/types.rs +++ b/core/types.rs @@ -2488,10 +2488,18 @@ impl IOCompletions { } } - pub fn completed(&self) -> bool { + pub fn finished(&self) -> bool { match self { - IOCompletions::Single(c) => c.is_completed(), - IOCompletions::Many(completions) => completions.iter().all(|c| c.is_completed()), + IOCompletions::Single(c) => c.finished(), + IOCompletions::Many(completions) => completions.iter().all(|c| c.finished()), + } + } + + /// Send abort signal to completions + pub fn abort(&self) { + match self { + IOCompletions::Single(c) => c.abort(), + IOCompletions::Many(completions) => completions.iter().for_each(|c| c.abort()), } } } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index bc62960c9..bb00a9f83 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -442,7 +442,7 @@ impl Program { return Ok(StepResult::Interrupt); } if let Some(io) = &state.io_completions { - if !io.completed() { + if !io.finished() { return Ok(StepResult::IO); } state.io_completions = None;