From 376d55cc81b4ecfaa104043cbc430e73c16871d5 Mon Sep 17 00:00:00 2001 From: pippellia-btc Date: Mon, 9 Jun 2025 16:36:27 +0200 Subject: [PATCH] refactored archiver and graph builder --- cmd/crawler/main.go | 2 +- cmd/sync/main.go | 8 +- pkg/config/config.go | 8 +- pkg/pipe/engine.go | 196 ++++++++++++++++++++++--------------------- 4 files changed, 109 insertions(+), 105 deletions(-) diff --git a/cmd/crawler/main.go b/cmd/crawler/main.go index 858ed63..466584b 100644 --- a/cmd/crawler/main.go +++ b/cmd/crawler/main.go @@ -47,7 +47,7 @@ func main() { if count == 0 { if len(config.InitPubkeys) == 0 { - panic("init pubkeys are empty") + panic("init pubkeys are empty: impossible to initialize") } log.Println("initialize from empty database...") diff --git a/cmd/sync/main.go b/cmd/sync/main.go index 932bb2d..981bc8f 100644 --- a/cmd/sync/main.go +++ b/cmd/sync/main.go @@ -22,7 +22,7 @@ import ( ) /* -This program syncronize the Redis database using the events already stored in the EventStore. +This program syncronize the Redis database to the events already stored in the event store. If Redis and the eventstore are already in sync, run the executable at /cmd/crawler/. */ @@ -51,11 +51,11 @@ func main() { } if count != 0 { - panic("refusing to run sync when redis is not empty") + panic("refuse to run sync when redis is not empty") } if len(config.InitPubkeys) == 0 { - panic("init pubkeys are empty") + panic("init pubkeys are empty: impossible to initialize") } log.Println("initialize from empty database...") @@ -108,7 +108,7 @@ func main() { consumers.Add(1) go func() { defer consumers.Done() - pipe.GraphUpdater(ctx, config.Engine, store, db, events) + pipe.GraphBuilder(ctx, config.Engine, store, db, events) }() producers.Wait() diff --git a/pkg/config/config.go b/pkg/config/config.go index e5cb856..1270d12 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -180,8 +180,8 @@ func Load() (*Config, error) { return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) } - case "ENGINE_UPDATER_CAPACITY": - config.Engine.UpdaterCapacity, err = strconv.Atoi(val) + case "ENGINE_BUILDER_CAPACITY": + config.Engine.BuilderCapacity, err = strconv.Atoi(val) if err != nil { return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) } @@ -192,8 +192,8 @@ func Load() (*Config, error) { return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) } - case "ENGINE_ARCHIVE_CAPACITY": - config.Engine.ArchiverCapacity, err = strconv.Atoi(val) + case "ENGINE_ARCHIVERS": + config.Engine.Archivers, err = strconv.Atoi(val) if err != nil { return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) } diff --git a/pkg/pipe/engine.go b/pkg/pipe/engine.go index 1fb0d77..8063ec5 100644 --- a/pkg/pipe/engine.go +++ b/pkg/pipe/engine.go @@ -10,47 +10,40 @@ import ( "github/pippellia-btc/crawler/pkg/walks" "log" "slices" - "sync/atomic" "time" "github.com/nbd-wtf/go-nostr" "github.com/vertex-lab/relay/pkg/eventstore" ) -// EventTracker tracks the number of events processed -var EventTracker atomic.Int32 - -var ErrUnsupportedKind = errors.New("unsupported event kind") - type EngineConfig struct { PrintEvery int - // for the GraphUpdater - UpdaterCapacity int + // GraphBuilder params + BuilderCapacity int CacheCapacity int - // for the archiveEngine - ArchiverCapacity int + // Archiver params + Archivers int } func NewEngineConfig() EngineConfig { return EngineConfig{ - PrintEvery: 5000, - UpdaterCapacity: 1000, - CacheCapacity: 100_000, - ArchiverCapacity: 1000, + PrintEvery: 5000, + BuilderCapacity: 1000, + CacheCapacity: 100_000, + Archivers: 4, } } func (c EngineConfig) Print() { fmt.Printf("Engine\n") fmt.Printf(" PrintEvery: %d\n", c.PrintEvery) - fmt.Printf(" UpdaterCapacity: %d\n", c.UpdaterCapacity) + fmt.Printf(" BuilderCapacity: %d\n", c.BuilderCapacity) fmt.Printf(" CacheCapacity: %d\n", c.CacheCapacity) - fmt.Printf(" ArchiveCapacity: %d\n", c.ArchiverCapacity) } -// Engine is responsible for dispacting the correct events to the [Archiver] or [GraphUpdater]. +// Engine is responsible for cohordinating the [Archiver] with the [GraphBuilder]. func Engine( ctx context.Context, config EngineConfig, @@ -58,48 +51,36 @@ func Engine( db redb.RedisDB, events chan *nostr.Event) { - defer log.Println("Engine: shutting down...") - - graphEvents := make(chan *nostr.Event, config.UpdaterCapacity) - archiveEvents := make(chan *nostr.Event, config.ArchiverCapacity) + graphEvents := make(chan *nostr.Event, config.BuilderCapacity) defer close(graphEvents) - defer close(archiveEvents) - - go GraphUpdater(ctx, config, store, db, graphEvents) - go Archiver(ctx, config, store, archiveEvents) + go GraphBuilder(ctx, config, store, db, graphEvents) log.Println("Engine: ready to process events") - for { - select { - case <-ctx.Done(): - return - - case event, ok := <-events: - if !ok { - return - } - - switch event.Kind { - case nostr.KindFollowList: - graphEvents <- event - - case nostr.KindProfileMetadata: - archiveEvents <- event - + Archiver(ctx, config, store, events, func(e *nostr.Event) error { + if e.Kind == nostr.KindFollowList { + select { + case graphEvents <- e: default: - logEvent(event, ErrUnsupportedKind) + return errors.New("channel is full") } } - } + return nil + }) + + log.Println("Engine: shutting down...") } -// Archiver consumes events that are not graph-related and stores them. +// Archiver stores events in the event store. func Archiver( ctx context.Context, config EngineConfig, store *eventstore.Store, - events chan *nostr.Event) { + events chan *nostr.Event, + onReplace func(*nostr.Event) error) { + + sem := make(chan struct{}, config.Archivers) + var processed int for { select { @@ -111,37 +92,56 @@ func Archiver( return } - err := func() error { - opctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() + sem <- struct{}{} + go func() { + err := archive(ctx, store, event, onReplace) + <-sem - switch { - case nostr.IsRegularKind(event.Kind): - return store.Save(opctx, event) - - case nostr.IsReplaceableKind(event.Kind): - _, err := store.Replace(opctx, event) - return err - - default: - return nil + if err != nil { + log.Printf("Archiver: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, err) } }() - if err != nil { - logEvent(event, err) - } - - processed := int(EventTracker.Add(1)) + processed++ if processed%config.PrintEvery == 0 { - log.Printf("Engine: processed %d events", processed) + log.Printf("Archiver: processed %d events", processed) } } } } -// GraphUpdater consumes events to update the graph and random walks. -func GraphUpdater( +// Archive an event based on its kind. +func archive( + ctx context.Context, + store *eventstore.Store, + event *nostr.Event, + onReplace func(*nostr.Event) error) error { + + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + switch { + case nostr.IsRegularKind(event.Kind): + return store.Save(ctx, event) + + case nostr.IsReplaceableKind(event.Kind): + replaced, err := store.Replace(ctx, event) + if err != nil { + return err + } + + if replaced { + return onReplace(event) + } + return nil + + default: + return nil + } +} + +// GraphBuilder consumes events to update the graph and random walks. +func GraphBuilder( ctx context.Context, config EngineConfig, store *eventstore.Store, @@ -154,6 +154,8 @@ func GraphUpdater( walks.WithLogFile("cache.log"), ) + var processed int + for { select { case <-ctx.Done(): @@ -165,44 +167,47 @@ func GraphUpdater( } err := func() error { - opctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + opctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - replaced, err := store.Replace(opctx, event) + delta, err := computeDelta(opctx, db, cache, event) if err != nil { return err } - if replaced { - return processFollowList(opctx, db, cache, event) + if err := updateWalks(opctx, db, cache, delta); err != nil { + return err } - return nil + + if err := db.Update(opctx, delta); err != nil { + return err + } + + return cache.Update(opctx, delta) }() if err != nil { - logEvent(event, err) + log.Printf("GraphBuilder: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, err) } - processed := int(EventTracker.Add(1)) + processed++ if processed%config.PrintEvery == 0 { - log.Printf("Engine: processed %d events", processed) + log.Printf("GraphBuilder: processed %d events", processed) } } } } -// processFollowList parses the pubkeys listed in the event, and uses them to: -// - update the follows of the author (db and cache) -// - update the author's random walks and signal the number to the [WalksTracker] -func processFollowList(ctx context.Context, db redb.RedisDB, cache *walks.CachedWalker, event *nostr.Event) error { +// Compute the delta from the "p" tags in the follow list. +func computeDelta(ctx context.Context, db redb.RedisDB, cache *walks.CachedWalker, event *nostr.Event) (graph.Delta, error) { author, err := db.NodeByKey(ctx, event.PubKey) if err != nil { - return err + return graph.Delta{}, fmt.Errorf("failed to compute delta: %w", err) } oldFollows, err := cache.Follows(ctx, author.ID) if err != nil { - return err + return graph.Delta{}, fmt.Errorf("failed to compute delta: %w", err) } pubkeys := parsePubkeys(event) @@ -214,35 +219,35 @@ func processFollowList(ctx context.Context, db redb.RedisDB, cache *walks.Cached newFollows, err := db.Resolve(ctx, pubkeys, onMissing) if err != nil { - return err + return graph.Delta{}, fmt.Errorf("failed to compute delta: %w", err) } - delta := graph.NewDelta(event.Kind, author.ID, oldFollows, newFollows) + return graph.NewDelta(event.Kind, author.ID, oldFollows, newFollows), nil +} + +// updateWalks uses the delta to update the random walks. +func updateWalks(ctx context.Context, db redb.RedisDB, cache *walks.CachedWalker, delta graph.Delta) error { if delta.Size() == 0 { - // old and new follows are the same, stop + // nothing to change, stop return nil } - visiting, err := db.WalksVisiting(ctx, author.ID, -1) + visiting, err := db.WalksVisiting(ctx, delta.Node, -1) if err != nil { - return err + return fmt.Errorf("failed to update walks: %w", err) } old, new, err := walks.ToUpdate(ctx, cache, delta, visiting) if err != nil { - return err + return fmt.Errorf("failed to update walks: %w", err) } if err := db.ReplaceWalks(ctx, old, new); err != nil { - return err - } - - if err := db.Update(ctx, delta); err != nil { - return err + return fmt.Errorf("failed to update walks: %w", err) } WalksTracker.Add(int32(len(new))) - return cache.Update(ctx, delta) + return nil } const ( @@ -279,9 +284,8 @@ func parsePubkeys(event *nostr.Event) []string { return unique(pubkeys) } -func logEvent(e *nostr.Event, extra any) { - msg := fmt.Sprintf("Engine: event ID %s, kind %d by %s: ", e.ID, e.Kind, e.PubKey) - log.Printf(msg+"%v", extra) +func logEvent(prefix string, e *nostr.Event, extra any) { + log.Printf("%s: event ID %s, kind %d by %s: %v", prefix, e.ID, e.Kind, e.PubKey, extra) } // Unique returns a slice of unique elements of the input slice.