mirror of
https://github.com/dergigi/boris.git
synced 2025-12-18 15:14:20 +01:00
refactor(reading-position): add startReadingPositionStream and remove timeouts
This commit is contained in:
@@ -135,8 +135,53 @@ export async function saveReadingPosition(
|
||||
await publishEvent(relayPool, eventStore, signed)
|
||||
}
|
||||
|
||||
/**
|
||||
* Streaming reading position loader (non-blocking, EOSE-driven)
|
||||
* Seeds from local eventStore, streams relay updates to store in background
|
||||
* @returns Unsubscribe function to cancel both store watch and network stream
|
||||
*/
|
||||
export function startReadingPositionStream(
|
||||
relayPool: RelayPool,
|
||||
eventStore: IEventStore,
|
||||
pubkey: string,
|
||||
articleIdentifier: string,
|
||||
onPosition: (pos: ReadingPosition | null) => void
|
||||
): () => void {
|
||||
const dTag = generateDTag(articleIdentifier)
|
||||
|
||||
// 1) Seed from local replaceable immediately and watch for updates
|
||||
const storeSub = eventStore
|
||||
.replaceable(READING_PROGRESS_KIND, pubkey, dTag)
|
||||
.subscribe((event: NostrEvent | undefined) => {
|
||||
if (!event) {
|
||||
onPosition(null)
|
||||
return
|
||||
}
|
||||
const parsed = getReadingProgressContent(event)
|
||||
onPosition(parsed || null)
|
||||
})
|
||||
|
||||
// 2) Stream from relays in background; pipe into store; no timeout/unsubscribe timer
|
||||
const networkSub = relayPool
|
||||
.subscription(RELAYS, {
|
||||
kinds: [READING_PROGRESS_KIND],
|
||||
authors: [pubkey],
|
||||
'#d': [dTag]
|
||||
})
|
||||
.pipe(onlyEvents(), mapEventsToStore(eventStore))
|
||||
.subscribe()
|
||||
|
||||
// Caller manages lifecycle
|
||||
return () => {
|
||||
try { storeSub.unsubscribe() } catch {}
|
||||
try { networkSub.unsubscribe() } catch {}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load reading position from Nostr (kind 39802)
|
||||
* @deprecated Use startReadingPositionStream for non-blocking behavior
|
||||
* Returns current local position immediately (or null) and starts background sync
|
||||
*/
|
||||
export async function loadReadingPosition(
|
||||
relayPool: RelayPool,
|
||||
@@ -146,101 +191,29 @@ export async function loadReadingPosition(
|
||||
): Promise<ReadingPosition | null> {
|
||||
const dTag = generateDTag(articleIdentifier)
|
||||
|
||||
// Check local event store first
|
||||
let initial: ReadingPosition | null = null
|
||||
try {
|
||||
const localEvent = await firstValueFrom(
|
||||
eventStore.replaceable(READING_PROGRESS_KIND, pubkey, dTag)
|
||||
)
|
||||
if (localEvent) {
|
||||
const content = getReadingProgressContent(localEvent)
|
||||
if (content) {
|
||||
// Fetch from relays in background to get any updates
|
||||
relayPool
|
||||
.subscription(RELAYS, {
|
||||
kinds: [READING_PROGRESS_KIND],
|
||||
authors: [pubkey],
|
||||
'#d': [dTag]
|
||||
})
|
||||
.pipe(onlyEvents(), mapEventsToStore(eventStore))
|
||||
.subscribe()
|
||||
|
||||
return content
|
||||
}
|
||||
if (content) initial = content
|
||||
}
|
||||
} catch (err) {
|
||||
// Ignore errors and fetch from relays
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
|
||||
// Fetch from relays
|
||||
const result = await fetchFromRelays(
|
||||
relayPool,
|
||||
eventStore,
|
||||
pubkey,
|
||||
READING_PROGRESS_KIND,
|
||||
dTag,
|
||||
getReadingProgressContent
|
||||
)
|
||||
|
||||
return result || null
|
||||
}
|
||||
|
||||
// Helper function to fetch from relays with timeout
|
||||
async function fetchFromRelays(
|
||||
relayPool: RelayPool,
|
||||
eventStore: IEventStore,
|
||||
pubkey: string,
|
||||
kind: number,
|
||||
dTag: string,
|
||||
parser: (event: NostrEvent) => ReadingPosition | undefined
|
||||
): Promise<ReadingPosition | null> {
|
||||
return new Promise((resolve) => {
|
||||
let hasResolved = false
|
||||
const timeout = setTimeout(() => {
|
||||
if (!hasResolved) {
|
||||
hasResolved = true
|
||||
resolve(null)
|
||||
}
|
||||
}, 3000)
|
||||
|
||||
const sub = relayPool
|
||||
.subscription(RELAYS, {
|
||||
kinds: [kind],
|
||||
authors: [pubkey],
|
||||
'#d': [dTag]
|
||||
})
|
||||
.pipe(onlyEvents(), mapEventsToStore(eventStore))
|
||||
.subscribe({
|
||||
complete: async () => {
|
||||
clearTimeout(timeout)
|
||||
if (!hasResolved) {
|
||||
hasResolved = true
|
||||
try {
|
||||
const event = await firstValueFrom(
|
||||
eventStore.replaceable(kind, pubkey, dTag)
|
||||
)
|
||||
if (event) {
|
||||
const content = parser(event)
|
||||
resolve(content || null)
|
||||
} else {
|
||||
resolve(null)
|
||||
}
|
||||
} catch (err) {
|
||||
resolve(null)
|
||||
}
|
||||
}
|
||||
},
|
||||
error: () => {
|
||||
clearTimeout(timeout)
|
||||
if (!hasResolved) {
|
||||
hasResolved = true
|
||||
resolve(null)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
setTimeout(() => {
|
||||
sub.unsubscribe()
|
||||
}, 3000)
|
||||
})
|
||||
// Start background sync (fire-and-forget; no timeout)
|
||||
relayPool
|
||||
.subscription(RELAYS, {
|
||||
kinds: [READING_PROGRESS_KIND],
|
||||
authors: [pubkey],
|
||||
'#d': [dTag]
|
||||
})
|
||||
.pipe(onlyEvents(), mapEventsToStore(eventStore))
|
||||
.subscribe()
|
||||
|
||||
return initial
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user