refactor: useSubscription

This commit is contained in:
Shusui MOYATANI
2024-02-09 11:10:44 +09:00
parent ee0be6b9eb
commit c5d25aed14

View File

@@ -1,13 +1,11 @@
import { createSignal, createEffect, createMemo, onMount, onCleanup, on } from 'solid-js'; import { createSignal, createEffect, createMemo, onMount, onCleanup, on } from 'solid-js';
import uniqBy from 'lodash/uniqBy';
import { type Filter } from 'nostr-tools/filter'; import { type Filter } from 'nostr-tools/filter';
import { type SubscribeManyParams } from 'nostr-tools/pool'; import { type SubscribeManyParams } from 'nostr-tools/pool';
import { type Event as NostrEvent } from 'nostr-tools/pure'; import { type Event as NostrEvent } from 'nostr-tools/pure';
import { insertEventIntoDescendingList } from 'nostr-tools/utils'; import { insertEventIntoDescendingList } from 'nostr-tools/utils';
import useConfig from '@/core/useConfig'; import useConfig from '@/core/useConfig';
import { sortEvents } from '@/nostr/event/comparator';
import usePool from '@/nostr/usePool'; import usePool from '@/nostr/usePool';
import useStats from '@/nostr/useStats'; import useStats from '@/nostr/useStats';
import epoch from '@/utils/epoch'; import epoch from '@/utils/epoch';
@@ -44,16 +42,91 @@ setInterval(() => {
setActiveSubscriptions(count); setActiveSubscriptions(count);
}, 1000); }, 1000);
// avoid updating an array rapidly while this is fetching stored events
const useThrottledEvents = ({
eose,
limit,
eoseLimit,
}: {
eose: () => boolean;
limit: () => number;
eoseLimit: () => number;
}) => {
const SecondsToIgnore = 300; // 5 min
const [events, setEvents] = createSignal<NostrEvent[]>([]);
const delayedEvents: NostrEvent[] = [];
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const reflectDelayedEvents = () => {
setEvents((currentEvents) => {
const newEvents = [...currentEvents];
delayedEvents.forEach((event) => {
insertEventIntoDescendingList(newEvents, event);
});
return newEvents.slice(0, eoseLimit());
});
// clear delayed events
delayedEvents.splice(0, delayedEvents.length);
};
const startTimerIfNotStarted = () => {
if (timeoutId != null) return;
timeoutId = setTimeout(() => {
timeoutId = undefined;
reflectDelayedEvents();
}, 100);
};
const addEvent = (event: NostrEvent) => {
const diffSec = event.created_at - epoch();
if (diffSec > SecondsToIgnore) return;
if (diffSec > 0) {
setTimeout(() => addEvent(event), diffSec * 1000);
return;
}
if (!eose()) {
delayedEvents.push(event);
startTimerIfNotStarted();
} else {
// SimplePool de-duplicates events but sometimes onEvent is called for duplicated events.
// insertEventIntoDescendingList de-duplicates events.
// https://github.com/syusui-s/rabbit/issues/5
setEvents((currentEvents) => {
const newEvents = [...currentEvents];
insertEventIntoDescendingList(newEvents, event);
return newEvents.slice(0, limit());
});
}
};
createEffect(() => {
if (eose()) {
reflectDelayedEvents();
}
});
onCleanup(() => {
if (timeoutId != null) {
clearTimeout(timeoutId);
}
});
return { events, setEvents, addEvent };
};
const useSubscription = (propsProvider: () => UseSubscriptionProps | null) => { const useSubscription = (propsProvider: () => UseSubscriptionProps | null) => {
const { config, shouldMuteEvent } = useConfig(); const { config, shouldMuteEvent } = useConfig();
const pool = usePool(); const pool = usePool();
const [events, setEvents] = createSignal<NostrEvent[]>([]);
const [eose, setEose] = createSignal<boolean>(false); const [eose, setEose] = createSignal<boolean>(false);
const props = createMemo(propsProvider); const props = createMemo(propsProvider);
const eoseLimit = () => propsProvider()?.eoseLimit ?? 25; const eoseLimit = () => propsProvider()?.eoseLimit ?? 25;
const limit = () => propsProvider()?.limit ?? 50; const limit = () => propsProvider()?.limit ?? 50;
const { events, setEvents, addEvent } = useThrottledEvents({ eose, eoseLimit, limit });
createEffect( createEffect(
on( on(
() => [config().mutedPubkeys, config().mutedKeywords], () => [config().mutedPubkeys, config().mutedKeywords],
@@ -71,28 +144,6 @@ const useSubscription = (propsProvider: () => UseSubscriptionProps | null) => {
}); });
}); });
const addEvent = (event: NostrEvent) => {
const SecondsToIgnore = 300; // 5 min
const diffSec = event.created_at - epoch();
if (diffSec > SecondsToIgnore) return;
if (diffSec > 0) {
setTimeout(() => addEvent(event), diffSec * 1000);
return;
}
setEvents((current) => {
const sorted = insertEventIntoDescendingList(current, event).slice(0, limit());
// FIXME なぜか重複して取得される問題があるが一旦uniqByで対処
// https://github.com/syusui-s/rabbit/issues/5
const deduped = uniqBy(sorted, (e) => e.id);
if (deduped.length !== sorted.length) {
console.warn('duplicated event', event);
}
return deduped;
});
};
const startSubscription = () => { const startSubscription = () => {
console.debug('startSubscription: start'); console.debug('startSubscription: start');
@@ -110,11 +161,7 @@ const useSubscription = (propsProvider: () => UseSubscriptionProps | null) => {
let subscribing = true; let subscribing = true;
count += 1; count += 1;
let pushed = false;
setEose(false); setEose(false);
const storedEvents: NostrEvent[] = [];
const updateEvents = () => setEvents(sortEvents(storedEvents).slice(0, eoseLimit()));
const sub = pool().subscribeMany( const sub = pool().subscribeMany(
relayUrls, relayUrls,
@@ -129,12 +176,7 @@ const useSubscription = (propsProvider: () => UseSubscriptionProps | null) => {
return; return;
} }
if (!eose()) { addEvent(event);
pushed = true;
storedEvents.push(event);
} else {
addEvent(event);
}
}, },
oneose: () => { oneose: () => {
// sometimes `oneose` called twice // sometimes `oneose` called twice
@@ -143,10 +185,7 @@ const useSubscription = (propsProvider: () => UseSubscriptionProps | null) => {
if (onEOSE != null) { if (onEOSE != null) {
onEOSE(); onEOSE();
} }
setEose(true); setEose(true);
updateEvents();
storedEvents.splice(0, storedEvents.length);
if (!continuous) { if (!continuous) {
sub.close(); sub.close();
@@ -159,18 +198,6 @@ const useSubscription = (propsProvider: () => UseSubscriptionProps | null) => {
}, },
); );
// avoid updating an array too rapidly while this is fetching stored events
const intervalId = setInterval(() => {
if (eose()) {
clearInterval(intervalId);
return;
}
if (pushed) {
pushed = false;
updateEvents();
}
}, 100);
onCleanup(() => { onCleanup(() => {
console.debug('startSubscription: end'); console.debug('startSubscription: end');
sub.close(); sub.close();
@@ -178,7 +205,6 @@ const useSubscription = (propsProvider: () => UseSubscriptionProps | null) => {
subscribing = false; subscribing = false;
count -= 1; count -= 1;
} }
clearInterval(intervalId);
}); });
}; };