diff --git a/cmd/sync/main.go b/cmd/sync/main.go index 84e65a7..e3572eb 100644 --- a/cmd/sync/main.go +++ b/cmd/sync/main.go @@ -39,15 +39,18 @@ func main() { config.Fetcher.Kinds = []int{nostr.KindFollowList} // no need to sync other event kinds - events := make(chan *nostr.Event, config.EventsCapacity) - pubkeys := make(chan string, config.PubkeysCapacity) + db := redb.New(&redis.Options{ + Addr: config.RedisAddress, + }) store, err := store.New(config.SQLiteURL) if err != nil { panic(err) } - db := redb.New(&redis.Options{Addr: config.RedisAddress}) + builderQueue := make(chan *nostr.Event, config.EventsCapacity) + fetcherQueue := make(chan string, config.PubkeysCapacity) + nodes, err := db.NodeCount(ctx) if err != nil { panic(err) @@ -58,7 +61,7 @@ func main() { } if len(config.InitPubkeys) == 0 { - panic("init pubkeys are empty: impossible to initialize") + panic("init fetcherQueue are empty: impossible to initialize") } log.Println("initialize from empty database...") @@ -69,7 +72,7 @@ func main() { panic(err) } - pubkeys <- pk // add to queue + fetcherQueue <- pk // add to queue } for _, node := range initNodes { @@ -77,10 +80,10 @@ func main() { panic(err) } } - log.Printf("correctly added %d init pubkeys", len(config.InitPubkeys)) + log.Printf("correctly added %d init fetcherQueue", len(config.InitPubkeys)) if config.PrintStats { - go printStats(ctx, events, pubkeys) + go printStats(ctx, builderQueue, fetcherQueue) } var wg sync.WaitGroup @@ -88,37 +91,29 @@ func main() { go func() { defer wg.Done() - pipe.FetcherDB(ctx, config.Fetcher, store, pubkeys, enqueue(events)) - close(events) // FetcherDB is the only event producer + pipe.FetcherDB(ctx, config.Fetcher, store, fetcherQueue, pipe.Send(builderQueue)) + close(builderQueue) // FetcherDB is the only event producer }() go func() { defer wg.Done() - pipe.Arbiter(ctx, config.Arbiter, db, enqueue(pubkeys)) - close(pubkeys) // Arbiter is the only pubkey producer + pipe.Arbiter(ctx, config.Arbiter, db, pipe.Send(fetcherQueue)) + close(fetcherQueue) // Arbiter is the only pubkey producer }() go func() { defer wg.Done() - pipe.GraphBuilder(ctx, config.Engine, db, events) + pipe.GraphBuilder(ctx, config.Engine, db, builderQueue) }() wg.Wait() } -// enqueue things into the specified channel or return an error if full. -func enqueue[T any](queue chan T) func(t T) error { - return func(t T) error { - select { - case queue <- t: - return nil - default: - return fmt.Errorf("channel is full, dropping %v", t) - } - } -} - -func printStats(ctx context.Context, events chan *nostr.Event, pubkeys chan string) { +func printStats( + ctx context.Context, + builderQueue chan *nostr.Event, + fetcherQueue chan string, +) { filename := "stats.log" file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { @@ -142,8 +137,8 @@ func printStats(ctx context.Context, events chan *nostr.Event, pubkeys chan stri runtime.ReadMemStats(memStats) log.Println("---------------------------------------") - log.Printf("events queue: %d/%d\n", len(events), cap(events)) - log.Printf("pubkeys queue: %d/%d\n", len(pubkeys), cap(pubkeys)) + log.Printf("GraphBuilder queue: %d/%d\n", len(builderQueue), cap(builderQueue)) + log.Printf("FetcherDB queue: %d/%d\n", len(fetcherQueue), cap(fetcherQueue)) log.Printf("walks tracker: %v\n", pipe.WalksTracker.Load()) log.Printf("goroutines: %d\n", goroutines) log.Printf("memory usage: %.2f MB\n", float64(memStats.Alloc)/(1024*1024)) diff --git a/pkg/pipe/engine.go b/pkg/pipe/engine.go index 7544daf..a8b33b5 100644 --- a/pkg/pipe/engine.go +++ b/pkg/pipe/engine.go @@ -3,7 +3,6 @@ package pipe import ( "context" - "errors" "fmt" "log" "time" @@ -48,22 +47,14 @@ func Engine( db redb.RedisDB, events chan *nostr.Event, ) { - graphEvents := make(chan *nostr.Event, config.BuilderCapacity) defer close(graphEvents) - log.Println("Engine: ready to process events") - defer log.Println("Engine: shut down") - go GraphBuilder(ctx, config, db, graphEvents) Archiver(ctx, config, store, events, func(e *nostr.Event) error { if e.Kind == nostr.KindFollowList { - select { - case graphEvents <- e: - default: - return errors.New("channel is full") - } + return Send(graphEvents)(e) } return nil }) @@ -77,6 +68,8 @@ func Archiver( events chan *nostr.Event, onReplace func(*nostr.Event) error, ) { + log.Println("Archiver: ready") + defer log.Println("Archiver: shut down") var processed int @@ -133,6 +126,8 @@ func GraphBuilder( db redb.RedisDB, events chan *nostr.Event, ) { + log.Println("GraphBuilder: ready") + defer log.Println("GraphBuilder: shut down") cache := walks.NewWalker( walks.WithCapacity(config.CacheCapacity), @@ -151,6 +146,11 @@ func GraphBuilder( return } + if event.Kind != nostr.KindFollowList { + log.Printf("GraphBuilder: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, "unexpected kind") + continue + } + err := func() error { opctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/pkg/pipe/firehose.go b/pkg/pipe/firehose.go index 3efad8d..bf8d479 100644 --- a/pkg/pipe/firehose.go +++ b/pkg/pipe/firehose.go @@ -77,6 +77,19 @@ type PubkeyChecker interface { type Forward[T any] func(T) error +// Send returns a [Forward] function that will attempt to send values into the given channel. +// It returns an error if the channel is full. +func Send[T any](ch chan T) Forward[T] { + return func(t T) error { + select { + case ch <- t: + return nil + default: + return fmt.Errorf("channel is full, dropping %v", t) + } + } +} + // Firehose connects to a list of relays and pulls config.Kinds events that are newer than config.Since. // It deduplicate events using a simple ring-buffer. // It discards events from unknown pubkeys as an anti-spam mechanism. @@ -87,6 +100,7 @@ func Firehose( check PubkeyChecker, forward Forward[*nostr.Event], ) { + log.Println("Firehose: ready") defer log.Println("Firehose: shut down") pool := nostr.NewSimplePool(ctx)