diff --git a/cmd/crawl/main.go b/cmd/crawl/main.go index 9762ff8..83177e7 100644 --- a/cmd/crawl/main.go +++ b/cmd/crawl/main.go @@ -39,8 +39,9 @@ func main() { panic(err) } - events := make(chan *nostr.Event, config.EventsCapacity) - pubkeys := make(chan string, config.PubkeysCapacity) + db := redb.New(&redis.Options{ + Addr: config.RedisAddress, + }) store, err := store.New( config.SQLiteURL, @@ -50,86 +51,96 @@ func main() { panic(err) } - db := redb.New(&redis.Options{Addr: config.RedisAddress}) + recorderQueue := make(chan *nostr.Event, config.EventsCapacity) + engineQueue := make(chan *nostr.Event, config.EventsCapacity) + fetcherQueue := make(chan string, config.PubkeysCapacity) + nodes, err := db.NodeCount(ctx) if err != nil { panic(err) } if nodes == 0 { - if len(config.InitPubkeys) == 0 { - panic("init pubkeys are empty: impossible to initialize") + if err := initGraph(ctx, db, config.InitPubkeys); err != nil { + panic(err) } - log.Println("initialize from empty database...") - - initNodes := make([]graph.ID, len(config.InitPubkeys)) - for i, pk := range config.InitPubkeys { - initNodes[i], err = db.AddNode(ctx, pk) - if err != nil { - panic(err) - } - - pubkeys <- pk // add to queue + for _, pk := range config.InitPubkeys { + fetcherQueue <- pk } - - for _, node := range initNodes { - if err := pipe.Promote(db, node); err != nil { - panic(err) - } - } - log.Printf("correctly added %d init pubkeys", len(config.InitPubkeys)) } if config.PrintStats { - go printStats(ctx, events, pubkeys) + go printStats(ctx, recorderQueue, engineQueue, fetcherQueue) } var producers sync.WaitGroup var consumers sync.WaitGroup - producers.Add(3) + producers.Add(4) go func() { defer producers.Done() - pipe.Firehose(ctx, config.Firehose, db, enqueue(events)) + pipe.Firehose(ctx, config.Firehose, db, pipe.Send(recorderQueue)) }() go func() { defer producers.Done() - pipe.Fetcher(ctx, config.Fetcher, pubkeys, enqueue(events)) + pipe.Recorder(ctx, db, recorderQueue, pipe.Send(engineQueue)) }() go func() { defer producers.Done() - pipe.Arbiter(ctx, config.Arbiter, db, enqueue(pubkeys)) - close(pubkeys) // Arbiter is the only pubkey producer + pipe.Fetcher(ctx, config.Fetcher, fetcherQueue, pipe.Send(engineQueue)) + }() + + go func() { + defer producers.Done() + pipe.Arbiter(ctx, config.Arbiter, db, pipe.Send(fetcherQueue)) + close(fetcherQueue) // Arbiter is the only pubkey producer }() consumers.Add(1) go func() { defer consumers.Done() - pipe.Engine(ctx, config.Engine, store, db, events) + pipe.Engine(ctx, config.Engine, store, db, engineQueue) }() producers.Wait() - close(events) - + close(engineQueue) consumers.Wait() } -// enqueue things into the specified channel or return an error if full. -func enqueue[T any](queue chan T) func(t T) error { - return func(t T) error { - select { - case queue <- t: - return nil - default: - return fmt.Errorf("channel is full, dropping %v", t) +func initGraph(ctx context.Context, db redb.RedisDB, pubkeys []string) error { + if len(pubkeys) == 0 { + panic("init pubkeys are empty: impossible to initialize") + } + log.Println("initialize from empty database...") + + var initNodes = make([]graph.ID, len(pubkeys)) + var err error + + for i, pk := range pubkeys { + initNodes[i], err = db.AddNode(ctx, pk) + if err != nil { + panic(err) } } + + for _, node := range initNodes { + if err := pipe.Promote(db, node); err != nil { + panic(err) + } + } + + log.Printf("correctly added %d init pubkeys", len(pubkeys)) + return nil } -func printStats(ctx context.Context, events chan *nostr.Event, pubkeys chan string) { +func printStats( + ctx context.Context, + recorderQueue, engineQueue chan *nostr.Event, + fetcherQueue chan string, +) { filename := "stats.log" file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { @@ -153,8 +164,9 @@ func printStats(ctx context.Context, events chan *nostr.Event, pubkeys chan stri runtime.ReadMemStats(memStats) log.Println("---------------------------------------") - log.Printf("events queue: %d/%d\n", len(events), cap(events)) - log.Printf("pubkeys queue: %d/%d\n", len(pubkeys), cap(pubkeys)) + log.Printf("Recorder queue: %d/%d\n", len(recorderQueue), cap(recorderQueue)) + log.Printf("Engine queue: %d/%d\n", len(engineQueue), cap(engineQueue)) + log.Printf("Fetcher queue: %d/%d\n", len(fetcherQueue), cap(fetcherQueue)) log.Printf("walks tracker: %v\n", pipe.WalksTracker.Load()) log.Printf("goroutines: %d\n", goroutines) log.Printf("memory usage: %.2f MB\n", float64(memStats.Alloc)/(1024*1024))