Merge 'Completion: make it Send + Sync' from Nikita Sivukhin

This PR makes Completion to be `Send` and also force internal callbacks
to be `Send`.
The reasons for that is following:
1. `io_uring` right now can execute completion at any moment potentially
on arbitrary thread, so we already implicitly rely on that property of
`Completion` and its callbacks
2. In case of partial sync
(https://github.com/tursodatabase/turso/pull/3931), there will be an
additional requirement for Completion to be Send as it will be put in
the separate queue associated with `DatabaseStorage` (which is Send +
Sync) processed in parallel with main IO
3. Generally, it sounds pretty natural in the context of async io to
have `Send` Completion so it can be safely transferred between threads
The approach in the PR is hacky as `Completion` made `Send` in a pretty
unsafe way. The main reason why Rust can't derive `Send` automatically
is following:
1. Many completions holds `Arc<Buffer>` internally which needs to be
marked with unsafe traits explicitly as it holds `ptr: NonNull<u8>`
2. `Completion` holds `CompletionInner` as `Arc` which internally holds
completion callback as `Box<XXXComplete>`, but because it's guarded by
`Arc` - Rust forces completion callback to also be Sync (not only Send)
and as we usually move Completion in the callback - we get a cycle here
and with current code Send for Completion implies Sync for Completion.
So, in order to fix this, PR marks `ArenaBuffer` as Send + Sync and
forces completion callbacks to be Send + Sync too. It's seems like
`Sync` requirement is theoretically unnecessary and `Send` should be
enough - but with current code organization Send + Sync looks like the
simplest approach.
Making `ArenaBuffer` Sync sounds almost correct, although I am worried
about read/write access to it as internally `ArenaBuffer` do not
introduce any synchronization of its reads/writes - so potentially we
already can hit some multi-threading bugs with io_uring do to
`ArenaBuffer` used from different threads (or maybe there are some
implicit memory barriers in another parts of the code which can
guarantee us that we will properly use `ArenaBuffer` - but this sounds
like a pure luck)

Reviewed-by: Preston Thorpe <preston@turso.tech>

Closes #3935
This commit is contained in:
Preston Thorpe
2025-11-11 20:10:52 -05:00
committed by GitHub
3 changed files with 37 additions and 25 deletions

View File

@@ -11,10 +11,10 @@ use parking_lot::Mutex;
use crate::{Buffer, CompletionError};
pub type ReadComplete = dyn Fn(Result<(Arc<Buffer>, i32), CompletionError>);
pub type WriteComplete = dyn Fn(Result<i32, CompletionError>);
pub type SyncComplete = dyn Fn(Result<i32, CompletionError>);
pub type TruncateComplete = dyn Fn(Result<i32, CompletionError>);
pub type ReadComplete = dyn Fn(Result<(Arc<Buffer>, i32), CompletionError>) + Send + Sync;
pub type WriteComplete = dyn Fn(Result<i32, CompletionError>) + Send + Sync;
pub type SyncComplete = dyn Fn(Result<i32, CompletionError>) + Send + Sync;
pub type TruncateComplete = dyn Fn(Result<i32, CompletionError>) + Send + Sync;
#[must_use]
#[derive(Debug, Clone)]
@@ -275,7 +275,7 @@ impl Completion {
pub fn new_write_linked<F>(complete: F) -> Self
where
F: Fn(Result<i32, CompletionError>) + 'static,
F: Fn(Result<i32, CompletionError>) + Send + Sync + 'static,
{
Self::new_linked(CompletionType::Write(WriteCompletion::new(Box::new(
complete,
@@ -284,7 +284,7 @@ impl Completion {
pub fn new_write<F>(complete: F) -> Self
where
F: Fn(Result<i32, CompletionError>) + 'static,
F: Fn(Result<i32, CompletionError>) + Send + Sync + 'static,
{
Self::new(CompletionType::Write(WriteCompletion::new(Box::new(
complete,
@@ -293,7 +293,7 @@ impl Completion {
pub fn new_read<F>(buf: Arc<Buffer>, complete: F) -> Self
where
F: Fn(Result<(Arc<Buffer>, i32), CompletionError>) + 'static,
F: Fn(Result<(Arc<Buffer>, i32), CompletionError>) + Send + Sync + 'static,
{
Self::new(CompletionType::Read(ReadCompletion::new(
buf,
@@ -302,7 +302,7 @@ impl Completion {
}
pub fn new_sync<F>(complete: F) -> Self
where
F: Fn(Result<i32, CompletionError>) + 'static,
F: Fn(Result<i32, CompletionError>) + Send + Sync + 'static,
{
Self::new(CompletionType::Sync(SyncCompletion::new(Box::new(
complete,
@@ -311,7 +311,7 @@ impl Completion {
pub fn new_trunc<F>(complete: F) -> Self
where
F: Fn(Result<i32, CompletionError>) + 'static,
F: Fn(Result<i32, CompletionError>) + Send + Sync + 'static,
{
Self::new(CompletionType::Truncate(TruncateCompletion::new(Box::new(
complete,

View File

@@ -26,6 +26,10 @@ pub struct ArenaBuffer {
len: usize,
}
// Unsound: write and read from different threads can be dangerous with current ArenaBuffer implementation without some additional explicit synchronization
unsafe impl Sync for ArenaBuffer {}
unsafe impl Send for ArenaBuffer {}
impl ArenaBuffer {
const fn new(
arena: Weak<Arena>,

View File

@@ -4,13 +4,14 @@ use rustc_hash::{FxHashMap, FxHashSet};
use std::array;
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::sync::Mutex;
use strum::EnumString;
use tracing::{instrument, Level};
use parking_lot::RwLock;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::{cell::Cell, fmt, sync::Arc};
use std::{fmt, sync::Arc};
use super::buffer_pool::BufferPool;
use super::pager::{PageRef, Pager};
@@ -1138,10 +1139,15 @@ impl Wal for WalFile {
}
#[instrument(skip_all, level = Level::DEBUG)]
// todo(sivukhin): change API to accept Buffer or some other owned type
// this method involves IO and cross "async" boundary - so juggling with references is bad and dangerous
fn read_frame_raw(&self, frame_id: u64, frame: &mut [u8]) -> Result<Completion> {
tracing::debug!("read_frame_raw({})", frame_id);
let offset = self.frame_offset(frame_id);
let (frame_ptr, frame_len) = (frame.as_mut_ptr(), frame.len());
// HACK: *mut u8 can't be Sent between threads safely, cast it to usize then
// for the time of writing this comment - this is *safe* as all callers immediately call synchronous method wait_for_completion and hold necessary references
let (frame_ptr, frame_len) = (frame.as_mut_ptr() as usize, frame.len());
let encryption_ctx = {
let io_ctx = self.io_ctx.read();
@@ -1157,6 +1163,7 @@ impl Wal for WalFile {
"read({bytes_read}) != expected({buf_len})"
);
let buf_ptr = buf.as_ptr();
let frame_ptr = frame_ptr as *mut u8;
let frame_ref: &mut [u8] =
unsafe { std::slice::from_raw_parts_mut(frame_ptr, frame_len) };
@@ -1194,6 +1201,8 @@ impl Wal for WalFile {
}
#[instrument(skip_all, level = Level::DEBUG)]
// todo(sivukhin): change API to accept Buffer or some other owned type
// this method involves IO and cross "async" boundary - so juggling with references is bad and dangerous
fn write_frame_raw(
&mut self,
buffer_pool: Arc<BufferPool>,
@@ -1226,8 +1235,12 @@ impl Wal for WalFile {
if frame_id <= self.max_frame.load(Ordering::Acquire) {
// just validate if page content from the frame matches frame in the WAL
let offset = self.frame_offset(frame_id);
let conflict = Arc::new(Cell::new(false));
let (page_ptr, page_len) = (page.as_ptr(), page.len());
let conflict = Arc::new(Mutex::new(false));
// HACK: *mut u8 can't be shared between threads safely, cast it to usize then
// for the time of writing this comment - this is *safe* as the function immediately call synchronous method wait_for_completion and hold necessary references
let (page_ptr, page_len) = (page.as_ptr() as usize, page.len());
let complete = Box::new({
let conflict = conflict.clone();
move |res: Result<(Arc<Buffer>, i32), CompletionError>| {
@@ -1239,9 +1252,9 @@ impl Wal for WalFile {
bytes_read == buf_len as i32,
"read({bytes_read}) != expected({buf_len})"
);
let page = unsafe { std::slice::from_raw_parts(page_ptr, page_len) };
let page = unsafe { std::slice::from_raw_parts(page_ptr as *mut u8, page_len) };
if buf.as_slice() != page {
conflict.set(true);
*conflict.lock().unwrap() = true;
}
}
});
@@ -1258,7 +1271,7 @@ impl Wal for WalFile {
&self.io_ctx.read(),
)?;
self.io.wait_for_completion(c)?;
return if conflict.get() {
return if *conflict.lock().unwrap() {
Err(LimboError::Conflict(format!(
"frame content differs from the WAL: frame_id={frame_id}"
)))
@@ -2506,11 +2519,7 @@ pub mod test {
use parking_lot::RwLock;
#[cfg(unix)]
use std::os::unix::fs::MetadataExt;
use std::{
cell::Cell,
rc::Rc,
sync::{atomic::Ordering, Arc},
};
use std::sync::{atomic::Ordering, Arc, Mutex};
#[allow(clippy::arc_with_non_send_sync)]
pub(crate) fn get_database() -> (Arc<Database>, std::path::PathBuf) {
let mut path = tempfile::tempdir().unwrap().keep();
@@ -2536,17 +2545,16 @@ pub mod test {
let _ = conn.execute("insert into test (value) values ('test1'), ('test2'), ('test3')");
let wal = db.shared_wal.write();
let wal_file = wal.file.as_ref().unwrap().clone();
let done = Rc::new(Cell::new(false));
let done = Arc::new(Mutex::new(false));
let _done = done.clone();
let _ = wal_file.truncate(
WAL_HEADER_SIZE as u64,
Completion::new_trunc(move |_| {
let done = _done.clone();
done.set(true);
*_done.lock().unwrap() = true;
}),
);
assert!(wal_file.size().unwrap() == WAL_HEADER_SIZE as u64);
assert!(done.get());
assert!(*done.lock().unwrap());
}
#[test]