mirror of
https://github.com/dergigi/boris.git
synced 2026-01-17 05:44:24 +01:00
feat(highlights): refactor fetchers to use EOSE-based queryEvents
- Replace ad-hoc Rx timeout-based queries with centralized queryEvents helper - Remove artificial timeouts (1200ms/6000ms) in favor of EOSE signals - Use KINDS.Highlights consistently instead of hardcoded 9802 - Maintain streaming callbacks for instant UI updates - Parallel queries for article #a and #e tags - Local-first relay prioritization via queryEvents
This commit is contained in:
@@ -1,12 +1,11 @@
|
||||
import { RelayPool, completeOnEose, onlyEvents } from 'applesauce-relay'
|
||||
import { lastValueFrom, merge, Observable, takeUntil, timer, tap, toArray } from 'rxjs'
|
||||
import { RelayPool } from 'applesauce-relay'
|
||||
import { NostrEvent } from 'nostr-tools'
|
||||
import { Highlight } from '../../types/highlights'
|
||||
import { prioritizeLocalRelays, partitionRelays } from '../../utils/helpers'
|
||||
import { eventToHighlight, dedupeHighlights, sortHighlights } from '../highlightEventProcessor'
|
||||
import { UserSettings } from '../settingsService'
|
||||
import { rebroadcastEvents } from '../rebroadcastService'
|
||||
import { KINDS } from '../../config/kinds'
|
||||
import { queryEvents } from '../dataFetch'
|
||||
|
||||
export const fetchHighlights = async (
|
||||
relayPool: RelayPool,
|
||||
@@ -15,44 +14,27 @@ export const fetchHighlights = async (
|
||||
settings?: UserSettings
|
||||
): Promise<Highlight[]> => {
|
||||
try {
|
||||
const relayUrls = Array.from(relayPool.relays.values()).map(relay => relay.url)
|
||||
const ordered = prioritizeLocalRelays(relayUrls)
|
||||
const { local: localRelays, remote: remoteRelays } = partitionRelays(ordered)
|
||||
|
||||
const seenIds = new Set<string>()
|
||||
const local$ = localRelays.length > 0
|
||||
? relayPool
|
||||
.req(localRelays, { kinds: [KINDS.Highlights], authors: [pubkey] })
|
||||
.pipe(
|
||||
onlyEvents(),
|
||||
tap((event: NostrEvent) => {
|
||||
if (!seenIds.has(event.id)) {
|
||||
seenIds.add(event.id)
|
||||
if (onHighlight) onHighlight(eventToHighlight(event))
|
||||
}
|
||||
}),
|
||||
completeOnEose(),
|
||||
takeUntil(timer(1200))
|
||||
)
|
||||
: new Observable<NostrEvent>((sub) => sub.complete())
|
||||
const remote$ = remoteRelays.length > 0
|
||||
? relayPool
|
||||
.req(remoteRelays, { kinds: [KINDS.Highlights], authors: [pubkey] })
|
||||
.pipe(
|
||||
onlyEvents(),
|
||||
tap((event: NostrEvent) => {
|
||||
if (!seenIds.has(event.id)) {
|
||||
seenIds.add(event.id)
|
||||
if (onHighlight) onHighlight(eventToHighlight(event))
|
||||
}
|
||||
}),
|
||||
completeOnEose(),
|
||||
takeUntil(timer(6000))
|
||||
)
|
||||
: new Observable<NostrEvent>((sub) => sub.complete())
|
||||
const rawEvents: NostrEvent[] = await lastValueFrom(merge(local$, remote$).pipe(toArray()))
|
||||
const rawEvents: NostrEvent[] = await queryEvents(
|
||||
relayPool,
|
||||
{ kinds: [KINDS.Highlights], authors: [pubkey] },
|
||||
{
|
||||
onEvent: (event: NostrEvent) => {
|
||||
if (seenIds.has(event.id)) return
|
||||
seenIds.add(event.id)
|
||||
if (onHighlight) onHighlight(eventToHighlight(event))
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
console.log(`📌 Fetched ${rawEvents.length} highlight events for author:`, pubkey.slice(0, 8))
|
||||
|
||||
try {
|
||||
await rebroadcastEvents(rawEvents, relayPool, settings)
|
||||
} catch (err) {
|
||||
console.warn('Failed to rebroadcast highlight events:', err)
|
||||
}
|
||||
|
||||
await rebroadcastEvents(rawEvents, relayPool, settings)
|
||||
const uniqueEvents = dedupeHighlights(rawEvents)
|
||||
const highlights = uniqueEvents.map(eventToHighlight)
|
||||
return sortHighlights(highlights)
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
import { RelayPool, completeOnEose, onlyEvents } from 'applesauce-relay'
|
||||
import { lastValueFrom, merge, Observable, takeUntil, timer, tap, toArray } from 'rxjs'
|
||||
import { RelayPool } from 'applesauce-relay'
|
||||
import { NostrEvent } from 'nostr-tools'
|
||||
import { Highlight } from '../../types/highlights'
|
||||
import { RELAYS } from '../../config/relays'
|
||||
import { prioritizeLocalRelays, partitionRelays } from '../../utils/helpers'
|
||||
import { KINDS } from '../../config/kinds'
|
||||
import { eventToHighlight, dedupeHighlights, sortHighlights } from '../highlightEventProcessor'
|
||||
import { UserSettings } from '../settingsService'
|
||||
import { rebroadcastEvents } from '../rebroadcastService'
|
||||
import { queryEvents } from '../dataFetch'
|
||||
|
||||
export const fetchHighlightsForArticle = async (
|
||||
relayPool: RelayPool,
|
||||
@@ -17,76 +16,29 @@ export const fetchHighlightsForArticle = async (
|
||||
): Promise<Highlight[]> => {
|
||||
try {
|
||||
const seenIds = new Set<string>()
|
||||
const processEvent = (event: NostrEvent): Highlight | null => {
|
||||
if (seenIds.has(event.id)) return null
|
||||
const onEvent = (event: NostrEvent) => {
|
||||
if (seenIds.has(event.id)) return
|
||||
seenIds.add(event.id)
|
||||
return eventToHighlight(event)
|
||||
if (onHighlight) onHighlight(eventToHighlight(event))
|
||||
}
|
||||
|
||||
const orderedRelays = prioritizeLocalRelays(RELAYS)
|
||||
const { local: localRelays, remote: remoteRelays } = partitionRelays(orderedRelays)
|
||||
|
||||
const aLocal$ = localRelays.length > 0
|
||||
? relayPool
|
||||
.req(localRelays, { kinds: [9802], '#a': [articleCoordinate] })
|
||||
.pipe(
|
||||
onlyEvents(),
|
||||
tap((event: NostrEvent) => {
|
||||
const highlight = processEvent(event)
|
||||
if (highlight && onHighlight) onHighlight(highlight)
|
||||
}),
|
||||
completeOnEose(),
|
||||
takeUntil(timer(1200))
|
||||
)
|
||||
: new Observable<NostrEvent>((sub) => sub.complete())
|
||||
const aRemote$ = remoteRelays.length > 0
|
||||
? relayPool
|
||||
.req(remoteRelays, { kinds: [9802], '#a': [articleCoordinate] })
|
||||
.pipe(
|
||||
onlyEvents(),
|
||||
tap((event: NostrEvent) => {
|
||||
const highlight = processEvent(event)
|
||||
if (highlight && onHighlight) onHighlight(highlight)
|
||||
}),
|
||||
completeOnEose(),
|
||||
takeUntil(timer(6000))
|
||||
)
|
||||
: new Observable<NostrEvent>((sub) => sub.complete())
|
||||
const aTagEvents: NostrEvent[] = await lastValueFrom(merge(aLocal$, aRemote$).pipe(toArray()))
|
||||
|
||||
let eTagEvents: NostrEvent[] = []
|
||||
if (eventId) {
|
||||
const eLocal$ = localRelays.length > 0
|
||||
? relayPool
|
||||
.req(localRelays, { kinds: [9802], '#e': [eventId] })
|
||||
.pipe(
|
||||
onlyEvents(),
|
||||
tap((event: NostrEvent) => {
|
||||
const highlight = processEvent(event)
|
||||
if (highlight && onHighlight) onHighlight(highlight)
|
||||
}),
|
||||
completeOnEose(),
|
||||
takeUntil(timer(1200))
|
||||
)
|
||||
: new Observable<NostrEvent>((sub) => sub.complete())
|
||||
const eRemote$ = remoteRelays.length > 0
|
||||
? relayPool
|
||||
.req(remoteRelays, { kinds: [9802], '#e': [eventId] })
|
||||
.pipe(
|
||||
onlyEvents(),
|
||||
tap((event: NostrEvent) => {
|
||||
const highlight = processEvent(event)
|
||||
if (highlight && onHighlight) onHighlight(highlight)
|
||||
}),
|
||||
completeOnEose(),
|
||||
takeUntil(timer(6000))
|
||||
)
|
||||
: new Observable<NostrEvent>((sub) => sub.complete())
|
||||
eTagEvents = await lastValueFrom(merge(eLocal$, eRemote$).pipe(toArray()))
|
||||
}
|
||||
// Query for both #a and #e tags in parallel
|
||||
const [aTagEvents, eTagEvents] = await Promise.all([
|
||||
queryEvents(relayPool, { kinds: [KINDS.Highlights], '#a': [articleCoordinate] }, { onEvent }),
|
||||
eventId
|
||||
? queryEvents(relayPool, { kinds: [KINDS.Highlights], '#e': [eventId] }, { onEvent })
|
||||
: Promise.resolve([] as NostrEvent[])
|
||||
])
|
||||
|
||||
const rawEvents = [...aTagEvents, ...eTagEvents]
|
||||
await rebroadcastEvents(rawEvents, relayPool, settings)
|
||||
console.log(`📌 Fetched ${rawEvents.length} highlight events for article:`, articleCoordinate)
|
||||
|
||||
try {
|
||||
await rebroadcastEvents(rawEvents, relayPool, settings)
|
||||
} catch (err) {
|
||||
console.warn('Failed to rebroadcast highlight events:', err)
|
||||
}
|
||||
|
||||
const uniqueEvents = dedupeHighlights(rawEvents)
|
||||
const highlights: Highlight[] = uniqueEvents.map(eventToHighlight)
|
||||
return sortHighlights(highlights)
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
import { RelayPool, completeOnEose, onlyEvents } from 'applesauce-relay'
|
||||
import { lastValueFrom, merge, Observable, takeUntil, timer, tap, toArray } from 'rxjs'
|
||||
import { RelayPool } from 'applesauce-relay'
|
||||
import { NostrEvent } from 'nostr-tools'
|
||||
import { Highlight } from '../../types/highlights'
|
||||
import { RELAYS } from '../../config/relays'
|
||||
import { prioritizeLocalRelays, partitionRelays } from '../../utils/helpers'
|
||||
import { KINDS } from '../../config/kinds'
|
||||
import { eventToHighlight, dedupeHighlights, sortHighlights } from '../highlightEventProcessor'
|
||||
import { UserSettings } from '../settingsService'
|
||||
import { rebroadcastEvents } from '../rebroadcastService'
|
||||
import { queryEvents } from '../dataFetch'
|
||||
|
||||
export const fetchHighlightsForUrl = async (
|
||||
relayPool: RelayPool,
|
||||
@@ -14,55 +13,34 @@ export const fetchHighlightsForUrl = async (
|
||||
onHighlight?: (highlight: Highlight) => void,
|
||||
settings?: UserSettings
|
||||
): Promise<Highlight[]> => {
|
||||
const seenIds = new Set<string>()
|
||||
const orderedRelaysUrl = prioritizeLocalRelays(RELAYS)
|
||||
const { local: localRelaysUrl, remote: remoteRelaysUrl } = partitionRelays(orderedRelaysUrl)
|
||||
|
||||
try {
|
||||
const local$ = localRelaysUrl.length > 0
|
||||
? relayPool
|
||||
.req(localRelaysUrl, { kinds: [9802], '#r': [url] })
|
||||
.pipe(
|
||||
onlyEvents(),
|
||||
tap((event: NostrEvent) => {
|
||||
seenIds.add(event.id)
|
||||
if (onHighlight) onHighlight(eventToHighlight(event))
|
||||
}),
|
||||
completeOnEose(),
|
||||
takeUntil(timer(1200))
|
||||
)
|
||||
: new Observable<NostrEvent>((sub) => sub.complete())
|
||||
const remote$ = remoteRelaysUrl.length > 0
|
||||
? relayPool
|
||||
.req(remoteRelaysUrl, { kinds: [9802], '#r': [url] })
|
||||
.pipe(
|
||||
onlyEvents(),
|
||||
tap((event: NostrEvent) => {
|
||||
seenIds.add(event.id)
|
||||
if (onHighlight) onHighlight(eventToHighlight(event))
|
||||
}),
|
||||
completeOnEose(),
|
||||
takeUntil(timer(6000))
|
||||
)
|
||||
: new Observable<NostrEvent>((sub) => sub.complete())
|
||||
const rawEvents: NostrEvent[] = await lastValueFrom(merge(local$, remote$).pipe(toArray()))
|
||||
|
||||
const seenIds = new Set<string>()
|
||||
const rawEvents: NostrEvent[] = await queryEvents(
|
||||
relayPool,
|
||||
{ kinds: [KINDS.Highlights], '#r': [url] },
|
||||
{
|
||||
onEvent: (event: NostrEvent) => {
|
||||
if (seenIds.has(event.id)) return
|
||||
seenIds.add(event.id)
|
||||
if (onHighlight) onHighlight(eventToHighlight(event))
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
console.log(`📌 Fetched ${rawEvents.length} highlight events for URL:`, url)
|
||||
|
||||
|
||||
// Rebroadcast events - but don't let errors here break the highlight display
|
||||
try {
|
||||
await rebroadcastEvents(rawEvents, relayPool, settings)
|
||||
} catch (err) {
|
||||
console.warn('Failed to rebroadcast highlight events:', err)
|
||||
}
|
||||
|
||||
|
||||
const uniqueEvents = dedupeHighlights(rawEvents)
|
||||
const highlights: Highlight[] = uniqueEvents.map(eventToHighlight)
|
||||
return sortHighlights(highlights)
|
||||
} catch (err) {
|
||||
console.error('Error fetching highlights for URL:', err)
|
||||
// Return highlights that were already streamed via callback
|
||||
// Don't return empty array as that would clear already-displayed highlights
|
||||
return []
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user