Merge 'core: Unsafe Send and Sync pushdown' from Pekka Enberg

This patch pushes unsafe Send and Sync to individual components instead
of doing it at Database level. This makes it easier for us to
incrementally fix thread-safety, but avoid developers adding more thread
unsafe code.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #3745
This commit is contained in:
Preston Thorpe
2025-10-16 08:19:45 -04:00
committed by GitHub
7 changed files with 55 additions and 1 deletions

View File

@@ -329,6 +329,11 @@ pub struct DbspNode {
pub executable: Box<dyn IncrementalOperator>,
}
// SAFETY: This needs to be audited for thread safety.
// See: https://github.com/tursodatabase/turso/issues/1552
unsafe impl Send for DbspNode {}
unsafe impl Sync for DbspNode {}
impl std::fmt::Debug for DbspNode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DbspNode")
@@ -395,6 +400,11 @@ pub struct DbspCircuit {
pub(super) internal_state_index_root: i64,
}
// SAFETY: This needs to be audited for thread safety.
// See: https://github.com/tursodatabase/turso/issues/1552
unsafe impl Send for DbspCircuit {}
unsafe impl Sync for DbspCircuit {}
impl DbspCircuit {
/// Create a new empty circuit with initial empty schema
/// The actual output schema will be set when the root node is established

View File

@@ -218,7 +218,9 @@ pub enum QueryOperator {
/// Operator DAG (Directed Acyclic Graph)
/// Base trait for incremental operators
pub trait IncrementalOperator: Debug {
// SAFETY: This needs to be audited for thread safety.
// See: https://github.com/tursodatabase/turso/issues/1552
pub trait IncrementalOperator: Debug + Send {
/// Evaluate the operator with a state, without modifying internal state
/// This is used during query execution to compute results
/// May need to read from storage to get current state (e.g., for aggregates)

View File

@@ -40,6 +40,11 @@ pub enum PopulateState {
Done,
}
// SAFETY: This needs to be audited for thread safety.
// See: https://github.com/tursodatabase/turso/issues/1552
unsafe impl Send for PopulateState {}
unsafe impl Sync for PopulateState {}
/// State machine for merge_delta to handle I/O operations
impl fmt::Debug for PopulateState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -130,6 +135,11 @@ pub struct AllViewsTxState {
states: Rc<RefCell<HashMap<String, Arc<ViewTransactionState>>>>,
}
// SAFETY: This needs to be audited for thread safety.
// See: https://github.com/tursodatabase/turso/issues/1552
unsafe impl Send for AllViewsTxState {}
unsafe impl Sync for AllViewsTxState {}
impl AllViewsTxState {
/// Create a new container for view transaction states
pub fn new() -> Self {
@@ -210,6 +220,11 @@ pub struct IncrementalView {
root_page: i64,
}
// SAFETY: This needs to be audited for thread safety.
// See: https://github.com/tursodatabase/turso/issues/1552
unsafe impl Send for IncrementalView {}
unsafe impl Sync for IncrementalView {}
impl IncrementalView {
/// Try to compile the SELECT statement into a DBSP circuit
fn try_compile_circuit(

View File

@@ -67,6 +67,11 @@ pub trait File: Send + Sync {
#[derive(Debug, Copy, Clone, PartialEq)]
pub struct OpenFlags(i32);
// SAFETY: This needs to be audited for thread safety.
// See: https://github.com/tursodatabase/turso/issues/1552
unsafe impl Send for OpenFlags {}
unsafe impl Sync for OpenFlags {}
bitflags! {
impl OpenFlags: i32 {
const None = 0b00000000;

View File

@@ -225,6 +225,8 @@ pub struct Database {
n_connections: AtomicUsize,
}
// SAFETY: This needs to be audited for thread safety.
// See: https://github.com/tursodatabase/turso/issues/1552
unsafe impl Send for Database {}
unsafe impl Sync for Database {}
@@ -1107,6 +1109,11 @@ pub struct Connection {
fk_deferred_violations: AtomicIsize,
}
// SAFETY: This needs to be audited for thread safety.
// See: https://github.com/tursodatabase/turso/issues/1552
unsafe impl Send for Connection {}
unsafe impl Sync for Connection {}
impl Drop for Connection {
fn drop(&mut self) {
if !self.is_closed() {

View File

@@ -548,6 +548,11 @@ pub struct Pager {
enable_encryption: AtomicBool,
}
// SAFETY: This needs to be audited for thread safety.
// See: https://github.com/tursodatabase/turso/issues/1552
unsafe impl Send for Pager {}
unsafe impl Sync for Pager {}
#[cfg(not(feature = "omit_autovacuum"))]
pub struct VacuumState {
/// State machine for [Pager::ptrmap_get]

View File

@@ -266,6 +266,11 @@ pub struct Row {
count: usize,
}
// SAFETY: This needs to be audited for thread safety.
// See: https://github.com/tursodatabase/turso/issues/1552
unsafe impl Send for Row {}
unsafe impl Sync for Row {}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum TxnCleanup {
None,
@@ -317,6 +322,11 @@ pub struct ProgramState {
fk_scope_counter: isize,
}
// SAFETY: This needs to be audited for thread safety.
// See: https://github.com/tursodatabase/turso/issues/1552
unsafe impl Send for ProgramState {}
unsafe impl Sync for ProgramState {}
impl ProgramState {
pub fn new(max_registers: usize, max_cursors: usize) -> Self {
let cursors: Vec<Option<Cursor>> = (0..max_cursors).map(|_| None).collect();