From 11041df1fbbf82aec2993e610b4d65491e20dd08 Mon Sep 17 00:00:00 2001 From: Gigi Date: Wed, 22 Oct 2025 08:55:18 +0200 Subject: [PATCH] refactor(reading-position): add startReadingPositionStream and remove timeouts --- src/services/readingPositionService.ts | 147 ++++++++++--------------- 1 file changed, 60 insertions(+), 87 deletions(-) diff --git a/src/services/readingPositionService.ts b/src/services/readingPositionService.ts index 9602237a..5b2d7e16 100644 --- a/src/services/readingPositionService.ts +++ b/src/services/readingPositionService.ts @@ -135,8 +135,53 @@ export async function saveReadingPosition( await publishEvent(relayPool, eventStore, signed) } +/** + * Streaming reading position loader (non-blocking, EOSE-driven) + * Seeds from local eventStore, streams relay updates to store in background + * @returns Unsubscribe function to cancel both store watch and network stream + */ +export function startReadingPositionStream( + relayPool: RelayPool, + eventStore: IEventStore, + pubkey: string, + articleIdentifier: string, + onPosition: (pos: ReadingPosition | null) => void +): () => void { + const dTag = generateDTag(articleIdentifier) + + // 1) Seed from local replaceable immediately and watch for updates + const storeSub = eventStore + .replaceable(READING_PROGRESS_KIND, pubkey, dTag) + .subscribe((event: NostrEvent | undefined) => { + if (!event) { + onPosition(null) + return + } + const parsed = getReadingProgressContent(event) + onPosition(parsed || null) + }) + + // 2) Stream from relays in background; pipe into store; no timeout/unsubscribe timer + const networkSub = relayPool + .subscription(RELAYS, { + kinds: [READING_PROGRESS_KIND], + authors: [pubkey], + '#d': [dTag] + }) + .pipe(onlyEvents(), mapEventsToStore(eventStore)) + .subscribe() + + // Caller manages lifecycle + return () => { + try { storeSub.unsubscribe() } catch {} + try { networkSub.unsubscribe() } catch {} + } +} + /** * Load reading position from Nostr (kind 39802) + * @deprecated Use startReadingPositionStream for non-blocking behavior + * Returns current local position immediately (or null) and starts background sync */ export async function loadReadingPosition( relayPool: RelayPool, @@ -146,101 +191,29 @@ export async function loadReadingPosition( ): Promise { const dTag = generateDTag(articleIdentifier) - // Check local event store first + let initial: ReadingPosition | null = null try { const localEvent = await firstValueFrom( eventStore.replaceable(READING_PROGRESS_KIND, pubkey, dTag) ) if (localEvent) { const content = getReadingProgressContent(localEvent) - if (content) { - // Fetch from relays in background to get any updates - relayPool - .subscription(RELAYS, { - kinds: [READING_PROGRESS_KIND], - authors: [pubkey], - '#d': [dTag] - }) - .pipe(onlyEvents(), mapEventsToStore(eventStore)) - .subscribe() - - return content - } + if (content) initial = content } - } catch (err) { - // Ignore errors and fetch from relays + } catch { + // ignore } - // Fetch from relays - const result = await fetchFromRelays( - relayPool, - eventStore, - pubkey, - READING_PROGRESS_KIND, - dTag, - getReadingProgressContent - ) - - return result || null -} - -// Helper function to fetch from relays with timeout -async function fetchFromRelays( - relayPool: RelayPool, - eventStore: IEventStore, - pubkey: string, - kind: number, - dTag: string, - parser: (event: NostrEvent) => ReadingPosition | undefined -): Promise { - return new Promise((resolve) => { - let hasResolved = false - const timeout = setTimeout(() => { - if (!hasResolved) { - hasResolved = true - resolve(null) - } - }, 3000) - - const sub = relayPool - .subscription(RELAYS, { - kinds: [kind], - authors: [pubkey], - '#d': [dTag] - }) - .pipe(onlyEvents(), mapEventsToStore(eventStore)) - .subscribe({ - complete: async () => { - clearTimeout(timeout) - if (!hasResolved) { - hasResolved = true - try { - const event = await firstValueFrom( - eventStore.replaceable(kind, pubkey, dTag) - ) - if (event) { - const content = parser(event) - resolve(content || null) - } else { - resolve(null) - } - } catch (err) { - resolve(null) - } - } - }, - error: () => { - clearTimeout(timeout) - if (!hasResolved) { - hasResolved = true - resolve(null) - } - } - }) - - setTimeout(() => { - sub.unsubscribe() - }, 3000) - }) + // Start background sync (fire-and-forget; no timeout) + relayPool + .subscription(RELAYS, { + kinds: [READING_PROGRESS_KIND], + authors: [pubkey], + '#d': [dTag] + }) + .pipe(onlyEvents(), mapEventsToStore(eventStore)) + .subscribe() + + return initial }