refactor: replace manual batching with applesauce EventLoader and AddressLoader

Replaced manual queryEvents batching with applesauce built-in loaders.

Key Changes:
- EventLoader for regular events (by ID) - auto-batches and streams
- AddressLoader for addressable events (coordinates) - handles kind batching
- Added EventStore instance to BookmarkController
- Initialize loaders in start() method
- hydrateByIds and hydrateByCoordinates now synchronous
- Removed manual chunk, IDS_BATCH_SIZE, etc.

Benefits:
- Follows applesauce best practices from examples
- More reliable (no manual timeout logic)
- Better performance (intelligent batching)
- Streaming results (progressive updates)
- Built-in deduplication via EventStore

Pattern: merge pointers through loader, subscribe to stream results.
This commit is contained in:
Gigi
2025-10-18 00:54:18 +02:00
parent b0fcb0e897
commit 61ae31c6a2

View File

@@ -1,6 +1,9 @@
import { RelayPool } from 'applesauce-relay'
import { Helpers } from 'applesauce-core'
import { Helpers, EventStore } from 'applesauce-core'
import { createEventLoader, createAddressLoader } from 'applesauce-loaders/loaders'
import { NostrEvent } from 'nostr-tools'
import { EventPointer } from 'nostr-tools/nip19'
import { merge } from 'rxjs'
import { queryEvents } from './dataFetch'
import { KINDS } from '../config/kinds'
import { collectBookmarksFromEvents } from './bookmarkProcessing'
@@ -12,22 +15,6 @@ import {
extractUrlsFromContent
} from './bookmarkHelpers'
// Batching constants
const IDS_BATCH_SIZE = 100
const D_TAG_BATCH_SIZE = 50
const AUTHORS_BATCH_SIZE = 50
/**
* Chunk array into smaller batches
*/
function chunk<T>(arr: T[], size: number): T[][] {
const result: T[][] = []
for (let i = 0; i < arr.length; i += size) {
result.push(arr.slice(i, i + size))
}
return result
}
/**
* Get unique key for event deduplication (from Debug)
*/
@@ -76,6 +63,11 @@ class BookmarkController {
}> = new Map()
private isLoading = false
private hydrationGeneration = 0
// Event loaders for efficient batching
private eventStore = new EventStore()
private eventLoader: ReturnType<typeof createEventLoader> | null = null
private addressLoader: ReturnType<typeof createAddressLoader> | null = null
onRawEvent(cb: RawEventCallback): () => void {
this.rawEventListeners.push(cb)
@@ -124,15 +116,19 @@ class BookmarkController {
}
/**
* Hydrate events by IDs in batches
* Hydrate events by IDs using EventLoader (auto-batching, streaming)
*/
private async hydrateByIds(
relayPool: RelayPool,
private hydrateByIds(
ids: string[],
idToEvent: Map<string, NostrEvent>,
onProgress: () => void,
generation: number
): Promise<void> {
): void {
if (!this.eventLoader) {
console.warn('[bookmark] ⚠️ EventLoader not initialized')
return
}
// Filter to unique IDs not already hydrated
const unique = Array.from(new Set(ids)).filter(id => !idToEvent.has(id))
if (unique.length === 0) {
@@ -140,120 +136,87 @@ class BookmarkController {
return
}
console.log('[bookmark] 🔧 Hydrating', unique.length, 'IDs in batches of', IDS_BATCH_SIZE)
console.log('[bookmark] 🔧 Hydrating', unique.length, 'IDs using EventLoader')
const batches = chunk(unique, IDS_BATCH_SIZE)
for (let i = 0; i < batches.length; i++) {
// Check if hydration was cancelled
if (this.hydrationGeneration !== generation) {
console.log('[bookmark] ⏹️ Hydration cancelled (generation changed)')
return
// Convert IDs to EventPointers
const pointers: EventPointer[] = unique.map(id => ({ id }))
// Use EventLoader - it auto-batches and streams results
merge(...pointers.map(this.eventLoader)).subscribe({
next: (event) => {
// Check if hydration was cancelled
if (this.hydrationGeneration !== generation) return
console.log('[bookmark] 📨 Received event:', event.id.slice(0, 8), 'kind:', event.kind)
idToEvent.set(event.id, event)
// Also index by coordinate for addressable events
if (event.kind && event.kind >= 30000 && event.kind < 40000) {
const dTag = event.tags?.find((t: string[]) => t[0] === 'd')?.[1] || ''
const coordinate = `${event.kind}:${event.pubkey}:${dTag}`
idToEvent.set(coordinate, event)
}
onProgress()
},
error: (error) => {
console.error('[bookmark] ❌ EventLoader error:', error)
},
complete: () => {
console.log('[bookmark] ✅ EventLoader completed')
}
const batch = batches[i]
console.log('[bookmark] 🔧 Fetching batch', i + 1, '/', batches.length, '(', batch.length, 'IDs )')
console.log('[bookmark] 🔧 First few IDs in batch:', batch.slice(0, 3))
try {
console.log('[bookmark] 🔧 Calling queryEvents...')
const events = await queryEvents(
relayPool,
{ ids: batch },
{
onEvent: (e: NostrEvent) => {
console.log('[bookmark] 📨 Received event:', e.id.slice(0, 8), 'kind:', e.kind)
idToEvent.set(e.id, e)
// Also index by coordinate for addressable events
if (e.kind && e.kind >= 30000 && e.kind < 40000) {
const dTag = e.tags?.find((t: string[]) => t[0] === 'd')?.[1] || ''
const coordinate = `${e.kind}:${e.pubkey}:${dTag}`
idToEvent.set(coordinate, e)
}
onProgress()
}
}
)
console.log('[bookmark] ✅ Batch', i + 1, 'completed with', events.length, 'events')
} catch (error) {
console.error('[bookmark] ❌ Batch', i + 1, 'failed:', error)
}
}
})
}
/**
* Hydrate addressable events by coordinates in batches
* Hydrate addressable events by coordinates using AddressLoader (auto-batching, streaming)
*/
private async hydrateByCoordinates(
relayPool: RelayPool,
private hydrateByCoordinates(
coords: Array<{ kind: number; pubkey: string; identifier: string }>,
idToEvent: Map<string, NostrEvent>,
onProgress: () => void,
generation: number
): Promise<void> {
): void {
if (!this.addressLoader) {
console.warn('[bookmark] ⚠️ AddressLoader not initialized')
return
}
if (coords.length === 0) return
console.log('[bookmark] 🔧 Hydrating', coords.length, 'coordinates by kind')
console.log('[bookmark] 🔧 Hydrating', coords.length, 'coordinates using AddressLoader')
// Group by kind for efficient queries
const byKind = new Map<number, Array<{ pubkey: string; identifier: string }>>()
coords.forEach(c => {
if (!byKind.has(c.kind)) byKind.set(c.kind, [])
byKind.get(c.kind)!.push({ pubkey: c.pubkey, identifier: c.identifier })
// Convert coordinates to AddressPointers
const pointers = coords.map(c => ({
kind: c.kind,
pubkey: c.pubkey,
identifier: c.identifier
}))
// Use AddressLoader - it auto-batches and streams results
merge(...pointers.map(this.addressLoader)).subscribe({
next: (event) => {
// Check if hydration was cancelled
if (this.hydrationGeneration !== generation) return
console.log('[bookmark] 📨 Received addressable event:', event.id.slice(0, 8), 'kind:', event.kind)
const dTag = event.tags?.find((t: string[]) => t[0] === 'd')?.[1] || ''
const coordinate = `${event.kind}:${event.pubkey}:${dTag}`
idToEvent.set(coordinate, event)
idToEvent.set(event.id, event)
onProgress()
},
error: (error) => {
console.error('[bookmark] ❌ AddressLoader error:', error)
},
complete: () => {
console.log('[bookmark] ✅ AddressLoader completed')
}
})
for (const [kind, items] of byKind.entries()) {
// Check if hydration was cancelled
if (this.hydrationGeneration !== generation) {
console.log('[bookmark] ⏹️ Hydration cancelled (generation changed)')
return
}
const authors = Array.from(new Set(items.map(i => i.pubkey)))
const identifiers = Array.from(new Set(items.map(i => i.identifier)))
console.log('[bookmark] 🔧 Kind', kind, ':', authors.length, 'authors ×', identifiers.length, 'identifiers')
// Batch authors and identifiers
const authorBatches = chunk(authors, AUTHORS_BATCH_SIZE)
const idBatches = chunk(identifiers, D_TAG_BATCH_SIZE)
for (const authorBatch of authorBatches) {
for (const idBatch of idBatches) {
// Check if hydration was cancelled
if (this.hydrationGeneration !== generation) {
console.log('[bookmark] ⏹️ Hydration cancelled (generation changed)')
return
}
console.log('[bookmark] 🔧 Fetching kind', kind, ':', authorBatch.length, 'authors ×', idBatch.length, 'identifiers')
try {
console.log('[bookmark] 🔧 Calling queryEvents for coordinates...')
const events = await queryEvents(
relayPool,
{ kinds: [kind], authors: authorBatch, '#d': idBatch },
{
onEvent: (e: NostrEvent) => {
console.log('[bookmark] 📨 Received coordinate event:', e.id.slice(0, 8), 'kind:', e.kind)
const dTag = e.tags?.find((t: string[]) => t[0] === 'd')?.[1] || ''
const coordinate = `${e.kind}:${e.pubkey}:${dTag}`
idToEvent.set(coordinate, e)
idToEvent.set(e.id, e)
onProgress()
}
}
)
console.log('[bookmark] ✅ Kind', kind, 'batch completed with', events.length, 'events')
} catch (error) {
console.error('[bookmark] ❌ Kind', kind, 'batch failed:', error)
}
}
}
}
}
private async buildAndEmitBookmarks(
relayPool: RelayPool,
activeAccount: AccountWithExtension,
signerCandidate: unknown
): Promise<void> {
@@ -380,12 +343,10 @@ class BookmarkController {
}
})
// Kick off batched hydration (sequential, fire-and-forget)
this.hydrateByIds(relayPool, noteIds, idToEvent, onProgress, generation)
.then(() => this.hydrateByCoordinates(relayPool, coordObjs, idToEvent, onProgress, generation))
.catch(error => {
console.warn('[bookmark] ⚠️ Hydration failed:', error)
})
// Kick off batched hydration (streaming, non-blocking)
// EventLoader and AddressLoader handle batching and streaming automatically
this.hydrateByIds(noteIds, idToEvent, onProgress, generation)
this.hydrateByCoordinates(coordObjs, idToEvent, onProgress, generation)
} catch (error) {
console.error('[bookmark] ❌ Failed to build bookmarks:', error)
console.error('[bookmark] ❌ Error details:', error instanceof Error ? error.message : String(error))
@@ -411,6 +372,11 @@ class BookmarkController {
// Increment generation to cancel any in-flight hydration
this.hydrationGeneration++
// Initialize loaders for this session
console.log('[bookmark] 🔧 Initializing EventLoader and AddressLoader')
this.eventLoader = createEventLoader(relayPool, { eventStore: this.eventStore })
this.addressLoader = createAddressLoader(relayPool, { eventStore: this.eventStore })
this.setLoading(true)
console.log('[bookmark] 🔍 Starting bookmark load for', account.pubkey.slice(0, 8))
@@ -449,7 +415,7 @@ class BookmarkController {
const isEncrypted = hasEncryptedContent(evt)
if (!isEncrypted) {
// For unencrypted events, build bookmarks immediately (progressive update)
this.buildAndEmitBookmarks(relayPool, maybeAccount, signerCandidate)
this.buildAndEmitBookmarks(maybeAccount, signerCandidate)
.catch(err => console.error('[bookmark] ❌ Failed to update after event:', err))
}
@@ -479,7 +445,7 @@ class BookmarkController {
)
// Rebuild bookmarks with newly decrypted content (progressive update)
this.buildAndEmitBookmarks(relayPool, maybeAccount, signerCandidate)
this.buildAndEmitBookmarks(maybeAccount, signerCandidate)
.catch(err => console.error('[bookmark] ❌ Failed to update after decrypt:', err))
})
.catch((error) => {
@@ -491,7 +457,7 @@ class BookmarkController {
)
// Final update after EOSE
await this.buildAndEmitBookmarks(relayPool, maybeAccount, signerCandidate)
await this.buildAndEmitBookmarks(maybeAccount, signerCandidate)
console.log('[bookmark] ✅ Bookmark load complete')
} catch (error) {
console.error('[bookmark] ❌ Failed to load bookmarks:', error)