From c5d25aed141f8fd832a46f3064a500e70fb16acb Mon Sep 17 00:00:00 2001 From: Shusui MOYATANI Date: Fri, 9 Feb 2024 11:10:44 +0900 Subject: [PATCH] refactor: useSubscription --- src/nostr/useSubscription.ts | 128 +++++++++++++++++++++-------------- 1 file changed, 77 insertions(+), 51 deletions(-) diff --git a/src/nostr/useSubscription.ts b/src/nostr/useSubscription.ts index 62985fa..6325118 100644 --- a/src/nostr/useSubscription.ts +++ b/src/nostr/useSubscription.ts @@ -1,13 +1,11 @@ import { createSignal, createEffect, createMemo, onMount, onCleanup, on } from 'solid-js'; -import uniqBy from 'lodash/uniqBy'; import { type Filter } from 'nostr-tools/filter'; import { type SubscribeManyParams } from 'nostr-tools/pool'; import { type Event as NostrEvent } from 'nostr-tools/pure'; import { insertEventIntoDescendingList } from 'nostr-tools/utils'; import useConfig from '@/core/useConfig'; -import { sortEvents } from '@/nostr/event/comparator'; import usePool from '@/nostr/usePool'; import useStats from '@/nostr/useStats'; import epoch from '@/utils/epoch'; @@ -44,16 +42,91 @@ setInterval(() => { setActiveSubscriptions(count); }, 1000); +// avoid updating an array rapidly while this is fetching stored events +const useThrottledEvents = ({ + eose, + limit, + eoseLimit, +}: { + eose: () => boolean; + limit: () => number; + eoseLimit: () => number; +}) => { + const SecondsToIgnore = 300; // 5 min + const [events, setEvents] = createSignal([]); + + const delayedEvents: NostrEvent[] = []; + let timeoutId: ReturnType | undefined; + + const reflectDelayedEvents = () => { + setEvents((currentEvents) => { + const newEvents = [...currentEvents]; + delayedEvents.forEach((event) => { + insertEventIntoDescendingList(newEvents, event); + }); + return newEvents.slice(0, eoseLimit()); + }); + // clear delayed events + delayedEvents.splice(0, delayedEvents.length); + }; + + const startTimerIfNotStarted = () => { + if (timeoutId != null) return; + timeoutId = setTimeout(() => { + timeoutId = undefined; + reflectDelayedEvents(); + }, 100); + }; + + const addEvent = (event: NostrEvent) => { + const diffSec = event.created_at - epoch(); + if (diffSec > SecondsToIgnore) return; + if (diffSec > 0) { + setTimeout(() => addEvent(event), diffSec * 1000); + return; + } + + if (!eose()) { + delayedEvents.push(event); + startTimerIfNotStarted(); + } else { + // SimplePool de-duplicates events but sometimes onEvent is called for duplicated events. + // insertEventIntoDescendingList de-duplicates events. + // https://github.com/syusui-s/rabbit/issues/5 + setEvents((currentEvents) => { + const newEvents = [...currentEvents]; + insertEventIntoDescendingList(newEvents, event); + return newEvents.slice(0, limit()); + }); + } + }; + + createEffect(() => { + if (eose()) { + reflectDelayedEvents(); + } + }); + + onCleanup(() => { + if (timeoutId != null) { + clearTimeout(timeoutId); + } + }); + + return { events, setEvents, addEvent }; +}; + const useSubscription = (propsProvider: () => UseSubscriptionProps | null) => { const { config, shouldMuteEvent } = useConfig(); const pool = usePool(); - const [events, setEvents] = createSignal([]); const [eose, setEose] = createSignal(false); const props = createMemo(propsProvider); const eoseLimit = () => propsProvider()?.eoseLimit ?? 25; const limit = () => propsProvider()?.limit ?? 50; + const { events, setEvents, addEvent } = useThrottledEvents({ eose, eoseLimit, limit }); + createEffect( on( () => [config().mutedPubkeys, config().mutedKeywords], @@ -71,28 +144,6 @@ const useSubscription = (propsProvider: () => UseSubscriptionProps | null) => { }); }); - const addEvent = (event: NostrEvent) => { - const SecondsToIgnore = 300; // 5 min - - const diffSec = event.created_at - epoch(); - if (diffSec > SecondsToIgnore) return; - if (diffSec > 0) { - setTimeout(() => addEvent(event), diffSec * 1000); - return; - } - - setEvents((current) => { - const sorted = insertEventIntoDescendingList(current, event).slice(0, limit()); - // FIXME なぜか重複して取得される問題があるが一旦uniqByで対処 - // https://github.com/syusui-s/rabbit/issues/5 - const deduped = uniqBy(sorted, (e) => e.id); - if (deduped.length !== sorted.length) { - console.warn('duplicated event', event); - } - return deduped; - }); - }; - const startSubscription = () => { console.debug('startSubscription: start'); @@ -110,11 +161,7 @@ const useSubscription = (propsProvider: () => UseSubscriptionProps | null) => { let subscribing = true; count += 1; - let pushed = false; setEose(false); - const storedEvents: NostrEvent[] = []; - - const updateEvents = () => setEvents(sortEvents(storedEvents).slice(0, eoseLimit())); const sub = pool().subscribeMany( relayUrls, @@ -129,12 +176,7 @@ const useSubscription = (propsProvider: () => UseSubscriptionProps | null) => { return; } - if (!eose()) { - pushed = true; - storedEvents.push(event); - } else { - addEvent(event); - } + addEvent(event); }, oneose: () => { // sometimes `oneose` called twice @@ -143,10 +185,7 @@ const useSubscription = (propsProvider: () => UseSubscriptionProps | null) => { if (onEOSE != null) { onEOSE(); } - setEose(true); - updateEvents(); - storedEvents.splice(0, storedEvents.length); if (!continuous) { sub.close(); @@ -159,18 +198,6 @@ const useSubscription = (propsProvider: () => UseSubscriptionProps | null) => { }, ); - // avoid updating an array too rapidly while this is fetching stored events - const intervalId = setInterval(() => { - if (eose()) { - clearInterval(intervalId); - return; - } - if (pushed) { - pushed = false; - updateEvents(); - } - }, 100); - onCleanup(() => { console.debug('startSubscription: end'); sub.close(); @@ -178,7 +205,6 @@ const useSubscription = (propsProvider: () => UseSubscriptionProps | null) => { subscribing = false; count -= 1; } - clearInterval(intervalId); }); };