From 1f9c5c6c3bda996ff81db8c1dc45671688f05d08 Mon Sep 17 00:00:00 2001 From: pippellia-btc Date: Sun, 14 Sep 2025 17:21:41 +0200 Subject: [PATCH] added recorder pipeline function --- pkg/pipe/recorder.go | 109 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 pkg/pipe/recorder.go diff --git a/pkg/pipe/recorder.go b/pkg/pipe/recorder.go new file mode 100644 index 0000000..f956df9 --- /dev/null +++ b/pkg/pipe/recorder.go @@ -0,0 +1,109 @@ +package pipe + +import ( + "context" + "fmt" + "log" + "strconv" + "time" + + "github.com/nbd-wtf/go-nostr" + "github.com/vertex-lab/crawler_v2/pkg/redb" +) + +func Recorder( + ctx context.Context, + db redb.RedisDB, + events <-chan *nostr.Event, + forward Forward[*nostr.Event], +) { + defer log.Println("Recorder: shut down") + timer := midnightTimer() + + for { + select { + case <-ctx.Done(): + return + + case event, ok := <-events: + if !ok { + return + } + + if err := recordEvent(db, event); err != nil { + log.Printf("Recorder: %v", err) + } + + if err := forward(event); err != nil { + log.Printf("Recorder: %v", err) + } + + case <-timer: + yesterday := time.Now().UTC().AddDate(0, 0, -1).Format("2006-01-02") + + if err := finalizeStats(db, yesterday); err != nil { + log.Printf("Recorder: %v", err) + } + + timer = midnightTimer() + } + } +} + +const ( + KeyStatsPrefix = "stats:" + KeyKindPrefix = "kind:" + KeyActivePubkeysPrefix = "active_pubkeys:" +) + +func stats(day string) string { return KeyStatsPrefix + day } +func kind(kind int) string { return KeyKindPrefix + strconv.Itoa(kind) } +func activePubkeys(day string) string { return KeyActivePubkeysPrefix + day } + +func recordEvent(db redb.RedisDB, event *nostr.Event) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + today := time.Now().UTC().Format("2006-01-02") + + pipe := db.Client.TxPipeline() + pipe.HIncrBy(ctx, stats(today), kind(event.Kind), 1) + pipe.PFAdd(ctx, activePubkeys(today), event.PubKey) + + if _, err := pipe.Exec(ctx); err != nil { + return fmt.Errorf("failed to record event: pipeline failed: %w", err) + } + return nil +} + +func finalizeStats(db redb.RedisDB, day string) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + totalPubkeys, err := db.NodeCount(ctx) + if err != nil { + return fmt.Errorf("finalizeStats: %v", err) + } + + activePubkeys, err := db.Client.PFCount(ctx, activePubkeys(day)).Result() + if err != nil { + return fmt.Errorf("finalizeStats: failed to fetch the active pubkeys count: %w", err) + } + + err = db.Client.HSet(ctx, stats(day), + "active_pubkeys", activePubkeys, + "total_pubkeys", totalPubkeys, + ).Err() + + if err != nil { + return fmt.Errorf("finalizeStats: failed to write: %w", err) + } + return nil +} + +func midnightTimer() <-chan time.Time { + now := time.Now().UTC() + midnight := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, time.UTC) + wait := time.Until(midnight) + return time.After(wait) +}