From 60f33dd4fe5279f84917b57299527438e423540c Mon Sep 17 00:00:00 2001 From: pippellia-btc Date: Thu, 18 Sep 2025 16:09:20 +0200 Subject: [PATCH] refactored archiver: - now it stores events whose kind is approved - archiver and builder have separate configs --- cmd/sync/main.go | 2 +- pkg/config/config.go | 9 ++- pkg/pipe/engine.go | 131 +++++++++++++++++++++++++++++-------------- 3 files changed, 96 insertions(+), 46 deletions(-) diff --git a/cmd/sync/main.go b/cmd/sync/main.go index bae47a1..612984e 100644 --- a/cmd/sync/main.go +++ b/cmd/sync/main.go @@ -92,7 +92,7 @@ func main() { go func() { defer wg.Done() - pipe.GraphBuilder(ctx, config.Engine, db, builderQueue) + pipe.GraphBuilder(ctx, config.Engine.Builder, db, builderQueue) }() wg.Wait() diff --git a/pkg/config/config.go b/pkg/config/config.go index c013c36..e6833de 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -184,19 +184,22 @@ func Load() (*Config, error) { config.Arbiter.PromotionWait = time.Duration(wait) * time.Second case "ENGINE_PRINT_EVERY": - config.Engine.PrintEvery, err = strconv.Atoi(val) + printEvery, err := strconv.Atoi(val) if err != nil { return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) } + config.Engine.Archiver.PrintEvery = printEvery + config.Engine.Builder.PrintEvery = printEvery + case "ENGINE_BUILDER_CAPACITY": - config.Engine.BuilderCapacity, err = strconv.Atoi(val) + config.Engine.ChannelCapacity, err = strconv.Atoi(val) if err != nil { return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) } case "ENGINE_CACHE_CAPACITY": - config.Engine.CacheCapacity, err = strconv.Atoi(val) + config.Engine.Builder.CacheCapacity, err = strconv.Atoi(val) if err != nil { return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) } diff --git a/pkg/pipe/engine.go b/pkg/pipe/engine.go index 8f75e2f..d6a4233 100644 --- a/pkg/pipe/engine.go +++ b/pkg/pipe/engine.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "log" + "slices" "time" "github.com/pippellia-btc/nastro" @@ -17,26 +18,23 @@ import ( ) type EngineConfig struct { - PrintEvery int - - // GraphBuilder params - BuilderCapacity int - CacheCapacity int + Archiver ArchiverConfig + Builder GraphBuilderConfig + ChannelCapacity int } func NewEngineConfig() EngineConfig { return EngineConfig{ - PrintEvery: 5000, - BuilderCapacity: 1000, - CacheCapacity: 100_000, + Archiver: NewArchiverConfig(), + Builder: NewGraphBuilderConfig(), } } func (c EngineConfig) Print() { fmt.Printf("Engine\n") - fmt.Printf(" PrintEvery: %d\n", c.PrintEvery) - fmt.Printf(" BuilderCapacity: %d\n", c.BuilderCapacity) - fmt.Printf(" CacheCapacity: %d\n", c.CacheCapacity) + fmt.Printf(" ChannelCapacity: %d\n", c.ChannelCapacity) + c.Archiver.Print() + c.Builder.Print() } // Engine is responsible for cohordinating the [Archiver] with the [GraphBuilder]. @@ -47,26 +45,45 @@ func Engine( db redb.RedisDB, events chan *nostr.Event, ) { - graphEvents := make(chan *nostr.Event, config.BuilderCapacity) + graphEvents := make(chan *nostr.Event, config.ChannelCapacity) defer close(graphEvents) - go GraphBuilder(ctx, config, db, graphEvents) - - Archiver(ctx, config, store, events, func(e *nostr.Event) error { + sendFollowList := func(e *nostr.Event) error { if e.Kind == nostr.KindFollowList { return Send(graphEvents)(e) } return nil - }) + } + + go GraphBuilder(ctx, config.Builder, db, graphEvents) + Archiver(ctx, config.Archiver, store, events, sendFollowList) +} + +type ArchiverConfig struct { + Kinds []int + PrintEvery int +} + +func NewArchiverConfig() ArchiverConfig { + return ArchiverConfig{ + Kinds: profileKinds, + PrintEvery: 10_000, + } +} + +func (c ArchiverConfig) Print() { + fmt.Printf("Archiver\n") + fmt.Printf(" Kinds: %v\n", c.Kinds) + fmt.Printf(" PrintEvery: %d\n", c.PrintEvery) } // Archiver stores events in the event store. func Archiver( ctx context.Context, - config EngineConfig, + config ArchiverConfig, store nastro.Store, events chan *nostr.Event, - onReplace func(*nostr.Event) error, + onReplace Forward[*nostr.Event], ) { log.Println("Archiver: ready") defer log.Println("Archiver: shut down") @@ -83,30 +100,11 @@ func Archiver( return } - err := func() error { - opctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - switch { - case nostr.IsRegularKind(event.Kind): - return nil // do nothing - - case nostr.IsReplaceableKind(event.Kind): - replaced, err := store.Replace(opctx, event) - if err != nil { - return err - } - - if replaced { - return onReplace(event) - } - return nil - - default: - return nil - } - }() + if !slices.Contains(config.Kinds, event.Kind) { + continue + } + err := archive(ctx, store, event, onReplace) if err != nil && ctx.Err() == nil { log.Printf("Archiver: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, err) } @@ -119,10 +117,59 @@ func Archiver( } } +// Archive an event based on its kind. +// If a replacement happened, it calls the provided onReplace +func archive( + ctx context.Context, + store nastro.Store, + event *nostr.Event, + onReplace Forward[*nostr.Event], +) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + switch { + case nostr.IsRegularKind(event.Kind): + return store.Save(ctx, event) + + case nostr.IsReplaceableKind(event.Kind) || nostr.IsAddressableKind(event.Kind): + replaced, err := store.Replace(ctx, event) + if err != nil { + return err + } + + if replaced && onReplace != nil { + return onReplace(event) + } + return nil + + default: + return nil + } +} + +type GraphBuilderConfig struct { + CacheCapacity int + PrintEvery int +} + +func NewGraphBuilderConfig() GraphBuilderConfig { + return GraphBuilderConfig{ + CacheCapacity: 100_000, + PrintEvery: 10_000, + } +} + +func (c GraphBuilderConfig) Print() { + fmt.Printf("GraphBuilder\n") + fmt.Printf(" CacheCapacity: %v\n", c.CacheCapacity) + fmt.Printf(" PrintEvery: %d\n", c.PrintEvery) +} + // GraphBuilder consumes events to update the graph and random walks. func GraphBuilder( ctx context.Context, - config EngineConfig, + config GraphBuilderConfig, db redb.RedisDB, events chan *nostr.Event, ) {