diff --git a/cmd/sync/main.go b/cmd/sync/main.go index 93d017b..e2ef258 100644 --- a/cmd/sync/main.go +++ b/cmd/sync/main.go @@ -37,6 +37,8 @@ func main() { panic(err) } + config.Fetcher.Kinds = []int{nostr.KindFollowList} // no need to sync other event kinds + events := make(chan *nostr.Event, config.EventsCapacity) pubkeys := make(chan string, config.PubkeysCapacity) @@ -81,7 +83,6 @@ func main() { go printStats(ctx, events, pubkeys) } - pipe.Kinds = []int{nostr.KindFollowList} // no need to sync other event kinds var wg sync.WaitGroup wg.Add(3) diff --git a/pkg/pipe/intake.go b/pkg/pipe/intake.go index cca8b3b..a7f7782 100644 --- a/pkg/pipe/intake.go +++ b/pkg/pipe/intake.go @@ -13,11 +13,6 @@ import ( ) var ( - Kinds = []int{ - nostr.KindProfileMetadata, - nostr.KindFollowList, - } - defaultRelays = []string{ "wss://purplepag.es", "wss://njump.me", @@ -51,12 +46,14 @@ var ( ) type FirehoseConfig struct { + Kinds []int Relays []string Offset time.Duration } func NewFirehoseConfig() FirehoseConfig { return FirehoseConfig{ + Kinds: []int{nostr.KindProfileMetadata, nostr.KindFollowList}, Relays: defaultRelays, Offset: time.Minute, } @@ -100,7 +97,7 @@ func (b *buffer) Contains(ID string) bool { return slices.Contains(b.IDs, ID) } -// Firehose connects to a list of relays and pulls [Kinds] events that are newer than [FirehoseConfig.Since]. +// Firehose connects to a list of relays and pulls config.Kinds events that are newer than [FirehoseConfig.Since]. // It discards events from unknown pubkeys as an anti-spam mechanism. func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, send func(*nostr.Event) error) { defer log.Println("Firehose: shutting down...") @@ -109,7 +106,7 @@ func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, s defer shutdown(pool) filter := nostr.Filter{ - Kinds: Kinds, + Kinds: config.Kinds, Since: config.Since(), } @@ -139,6 +136,7 @@ func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, s } type FetcherConfig struct { + Kinds []int Relays []string Batch int Interval time.Duration @@ -146,6 +144,7 @@ type FetcherConfig struct { func NewFetcherConfig() FetcherConfig { return FetcherConfig{ + Kinds: []int{nostr.KindProfileMetadata, nostr.KindFollowList}, Relays: defaultRelays, Batch: 100, Interval: time.Minute, @@ -168,9 +167,6 @@ func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, s batch := make([]string, 0, config.Batch) timer := time.After(config.Interval) - pool := nostr.NewSimplePool(ctx) - defer shutdown(pool) - for { select { case <-ctx.Done(): @@ -186,7 +182,7 @@ func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, s continue } - events, err := fetch(ctx, pool, config.Relays, batch) + events, err := fetch(ctx, config, batch) if err != nil && ctx.Err() == nil { log.Printf("Fetcher: %v", err) continue @@ -202,7 +198,7 @@ func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, s timer = time.After(config.Interval) case <-timer: - events, err := fetch(ctx, pool, config.Relays, batch) + events, err := fetch(ctx, config, batch) if err != nil && ctx.Err() == nil { log.Printf("Fetcher: %v", err) continue @@ -220,8 +216,8 @@ func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, s } } -// fetch queries the [Kinds] of the specified pubkeys. -func fetch(ctx context.Context, pool *nostr.SimplePool, relays, pubkeys []string) ([]*nostr.Event, error) { +// fetch queries the config.Relays for the config.Kinds of the specified pubkeys. +func fetch(ctx context.Context, config FetcherConfig, pubkeys []string) ([]*nostr.Event, error) { if len(pubkeys) == 0 { return nil, nil } @@ -229,13 +225,17 @@ func fetch(ctx context.Context, pool *nostr.SimplePool, relays, pubkeys []string ctx, cancel := context.WithTimeout(ctx, time.Second*15) defer cancel() + pool := nostr.NewSimplePool(ctx) + defer shutdown(pool) + filter := nostr.Filter{ - Kinds: Kinds, + Kinds: config.Kinds, Authors: pubkeys, + Limit: len(config.Kinds) * len(pubkeys), } - latest := make(map[string]*nostr.Event, len(pubkeys)*len(filter.Kinds)) - for event := range pool.FetchMany(ctx, relays, filter) { + latest := make(map[string]*nostr.Event, len(pubkeys)*len(config.Kinds)) + for event := range pool.FetchMany(ctx, config.Relays, filter) { key := fmt.Sprintf("%s:%d", event.PubKey, event.Kind) e, exists := latest[key] @@ -252,7 +252,7 @@ func fetch(ctx context.Context, pool *nostr.SimplePool, relays, pubkeys []string return events, nil } -// Fetcher extracts pubkeys from the channel and queries the store for their events: +// FetcherDB extracts pubkeys from the channel and queries the store for their events: // - when the batch is bigger than config.Batch // - after config.Interval since the last query. func FetcherDB( @@ -283,9 +283,9 @@ func FetcherDB( } filter := nostr.Filter{ - Kinds: Kinds, + Kinds: config.Kinds, Authors: batch, - Limit: len(Kinds) * len(batch), + Limit: len(config.Kinds) * len(batch), } events, err := store.Query(ctx, filter) @@ -308,9 +308,9 @@ func FetcherDB( } filter := nostr.Filter{ - Kinds: Kinds, + Kinds: config.Kinds, Authors: batch, - Limit: len(Kinds) * len(batch), + Limit: len(config.Kinds) * len(batch), } events, err := store.Query(ctx, filter) diff --git a/pkg/pipe/intake_test.go b/pkg/pipe/intake_test.go index 51b4329..c5fa8c3 100644 --- a/pkg/pipe/intake_test.go +++ b/pkg/pipe/intake_test.go @@ -28,15 +28,15 @@ func TestFirehose(t *testing.T) { } func TestFetch(t *testing.T) { - pool := nostr.NewSimplePool(ctx) pubkeys := []string{odell, calle, pip} + config := NewFetcherConfig() - events, err := fetch(ctx, pool, defaultRelays, pubkeys) + events, err := fetch(ctx, config, pubkeys) if err != nil { t.Fatalf("expected error nil, got %v", err) } - expected := len(pubkeys) * len(Kinds) + expected := len(pubkeys) * 2 if len(events) != expected { t.Fatalf("expected %d events, got %d", expected, len(events)) }