diff --git a/pkg/pipe/engine.go b/pkg/pipe/engine.go index 8f75e2f..e014bc5 100644 --- a/pkg/pipe/engine.go +++ b/pkg/pipe/engine.go @@ -83,6 +83,10 @@ func Archiver( return } + // if slices.Contains(userMetadataKinds, event.Kind) { + + // } + err := func() error { opctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() diff --git a/pkg/pipe/pipe_test.go b/pkg/pipe/pipe_test.go index ab4e4d3..4f1c327 100644 --- a/pkg/pipe/pipe_test.go +++ b/pkg/pipe/pipe_test.go @@ -7,6 +7,8 @@ import ( "time" "github.com/nbd-wtf/go-nostr" + "github.com/redis/go-redis/v9" + "github.com/vertex-lab/crawler_v2/pkg/redb" ) var ( @@ -56,3 +58,15 @@ func TestFetch(t *testing.T) { t.Fatalf("expected %d events, got %d", expected, len(events)) } } + +// Manually check on a test database +func TestFinalizeStats(t *testing.T) { + db := redb.New(&redis.Options{ + Addr: "localhost:6379", + }) + + err := finalizeStats(db, "2025-09-15") + if err != nil { + t.Fatalf("expected nil, got %v", err) + } +} diff --git a/pkg/pipe/recorder.go b/pkg/pipe/recorder.go index 154d62e..a9dd418 100644 --- a/pkg/pipe/recorder.go +++ b/pkg/pipe/recorder.go @@ -4,10 +4,13 @@ import ( "context" "fmt" "log" + "slices" "strconv" + "strings" "time" "github.com/nbd-wtf/go-nostr" + "github.com/redis/go-redis/v9" "github.com/vertex-lab/crawler_v2/pkg/redb" ) @@ -19,6 +22,7 @@ func Recorder( ) { log.Println("Recorder: ready") defer log.Println("Recorder: shut down") + timer := midnightTimer() for { @@ -31,7 +35,8 @@ func Recorder( return } - if err := recordEvent(db, event); err != nil { + err := recordEvent(ctx, db, event) + if err != nil && ctx.Err() == nil { log.Printf("Recorder: %v", err) } @@ -44,75 +49,104 @@ func Recorder( // the statistics of the previous day yesterday := time.Now().UTC().AddDate(0, 0, -1).Format("2006-01-02") - if err := finalizeStats(db, yesterday); err != nil { + err := finalizeStats(db, yesterday) + if err != nil { log.Printf("Recorder: %v", err) + } else { + log.Printf("Recorder: finalized stats for %s", yesterday) } - log.Printf("Recorder: finalized stats for %s", yesterday) timer = midnightTimer() } } } const ( - separator = ":" - KeyStats = "stats" - KeyKind = "kind" - KeyActivePubkeys = "active_pubkeys" - KeyTotalPubkeys = "total_pubkeys" + separator = ":" + KeyStats = "stats" + KeyKind = "kind" + + KeyEvents = "events" + KeyActivePubkeys = "active_pubkeys" + KeyCreatorPubkeys = "creator_pubkeys" + KeyTotalPubkeys = "total_pubkeys" expiration = 30 * 24 * time.Hour ) -func stats(day string) string { return KeyStats + separator + day } -func kind(kind int) string { return KeyKind + separator + strconv.Itoa(kind) } -func activePubkeys(day string) string { return KeyActivePubkeys + separator + day } +func stats(day string) string { return KeyStats + separator + day } +func kind(k int) string { return KeyKind + separator + strconv.Itoa(k) } +func activePubkeys(day string) string { return KeyActivePubkeys + separator + day } +func creatorPubkeys(day string) string { return KeyCreatorPubkeys + separator + day } +func events(day string, k int) string { return KeyEvents + separator + day + separator + kind(k) } -// recordEvent in redis. In particular: -// - add pubkey to the active_pubkeys HLL (~1% error) -// - increment the count for event.Kind. -// -// The latter assumes that [Recorder] does not receives (many) duplicate events. -// We could have used an HLL for that as well, but it's unclear whether -// it would be more precise given the HLL is a probabilistic method with ~1% error. -func recordEvent(db redb.RedisDB, event *nostr.Event) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) +// recordEvent in redis, by updating the appropriate HLLs (~1% error): +// - add event.ID to the events::kind: +// - add pubkey to the active_pubkeys: +// - add pubkey to the creator_pubkeys: if event is in [contentKinds] +func recordEvent(ctx context.Context, db redb.RedisDB, event *nostr.Event) error { + ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() today := time.Now().UTC().Format("2006-01-02") pipe := db.Client.TxPipeline() - pipe.HIncrBy(ctx, stats(today), kind(event.Kind), 1) + pipe.PFAdd(ctx, events(today, event.Kind), event.ID) + pipe.ExpireNX(ctx, events(today, event.Kind), expiration) + pipe.PFAdd(ctx, activePubkeys(today), event.PubKey) - pipe.Expire(ctx, activePubkeys(today), expiration) + pipe.ExpireNX(ctx, activePubkeys(today), expiration) + + if slices.Contains(contentKinds, event.Kind) { + pipe.PFAdd(ctx, creatorPubkeys(today), event.PubKey) + pipe.ExpireNX(ctx, creatorPubkeys(today), expiration) + } if _, err := pipe.Exec(ctx); err != nil { - return fmt.Errorf("failed to record event: pipeline failed: %w", err) + return fmt.Errorf("failed to record event with ID %s: pipeline failed: %w", event.ID, err) } return nil } // finalizeStats for a particular day, adding to stats the total and active pubkey counts. func finalizeStats(db redb.RedisDB, day string) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - totalPubkeys, err := db.NodeCount(ctx) + prefix := KeyEvents + separator + day + separator + eventKeys, err := db.Client.Keys(ctx, prefix+"*").Result() if err != nil { - return fmt.Errorf("finalizeStats: %v", err) + return fmt.Errorf("finalizeStats: failed to fetch all the events keys: %w", err) } - activePubkeys, err := db.Client.PFCount(ctx, activePubkeys(day)).Result() - if err != nil { - return fmt.Errorf("finalizeStats: failed to fetch the active pubkeys count: %w", err) + pipe := db.Client.Pipeline() + kinds := make([]string, len(eventKeys)) + kindCounts := make([]*redis.IntCmd, len(eventKeys)) + + for i, key := range eventKeys { + kinds[i] = strings.TrimPrefix(key, prefix) + kindCounts[i] = pipe.PFCount(ctx, key) } - err = db.Client.HSet(ctx, stats(day), - KeyActivePubkeys, activePubkeys, - KeyTotalPubkeys, totalPubkeys, - ).Err() + actives := pipe.PFCount(ctx, activePubkeys(day)) + creators := pipe.PFCount(ctx, creatorPubkeys(day)) + total := pipe.HLen(ctx, redb.KeyKeyIndex) - if err != nil { + if _, err := pipe.Exec(ctx); err != nil { + return fmt.Errorf("finalizeStats: pipeline failed: %w", err) + } + + statistics := []any{ + KeyActivePubkeys, actives.Val(), + KeyCreatorPubkeys, creators.Val(), + KeyTotalPubkeys, total.Val(), + } + + for i := range kindCounts { + statistics = append(statistics, kinds[i], kindCounts[i].Val()) + } + + if err = db.Client.HSet(ctx, stats(day), statistics...).Err(); err != nil { return fmt.Errorf("finalizeStats: failed to write: %w", err) } return nil