perf(parallel): run local+remote fetches concurrently across services; stream+dedupe

This commit is contained in:
Gigi
2025-10-12 23:13:26 +02:00
parent ea1046fe13
commit 496d1df404
9 changed files with 204 additions and 373 deletions

View File

@@ -1,11 +1,12 @@
import { RelayPool, onlyEvents } from 'applesauce-relay' import { RelayPool } from 'applesauce-relay'
import { lastValueFrom, take, takeUntil, timer, toArray } from 'rxjs' import { lastValueFrom, take } from 'rxjs'
import { nip19 } from 'nostr-tools' import { nip19 } from 'nostr-tools'
import { AddressPointer } from 'nostr-tools/nip19' import { AddressPointer } from 'nostr-tools/nip19'
import { NostrEvent } from 'nostr-tools' import { NostrEvent } from 'nostr-tools'
import { Helpers } from 'applesauce-core' import { Helpers } from 'applesauce-core'
import { RELAYS } from '../config/relays' import { RELAYS } from '../config/relays'
import { prioritizeLocalRelays, partitionRelays } from '../utils/helpers' import { prioritizeLocalRelays, partitionRelays, createParallelReqStreams } from '../utils/helpers'
import { merge, toArray as rxToArray } from 'rxjs'
import { UserSettings } from './settingsService' import { UserSettings } from './settingsService'
import { rebroadcastEvents } from './rebroadcastService' import { rebroadcastEvents } from './rebroadcastService'
@@ -112,43 +113,10 @@ export async function fetchArticleByNaddr(
'#d': [pointer.identifier] '#d': [pointer.identifier]
} }
// Local-first: try local relays quickly, then ALWAYS query remote to merge // Parallel local+remote, stream immediate, collect up to first from each
let events = [] as NostrEvent[] const { local$, remote$ } = createParallelReqStreams(relayPool, localRelays, remoteRelays, filter, 1200, 6000)
if (localRelays.length > 0) { const collected = await lastValueFrom(merge(local$.pipe(take(1)), remote$.pipe(take(1))).pipe(rxToArray()))
try { const events = collected as NostrEvent[]
events = await lastValueFrom(
relayPool
.req(localRelays, filter)
.pipe(
onlyEvents(),
take(1),
takeUntil(timer(1200)),
toArray()
)
)
} catch {
events = []
}
}
// Always query remote to ensure we have the latest from the wider network
if (remoteRelays.length > 0) {
try {
const remoteEvents = await lastValueFrom(
relayPool
.req(remoteRelays, filter)
.pipe(
onlyEvents(),
take(1),
takeUntil(timer(6000)),
toArray()
)
)
events = events.concat(remoteEvents)
} catch {
// ignore
}
}
if (events.length === 0) { if (events.length === 0) {
throw new Error('Article not found') throw new Error('Article not found')

View File

@@ -1,10 +1,11 @@
import { RelayPool, onlyEvents } from 'applesauce-relay' import { RelayPool } from 'applesauce-relay'
import { lastValueFrom, take, takeUntil, timer, toArray } from 'rxjs' import { lastValueFrom, take } from 'rxjs'
import { nip19 } from 'nostr-tools' import { nip19 } from 'nostr-tools'
import { AddressPointer } from 'nostr-tools/nip19' import { AddressPointer } from 'nostr-tools/nip19'
import { Helpers } from 'applesauce-core' import { Helpers } from 'applesauce-core'
import { RELAYS } from '../config/relays' import { RELAYS } from '../config/relays'
import { prioritizeLocalRelays, partitionRelays } from '../utils/helpers' import { prioritizeLocalRelays, partitionRelays, createParallelReqStreams } from '../utils/helpers'
import { merge, toArray as rxToArray } from 'rxjs'
const { getArticleTitle } = Helpers const { getArticleTitle } = Helpers
@@ -39,28 +40,11 @@ export async function fetchArticleTitle(
'#d': [pointer.identifier] '#d': [pointer.identifier]
} }
// Try to get the first event quickly from local relays // Parallel local+remote: collect up to one event from each
let events: { created_at: number }[] = [] const { local$, remote$ } = createParallelReqStreams(relayPool, localRelays, remoteRelays, filter, 1200, 5000)
if (localRelays.length > 0) { const events = await lastValueFrom(
try { merge(local$.pipe(take(1)), remote$.pipe(take(1))).pipe(rxToArray())
events = await lastValueFrom( ) as unknown as { created_at: number }[]
relayPool
.req(localRelays, filter)
.pipe(onlyEvents(), take(1), takeUntil(timer(1200)), toArray())
)
} catch {
events = []
}
}
// Always follow up with remote relays to ensure we have latest network data
if (remoteRelays.length > 0) {
const remoteEvents = await lastValueFrom(
relayPool
.req(remoteRelays, filter)
.pipe(onlyEvents(), take(1), takeUntil(timer(5000)), toArray())
)
events = events.concat(remoteEvents as unknown as { created_at: number }[])
}
if (events.length === 0) { if (events.length === 0) {
return null return null

View File

@@ -1,5 +1,5 @@
import { RelayPool, completeOnEose } from 'applesauce-relay' import { RelayPool, completeOnEose } from 'applesauce-relay'
import { lastValueFrom, takeUntil, timer, toArray } from 'rxjs' import { lastValueFrom, merge, Observable, takeUntil, timer, toArray } from 'rxjs'
import { import {
AccountWithExtension, AccountWithExtension,
NostrEvent, NostrEvent,
@@ -37,30 +37,17 @@ export const fetchBookmarks = async (
// Fetch bookmark events - NIP-51 standards, legacy formats, and web bookmarks (NIP-B0) // Fetch bookmark events - NIP-51 standards, legacy formats, and web bookmarks (NIP-B0)
console.log('🔍 Fetching bookmark events from relays:', relayUrls) console.log('🔍 Fetching bookmark events from relays:', relayUrls)
// Try local-first quickly, then full set fallback // Try local-first quickly, then full set fallback
let rawEvents = [] as NostrEvent[] const local$ = localRelays.length > 0
if (localRelays.length > 0) { ? relayPool
try { .req(localRelays, { kinds: [10003, 30003, 30001, 39701], authors: [activeAccount.pubkey] })
rawEvents = await lastValueFrom( .pipe(completeOnEose(), takeUntil(timer(1200)))
relayPool : new Observable<NostrEvent>((sub) => sub.complete())
.req(localRelays, { kinds: [10003, 30003, 30001, 39701], authors: [activeAccount.pubkey] }) const remote$ = remoteRelays.length > 0
.pipe(completeOnEose(), takeUntil(timer(1200)), toArray()) ? relayPool
) .req(remoteRelays, { kinds: [10003, 30003, 30001, 39701], authors: [activeAccount.pubkey] })
} catch { .pipe(completeOnEose(), takeUntil(timer(6000)))
rawEvents = [] : new Observable<NostrEvent>((sub) => sub.complete())
} const rawEvents = await lastValueFrom(merge(local$, remote$).pipe(toArray()))
}
if (remoteRelays.length > 0) {
try {
const remoteEvents = await lastValueFrom(
relayPool
.req(remoteRelays, { kinds: [10003, 30003, 30001, 39701], authors: [activeAccount.pubkey] })
.pipe(completeOnEose(), takeUntil(timer(6000)), toArray())
)
rawEvents = rawEvents.concat(remoteEvents)
} catch {
// ignore
}
}
console.log('📊 Raw events fetched:', rawEvents.length, 'events') console.log('📊 Raw events fetched:', rawEvents.length, 'events')
// Rebroadcast bookmark events to local/all relays based on settings // Rebroadcast bookmark events to local/all relays based on settings
@@ -125,30 +112,13 @@ export const fetchBookmarks = async (
if (noteIds.length > 0) { if (noteIds.length > 0) {
try { try {
const { local: localHydrate, remote: remoteHydrate } = partitionRelays(relayUrls) const { local: localHydrate, remote: remoteHydrate } = partitionRelays(relayUrls)
let events: NostrEvent[] = [] const localHydrate$ = localHydrate.length > 0
if (localHydrate.length > 0) { ? relayPool.req(localHydrate, { ids: noteIds }).pipe(completeOnEose(), takeUntil(timer(800)))
try { : new Observable<NostrEvent>((sub) => sub.complete())
events = await lastValueFrom( const remoteHydrate$ = remoteHydrate.length > 0
relayPool ? relayPool.req(remoteHydrate, { ids: noteIds }).pipe(completeOnEose(), takeUntil(timer(2500)))
.req(localHydrate, { ids: noteIds }) : new Observable<NostrEvent>((sub) => sub.complete())
.pipe(completeOnEose(), takeUntil(timer(800)), toArray()) const events: NostrEvent[] = await lastValueFrom(merge(localHydrate$, remoteHydrate$).pipe(toArray()))
)
} catch {
events = []
}
}
if (remoteHydrate.length > 0) {
try {
const remote = await lastValueFrom(
relayPool
.req(remoteHydrate, { ids: noteIds })
.pipe(completeOnEose(), takeUntil(timer(2500)), toArray())
)
events = events.concat(remote)
} catch {
// ignore
}
}
idToEvent = new Map(events.map((e: NostrEvent) => [e.id, e])) idToEvent = new Map(events.map((e: NostrEvent) => [e.id, e]))
} catch (error) { } catch (error) {
console.warn('Failed to fetch events for hydration:', error) console.warn('Failed to fetch events for hydration:', error)

View File

@@ -1,5 +1,5 @@
import { RelayPool, completeOnEose } from 'applesauce-relay' import { RelayPool, completeOnEose } from 'applesauce-relay'
import { lastValueFrom, takeUntil, timer, toArray } from 'rxjs' import { lastValueFrom, merge, Observable, takeUntil, timer, toArray } from 'rxjs'
import { prioritizeLocalRelays } from '../utils/helpers' import { prioritizeLocalRelays } from '../utils/helpers'
/** /**
@@ -20,19 +20,20 @@ export const fetchContacts = async (
// Local-first quick attempt // Local-first quick attempt
const localRelays = relayUrls.filter(url => url.includes('localhost') || url.includes('127.0.0.1')) const localRelays = relayUrls.filter(url => url.includes('localhost') || url.includes('127.0.0.1'))
let events: Array<{ created_at: number; tags: string[][] }> = [] const remoteRelays = relayUrls.filter(url => !url.includes('localhost') && !url.includes('127.0.0.1'))
if (localRelays.length > 0) { const local$ = localRelays.length > 0
try { ? relayPool
const localEvents = await lastValueFrom( .req(localRelays, { kinds: [3], authors: [pubkey] })
relayPool .pipe(completeOnEose(), takeUntil(timer(1200)))
.req(localRelays, { kinds: [3], authors: [pubkey] }) : new Observable<{ created_at: number; tags: string[][] }>((sub) => sub.complete())
.pipe(completeOnEose(), takeUntil(timer(1200)), toArray()) const remote$ = remoteRelays.length > 0
) ? relayPool
events = localEvents as Array<{ created_at: number; tags: string[][] }> .req(remoteRelays, { kinds: [3], authors: [pubkey] })
} catch { .pipe(completeOnEose(), takeUntil(timer(6000)))
events = [] : new Observable<{ created_at: number; tags: string[][] }>((sub) => sub.complete())
} const events = await lastValueFrom(
} merge(local$, remote$).pipe(toArray())
)
const followed = new Set<string>() const followed = new Set<string>()
if (events.length > 0) { if (events.length > 0) {
// Get the most recent contact list // Get the most recent contact list
@@ -46,29 +47,7 @@ export const fetchContacts = async (
} }
if (onPartial) onPartial(new Set(followed)) if (onPartial) onPartial(new Set(followed))
} }
// Always fetch remote to merge more contacts // merged already via streams
const remoteRelays = relayUrls.filter(url => !url.includes('localhost') && !url.includes('127.0.0.1'))
if (remoteRelays.length > 0) {
try {
const remoteEvents = await lastValueFrom(
relayPool
.req(remoteRelays, { kinds: [3], authors: [pubkey] })
.pipe(completeOnEose(), takeUntil(timer(6000)), toArray())
)
if (remoteEvents.length > 0) {
const sortedRemote = (remoteEvents as Array<{ created_at: number; tags: string[][] }>).
sort((a, b) => b.created_at - a.created_at)
const contactList = sortedRemote[0]
for (const tag of contactList.tags) {
if (tag[0] === 'p' && tag[1]) {
followed.add(tag[1])
}
}
}
} catch {
// ignore
}
}
console.log('📊 Contact events fetched:', events.length) console.log('📊 Contact events fetched:', events.length)

View File

@@ -1,5 +1,5 @@
import { RelayPool, completeOnEose } from 'applesauce-relay' import { RelayPool, completeOnEose } from 'applesauce-relay'
import { lastValueFrom, takeUntil, timer, toArray } from 'rxjs' import { lastValueFrom, merge, Observable, takeUntil, timer, toArray } from 'rxjs'
import { prioritizeLocalRelays, partitionRelays } from '../utils/helpers' import { prioritizeLocalRelays, partitionRelays } from '../utils/helpers'
import { NostrEvent } from 'nostr-tools' import { NostrEvent } from 'nostr-tools'
import { Helpers } from 'applesauce-core' import { Helpers } from 'applesauce-core'
@@ -66,49 +66,18 @@ export const fetchBlogPostsFromAuthors = async (
} }
} }
// Phase 1: local relays fast path const local$ = localRelays.length > 0
if (localRelays.length > 0) { ? relayPool
try { .req(localRelays, { kinds: [30023], authors: pubkeys, limit: 100 })
const localEvents = await lastValueFrom( .pipe(completeOnEose(), takeUntil(timer(1200)))
relayPool : new Observable<NostrEvent>((sub) => sub.complete())
.req(localRelays, { const remote$ = remoteRelays.length > 0
kinds: [30023], ? relayPool
authors: pubkeys, .req(remoteRelays, { kinds: [30023], authors: pubkeys, limit: 100 })
limit: 100 .pipe(completeOnEose(), takeUntil(timer(6000)))
}) : new Observable<NostrEvent>((sub) => sub.complete())
.pipe( const events = await lastValueFrom(merge(local$, remote$).pipe(toArray()))
completeOnEose(), processEvents(events)
takeUntil(timer(1200)),
toArray()
)
)
processEvents(localEvents)
} catch {
// ignore
}
}
// Phase 2: always query remote relays to fill in missing content
if (remoteRelays.length > 0) {
try {
const remoteEvents = await lastValueFrom(
relayPool
.req(remoteRelays, {
kinds: [30023],
authors: pubkeys,
limit: 100
})
.pipe(
completeOnEose(),
takeUntil(timer(6000)),
toArray()
)
)
processEvents(remoteEvents)
} catch {
// ignore
}
}
console.log('📊 Blog post events fetched (unique):', uniqueEvents.size) console.log('📊 Blog post events fetched (unique):', uniqueEvents.size)

View File

@@ -1,5 +1,5 @@
import { RelayPool, completeOnEose, onlyEvents } from 'applesauce-relay' import { RelayPool, completeOnEose, onlyEvents } from 'applesauce-relay'
import { lastValueFrom, takeUntil, timer, tap, toArray } from 'rxjs' import { lastValueFrom, merge, Observable, takeUntil, timer, tap, toArray } from 'rxjs'
import { NostrEvent } from 'nostr-tools' import { NostrEvent } from 'nostr-tools'
import { Highlight } from '../../types/highlights' import { Highlight } from '../../types/highlights'
import { prioritizeLocalRelays, partitionRelays } from '../../utils/helpers' import { prioritizeLocalRelays, partitionRelays } from '../../utils/helpers'
@@ -19,52 +19,37 @@ export const fetchHighlights = async (
const { local: localRelays, remote: remoteRelays } = partitionRelays(ordered) const { local: localRelays, remote: remoteRelays } = partitionRelays(ordered)
const seenIds = new Set<string>() const seenIds = new Set<string>()
let rawEvents: NostrEvent[] = [] const local$ = localRelays.length > 0
if (localRelays.length > 0) { ? relayPool
try { .req(localRelays, { kinds: [9802], authors: [pubkey] })
rawEvents = await lastValueFrom( .pipe(
relayPool onlyEvents(),
.req(localRelays, { kinds: [9802], authors: [pubkey] }) tap((event: NostrEvent) => {
.pipe( if (!seenIds.has(event.id)) {
onlyEvents(), seenIds.add(event.id)
tap((event: NostrEvent) => { if (onHighlight) onHighlight(eventToHighlight(event))
if (!seenIds.has(event.id)) { }
seenIds.add(event.id) }),
if (onHighlight) onHighlight(eventToHighlight(event)) completeOnEose(),
} takeUntil(timer(1200))
}), )
completeOnEose(), : new Observable<NostrEvent>((sub) => sub.complete())
takeUntil(timer(1200)), const remote$ = remoteRelays.length > 0
toArray() ? relayPool
) .req(remoteRelays, { kinds: [9802], authors: [pubkey] })
) .pipe(
} catch { onlyEvents(),
rawEvents = [] tap((event: NostrEvent) => {
} if (!seenIds.has(event.id)) {
} seenIds.add(event.id)
if (remoteRelays.length > 0) { if (onHighlight) onHighlight(eventToHighlight(event))
try { }
const remoteEvents = await lastValueFrom( }),
relayPool completeOnEose(),
.req(remoteRelays, { kinds: [9802], authors: [pubkey] }) takeUntil(timer(6000))
.pipe( )
onlyEvents(), : new Observable<NostrEvent>((sub) => sub.complete())
tap((event: NostrEvent) => { const rawEvents: NostrEvent[] = await lastValueFrom(merge(local$, remote$).pipe(toArray()))
if (!seenIds.has(event.id)) {
seenIds.add(event.id)
if (onHighlight) onHighlight(eventToHighlight(event))
}
}),
completeOnEose(),
takeUntil(timer(6000)),
toArray()
)
)
rawEvents = rawEvents.concat(remoteEvents)
} catch {
// ignore
}
}
await rebroadcastEvents(rawEvents, relayPool, settings) await rebroadcastEvents(rawEvents, relayPool, settings)
const uniqueEvents = dedupeHighlights(rawEvents) const uniqueEvents = dedupeHighlights(rawEvents)

View File

@@ -1,5 +1,5 @@
import { RelayPool, completeOnEose, onlyEvents } from 'applesauce-relay' import { RelayPool, completeOnEose, onlyEvents } from 'applesauce-relay'
import { lastValueFrom, takeUntil, timer, tap, toArray } from 'rxjs' import { lastValueFrom, merge, Observable, takeUntil, timer, tap, toArray } from 'rxjs'
import { NostrEvent } from 'nostr-tools' import { NostrEvent } from 'nostr-tools'
import { Highlight } from '../../types/highlights' import { Highlight } from '../../types/highlights'
import { RELAYS } from '../../config/relays' import { RELAYS } from '../../config/relays'
@@ -26,96 +26,63 @@ export const fetchHighlightsForArticle = async (
const orderedRelays = prioritizeLocalRelays(RELAYS) const orderedRelays = prioritizeLocalRelays(RELAYS)
const { local: localRelays, remote: remoteRelays } = partitionRelays(orderedRelays) const { local: localRelays, remote: remoteRelays } = partitionRelays(orderedRelays)
let aTagEvents: NostrEvent[] = [] const aLocal$ = localRelays.length > 0
if (localRelays.length > 0) { ? relayPool
try { .req(localRelays, { kinds: [9802], '#a': [articleCoordinate] })
aTagEvents = await lastValueFrom( .pipe(
relayPool onlyEvents(),
.req(localRelays, { kinds: [9802], '#a': [articleCoordinate] }) tap((event: NostrEvent) => {
.pipe( const highlight = processEvent(event)
onlyEvents(), if (highlight && onHighlight) onHighlight(highlight)
tap((event: NostrEvent) => { }),
const highlight = processEvent(event) completeOnEose(),
if (highlight && onHighlight) onHighlight(highlight) takeUntil(timer(1200))
}), )
completeOnEose(), : new Observable<NostrEvent>((sub) => sub.complete())
takeUntil(timer(1200)), const aRemote$ = remoteRelays.length > 0
toArray() ? relayPool
) .req(remoteRelays, { kinds: [9802], '#a': [articleCoordinate] })
) .pipe(
} catch { onlyEvents(),
aTagEvents = [] tap((event: NostrEvent) => {
} const highlight = processEvent(event)
} if (highlight && onHighlight) onHighlight(highlight)
}),
// Always query remote relays to merge additional highlights completeOnEose(),
if (remoteRelays.length > 0) { takeUntil(timer(6000))
try { )
const aRemote = await lastValueFrom( : new Observable<NostrEvent>((sub) => sub.complete())
relayPool const aTagEvents: NostrEvent[] = await lastValueFrom(merge(aLocal$, aRemote$).pipe(toArray()))
.req(remoteRelays, { kinds: [9802], '#a': [articleCoordinate] })
.pipe(
onlyEvents(),
tap((event: NostrEvent) => {
const highlight = processEvent(event)
if (highlight && onHighlight) onHighlight(highlight)
}),
completeOnEose(),
takeUntil(timer(6000)),
toArray()
)
)
aTagEvents = aTagEvents.concat(aRemote)
} catch {
// ignore
}
}
let eTagEvents: NostrEvent[] = [] let eTagEvents: NostrEvent[] = []
if (eventId) { if (eventId) {
if (localRelays.length > 0) { const eLocal$ = localRelays.length > 0
try { ? relayPool
eTagEvents = await lastValueFrom( .req(localRelays, { kinds: [9802], '#e': [eventId] })
relayPool .pipe(
.req(localRelays, { kinds: [9802], '#e': [eventId] }) onlyEvents(),
.pipe( tap((event: NostrEvent) => {
onlyEvents(), const highlight = processEvent(event)
tap((event: NostrEvent) => { if (highlight && onHighlight) onHighlight(highlight)
const highlight = processEvent(event) }),
if (highlight && onHighlight) onHighlight(highlight) completeOnEose(),
}), takeUntil(timer(1200))
completeOnEose(), )
takeUntil(timer(1200)), : new Observable<NostrEvent>((sub) => sub.complete())
toArray() const eRemote$ = remoteRelays.length > 0
) ? relayPool
) .req(remoteRelays, { kinds: [9802], '#e': [eventId] })
} catch { .pipe(
eTagEvents = [] onlyEvents(),
} tap((event: NostrEvent) => {
} const highlight = processEvent(event)
if (highlight && onHighlight) onHighlight(highlight)
// Always query remote for e-tag too }),
if (remoteRelays.length > 0) { completeOnEose(),
try { takeUntil(timer(6000))
const eRemote = await lastValueFrom( )
relayPool : new Observable<NostrEvent>((sub) => sub.complete())
.req(remoteRelays, { kinds: [9802], '#e': [eventId] }) eTagEvents = await lastValueFrom(merge(eLocal$, eRemote$).pipe(toArray()))
.pipe(
onlyEvents(),
tap((event: NostrEvent) => {
const highlight = processEvent(event)
if (highlight && onHighlight) onHighlight(highlight)
}),
completeOnEose(),
takeUntil(timer(6000)),
toArray()
)
)
eTagEvents = eTagEvents.concat(eRemote)
} catch {
// ignore
}
}
} }
const rawEvents = [...aTagEvents, ...eTagEvents] const rawEvents = [...aTagEvents, ...eTagEvents]

View File

@@ -1,5 +1,5 @@
import { RelayPool, completeOnEose, onlyEvents } from 'applesauce-relay' import { RelayPool, completeOnEose, onlyEvents } from 'applesauce-relay'
import { lastValueFrom, takeUntil, timer, tap, toArray } from 'rxjs' import { lastValueFrom, merge, Observable, takeUntil, timer, tap, toArray } from 'rxjs'
import { NostrEvent } from 'nostr-tools' import { NostrEvent } from 'nostr-tools'
import { Highlight } from '../../types/highlights' import { Highlight } from '../../types/highlights'
import { RELAYS } from '../../config/relays' import { RELAYS } from '../../config/relays'
@@ -18,48 +18,33 @@ export const fetchHighlightsForUrl = async (
const seenIds = new Set<string>() const seenIds = new Set<string>()
const orderedRelaysUrl = prioritizeLocalRelays(RELAYS) const orderedRelaysUrl = prioritizeLocalRelays(RELAYS)
const { local: localRelaysUrl, remote: remoteRelaysUrl } = partitionRelays(orderedRelaysUrl) const { local: localRelaysUrl, remote: remoteRelaysUrl } = partitionRelays(orderedRelaysUrl)
let rawEvents: NostrEvent[] = [] const local$ = localRelaysUrl.length > 0
if (localRelaysUrl.length > 0) { ? relayPool
try { .req(localRelaysUrl, { kinds: [9802], '#r': [url] })
rawEvents = await lastValueFrom( .pipe(
relayPool onlyEvents(),
.req(localRelaysUrl, { kinds: [9802], '#r': [url] }) tap((event: NostrEvent) => {
.pipe( seenIds.add(event.id)
onlyEvents(), if (onHighlight) onHighlight(eventToHighlight(event))
tap((event: NostrEvent) => { }),
seenIds.add(event.id) completeOnEose(),
if (onHighlight) onHighlight(eventToHighlight(event)) takeUntil(timer(1200))
}), )
completeOnEose(), : new Observable<NostrEvent>((sub) => sub.complete())
takeUntil(timer(1200)), const remote$ = remoteRelaysUrl.length > 0
toArray() ? relayPool
) .req(remoteRelaysUrl, { kinds: [9802], '#r': [url] })
) .pipe(
} catch { onlyEvents(),
rawEvents = [] tap((event: NostrEvent) => {
} seenIds.add(event.id)
} if (onHighlight) onHighlight(eventToHighlight(event))
if (remoteRelaysUrl.length > 0) { }),
try { completeOnEose(),
const remote = await lastValueFrom( takeUntil(timer(6000))
relayPool )
.req(remoteRelaysUrl, { kinds: [9802], '#r': [url] }) : new Observable<NostrEvent>((sub) => sub.complete())
.pipe( const rawEvents: NostrEvent[] = await lastValueFrom(merge(local$, remote$).pipe(toArray()))
onlyEvents(),
tap((event: NostrEvent) => {
seenIds.add(event.id)
if (onHighlight) onHighlight(eventToHighlight(event))
}),
completeOnEose(),
takeUntil(timer(6000)),
toArray()
)
)
rawEvents = rawEvents.concat(remote)
} catch {
// ignore
}
}
await rebroadcastEvents(rawEvents, relayPool, settings) await rebroadcastEvents(rawEvents, relayPool, settings)
const uniqueEvents = dedupeHighlights(rawEvents) const uniqueEvents = dedupeHighlights(rawEvents)
const highlights: Highlight[] = uniqueEvents.map(eventToHighlight) const highlights: Highlight[] = uniqueEvents.map(eventToHighlight)

View File

@@ -94,3 +94,27 @@ export const prioritizeLocalRelays = (relayUrls: string[]): string[] => {
return out return out
} }
// Parallel request helper
import { completeOnEose, onlyEvents, RelayPool } from 'applesauce-relay'
import { Observable, takeUntil, timer } from 'rxjs'
export function createParallelReqStreams(
relayPool: RelayPool,
localRelays: string[],
remoteRelays: string[],
// eslint-disable-next-line @typescript-eslint/no-explicit-any
filter: any,
localTimeoutMs = 1200,
remoteTimeoutMs = 6000
): { local$: Observable<unknown>; remote$: Observable<unknown> } {
const local$ = (localRelays.length > 0)
? relayPool.req(localRelays, filter).pipe(onlyEvents(), completeOnEose(), takeUntil(timer(localTimeoutMs)))
: new Observable<unknown>((sub) => { sub.complete() })
const remote$ = (remoteRelays.length > 0)
? relayPool.req(remoteRelays, filter).pipe(onlyEvents(), completeOnEose(), takeUntil(timer(remoteTimeoutMs)))
: new Observable<unknown>((sub) => { sub.complete() })
return { local$, remote$ }
}