From 30ed5fb4362554e9558a8a59b3f0ec0154d2f076 Mon Sep 17 00:00:00 2001 From: Gigi Date: Wed, 22 Oct 2025 00:12:34 +0200 Subject: [PATCH] fix: batch event hydration with concurrency limit - Replace merge(...map(eventLoader)) with mergeMap concurrency: 5 - Prevents overwhelming relays with 96+ simultaneous requests - EventLoader now properly throttles to 5 concurrent requests at a time - Fixes issue where only ~7 out of 96 events were being fetched --- src/services/bookmarkController.ts | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/services/bookmarkController.ts b/src/services/bookmarkController.ts index 21127bb0..17a799b0 100644 --- a/src/services/bookmarkController.ts +++ b/src/services/bookmarkController.ts @@ -3,7 +3,8 @@ import { Helpers, EventStore } from 'applesauce-core' import { createEventLoader, createAddressLoader } from 'applesauce-loaders/loaders' import { NostrEvent } from 'nostr-tools' import { EventPointer } from 'nostr-tools/nip19' -import { merge } from 'rxjs' +import { merge, from } from 'rxjs' +import { mergeMap } from 'rxjs/operators' import { queryEvents } from './dataFetch' import { KINDS } from '../config/kinds' import { RELAYS } from '../config/relays' @@ -140,8 +141,11 @@ class BookmarkController { // Convert IDs to EventPointers const pointers: EventPointer[] = unique.map(id => ({ id })) - // Use EventLoader - it auto-batches and streams results - merge(...pointers.map(this.eventLoader)).subscribe({ + // Use mergeMap with concurrency limit instead of merge to properly batch requests + // This prevents overwhelming relays with 96+ simultaneous requests + from(pointers).pipe( + mergeMap(pointer => this.eventLoader!(pointer), { concurrency: 5 }) + ).subscribe({ next: (event) => { // Check if hydration was cancelled if (this.hydrationGeneration !== generation) return @@ -193,8 +197,10 @@ class BookmarkController { identifier: c.identifier })) - // Use AddressLoader - it auto-batches and streams results - merge(...pointers.map(this.addressLoader)).subscribe({ + // Use mergeMap with concurrency limit instead of merge to properly batch requests + from(pointers).pipe( + mergeMap(pointer => this.addressLoader!(pointer), { concurrency: 5 }) + ).subscribe({ next: (event) => { // Check if hydration was cancelled if (this.hydrationGeneration !== generation) return