From bf5de920f2567c2cb10993ebd97dae189380762d Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 16 Oct 2025 11:10:43 +0300 Subject: [PATCH] core: Unsafe Send and Sync pushdown This patch pushes unsafe Send and Sync to individual components instead of doing it at Database level. This makes it easier for us to incrementally fix thread-safety, but avoid developers adding more thread unsafe code. --- core/incremental/compiler.rs | 10 ++++++++++ core/incremental/operator.rs | 4 +++- core/incremental/view.rs | 15 +++++++++++++++ core/io/mod.rs | 5 +++++ core/lib.rs | 7 +++++++ core/storage/pager.rs | 5 +++++ core/vdbe/mod.rs | 10 ++++++++++ 7 files changed, 55 insertions(+), 1 deletion(-) diff --git a/core/incremental/compiler.rs b/core/incremental/compiler.rs index f067515cc..b62795fab 100644 --- a/core/incremental/compiler.rs +++ b/core/incremental/compiler.rs @@ -329,6 +329,11 @@ pub struct DbspNode { pub executable: Box, } +// SAFETY: This needs to be audited for thread safety. +// See: https://github.com/tursodatabase/turso/issues/1552 +unsafe impl Send for DbspNode {} +unsafe impl Sync for DbspNode {} + impl std::fmt::Debug for DbspNode { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("DbspNode") @@ -395,6 +400,11 @@ pub struct DbspCircuit { pub(super) internal_state_index_root: i64, } +// SAFETY: This needs to be audited for thread safety. +// See: https://github.com/tursodatabase/turso/issues/1552 +unsafe impl Send for DbspCircuit {} +unsafe impl Sync for DbspCircuit {} + impl DbspCircuit { /// Create a new empty circuit with initial empty schema /// The actual output schema will be set when the root node is established diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 276249fb3..70ab72d74 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -218,7 +218,9 @@ pub enum QueryOperator { /// Operator DAG (Directed Acyclic Graph) /// Base trait for incremental operators -pub trait IncrementalOperator: Debug { +// SAFETY: This needs to be audited for thread safety. +// See: https://github.com/tursodatabase/turso/issues/1552 +pub trait IncrementalOperator: Debug + Send { /// Evaluate the operator with a state, without modifying internal state /// This is used during query execution to compute results /// May need to read from storage to get current state (e.g., for aggregates) diff --git a/core/incremental/view.rs b/core/incremental/view.rs index 957605d17..a82a1188b 100644 --- a/core/incremental/view.rs +++ b/core/incremental/view.rs @@ -40,6 +40,11 @@ pub enum PopulateState { Done, } +// SAFETY: This needs to be audited for thread safety. +// See: https://github.com/tursodatabase/turso/issues/1552 +unsafe impl Send for PopulateState {} +unsafe impl Sync for PopulateState {} + /// State machine for merge_delta to handle I/O operations impl fmt::Debug for PopulateState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -130,6 +135,11 @@ pub struct AllViewsTxState { states: Rc>>>, } +// SAFETY: This needs to be audited for thread safety. +// See: https://github.com/tursodatabase/turso/issues/1552 +unsafe impl Send for AllViewsTxState {} +unsafe impl Sync for AllViewsTxState {} + impl AllViewsTxState { /// Create a new container for view transaction states pub fn new() -> Self { @@ -210,6 +220,11 @@ pub struct IncrementalView { root_page: i64, } +// SAFETY: This needs to be audited for thread safety. +// See: https://github.com/tursodatabase/turso/issues/1552 +unsafe impl Send for IncrementalView {} +unsafe impl Sync for IncrementalView {} + impl IncrementalView { /// Try to compile the SELECT statement into a DBSP circuit fn try_compile_circuit( diff --git a/core/io/mod.rs b/core/io/mod.rs index 35f7d4786..77ff1b807 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -67,6 +67,11 @@ pub trait File: Send + Sync { #[derive(Debug, Copy, Clone, PartialEq)] pub struct OpenFlags(i32); +// SAFETY: This needs to be audited for thread safety. +// See: https://github.com/tursodatabase/turso/issues/1552 +unsafe impl Send for OpenFlags {} +unsafe impl Sync for OpenFlags {} + bitflags! { impl OpenFlags: i32 { const None = 0b00000000; diff --git a/core/lib.rs b/core/lib.rs index 25ed3114c..637e91c92 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -225,6 +225,8 @@ pub struct Database { n_connections: AtomicUsize, } +// SAFETY: This needs to be audited for thread safety. +// See: https://github.com/tursodatabase/turso/issues/1552 unsafe impl Send for Database {} unsafe impl Sync for Database {} @@ -1107,6 +1109,11 @@ pub struct Connection { fk_deferred_violations: AtomicIsize, } +// SAFETY: This needs to be audited for thread safety. +// See: https://github.com/tursodatabase/turso/issues/1552 +unsafe impl Send for Connection {} +unsafe impl Sync for Connection {} + impl Drop for Connection { fn drop(&mut self) { if !self.is_closed() { diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 021071134..2c8490a1d 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -548,6 +548,11 @@ pub struct Pager { enable_encryption: AtomicBool, } +// SAFETY: This needs to be audited for thread safety. +// See: https://github.com/tursodatabase/turso/issues/1552 +unsafe impl Send for Pager {} +unsafe impl Sync for Pager {} + #[cfg(not(feature = "omit_autovacuum"))] pub struct VacuumState { /// State machine for [Pager::ptrmap_get] diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 584b62da4..ed8190092 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -266,6 +266,11 @@ pub struct Row { count: usize, } +// SAFETY: This needs to be audited for thread safety. +// See: https://github.com/tursodatabase/turso/issues/1552 +unsafe impl Send for Row {} +unsafe impl Sync for Row {} + #[derive(Debug, Clone, Copy, PartialEq)] pub enum TxnCleanup { None, @@ -317,6 +322,11 @@ pub struct ProgramState { fk_scope_counter: isize, } +// SAFETY: This needs to be audited for thread safety. +// See: https://github.com/tursodatabase/turso/issues/1552 +unsafe impl Send for ProgramState {} +unsafe impl Sync for ProgramState {} + impl ProgramState { pub fn new(max_registers: usize, max_cursors: usize) -> Self { let cursors: Vec> = (0..max_cursors).map(|_| None).collect();