diff --git a/pkg/config/config.go b/pkg/config/config.go index 6c8c745..c013c36 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -200,12 +200,6 @@ func Load() (*Config, error) { if err != nil { return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) } - - case "ENGINE_ARCHIVERS": - config.Engine.Archivers, err = strconv.Atoi(val) - if err != nil { - return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) - } } } diff --git a/pkg/pipe/engine.go b/pkg/pipe/engine.go index 9413e3a..eb09cb8 100644 --- a/pkg/pipe/engine.go +++ b/pkg/pipe/engine.go @@ -24,9 +24,6 @@ type EngineConfig struct { // GraphBuilder params BuilderCapacity int CacheCapacity int - - // Archiver params - Archivers int } func NewEngineConfig() EngineConfig { @@ -34,7 +31,6 @@ func NewEngineConfig() EngineConfig { PrintEvery: 5000, BuilderCapacity: 1000, CacheCapacity: 100_000, - Archivers: 4, } } @@ -56,8 +52,10 @@ func Engine( graphEvents := make(chan *nostr.Event, config.BuilderCapacity) defer close(graphEvents) - go GraphBuilder(ctx, config, db, graphEvents) log.Println("Engine: ready to process events") + defer log.Println("Engine: shutting down...") + + go GraphBuilder(ctx, config, db, graphEvents) Archiver(ctx, config, store, events, func(e *nostr.Event) error { if e.Kind == nostr.KindFollowList { @@ -69,8 +67,6 @@ func Engine( } return nil }) - - log.Println("Engine: shutting down...") } // Archiver stores events in the event store. @@ -81,7 +77,6 @@ func Archiver( events chan *nostr.Event, onReplace func(*nostr.Event) error) { - sem := make(chan struct{}, config.Archivers) var processed int for { @@ -94,16 +89,34 @@ func Archiver( return } - sem <- struct{}{} - go func() { - err := archive(ctx, store, event, onReplace) - <-sem + err := func() error { + opctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() - if err != nil { - log.Printf("Archiver: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, err) + switch { + case nostr.IsRegularKind(event.Kind): + return store.Save(opctx, event) + + case nostr.IsReplaceableKind(event.Kind): + replaced, err := store.Replace(opctx, event) + if err != nil { + return err + } + + if replaced { + return onReplace(event) + } + return nil + + default: + return nil } }() + if err != nil { + log.Printf("Archiver: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, err) + } + processed++ if processed%config.PrintEvery == 0 { log.Printf("Archiver: processed %d events", processed) @@ -112,36 +125,6 @@ func Archiver( } } -// Archive an event based on its kind. -func archive( - ctx context.Context, - store *eventstore.Store, - event *nostr.Event, - onReplace func(*nostr.Event) error) error { - - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) - defer cancel() - - switch { - case nostr.IsRegularKind(event.Kind): - return store.Save(ctx, event) - - case nostr.IsReplaceableKind(event.Kind): - replaced, err := store.Replace(ctx, event) - if err != nil { - return err - } - - if replaced { - return onReplace(event) - } - return nil - - default: - return nil - } -} - // GraphBuilder consumes events to update the graph and random walks. func GraphBuilder( ctx context.Context,