From 815e79282f1a2b668657c3c8e293c76ef548f1a9 Mon Sep 17 00:00:00 2001 From: pippellia-btc Date: Fri, 19 Sep 2025 16:06:31 +0200 Subject: [PATCH] renamed GraphBuilder to Grapher for simplicity --- cmd/sync/main.go | 14 ++++++------ pkg/pipe/arbiter.go | 2 +- pkg/pipe/engine.go | 53 +++++++++++++++++++++++++-------------------- 3 files changed, 37 insertions(+), 32 deletions(-) diff --git a/cmd/sync/main.go b/cmd/sync/main.go index 685711f..17d9765 100644 --- a/cmd/sync/main.go +++ b/cmd/sync/main.go @@ -47,7 +47,7 @@ func main() { panic(err) } - builderQueue := make(chan *nostr.Event, config.ChannelCapacity) + grapherQueue := make(chan *nostr.Event, config.ChannelCapacity) fetcherQueue := make(chan string, config.ChannelCapacity) nodes, err := db.NodeCount(ctx) @@ -72,7 +72,7 @@ func main() { log.Printf("correctly added %d pubkeys", len(config.InitPubkeys)) if config.PrintStats { - go printStats(ctx, builderQueue, fetcherQueue) + go printStats(ctx, grapherQueue, fetcherQueue) } var wg sync.WaitGroup @@ -80,8 +80,8 @@ func main() { go func() { defer wg.Done() - pipe.FetcherDB(ctx, config.Fetcher, store, fetcherQueue, pipe.Send(builderQueue)) - close(builderQueue) // FetcherDB is the only event producer + pipe.FetcherDB(ctx, config.Fetcher, store, fetcherQueue, pipe.Send(grapherQueue)) + close(grapherQueue) // FetcherDB is the only event producer }() go func() { @@ -92,7 +92,7 @@ func main() { go func() { defer wg.Done() - pipe.GraphBuilder(ctx, config.Engine.Builder, db, builderQueue) + pipe.Grapher(ctx, config.Engine.Grapher, db, grapherQueue) }() wg.Wait() @@ -100,7 +100,7 @@ func main() { func printStats( ctx context.Context, - builderQueue chan *nostr.Event, + grapherQueue chan *nostr.Event, fetcherQueue chan string, ) { filename := "stats.log" @@ -126,7 +126,7 @@ func printStats( runtime.ReadMemStats(memStats) log.Println("---------------------------------------") - log.Printf("GraphBuilder queue: %d/%d\n", len(builderQueue), cap(builderQueue)) + log.Printf("Grapher queue: %d/%d\n", len(grapherQueue), cap(grapherQueue)) log.Printf("FetcherDB queue: %d/%d\n", len(fetcherQueue), cap(fetcherQueue)) log.Printf("walks tracker: %v\n", pipe.WalksTracker.Load()) log.Printf("goroutines: %d\n", goroutines) diff --git a/pkg/pipe/arbiter.go b/pkg/pipe/arbiter.go index a4f5072..a456cd8 100644 --- a/pkg/pipe/arbiter.go +++ b/pkg/pipe/arbiter.go @@ -14,7 +14,7 @@ import ( "github.com/vertex-lab/crawler_v2/pkg/walks" ) -// WalksTracker tracks the number of walks that have been updated by the [GraphBuilder]. +// WalksTracker tracks the number of walks that have been updated by the [Grapher]. // It's used to wake-up the [Arbiter], which performs work and then resets it to 0. var WalksTracker atomic.Int32 diff --git a/pkg/pipe/engine.go b/pkg/pipe/engine.go index 6e4d6fe..a26a78c 100644 --- a/pkg/pipe/engine.go +++ b/pkg/pipe/engine.go @@ -19,25 +19,29 @@ import ( ) type EngineConfig struct { - Archiver ArchiverConfig - Builder GraphBuilderConfig ChannelCapacity int `envconfig:"CHANNEL_CAPACITY"` + Archiver ArchiverConfig + Grapher GrapherConfig } func NewEngineConfig() EngineConfig { return EngineConfig{ Archiver: NewArchiverConfig(), - Builder: NewGraphBuilderConfig(), + Grapher: NewGrapherConfig(), } } func (c EngineConfig) Validate() error { + if c.ChannelCapacity < 0 { + return errors.New("channel capacity cannot be negative") + } + if err := c.Archiver.Validate(); err != nil { return fmt.Errorf("Archiver: %w", err) } - if err := c.Builder.Validate(); err != nil { - return fmt.Errorf("GraphBuilder: %w", err) + if err := c.Grapher.Validate(); err != nil { + return fmt.Errorf("Grapher: %w", err) } return nil @@ -47,10 +51,10 @@ func (c EngineConfig) Print() { fmt.Printf("Engine:\n") fmt.Printf(" ChannelCapacity: %d\n", c.ChannelCapacity) c.Archiver.Print() - c.Builder.Print() + c.Grapher.Print() } -// Engine is responsible for cohordinating the [Archiver] with the [GraphBuilder]. +// Engine is responsible for cohordinating the [Archiver] with the [Grapher]. func Engine( ctx context.Context, config EngineConfig, @@ -68,13 +72,13 @@ func Engine( return nil } - go GraphBuilder(ctx, config.Builder, db, graphEvents) + go Grapher(ctx, config.Grapher, db, graphEvents) Archiver(ctx, config.Archiver, store, events, sendFollowList) } type ArchiverConfig struct { Kinds []int `envconfig:"ARCHIVER_KINDS"` - PrintEvery int `envconfig:"ENGINE_PRINT_EVERY"` + PrintEvery int `envconfig:"ARCHIVER_PRINT_EVERY"` } func NewArchiverConfig() ArchiverConfig { @@ -101,7 +105,8 @@ func (c ArchiverConfig) Print() { fmt.Printf(" PrintEvery: %d\n", c.PrintEvery) } -// Archiver stores events in the event store. +// Archiver stores events in the event store according to their kind. +// If a replacement happened, it calls the provided onReplace function. func Archiver( ctx context.Context, config ArchiverConfig, @@ -172,19 +177,19 @@ func archive( } } -type GraphBuilderConfig struct { +type GrapherConfig struct { CacheCapacity int `envconfig:"ENGINE_CACHE_CAPACITY"` PrintEvery int `envconfig:"ENGINE_PRINT_EVERY"` } -func NewGraphBuilderConfig() GraphBuilderConfig { - return GraphBuilderConfig{ +func NewGrapherConfig() GrapherConfig { + return GrapherConfig{ CacheCapacity: 100_000, PrintEvery: 10_000, } } -func (c GraphBuilderConfig) Validate() error { +func (c GrapherConfig) Validate() error { if c.CacheCapacity < 0 { return errors.New("cache capacity cannot be negative") } @@ -195,21 +200,21 @@ func (c GraphBuilderConfig) Validate() error { return nil } -func (c GraphBuilderConfig) Print() { - fmt.Printf("GraphBuilder:\n") +func (c GrapherConfig) Print() { + fmt.Printf("Grapher:\n") fmt.Printf(" CacheCapacity: %v\n", c.CacheCapacity) fmt.Printf(" PrintEvery: %d\n", c.PrintEvery) } -// GraphBuilder consumes events to update the graph and random walks. -func GraphBuilder( +// Grapher consumes events to update the graph and random walks. +func Grapher( ctx context.Context, - config GraphBuilderConfig, + config GrapherConfig, db redb.RedisDB, events chan *nostr.Event, ) { - log.Println("GraphBuilder: ready") - defer log.Println("GraphBuilder: shut down") + log.Println("Grapher: ready") + defer log.Println("Grapher: shut down") cache := walks.NewWalker( walks.WithCapacity(config.CacheCapacity), @@ -229,7 +234,7 @@ func GraphBuilder( } if event.Kind != nostr.KindFollowList { - log.Printf("GraphBuilder: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, "unexpected kind") + log.Printf("Grapher: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, "unexpected kind") continue } @@ -254,12 +259,12 @@ func GraphBuilder( }() if err != nil && ctx.Err() == nil { - log.Printf("GraphBuilder: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, err) + log.Printf("Grapher: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, err) } processed++ if processed%config.PrintEvery == 0 { - log.Printf("GraphBuilder: processed %d events", processed) + log.Printf("Grapher: processed %d events", processed) } } }