diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go index c8c0813..ec4a116 100644 --- a/pkg/pipe/pipe.go +++ b/pkg/pipe/pipe.go @@ -100,7 +100,7 @@ func (b *buffer) Contains(ID string) bool { // Firehose connects to a list of relays and pulls [relevantKinds] events that are newer than [FirehoseConfig.Since]. // It discards events from unknown pubkeys as an anti-spam mechanism. -func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, send func(e *nostr.Event) error) { +func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, send func(*nostr.Event) error) { pool := nostr.NewSimplePool(ctx) defer close(pool) @@ -111,7 +111,6 @@ func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, s seen := newBuffer(2048) for event := range pool.SubscribeMany(ctx, config.Relays, filter) { - if seen.Contains(event.ID) { // event already seen, skip continue @@ -135,6 +134,114 @@ func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, s } } +type FetcherConfig struct { + Relays []string + Batch int + Interval time.Duration +} + +func NewFetcherConfig() FetcherConfig { + return FetcherConfig{ + Relays: defaultRelays, + Batch: 100, + Interval: time.Minute, + } +} + +func (c FetcherConfig) Print() { + fmt.Printf("Fetcher\n") + fmt.Printf(" Relays: %v\n", c.Relays) + fmt.Printf(" Batch: %d\n", c.Batch) + fmt.Printf(" Interval: %v\n", c.Interval) +} + +// Fetcher extracts pubkeys from the channel and queries for their events when either: +// - the batch is bigger than config.Batch +// - after config.Interval since the last query. +func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, send func(*nostr.Event) error) { + batch := make([]string, 0, config.Batch) + timer := time.After(config.Interval) + + pool := nostr.NewSimplePool(ctx) + defer close(pool) + + for { + select { + case <-ctx.Done(): + return + + case pubkey := <-pubkeys: + batch = append(batch, pubkey) + if len(batch) < config.Batch { + continue + } + + events, err := fetch(ctx, pool, config.Relays, batch) + if err != nil { + log.Printf("Fetcher: %v", err) + continue + } + + for _, event := range events { + if err := send(event); err != nil { + log.Printf("Fetcher: %v", err) + } + } + + batch = make([]string, 0, config.Batch) + timer = time.After(config.Interval) + + case <-timer: + events, err := fetch(ctx, pool, config.Relays, batch) + if err != nil { + log.Printf("Fetcher: %v", err) + continue + } + + for _, event := range events { + if err := send(event); err != nil { + log.Printf("Fetcher: %v", err) + } + } + + batch = make([]string, 0, config.Batch) + timer = time.After(config.Interval) + } + } +} + +// fetch queries the [relevantKinds] of the specified pubkeys. +func fetch(ctx context.Context, pool *nostr.SimplePool, relays, pubkeys []string) ([]*nostr.Event, error) { + if len(pubkeys) == 0 { + return nil, nil + } + + ctx, cancel := context.WithTimeout(ctx, time.Second*15) + defer cancel() + + filter := nostr.Filter{ + Kinds: relevantKinds, + Authors: pubkeys, + } + + latest := make(map[string]*nostr.Event, len(pubkeys)*len(filter.Kinds)) + for event := range pool.FetchMany(ctx, relays, filter) { + + key := fmt.Sprintf("%s:%d", event.PubKey, event.Kind) + e, exists := latest[key] + if !exists || event.CreatedAt > e.CreatedAt { + latest[key] = event.Event + } + } + + events := make([]*nostr.Event, 0, len(latest)) + for _, event := range latest { + events = append(events, event) + } + + return events, nil +} + // Close iterates over the relays in the pool and closes all connections. func close(pool *nostr.SimplePool) { pool.Relays.Range(func(_ string, relay *nostr.Relay) bool { diff --git a/pkg/pipe/pipe_test.go b/pkg/pipe/pipe_test.go index 6d4c16e..57a41f7 100644 --- a/pkg/pipe/pipe_test.go +++ b/pkg/pipe/pipe_test.go @@ -12,7 +12,9 @@ import ( var ( ctx = context.Background() - pip = "f683e87035f7ad4f44e0b98cfbd9537e16455a92cd38cefc4cb31db7557f5ef2" + odell string = "04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9" + calle string = "50d94fc2d8580c682b071a542f8b1e31a200b0508bab95a33bef0855df281d63" + pip string = "f683e87035f7ad4f44e0b98cfbd9537e16455a92cd38cefc4cb31db7557f5ef2" ) // Manually change pip's follow list and see if the events gets printed. Works only with `go test` @@ -22,9 +24,25 @@ func TestFirehose(t *testing.T) { checker := mockChecker{pubkey: pip} config := FirehoseConfig{Relays: defaultRelays} + Firehose(ctx, config, checker, print) } +func TestFetch(t *testing.T) { + pool := nostr.NewSimplePool(ctx) + pubkeys := []string{odell, calle, pip} + + events, err := fetch(ctx, pool, defaultRelays, pubkeys) + if err != nil { + t.Fatalf("expected error nil, got %v", err) + } + + expected := len(pubkeys) * len(relevantKinds) + if len(events) != expected { + t.Fatalf("expected %d events, got %d", expected, len(events)) + } +} + type mockChecker struct { pubkey string }