Merge 'Fix page locked panic' from Pedro Muniz

Clear locked pages when read completions callback fail. Also, we need to
abort I/O Completions in `stmt.run_once()` so that we do not raise
errors in `rollback` when clearing the page cache.
Fixes #2658
Fixes #2675
Fixes #2680
Fixes #2682

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #2683
This commit is contained in:
Pekka Enberg
2025-08-20 09:31:26 +03:00
committed by GitHub
6 changed files with 67 additions and 370 deletions

View File

@@ -3,171 +3,21 @@ use crate::error::LimboError;
use crate::io::clock::{Clock, Instant};
use crate::io::common;
use crate::Result;
use polling::{Event, Events, Poller};
use parking_lot::Mutex;
use rustix::{
fd::{AsFd, AsRawFd},
fs::{self, FlockOperation, OFlags, OpenOptionsExt},
io::Errno,
};
use std::os::fd::RawFd;
use std::{cell::UnsafeCell, mem::MaybeUninit, sync::Mutex};
use std::{io::ErrorKind, sync::Arc};
#[cfg(feature = "fs")]
use tracing::debug;
use tracing::{instrument, trace, Level};
struct OwnedCallbacks(UnsafeCell<Callbacks>);
// We assume we locking on IO level is done by user.
unsafe impl Send for OwnedCallbacks {}
unsafe impl Sync for OwnedCallbacks {}
struct BorrowedCallbacks<'io>(UnsafeCell<&'io mut Callbacks>);
impl OwnedCallbacks {
#[allow(dead_code)]
fn new() -> Self {
Self(UnsafeCell::new(Callbacks::new()))
}
fn as_mut<'io>(&self) -> &'io mut Callbacks {
unsafe { &mut *self.0.get() }
}
fn is_empty(&self) -> bool {
self.as_mut().inline_count == 0
}
fn remove(&self, fd: usize) -> Option<CompletionCallback> {
let callbacks = unsafe { &mut *self.0.get() };
callbacks.remove(fd)
}
}
impl BorrowedCallbacks<'_> {
fn insert(&self, fd: usize, callback: CompletionCallback) {
let callbacks = unsafe { &mut *self.0.get() };
callbacks.insert(fd, callback);
}
}
struct EventsHandler(UnsafeCell<Events>);
impl EventsHandler {
#[allow(dead_code)]
fn new() -> Self {
Self(UnsafeCell::new(Events::new()))
}
fn clear(&self) {
let events = unsafe { &mut *self.0.get() };
events.clear();
}
fn iter(&self) -> impl Iterator<Item = Event> {
let events = unsafe { &*self.0.get() };
events.iter()
}
fn as_mut<'io>(&self) -> &'io mut Events {
unsafe { &mut *self.0.get() }
}
}
struct PollHandler(UnsafeCell<Poller>);
struct BorrowedPollHandler<'io>(UnsafeCell<&'io mut Poller>);
impl BorrowedPollHandler<'_> {
fn add(&self, fd: &rustix::fd::BorrowedFd, event: Event) -> Result<()> {
let poller = unsafe { &mut *self.0.get() };
unsafe { poller.add(fd, event)? }
Ok(())
}
}
impl PollHandler {
#[allow(dead_code)]
fn new() -> Self {
Self(UnsafeCell::new(Poller::new().unwrap()))
}
fn wait(&self, events: &mut Events, timeout: Option<std::time::Duration>) -> Result<()> {
let poller = unsafe { &mut *self.0.get() };
poller.wait(events, timeout)?;
Ok(())
}
fn as_mut<'io>(&self) -> &'io mut Poller {
unsafe { &mut *self.0.get() }
}
}
type CallbackEntry = (usize, CompletionCallback);
const FD_INLINE_SIZE: usize = 32;
struct Callbacks {
inline_entries: [MaybeUninit<(usize, CompletionCallback)>; FD_INLINE_SIZE],
heap_entries: Vec<CallbackEntry>,
inline_count: usize,
}
impl Callbacks {
fn new() -> Self {
Self {
inline_entries: [const { MaybeUninit::uninit() }; FD_INLINE_SIZE],
heap_entries: Vec::new(),
inline_count: 0,
}
}
fn insert(&mut self, fd: usize, callback: CompletionCallback) {
if self.inline_count < FD_INLINE_SIZE {
self.inline_entries[self.inline_count].write((fd, callback));
self.inline_count += 1;
} else {
self.heap_entries.push((fd, callback));
}
}
fn remove(&mut self, fd: usize) -> Option<CompletionCallback> {
if let Some(pos) = self.find_inline(fd) {
let (_, callback) = unsafe { self.inline_entries[pos].assume_init_read() };
// if not the last element, move the last valid entry into this position
if pos < self.inline_count - 1 {
let last_valid =
unsafe { self.inline_entries[self.inline_count - 1].assume_init_read() };
self.inline_entries[pos].write(last_valid);
}
self.inline_count -= 1;
return Some(callback);
}
if let Some(pos) = self.heap_entries.iter().position(|&(k, _)| k == fd) {
return Some(self.heap_entries.swap_remove(pos).1);
}
None
}
fn find_inline(&self, fd: usize) -> Option<usize> {
(0..self.inline_count)
.find(|&i| unsafe { self.inline_entries[i].assume_init_ref().0 == fd })
}
}
impl Drop for Callbacks {
fn drop(&mut self) {
for i in 0..self.inline_count {
unsafe { self.inline_entries[i].assume_init_drop() };
}
}
}
/// UnixIO lives longer than any of the files it creates, so it is
/// safe to store references to it's internals in the UnixFiles
pub struct UnixIO {
poller: PollHandler,
events: EventsHandler,
callbacks: OwnedCallbacks,
}
pub struct UnixIO {}
unsafe impl Send for UnixIO {}
unsafe impl Sync for UnixIO {}
@@ -176,11 +26,7 @@ impl UnixIO {
#[cfg(feature = "fs")]
pub fn new() -> Result<Self> {
debug!("Using IO backend 'syscall'");
Ok(Self {
poller: PollHandler::new(),
events: EventsHandler::new(),
callbacks: OwnedCallbacks::new(),
})
Ok(Self {})
}
}
@@ -263,8 +109,6 @@ impl IO for UnixIO {
#[allow(clippy::arc_with_non_send_sync)]
let unix_file = Arc::new(UnixFile {
file: Arc::new(Mutex::new(file)),
poller: BorrowedPollHandler(self.poller.as_mut().into()),
callbacks: BorrowedCallbacks(self.callbacks.as_mut().into()),
});
if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() {
unix_file.lock_file(!flags.contains(OpenFlags::ReadOnly))?;
@@ -274,162 +118,37 @@ impl IO for UnixIO {
#[instrument(err, skip_all, level = Level::TRACE)]
fn run_once(&self) -> Result<()> {
if self.callbacks.is_empty() {
return Ok(());
}
self.events.clear();
trace!("run_once() waits for events");
self.poller.wait(self.events.as_mut(), None)?;
for event in self.events.iter() {
let key = event.key;
let cb = match self.callbacks.remove(key) {
Some(cb) => cb,
None => continue, // could have been completed/removed already
};
match cb {
CompletionCallback::Read(ref file, c, pos) => {
let f = file
.lock()
.map_err(|e| LimboError::LockingError(e.to_string()))?;
let r = c.as_read();
let buf = r.buf();
match rustix::io::pread(f.as_fd(), buf.as_mut_slice(), pos as u64) {
Ok(n) => c.complete(n as i32),
Err(Errno::AGAIN) => {
// re-arm
unsafe { self.poller.as_mut().add(&f.as_fd(), Event::readable(key))? };
self.callbacks.as_mut().insert(
key,
CompletionCallback::Read(file.clone(), c.clone(), pos),
);
}
Err(e) => return Err(e.into()),
}
}
CompletionCallback::Write(ref file, c, buf, pos) => {
let f = file
.lock()
.map_err(|e| LimboError::LockingError(e.to_string()))?;
match rustix::io::pwrite(f.as_fd(), buf.as_slice(), pos as u64) {
Ok(n) => c.complete(n as i32),
Err(Errno::AGAIN) => {
unsafe { self.poller.as_mut().add(&f.as_fd(), Event::writable(key))? };
self.callbacks.as_mut().insert(
key,
CompletionCallback::Write(file.clone(), c, buf.clone(), pos),
);
}
Err(e) => return Err(e.into()),
}
}
CompletionCallback::Writev(file, c, bufs, mut pos, mut idx, mut off) => {
let f = file
.lock()
.map_err(|e| LimboError::LockingError(e.to_string()))?;
// keep trying until WouldBlock or we're done with this event
match try_pwritev_raw(f.as_raw_fd(), pos as u64, &bufs, idx, off) {
Ok(written) => {
// advance through buffers
let mut rem = written;
while rem > 0 {
let len = bufs[idx].len();
let left = len - off;
if rem < left {
off += rem;
rem = 0;
} else {
rem -= left;
idx += 1;
off = 0;
if idx == bufs.len() {
break;
}
}
}
pos += written;
if idx == bufs.len() {
c.complete(pos as i32);
} else {
// Not finished; re-arm and store updated state
unsafe {
self.poller.as_mut().add(&f.as_fd(), Event::writable(key))?
};
self.callbacks.as_mut().insert(
key,
CompletionCallback::Writev(
file.clone(),
c.clone(),
bufs,
pos,
idx,
off,
),
);
}
break;
}
Err(e) if e.kind() == ErrorKind::WouldBlock => {
// re-arm with same state
unsafe { self.poller.as_mut().add(&f.as_fd(), Event::writable(key))? };
self.callbacks.as_mut().insert(
key,
CompletionCallback::Writev(
file.clone(),
c.clone(),
bufs,
pos,
idx,
off,
),
);
break;
}
Err(e) => return Err(e.into()),
}
}
}
}
Ok(())
}
}
enum CompletionCallback {
Read(Arc<Mutex<std::fs::File>>, Completion, usize),
Write(
Arc<Mutex<std::fs::File>>,
Completion,
Arc<crate::Buffer>,
usize,
),
Writev(
Arc<Mutex<std::fs::File>>,
Completion,
Vec<Arc<crate::Buffer>>,
usize, // absolute file offset
usize, // buf index
usize, // intra-buf offset
),
}
// enum CompletionCallback {
// Read(Arc<Mutex<std::fs::File>>, Completion, usize),
// Write(
// Arc<Mutex<std::fs::File>>,
// Completion,
// Arc<crate::Buffer>,
// usize,
// ),
// Writev(
// Arc<Mutex<std::fs::File>>,
// Completion,
// Vec<Arc<crate::Buffer>>,
// usize, // absolute file offset
// usize, // buf index
// usize, // intra-buf offset
// ),
// }
pub struct UnixFile<'io> {
#[allow(clippy::arc_with_non_send_sync)]
pub struct UnixFile {
file: Arc<Mutex<std::fs::File>>,
poller: BorrowedPollHandler<'io>,
callbacks: BorrowedCallbacks<'io>,
}
unsafe impl Send for UnixFile<'_> {}
unsafe impl Sync for UnixFile<'_> {}
unsafe impl Send for UnixFile {}
unsafe impl Sync for UnixFile {}
impl File for UnixFile<'_> {
impl File for UnixFile {
fn lock_file(&self, exclusive: bool) -> Result<()> {
let fd = self.file.lock().unwrap();
let fd = self.file.lock();
let fd = fd.as_fd();
// F_SETLK is a non-blocking lock. The lock will be released when the file is closed
// or the process exits or after an explicit unlock.
@@ -456,7 +175,7 @@ impl File for UnixFile<'_> {
}
fn unlock_file(&self) -> Result<()> {
let fd = self.file.lock().unwrap();
let fd = self.file.lock();
let fd = fd.as_fd();
fs::fcntl_lock(fd, FlockOperation::NonBlockingUnlock).map_err(|e| {
LimboError::LockingError(format!(
@@ -469,7 +188,10 @@ impl File for UnixFile<'_> {
#[instrument(err, skip_all, level = Level::TRACE)]
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
let file = self.file.lock().unwrap();
let file = self
.file
.try_lock()
.ok_or_else(|| LimboError::LockingError("Failed locking file".to_string()))?;
let result = {
let r = c.as_read();
let buf = r.buf();
@@ -482,27 +204,16 @@ impl File for UnixFile<'_> {
c.complete(n as i32);
Ok(c)
}
Err(Errno::AGAIN) => {
trace!("pread blocks");
// Would block, set up polling
let fd = file.as_raw_fd();
self.poller
.add(&file.as_fd(), Event::readable(fd as usize))?;
{
self.callbacks.insert(
fd as usize,
CompletionCallback::Read(self.file.clone(), c.clone(), pos),
);
}
Ok(c)
}
Err(e) => Err(e.into()),
}
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
let file = self.file.lock().unwrap();
let file = self
.file
.try_lock()
.ok_or_else(|| LimboError::LockingError("Failed locking file".to_string()))?;
let result = { rustix::io::pwrite(file.as_fd(), buffer.as_slice(), pos as u64) };
match result {
Ok(n) => {
@@ -511,18 +222,6 @@ impl File for UnixFile<'_> {
c.complete(n as i32);
Ok(c)
}
Err(Errno::AGAIN) => {
trace!("pwrite blocks");
// Would block, set up polling
let fd = file.as_raw_fd();
self.poller
.add(&file.as_fd(), Event::writable(fd as usize))?;
self.callbacks.insert(
fd as usize,
CompletionCallback::Write(self.file.clone(), c.clone(), buffer.clone(), pos),
);
Ok(c)
}
Err(e) => Err(e.into()),
}
}
@@ -540,8 +239,8 @@ impl File for UnixFile<'_> {
}
let file = self
.file
.lock()
.map_err(|e| LimboError::LockingError(e.to_string()))?;
.try_lock()
.ok_or_else(|| LimboError::LockingError("Failed locking file".to_string()))?;
match try_pwritev_raw(file.as_raw_fd(), pos as u64, &buffers, 0, 0) {
Ok(written) => {
@@ -550,36 +249,17 @@ impl File for UnixFile<'_> {
Ok(c)
}
Err(e) => {
if e.kind() == ErrorKind::WouldBlock {
trace!("pwritev blocks");
} else {
return Err(e.into());
}
// Set up state so we can resume later
let fd = file.as_raw_fd();
self.poller
.add(&file.as_fd(), Event::writable(fd as usize))?;
let buf_idx = 0;
let buf_offset = 0;
self.callbacks.insert(
fd as usize,
CompletionCallback::Writev(
self.file.clone(),
c.clone(),
buffers,
pos,
buf_idx,
buf_offset,
),
);
Ok(c)
return Err(e.into());
}
}
}
#[instrument(err, skip_all, level = Level::TRACE)]
fn sync(&self, c: Completion) -> Result<Completion> {
let file = self.file.lock().unwrap();
let file = self
.file
.try_lock()
.ok_or_else(|| LimboError::LockingError("Failed locking file".to_string()))?;
let result = fs::fsync(file.as_fd());
match result {
Ok(()) => {
@@ -593,15 +273,19 @@ impl File for UnixFile<'_> {
#[instrument(err, skip_all, level = Level::TRACE)]
fn size(&self) -> Result<u64> {
let file = self.file.lock().unwrap();
let file = self
.file
.try_lock()
.ok_or_else(|| LimboError::LockingError("Failed locking file".to_string()))?;
Ok(file.metadata()?.len())
}
#[instrument(err, skip_all, level = Level::INFO)]
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
let file = self.file.lock().map_err(|e| {
LimboError::LockingError(format!("Failed to lock file for truncation: {e}"))
})?;
let file = self
.file
.try_lock()
.ok_or_else(|| LimboError::LockingError("Failed locking file".to_string()))?;
let result = file.set_len(len as u64);
match result {
Ok(()) => {
@@ -614,7 +298,7 @@ impl File for UnixFile<'_> {
}
}
impl Drop for UnixFile<'_> {
impl Drop for UnixFile {
fn drop(&mut self) {
self.unlock_file().expect("Failed to unlock file");
}

View File

@@ -2048,6 +2048,9 @@ impl Statement {
return res;
}
if res.is_err() {
if let Some(io) = &self.state.io_completions {
io.abort();
}
let state = self.program.connection.transaction_state.get();
if let TransactionState::Write { .. } = state {
let end_tx_res = self.pager.end_tx(true, &self.program.connection, true)?;

View File

@@ -876,6 +876,7 @@ pub fn begin_read_page(
let buf = Arc::new(buf);
let complete = Box::new(move |res: Result<(Arc<Buffer>, i32), CompletionError>| {
let Ok((mut buf, bytes_read)) = res else {
page.clear_locked();
return;
};
let buf_len = buf.len();

View File

@@ -915,6 +915,7 @@ impl Wal for WalFile {
let frame = page.clone();
let complete = Box::new(move |res: Result<(Arc<Buffer>, i32), CompletionError>| {
let Ok((buf, bytes_read)) = res else {
page.clear_locked();
return;
};
let buf_len = buf.len();

View File

@@ -2488,10 +2488,18 @@ impl IOCompletions {
}
}
pub fn completed(&self) -> bool {
pub fn finished(&self) -> bool {
match self {
IOCompletions::Single(c) => c.is_completed(),
IOCompletions::Many(completions) => completions.iter().all(|c| c.is_completed()),
IOCompletions::Single(c) => c.finished(),
IOCompletions::Many(completions) => completions.iter().all(|c| c.finished()),
}
}
/// Send abort signal to completions
pub fn abort(&self) {
match self {
IOCompletions::Single(c) => c.abort(),
IOCompletions::Many(completions) => completions.iter().for_each(|c| c.abort()),
}
}
}

View File

@@ -442,7 +442,7 @@ impl Program {
return Ok(StepResult::Interrupt);
}
if let Some(io) = &state.io_completions {
if !io.completed() {
if !io.finished() {
return Ok(StepResult::IO);
}
state.io_completions = None;