import { RelayPool, completeOnEose, onlyEvents } from 'applesauce-relay' import { Observable, merge, toArray, tap, lastValueFrom } from 'rxjs' import { NostrEvent } from 'nostr-tools' import { Filter } from 'nostr-tools/filter' import { prioritizeLocalRelays, partitionRelays } from '../utils/helpers' export interface QueryOptions { relayUrls?: string[] onEvent?: (event: NostrEvent) => void } /** * Unified local-first query helper with optional streaming callback. * Returns all collected events (deduped by id) after both streams complete (EOSE). * Trusts relay EOSE signals - no artificial timeouts. */ export async function queryEvents( relayPool: RelayPool, filter: Filter, options: QueryOptions = {} ): Promise { const { relayUrls, onEvent } = options const urls = relayUrls && relayUrls.length > 0 ? relayUrls : Array.from(relayPool.relays.values()).map(r => r.url) const ordered = prioritizeLocalRelays(urls) const { local: localRelays, remote: remoteRelays } = partitionRelays(ordered) const local$: Observable = localRelays.length > 0 ? relayPool .req(localRelays, filter) .pipe( onlyEvents(), onEvent ? tap((e: NostrEvent) => onEvent(e)) : tap(() => {}), completeOnEose() ) as unknown as Observable : new Observable((sub) => sub.complete()) const remote$: Observable = remoteRelays.length > 0 ? relayPool .req(remoteRelays, filter) .pipe( onlyEvents(), onEvent ? tap((e: NostrEvent) => onEvent(e)) : tap(() => {}), completeOnEose() ) as unknown as Observable : new Observable((sub) => sub.complete()) const events = await lastValueFrom(merge(local$, remote$).pipe(toArray())) // Deduplicate by id (callers can perform higher-level replaceable grouping if needed) const byId = new Map() for (const ev of events) { if (!byId.has(ev.id)) byId.set(ev.id, ev) } return Array.from(byId.values()) }