diff --git a/src/services/contactService.ts b/src/services/contactService.ts index 00dabd78..468777b3 100644 --- a/src/services/contactService.ts +++ b/src/services/contactService.ts @@ -1,6 +1,8 @@ -import { RelayPool, completeOnEose } from 'applesauce-relay' -import { lastValueFrom, merge, Observable, takeUntil, timer, toArray } from 'rxjs' +import { RelayPool } from 'applesauce-relay' +import { Observable } from 'rxjs' import { prioritizeLocalRelays } from '../utils/helpers' +import { queryEvents } from './dataFetch' +import { CONTACTS_REMOTE_TIMEOUT_MS } from '../config/network' /** * Fetches the contact list (follows) for a specific user @@ -15,24 +17,27 @@ export const fetchContacts = async ( ): Promise> => { try { const relayUrls = prioritizeLocalRelays(Array.from(relayPool.relays.values()).map(relay => relay.url)) - console.log('🔍 Fetching contacts (kind 3) for user:', pubkey) - - // Local-first quick attempt - const localRelays = relayUrls.filter(url => url.includes('localhost') || url.includes('127.0.0.1')) - const remoteRelays = relayUrls.filter(url => !url.includes('localhost') && !url.includes('127.0.0.1')) - const local$ = localRelays.length > 0 - ? relayPool - .req(localRelays, { kinds: [3], authors: [pubkey] }) - .pipe(completeOnEose(), takeUntil(timer(1200))) - : new Observable<{ created_at: number; tags: string[][] }>((sub) => sub.complete()) - const remote$ = remoteRelays.length > 0 - ? relayPool - .req(remoteRelays, { kinds: [3], authors: [pubkey] }) - .pipe(completeOnEose(), takeUntil(timer(6000))) - : new Observable<{ created_at: number; tags: string[][] }>((sub) => sub.complete()) - const events = await lastValueFrom( - merge(local$, remote$).pipe(toArray()) + + const partialFollowed = new Set() + const events = await queryEvents( + relayPool, + { kinds: [3], authors: [pubkey] }, + { + relayUrls, + remoteTimeoutMs: CONTACTS_REMOTE_TIMEOUT_MS, + onEvent: (event: { created_at: number; tags: string[][] }) => { + // Stream partials as we see any contact list + for (const tag of event.tags) { + if (tag[0] === 'p' && tag[1]) { + partialFollowed.add(tag[1]) + } + } + if (onPartial && partialFollowed.size > 0) { + onPartial(new Set(partialFollowed)) + } + } + } ) const followed = new Set() if (events.length > 0) { diff --git a/src/services/dataFetch.ts b/src/services/dataFetch.ts new file mode 100644 index 00000000..f4f31bb4 --- /dev/null +++ b/src/services/dataFetch.ts @@ -0,0 +1,70 @@ +import { RelayPool, completeOnEose, onlyEvents } from 'applesauce-relay' +import { Observable, merge, takeUntil, timer, toArray, tap, lastValueFrom } from 'rxjs' +import { NostrEvent } from 'nostr-tools' +import { prioritizeLocalRelays, partitionRelays } from '../utils/helpers' +import { LOCAL_TIMEOUT_MS, REMOTE_TIMEOUT_MS } from '../config/network' + +export interface QueryOptions { + relayUrls?: string[] + localTimeoutMs?: number + remoteTimeoutMs?: number + onEvent?: (event: NostrEvent) => void +} + +/** + * Unified local-first query helper with optional streaming callback. + * Returns all collected events (deduped by id) after both streams complete or time out. + */ +export async function queryEvents( + relayPool: RelayPool, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + filter: any, + options: QueryOptions = {} +): Promise { + const { + relayUrls, + localTimeoutMs = LOCAL_TIMEOUT_MS, + remoteTimeoutMs = REMOTE_TIMEOUT_MS, + onEvent + } = options + + const urls = relayUrls && relayUrls.length > 0 + ? relayUrls + : Array.from(relayPool.relays.values()).map(r => r.url) + + const ordered = prioritizeLocalRelays(urls) + const { local: localRelays, remote: remoteRelays } = partitionRelays(ordered) + + const local$: Observable = localRelays.length > 0 + ? relayPool + .req(localRelays, filter) + .pipe( + onlyEvents(), + onEvent ? tap((e: NostrEvent) => onEvent(e)) : tap(() => {}), + completeOnEose(), + takeUntil(timer(localTimeoutMs)) + ) as unknown as Observable + : new Observable((sub) => sub.complete()) + + const remote$: Observable = remoteRelays.length > 0 + ? relayPool + .req(remoteRelays, filter) + .pipe( + onlyEvents(), + onEvent ? tap((e: NostrEvent) => onEvent(e)) : tap(() => {}), + completeOnEose(), + takeUntil(timer(remoteTimeoutMs)) + ) as unknown as Observable + : new Observable((sub) => sub.complete()) + + const events = await lastValueFrom(merge(local$, remote$).pipe(toArray())) + + // Deduplicate by id (callers can perform higher-level replaceable grouping if needed) + const byId = new Map() + for (const ev of events) { + if (!byId.has(ev.id)) byId.set(ev.id, ev) + } + return Array.from(byId.values()) +} + +