mirror of
https://github.com/aljazceru/crawler_v2.git
synced 2025-12-17 07:24:21 +01:00
using HLL for all counts to avoid duplicates
This commit is contained in:
@@ -83,6 +83,10 @@ func Archiver(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if slices.Contains(userMetadataKinds, event.Kind) {
|
||||||
|
|
||||||
|
// }
|
||||||
|
|
||||||
err := func() error {
|
err := func() error {
|
||||||
opctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
opctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
"github.com/vertex-lab/crawler_v2/pkg/redb"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -56,3 +58,15 @@ func TestFetch(t *testing.T) {
|
|||||||
t.Fatalf("expected %d events, got %d", expected, len(events))
|
t.Fatalf("expected %d events, got %d", expected, len(events))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Manually check on a test database
|
||||||
|
func TestFinalizeStats(t *testing.T) {
|
||||||
|
db := redb.New(&redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
})
|
||||||
|
|
||||||
|
err := finalizeStats(db, "2025-09-15")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected nil, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,10 +4,13 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
"github.com/vertex-lab/crawler_v2/pkg/redb"
|
"github.com/vertex-lab/crawler_v2/pkg/redb"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -19,6 +22,7 @@ func Recorder(
|
|||||||
) {
|
) {
|
||||||
log.Println("Recorder: ready")
|
log.Println("Recorder: ready")
|
||||||
defer log.Println("Recorder: shut down")
|
defer log.Println("Recorder: shut down")
|
||||||
|
|
||||||
timer := midnightTimer()
|
timer := midnightTimer()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@@ -31,7 +35,8 @@ func Recorder(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := recordEvent(db, event); err != nil {
|
err := recordEvent(ctx, db, event)
|
||||||
|
if err != nil && ctx.Err() == nil {
|
||||||
log.Printf("Recorder: %v", err)
|
log.Printf("Recorder: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -44,11 +49,13 @@ func Recorder(
|
|||||||
// the statistics of the previous day
|
// the statistics of the previous day
|
||||||
yesterday := time.Now().UTC().AddDate(0, 0, -1).Format("2006-01-02")
|
yesterday := time.Now().UTC().AddDate(0, 0, -1).Format("2006-01-02")
|
||||||
|
|
||||||
if err := finalizeStats(db, yesterday); err != nil {
|
err := finalizeStats(db, yesterday)
|
||||||
|
if err != nil {
|
||||||
log.Printf("Recorder: %v", err)
|
log.Printf("Recorder: %v", err)
|
||||||
|
} else {
|
||||||
|
log.Printf("Recorder: finalized stats for %s", yesterday)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Recorder: finalized stats for %s", yesterday)
|
|
||||||
timer = midnightTimer()
|
timer = midnightTimer()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -58,61 +65,88 @@ const (
|
|||||||
separator = ":"
|
separator = ":"
|
||||||
KeyStats = "stats"
|
KeyStats = "stats"
|
||||||
KeyKind = "kind"
|
KeyKind = "kind"
|
||||||
|
|
||||||
|
KeyEvents = "events"
|
||||||
KeyActivePubkeys = "active_pubkeys"
|
KeyActivePubkeys = "active_pubkeys"
|
||||||
|
KeyCreatorPubkeys = "creator_pubkeys"
|
||||||
KeyTotalPubkeys = "total_pubkeys"
|
KeyTotalPubkeys = "total_pubkeys"
|
||||||
|
|
||||||
expiration = 30 * 24 * time.Hour
|
expiration = 30 * 24 * time.Hour
|
||||||
)
|
)
|
||||||
|
|
||||||
func stats(day string) string { return KeyStats + separator + day }
|
func stats(day string) string { return KeyStats + separator + day }
|
||||||
func kind(kind int) string { return KeyKind + separator + strconv.Itoa(kind) }
|
func kind(k int) string { return KeyKind + separator + strconv.Itoa(k) }
|
||||||
func activePubkeys(day string) string { return KeyActivePubkeys + separator + day }
|
func activePubkeys(day string) string { return KeyActivePubkeys + separator + day }
|
||||||
|
func creatorPubkeys(day string) string { return KeyCreatorPubkeys + separator + day }
|
||||||
|
func events(day string, k int) string { return KeyEvents + separator + day + separator + kind(k) }
|
||||||
|
|
||||||
// recordEvent in redis. In particular:
|
// recordEvent in redis, by updating the appropriate HLLs (~1% error):
|
||||||
// - add pubkey to the active_pubkeys HLL (~1% error)
|
// - add event.ID to the events:<today>:kind:<event.Kind>
|
||||||
// - increment the count for event.Kind.
|
// - add pubkey to the active_pubkeys:<today>
|
||||||
//
|
// - add pubkey to the creator_pubkeys:<today> if event is in [contentKinds]
|
||||||
// The latter assumes that [Recorder] does not receives (many) duplicate events.
|
func recordEvent(ctx context.Context, db redb.RedisDB, event *nostr.Event) error {
|
||||||
// We could have used an HLL for that as well, but it's unclear whether
|
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||||
// it would be more precise given the HLL is a probabilistic method with ~1% error.
|
|
||||||
func recordEvent(db redb.RedisDB, event *nostr.Event) error {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
today := time.Now().UTC().Format("2006-01-02")
|
today := time.Now().UTC().Format("2006-01-02")
|
||||||
|
|
||||||
pipe := db.Client.TxPipeline()
|
pipe := db.Client.TxPipeline()
|
||||||
pipe.HIncrBy(ctx, stats(today), kind(event.Kind), 1)
|
pipe.PFAdd(ctx, events(today, event.Kind), event.ID)
|
||||||
|
pipe.ExpireNX(ctx, events(today, event.Kind), expiration)
|
||||||
|
|
||||||
pipe.PFAdd(ctx, activePubkeys(today), event.PubKey)
|
pipe.PFAdd(ctx, activePubkeys(today), event.PubKey)
|
||||||
pipe.Expire(ctx, activePubkeys(today), expiration)
|
pipe.ExpireNX(ctx, activePubkeys(today), expiration)
|
||||||
|
|
||||||
|
if slices.Contains(contentKinds, event.Kind) {
|
||||||
|
pipe.PFAdd(ctx, creatorPubkeys(today), event.PubKey)
|
||||||
|
pipe.ExpireNX(ctx, creatorPubkeys(today), expiration)
|
||||||
|
}
|
||||||
|
|
||||||
if _, err := pipe.Exec(ctx); err != nil {
|
if _, err := pipe.Exec(ctx); err != nil {
|
||||||
return fmt.Errorf("failed to record event: pipeline failed: %w", err)
|
return fmt.Errorf("failed to record event with ID %s: pipeline failed: %w", event.ID, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// finalizeStats for a particular day, adding to stats the total and active pubkey counts.
|
// finalizeStats for a particular day, adding to stats the total and active pubkey counts.
|
||||||
func finalizeStats(db redb.RedisDB, day string) error {
|
func finalizeStats(db redb.RedisDB, day string) error {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
totalPubkeys, err := db.NodeCount(ctx)
|
prefix := KeyEvents + separator + day + separator
|
||||||
|
eventKeys, err := db.Client.Keys(ctx, prefix+"*").Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("finalizeStats: %v", err)
|
return fmt.Errorf("finalizeStats: failed to fetch all the events keys: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
activePubkeys, err := db.Client.PFCount(ctx, activePubkeys(day)).Result()
|
pipe := db.Client.Pipeline()
|
||||||
if err != nil {
|
kinds := make([]string, len(eventKeys))
|
||||||
return fmt.Errorf("finalizeStats: failed to fetch the active pubkeys count: %w", err)
|
kindCounts := make([]*redis.IntCmd, len(eventKeys))
|
||||||
|
|
||||||
|
for i, key := range eventKeys {
|
||||||
|
kinds[i] = strings.TrimPrefix(key, prefix)
|
||||||
|
kindCounts[i] = pipe.PFCount(ctx, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = db.Client.HSet(ctx, stats(day),
|
actives := pipe.PFCount(ctx, activePubkeys(day))
|
||||||
KeyActivePubkeys, activePubkeys,
|
creators := pipe.PFCount(ctx, creatorPubkeys(day))
|
||||||
KeyTotalPubkeys, totalPubkeys,
|
total := pipe.HLen(ctx, redb.KeyKeyIndex)
|
||||||
).Err()
|
|
||||||
|
|
||||||
if err != nil {
|
if _, err := pipe.Exec(ctx); err != nil {
|
||||||
|
return fmt.Errorf("finalizeStats: pipeline failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
statistics := []any{
|
||||||
|
KeyActivePubkeys, actives.Val(),
|
||||||
|
KeyCreatorPubkeys, creators.Val(),
|
||||||
|
KeyTotalPubkeys, total.Val(),
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range kindCounts {
|
||||||
|
statistics = append(statistics, kinds[i], kindCounts[i].Val())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = db.Client.HSet(ctx, stats(day), statistics...).Err(); err != nil {
|
||||||
return fmt.Errorf("finalizeStats: failed to write: %w", err)
|
return fmt.Errorf("finalizeStats: failed to write: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
Reference in New Issue
Block a user