Fix missed events race when creating subscriptions (#1023)

Previously, we fetched the initial state *before* registering the new
subscription. Any events emitted after the DB read but before the
subscription was installed were dropped—most visible under
low-resource conditions (e.g., CI).

Change:
- Register the subscription first, then asynchronously fetch and send
  the initial state (spawned task). This eliminates the window where
  events could be missed.
- Require `F: Send + Sync` and store `on_new_subscription` as `Arc<F>`
  so it can be safely used from the spawned task.

Result:
- No gap between “subscribe” and “start receiving,” avoiding lost events.
- Initial state still delivered, now via a background task.
This commit is contained in:
C
2025-09-01 16:24:50 -03:00
committed by GitHub
parent 6067242793
commit 62f085b350

View File

@@ -41,10 +41,10 @@ pub struct Manager<T, I, F>
where
T: Indexable<Type = I> + Clone + Send + Sync + 'static,
I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
F: OnNewSubscription<Index = I, Event = T> + 'static,
F: OnNewSubscription<Index = I, Event = T> + Send + Sync + 'static,
{
indexes: IndexTree<T, I>,
on_new_subscription: Option<F>,
on_new_subscription: Option<Arc<F>>,
unsubscription_sender: mpsc::Sender<(SubId, Vec<Index<I>>)>,
active_subscriptions: Arc<AtomicUsize>,
background_subscription_remover: Option<JoinHandle<()>>,
@@ -54,7 +54,7 @@ impl<T, I, F> Default for Manager<T, I, F>
where
T: Indexable<Type = I> + Clone + Send + Sync + 'static,
I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
F: OnNewSubscription<Index = I, Event = T> + 'static,
F: OnNewSubscription<Index = I, Event = T> + Send + Sync + 'static,
{
fn default() -> Self {
let (sender, receiver) = mpsc::channel(DEFAULT_REMOVE_SIZE);
@@ -79,11 +79,11 @@ impl<T, I, F> From<F> for Manager<T, I, F>
where
T: Indexable<Type = I> + Clone + Send + Sync + 'static,
I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
F: OnNewSubscription<Index = I, Event = T> + 'static,
F: OnNewSubscription<Index = I, Event = T> + Send + Sync + 'static,
{
fn from(value: F) -> Self {
let mut manager: Self = Default::default();
manager.on_new_subscription = Some(value);
manager.on_new_subscription = Some(Arc::new(value));
manager
}
}
@@ -92,7 +92,7 @@ impl<T, I, F> Manager<T, I, F>
where
T: Indexable<Type = I> + Clone + Send + Sync + 'static,
I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
F: OnNewSubscription<Index = I, Event = T> + 'static,
F: OnNewSubscription<Index = I, Event = T> + Send + Sync + 'static,
{
#[inline]
/// Broadcast an event to all listeners
@@ -143,32 +143,45 @@ where
indexes: Vec<Index<I>>,
) -> ActiveSubscription<T, I> {
let (sender, receiver) = mpsc::channel(10);
if let Some(on_new_subscription) = self.on_new_subscription.as_ref() {
match on_new_subscription
.on_new_subscription(&indexes.iter().map(|x| x.deref()).collect::<Vec<_>>())
.await
{
Ok(events) => {
for event in events {
let _ = sender.try_send((sub_id.clone(), event));
}
}
Err(err) => {
tracing::info!(
"Failed to get initial state for subscription: {:?}, {}",
sub_id,
err
);
}
}
}
let mut index_storage = self.indexes.write().await;
// Subscribe to events as soon as possible
for index in indexes.clone() {
index_storage.insert(index, sender.clone());
}
drop(index_storage);
if let Some(on_new_subscription) = self.on_new_subscription.clone() {
// After we're subscribed already, fetch the current status of matching events. It is
// down in another thread to return right away
let indexes_for_worker = indexes.clone();
let sub_id_for_worker = sub_id.clone();
tokio::spawn(async move {
match on_new_subscription
.on_new_subscription(
&indexes_for_worker
.iter()
.map(|x| x.deref())
.collect::<Vec<_>>(),
)
.await
{
Ok(events) => {
for event in events {
let _ = sender.try_send((sub_id_for_worker.clone(), event));
}
}
Err(err) => {
tracing::info!(
"Failed to get initial state for subscription: {:?}, {}",
sub_id_for_worker,
err
);
}
}
});
}
self.active_subscriptions
.fetch_add(1, atomic::Ordering::Relaxed);
@@ -232,7 +245,7 @@ impl<T, I, F> Drop for Manager<T, I, F>
where
T: Indexable<Type = I> + Clone + Send + Sync + 'static,
I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
F: OnNewSubscription<Index = I, Event = T> + 'static,
F: OnNewSubscription<Index = I, Event = T> + Send + Sync + 'static,
{
fn drop(&mut self) {
if let Some(handler) = self.background_subscription_remover.take() {