diff --git a/cmd/crawler.go b/cmd/crawler.go index ea31610..11b1687 100644 --- a/cmd/crawler.go +++ b/cmd/crawler.go @@ -42,7 +42,7 @@ func main() { } if count == 0 { - log.Println("initializing crawler from empty database") + log.Println("initialize from empty database") nodes := make([]graph.ID, len(config.InitPubkeys)) for i, pk := range config.InitPubkeys { @@ -67,46 +67,29 @@ func main() { } var wg sync.WaitGroup - wg.Add(3) + wg.Add(4) go func() { defer wg.Done() - pipe.Firehose(ctx, config.Firehose, db, func(event *nostr.Event) error { - select { - case events <- event: - default: - log.Printf("Firehose: channel is full, dropping event ID %s by %s", event.ID, event.PubKey) - } - return nil - }) + pipe.Firehose(ctx, config.Firehose, db, enqueue(events)) }() go func() { defer wg.Done() - pipe.Fetcher(ctx, config.Fetcher, pubkeys, func(event *nostr.Event) error { - select { - case events <- event: - default: - log.Printf("Fetcher: channel is full, dropping event ID %s by %s", event.ID, event.PubKey) - } - return nil - }) + pipe.Fetcher(ctx, config.Fetcher, pubkeys, enqueue(events)) }() go func() { defer wg.Done() - pipe.Arbiter(ctx, config.Arbiter, db, func(pubkey string) error { - select { - case pubkeys <- pubkey: - default: - log.Printf("Arbiter: channel is full, dropping pubkey %s", pubkey) - } - return nil - }) + pipe.Arbiter(ctx, config.Arbiter, db, enqueue(pubkeys)) + }() + + go func() { + defer wg.Done() + pipe.Processor(ctx, config.Processor, db, events) }() go printStats(ctx) - pipe.Processor(ctx, config.Processor, db, events) wg.Wait() } @@ -120,6 +103,18 @@ func handleSignals(cancel context.CancelFunc) { cancel() } +// enqueue things into channels +func enqueue[T any](queue chan T) func(t T) error { + return func(t T) error { + select { + case queue <- t: + default: + log.Printf("channel is full, dropping %v", t) + } + return nil + } +} + func printStats(ctx context.Context) { filename := "stats.log" file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)