mirror of
https://github.com/aljazceru/crawler_v2.git
synced 2025-12-17 07:24:21 +01:00
305 lines
6.2 KiB
Go
305 lines
6.2 KiB
Go
package pipe
|
|
|
|
import (
|
|
"cmp"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"github/pippellia-btc/crawler/pkg/graph"
|
|
"github/pippellia-btc/crawler/pkg/redb"
|
|
"github/pippellia-btc/crawler/pkg/walks"
|
|
"log"
|
|
"slices"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/nbd-wtf/go-nostr"
|
|
"github.com/vertex-lab/relay/pkg/eventstore"
|
|
)
|
|
|
|
// EventTracker tracks the number of events processed
|
|
var EventTracker atomic.Int32
|
|
|
|
var ErrUnsupportedKind = errors.New("unsupported event kind")
|
|
|
|
type EngineConfig struct {
|
|
PrintEvery int
|
|
|
|
// for the GraphUpdater
|
|
UpdaterCapacity int
|
|
CacheCapacity int
|
|
|
|
// for the archiveEngine
|
|
ArchiverCapacity int
|
|
}
|
|
|
|
func NewEngineConfig() EngineConfig {
|
|
return EngineConfig{
|
|
PrintEvery: 5000,
|
|
UpdaterCapacity: 1000,
|
|
CacheCapacity: 100_000,
|
|
ArchiverCapacity: 1000,
|
|
}
|
|
}
|
|
|
|
func (c EngineConfig) Print() {
|
|
fmt.Printf("Engine\n")
|
|
fmt.Printf(" PrintEvery: %d\n", c.PrintEvery)
|
|
fmt.Printf(" UpdaterCapacity: %d\n", c.UpdaterCapacity)
|
|
fmt.Printf(" CacheCapacity: %d\n", c.CacheCapacity)
|
|
fmt.Printf(" ArchiveCapacity: %d\n", c.ArchiverCapacity)
|
|
}
|
|
|
|
// Engine is responsible for dispacting the correct events to the [Archiver] or [GraphUpdater].
|
|
func Engine(
|
|
ctx context.Context,
|
|
config EngineConfig,
|
|
store *eventstore.Store,
|
|
db redb.RedisDB,
|
|
events chan *nostr.Event) {
|
|
|
|
defer log.Println("Engine: shutting down...")
|
|
|
|
graphEvents := make(chan *nostr.Event, config.UpdaterCapacity)
|
|
archiveEvents := make(chan *nostr.Event, config.ArchiverCapacity)
|
|
defer close(graphEvents)
|
|
defer close(archiveEvents)
|
|
|
|
go GraphUpdater(ctx, config, store, db, graphEvents)
|
|
go Archiver(ctx, config, store, archiveEvents)
|
|
|
|
log.Println("Engine: ready to process events")
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case event, ok := <-events:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
switch event.Kind {
|
|
case nostr.KindFollowList:
|
|
graphEvents <- event
|
|
|
|
case nostr.KindProfileMetadata:
|
|
archiveEvents <- event
|
|
|
|
default:
|
|
logEvent(event, ErrUnsupportedKind)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Archiver consumes events that are not graph-related and stores them.
|
|
func Archiver(
|
|
ctx context.Context,
|
|
config EngineConfig,
|
|
store *eventstore.Store,
|
|
events chan *nostr.Event) {
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case event, ok := <-events:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
err := func() error {
|
|
opctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
|
|
switch {
|
|
case nostr.IsRegularKind(event.Kind):
|
|
return store.Save(opctx, event)
|
|
|
|
case nostr.IsReplaceableKind(event.Kind):
|
|
_, err := store.Replace(opctx, event)
|
|
return err
|
|
|
|
default:
|
|
return nil
|
|
}
|
|
}()
|
|
|
|
if err != nil {
|
|
logEvent(event, err)
|
|
}
|
|
|
|
processed := int(EventTracker.Add(1))
|
|
if processed%config.PrintEvery == 0 {
|
|
log.Printf("Engine: processed %d events", processed)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// GraphUpdater consumes events to update the graph and random walks.
|
|
func GraphUpdater(
|
|
ctx context.Context,
|
|
config EngineConfig,
|
|
store *eventstore.Store,
|
|
db redb.RedisDB,
|
|
events chan *nostr.Event) {
|
|
|
|
cache := walks.NewWalker(
|
|
walks.WithCapacity(config.CacheCapacity),
|
|
walks.WithFallback(db),
|
|
walks.WithLogFile("cache.log"),
|
|
)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case event, ok := <-events:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
err := func() error {
|
|
opctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
|
|
replaced, err := store.Replace(opctx, event)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if replaced {
|
|
return processFollowList(opctx, db, cache, event)
|
|
}
|
|
return nil
|
|
}()
|
|
|
|
if err != nil {
|
|
logEvent(event, err)
|
|
}
|
|
|
|
processed := int(EventTracker.Add(1))
|
|
if processed%config.PrintEvery == 0 {
|
|
log.Printf("Engine: processed %d events", processed)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processFollowList parses the pubkeys listed in the event, and uses them to:
|
|
// - update the follows of the author (db and cache)
|
|
// - update the author's random walks and signal the number to the [WalksTracker]
|
|
func processFollowList(ctx context.Context, db redb.RedisDB, cache *walks.CachedWalker, event *nostr.Event) error {
|
|
author, err := db.NodeByKey(ctx, event.PubKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
oldFollows, err := cache.Follows(ctx, author.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
pubkeys := parsePubkeys(event)
|
|
onMissing := redb.Ignore
|
|
if author.Status == graph.StatusActive {
|
|
// active nodes are the only ones that can add new pubkeys to the database
|
|
onMissing = redb.AddValid
|
|
}
|
|
|
|
newFollows, err := db.Resolve(ctx, pubkeys, onMissing)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
delta := graph.NewDelta(event.Kind, author.ID, oldFollows, newFollows)
|
|
if delta.Size() == 0 {
|
|
// old and new follows are the same, stop
|
|
return nil
|
|
}
|
|
|
|
visiting, err := db.WalksVisiting(ctx, author.ID, -1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
old, new, err := walks.ToUpdate(ctx, cache, delta, visiting)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := db.ReplaceWalks(ctx, old, new); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := db.Update(ctx, delta); err != nil {
|
|
return err
|
|
}
|
|
|
|
WalksTracker.Add(int32(len(new)))
|
|
return cache.Update(ctx, delta)
|
|
}
|
|
|
|
const (
|
|
followPrefix = "p"
|
|
maxFollows = 50000
|
|
)
|
|
|
|
// parse unique pubkeys (excluding author) from the "p" tags in the event.
|
|
func parsePubkeys(event *nostr.Event) []string {
|
|
pubkeys := make([]string, 0, min(len(event.Tags), maxFollows))
|
|
for _, tag := range event.Tags {
|
|
if len(pubkeys) > maxFollows {
|
|
// stop processing, list is too big
|
|
break
|
|
}
|
|
|
|
if len(tag) < 2 {
|
|
continue
|
|
}
|
|
|
|
prefix, pubkey := tag[0], tag[1]
|
|
if prefix != followPrefix {
|
|
continue
|
|
}
|
|
|
|
if pubkey == event.PubKey {
|
|
// remove self-follows
|
|
continue
|
|
}
|
|
|
|
pubkeys = append(pubkeys, pubkey)
|
|
}
|
|
|
|
return unique(pubkeys)
|
|
}
|
|
|
|
func logEvent(e *nostr.Event, extra any) {
|
|
msg := fmt.Sprintf("Engine: event ID %s, kind %d by %s: ", e.ID, e.Kind, e.PubKey)
|
|
log.Printf(msg+"%v", extra)
|
|
}
|
|
|
|
// Unique returns a slice of unique elements of the input slice.
|
|
func unique[E cmp.Ordered](slice []E) []E {
|
|
if len(slice) == 0 {
|
|
return nil
|
|
}
|
|
|
|
slices.Sort(slice)
|
|
unique := make([]E, 0, len(slice))
|
|
unique = append(unique, slice[0])
|
|
|
|
for i := 1; i < len(slice); i++ {
|
|
if slice[i] != slice[i-1] {
|
|
unique = append(unique, slice[i])
|
|
}
|
|
}
|
|
|
|
return unique
|
|
}
|