diff --git a/crates/cdk/src/pub_sub.rs b/crates/cdk/src/pub_sub.rs index ceec2ed3..c5f98a33 100644 --- a/crates/cdk/src/pub_sub.rs +++ b/crates/cdk/src/pub_sub.rs @@ -41,10 +41,10 @@ pub struct Manager where T: Indexable + Clone + Send + Sync + 'static, I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static, - F: OnNewSubscription + 'static, + F: OnNewSubscription + Send + Sync + 'static, { indexes: IndexTree, - on_new_subscription: Option, + on_new_subscription: Option>, unsubscription_sender: mpsc::Sender<(SubId, Vec>)>, active_subscriptions: Arc, background_subscription_remover: Option>, @@ -54,7 +54,7 @@ impl Default for Manager where T: Indexable + Clone + Send + Sync + 'static, I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static, - F: OnNewSubscription + 'static, + F: OnNewSubscription + Send + Sync + 'static, { fn default() -> Self { let (sender, receiver) = mpsc::channel(DEFAULT_REMOVE_SIZE); @@ -79,11 +79,11 @@ impl From for Manager where T: Indexable + Clone + Send + Sync + 'static, I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static, - F: OnNewSubscription + 'static, + F: OnNewSubscription + Send + Sync + 'static, { fn from(value: F) -> Self { let mut manager: Self = Default::default(); - manager.on_new_subscription = Some(value); + manager.on_new_subscription = Some(Arc::new(value)); manager } } @@ -92,7 +92,7 @@ impl Manager where T: Indexable + Clone + Send + Sync + 'static, I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static, - F: OnNewSubscription + 'static, + F: OnNewSubscription + Send + Sync + 'static, { #[inline] /// Broadcast an event to all listeners @@ -143,32 +143,45 @@ where indexes: Vec>, ) -> ActiveSubscription { let (sender, receiver) = mpsc::channel(10); - if let Some(on_new_subscription) = self.on_new_subscription.as_ref() { - match on_new_subscription - .on_new_subscription(&indexes.iter().map(|x| x.deref()).collect::>()) - .await - { - Ok(events) => { - for event in events { - let _ = sender.try_send((sub_id.clone(), event)); - } - } - Err(err) => { - tracing::info!( - "Failed to get initial state for subscription: {:?}, {}", - sub_id, - err - ); - } - } - } let mut index_storage = self.indexes.write().await; + // Subscribe to events as soon as possible for index in indexes.clone() { index_storage.insert(index, sender.clone()); } drop(index_storage); + if let Some(on_new_subscription) = self.on_new_subscription.clone() { + // After we're subscribed already, fetch the current status of matching events. It is + // down in another thread to return right away + let indexes_for_worker = indexes.clone(); + let sub_id_for_worker = sub_id.clone(); + tokio::spawn(async move { + match on_new_subscription + .on_new_subscription( + &indexes_for_worker + .iter() + .map(|x| x.deref()) + .collect::>(), + ) + .await + { + Ok(events) => { + for event in events { + let _ = sender.try_send((sub_id_for_worker.clone(), event)); + } + } + Err(err) => { + tracing::info!( + "Failed to get initial state for subscription: {:?}, {}", + sub_id_for_worker, + err + ); + } + } + }); + } + self.active_subscriptions .fetch_add(1, atomic::Ordering::Relaxed); @@ -232,7 +245,7 @@ impl Drop for Manager where T: Indexable + Clone + Send + Sync + 'static, I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static, - F: OnNewSubscription + 'static, + F: OnNewSubscription + Send + Sync + 'static, { fn drop(&mut self) { if let Some(handler) = self.background_subscription_remover.take() {