diff --git a/cmd/crawl/main.go b/cmd/crawl/main.go index 86ccc4c..7e2704e 100644 --- a/cmd/crawl/main.go +++ b/cmd/crawl/main.go @@ -89,7 +89,7 @@ func main() { go func() { defer producers.Done() - pipe.Recorder(ctx, db, recorderQueue, pipe.Send(engineQueue)) + pipe.Recorder(ctx, recorderQueue, db, pipe.Send(engineQueue)) }() go func() { @@ -106,7 +106,7 @@ func main() { consumers.Add(1) go func() { defer consumers.Done() - pipe.Engine(ctx, config.Engine, store, db, engineQueue) + pipe.Engine(ctx, config.Engine, engineQueue, store, db) }() producers.Wait() diff --git a/cmd/sync/main.go b/cmd/sync/main.go index 17d9765..ca66251 100644 --- a/cmd/sync/main.go +++ b/cmd/sync/main.go @@ -80,7 +80,7 @@ func main() { go func() { defer wg.Done() - pipe.FetcherDB(ctx, config.Fetcher, store, fetcherQueue, pipe.Send(grapherQueue)) + pipe.FetcherDB(ctx, config.Fetcher, fetcherQueue, store, pipe.Send(grapherQueue)) close(grapherQueue) // FetcherDB is the only event producer }() @@ -92,7 +92,7 @@ func main() { go func() { defer wg.Done() - pipe.Grapher(ctx, config.Engine.Grapher, db, grapherQueue) + pipe.Grapher(ctx, config.Engine.Grapher, grapherQueue, db) }() wg.Wait() diff --git a/pkg/pipe/engine.go b/pkg/pipe/engine.go index a26a78c..49146d6 100644 --- a/pkg/pipe/engine.go +++ b/pkg/pipe/engine.go @@ -58,22 +58,22 @@ func (c EngineConfig) Print() { func Engine( ctx context.Context, config EngineConfig, + events chan *nostr.Event, store nastro.Store, db redb.RedisDB, - events chan *nostr.Event, ) { graphEvents := make(chan *nostr.Event, config.ChannelCapacity) defer close(graphEvents) - sendFollowList := func(e *nostr.Event) error { + sendGraphEvents := func(e *nostr.Event) error { if e.Kind == nostr.KindFollowList { return Send(graphEvents)(e) } return nil } - go Grapher(ctx, config.Grapher, db, graphEvents) - Archiver(ctx, config.Archiver, store, events, sendFollowList) + go Grapher(ctx, config.Grapher, graphEvents, db) + Archiver(ctx, config.Archiver, events, store, sendGraphEvents) } type ArchiverConfig struct { @@ -110,8 +110,8 @@ func (c ArchiverConfig) Print() { func Archiver( ctx context.Context, config ArchiverConfig, - store nastro.Store, events chan *nostr.Event, + store nastro.Store, onReplace Forward[*nostr.Event], ) { log.Println("Archiver: ready") @@ -133,7 +133,7 @@ func Archiver( continue } - err := archive(ctx, store, event, onReplace) + err := archive(ctx, event, store, onReplace) if err != nil && ctx.Err() == nil { log.Printf("Archiver: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, err) } @@ -150,8 +150,8 @@ func Archiver( // If a replacement happened, it calls the provided onReplace func archive( ctx context.Context, - store nastro.Store, event *nostr.Event, + store nastro.Store, onReplace Forward[*nostr.Event], ) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) @@ -210,8 +210,8 @@ func (c GrapherConfig) Print() { func Grapher( ctx context.Context, config GrapherConfig, - db redb.RedisDB, events chan *nostr.Event, + db redb.RedisDB, ) { log.Println("Grapher: ready") defer log.Println("Grapher: shut down") diff --git a/pkg/pipe/fetcher.go b/pkg/pipe/fetcher.go index 4c7a73f..f46bfd7 100644 --- a/pkg/pipe/fetcher.go +++ b/pkg/pipe/fetcher.go @@ -158,8 +158,8 @@ func fetch(ctx context.Context, config FetcherConfig, pubkeys []string) ([]*nost func FetcherDB( ctx context.Context, config FetcherConfig, - store nastro.Store, pubkeys <-chan string, + store nastro.Store, forward Forward[*nostr.Event], ) { log.Println("FetcherDB: ready") diff --git a/pkg/pipe/recorder.go b/pkg/pipe/recorder.go index a9dd418..8b14bcb 100644 --- a/pkg/pipe/recorder.go +++ b/pkg/pipe/recorder.go @@ -16,8 +16,8 @@ import ( func Recorder( ctx context.Context, - db redb.RedisDB, events <-chan *nostr.Event, + db redb.RedisDB, forward Forward[*nostr.Event], ) { log.Println("Recorder: ready") @@ -35,7 +35,7 @@ func Recorder( return } - err := recordEvent(ctx, db, event) + err := recordEvent(ctx, event, db) if err != nil && ctx.Err() == nil { log.Printf("Recorder: %v", err) } @@ -84,7 +84,7 @@ func events(day string, k int) string { return KeyEvents + separator + day + se // - add event.ID to the events::kind: // - add pubkey to the active_pubkeys: // - add pubkey to the creator_pubkeys: if event is in [contentKinds] -func recordEvent(ctx context.Context, db redb.RedisDB, event *nostr.Event) error { +func recordEvent(ctx context.Context, event *nostr.Event, db redb.RedisDB) error { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel()