feat(fetch): add unified queryEvents helper and stream contacts partials with extended timeout

This commit is contained in:
Gigi
2025-10-15 09:17:51 +02:00
parent 765ce0ac5e
commit 02ec8dd936
2 changed files with 94 additions and 19 deletions

View File

@@ -1,6 +1,8 @@
import { RelayPool, completeOnEose } from 'applesauce-relay'
import { lastValueFrom, merge, Observable, takeUntil, timer, toArray } from 'rxjs'
import { RelayPool } from 'applesauce-relay'
import { Observable } from 'rxjs'
import { prioritizeLocalRelays } from '../utils/helpers'
import { queryEvents } from './dataFetch'
import { CONTACTS_REMOTE_TIMEOUT_MS } from '../config/network'
/**
* Fetches the contact list (follows) for a specific user
@@ -15,24 +17,27 @@ export const fetchContacts = async (
): Promise<Set<string>> => {
try {
const relayUrls = prioritizeLocalRelays(Array.from(relayPool.relays.values()).map(relay => relay.url))
console.log('🔍 Fetching contacts (kind 3) for user:', pubkey)
// Local-first quick attempt
const localRelays = relayUrls.filter(url => url.includes('localhost') || url.includes('127.0.0.1'))
const remoteRelays = relayUrls.filter(url => !url.includes('localhost') && !url.includes('127.0.0.1'))
const local$ = localRelays.length > 0
? relayPool
.req(localRelays, { kinds: [3], authors: [pubkey] })
.pipe(completeOnEose(), takeUntil(timer(1200)))
: new Observable<{ created_at: number; tags: string[][] }>((sub) => sub.complete())
const remote$ = remoteRelays.length > 0
? relayPool
.req(remoteRelays, { kinds: [3], authors: [pubkey] })
.pipe(completeOnEose(), takeUntil(timer(6000)))
: new Observable<{ created_at: number; tags: string[][] }>((sub) => sub.complete())
const events = await lastValueFrom(
merge(local$, remote$).pipe(toArray())
const partialFollowed = new Set<string>()
const events = await queryEvents(
relayPool,
{ kinds: [3], authors: [pubkey] },
{
relayUrls,
remoteTimeoutMs: CONTACTS_REMOTE_TIMEOUT_MS,
onEvent: (event: { created_at: number; tags: string[][] }) => {
// Stream partials as we see any contact list
for (const tag of event.tags) {
if (tag[0] === 'p' && tag[1]) {
partialFollowed.add(tag[1])
}
}
if (onPartial && partialFollowed.size > 0) {
onPartial(new Set(partialFollowed))
}
}
}
)
const followed = new Set<string>()
if (events.length > 0) {

70
src/services/dataFetch.ts Normal file
View File

@@ -0,0 +1,70 @@
import { RelayPool, completeOnEose, onlyEvents } from 'applesauce-relay'
import { Observable, merge, takeUntil, timer, toArray, tap, lastValueFrom } from 'rxjs'
import { NostrEvent } from 'nostr-tools'
import { prioritizeLocalRelays, partitionRelays } from '../utils/helpers'
import { LOCAL_TIMEOUT_MS, REMOTE_TIMEOUT_MS } from '../config/network'
export interface QueryOptions {
relayUrls?: string[]
localTimeoutMs?: number
remoteTimeoutMs?: number
onEvent?: (event: NostrEvent) => void
}
/**
* Unified local-first query helper with optional streaming callback.
* Returns all collected events (deduped by id) after both streams complete or time out.
*/
export async function queryEvents(
relayPool: RelayPool,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
filter: any,
options: QueryOptions = {}
): Promise<NostrEvent[]> {
const {
relayUrls,
localTimeoutMs = LOCAL_TIMEOUT_MS,
remoteTimeoutMs = REMOTE_TIMEOUT_MS,
onEvent
} = options
const urls = relayUrls && relayUrls.length > 0
? relayUrls
: Array.from(relayPool.relays.values()).map(r => r.url)
const ordered = prioritizeLocalRelays(urls)
const { local: localRelays, remote: remoteRelays } = partitionRelays(ordered)
const local$: Observable<NostrEvent> = localRelays.length > 0
? relayPool
.req(localRelays, filter)
.pipe(
onlyEvents(),
onEvent ? tap((e: NostrEvent) => onEvent(e)) : tap(() => {}),
completeOnEose(),
takeUntil(timer(localTimeoutMs))
) as unknown as Observable<NostrEvent>
: new Observable<NostrEvent>((sub) => sub.complete())
const remote$: Observable<NostrEvent> = remoteRelays.length > 0
? relayPool
.req(remoteRelays, filter)
.pipe(
onlyEvents(),
onEvent ? tap((e: NostrEvent) => onEvent(e)) : tap(() => {}),
completeOnEose(),
takeUntil(timer(remoteTimeoutMs))
) as unknown as Observable<NostrEvent>
: new Observable<NostrEvent>((sub) => sub.complete())
const events = await lastValueFrom(merge(local$, remote$).pipe(toArray()))
// Deduplicate by id (callers can perform higher-level replaceable grouping if needed)
const byId = new Map<string, NostrEvent>()
for (const ev of events) {
if (!byId.has(ev.id)) byId.set(ev.id, ev)
}
return Array.from(byId.values())
}