refactored archiver and graph builder

This commit is contained in:
pippellia-btc
2025-06-09 16:36:27 +02:00
parent a4bcc71dac
commit 376d55cc81
4 changed files with 109 additions and 105 deletions

View File

@@ -47,7 +47,7 @@ func main() {
if count == 0 {
if len(config.InitPubkeys) == 0 {
panic("init pubkeys are empty")
panic("init pubkeys are empty: impossible to initialize")
}
log.Println("initialize from empty database...")

View File

@@ -22,7 +22,7 @@ import (
)
/*
This program syncronize the Redis database using the events already stored in the EventStore.
This program syncronize the Redis database to the events already stored in the event store.
If Redis and the eventstore are already in sync, run the executable at /cmd/crawler/.
*/
@@ -51,11 +51,11 @@ func main() {
}
if count != 0 {
panic("refusing to run sync when redis is not empty")
panic("refuse to run sync when redis is not empty")
}
if len(config.InitPubkeys) == 0 {
panic("init pubkeys are empty")
panic("init pubkeys are empty: impossible to initialize")
}
log.Println("initialize from empty database...")
@@ -108,7 +108,7 @@ func main() {
consumers.Add(1)
go func() {
defer consumers.Done()
pipe.GraphUpdater(ctx, config.Engine, store, db, events)
pipe.GraphBuilder(ctx, config.Engine, store, db, events)
}()
producers.Wait()

View File

@@ -180,8 +180,8 @@ func Load() (*Config, error) {
return nil, fmt.Errorf("error parsing %v: %v", keyVal, err)
}
case "ENGINE_UPDATER_CAPACITY":
config.Engine.UpdaterCapacity, err = strconv.Atoi(val)
case "ENGINE_BUILDER_CAPACITY":
config.Engine.BuilderCapacity, err = strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("error parsing %v: %v", keyVal, err)
}
@@ -192,8 +192,8 @@ func Load() (*Config, error) {
return nil, fmt.Errorf("error parsing %v: %v", keyVal, err)
}
case "ENGINE_ARCHIVE_CAPACITY":
config.Engine.ArchiverCapacity, err = strconv.Atoi(val)
case "ENGINE_ARCHIVERS":
config.Engine.Archivers, err = strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("error parsing %v: %v", keyVal, err)
}

View File

@@ -10,47 +10,40 @@ import (
"github/pippellia-btc/crawler/pkg/walks"
"log"
"slices"
"sync/atomic"
"time"
"github.com/nbd-wtf/go-nostr"
"github.com/vertex-lab/relay/pkg/eventstore"
)
// EventTracker tracks the number of events processed
var EventTracker atomic.Int32
var ErrUnsupportedKind = errors.New("unsupported event kind")
type EngineConfig struct {
PrintEvery int
// for the GraphUpdater
UpdaterCapacity int
// GraphBuilder params
BuilderCapacity int
CacheCapacity int
// for the archiveEngine
ArchiverCapacity int
// Archiver params
Archivers int
}
func NewEngineConfig() EngineConfig {
return EngineConfig{
PrintEvery: 5000,
UpdaterCapacity: 1000,
CacheCapacity: 100_000,
ArchiverCapacity: 1000,
PrintEvery: 5000,
BuilderCapacity: 1000,
CacheCapacity: 100_000,
Archivers: 4,
}
}
func (c EngineConfig) Print() {
fmt.Printf("Engine\n")
fmt.Printf(" PrintEvery: %d\n", c.PrintEvery)
fmt.Printf(" UpdaterCapacity: %d\n", c.UpdaterCapacity)
fmt.Printf(" BuilderCapacity: %d\n", c.BuilderCapacity)
fmt.Printf(" CacheCapacity: %d\n", c.CacheCapacity)
fmt.Printf(" ArchiveCapacity: %d\n", c.ArchiverCapacity)
}
// Engine is responsible for dispacting the correct events to the [Archiver] or [GraphUpdater].
// Engine is responsible for cohordinating the [Archiver] with the [GraphBuilder].
func Engine(
ctx context.Context,
config EngineConfig,
@@ -58,48 +51,36 @@ func Engine(
db redb.RedisDB,
events chan *nostr.Event) {
defer log.Println("Engine: shutting down...")
graphEvents := make(chan *nostr.Event, config.UpdaterCapacity)
archiveEvents := make(chan *nostr.Event, config.ArchiverCapacity)
graphEvents := make(chan *nostr.Event, config.BuilderCapacity)
defer close(graphEvents)
defer close(archiveEvents)
go GraphUpdater(ctx, config, store, db, graphEvents)
go Archiver(ctx, config, store, archiveEvents)
go GraphBuilder(ctx, config, store, db, graphEvents)
log.Println("Engine: ready to process events")
for {
select {
case <-ctx.Done():
return
case event, ok := <-events:
if !ok {
return
}
switch event.Kind {
case nostr.KindFollowList:
graphEvents <- event
case nostr.KindProfileMetadata:
archiveEvents <- event
Archiver(ctx, config, store, events, func(e *nostr.Event) error {
if e.Kind == nostr.KindFollowList {
select {
case graphEvents <- e:
default:
logEvent(event, ErrUnsupportedKind)
return errors.New("channel is full")
}
}
}
return nil
})
log.Println("Engine: shutting down...")
}
// Archiver consumes events that are not graph-related and stores them.
// Archiver stores events in the event store.
func Archiver(
ctx context.Context,
config EngineConfig,
store *eventstore.Store,
events chan *nostr.Event) {
events chan *nostr.Event,
onReplace func(*nostr.Event) error) {
sem := make(chan struct{}, config.Archivers)
var processed int
for {
select {
@@ -111,37 +92,56 @@ func Archiver(
return
}
err := func() error {
opctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
sem <- struct{}{}
go func() {
err := archive(ctx, store, event, onReplace)
<-sem
switch {
case nostr.IsRegularKind(event.Kind):
return store.Save(opctx, event)
case nostr.IsReplaceableKind(event.Kind):
_, err := store.Replace(opctx, event)
return err
default:
return nil
if err != nil {
log.Printf("Archiver: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, err)
}
}()
if err != nil {
logEvent(event, err)
}
processed := int(EventTracker.Add(1))
processed++
if processed%config.PrintEvery == 0 {
log.Printf("Engine: processed %d events", processed)
log.Printf("Archiver: processed %d events", processed)
}
}
}
}
// GraphUpdater consumes events to update the graph and random walks.
func GraphUpdater(
// Archive an event based on its kind.
func archive(
ctx context.Context,
store *eventstore.Store,
event *nostr.Event,
onReplace func(*nostr.Event) error) error {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
switch {
case nostr.IsRegularKind(event.Kind):
return store.Save(ctx, event)
case nostr.IsReplaceableKind(event.Kind):
replaced, err := store.Replace(ctx, event)
if err != nil {
return err
}
if replaced {
return onReplace(event)
}
return nil
default:
return nil
}
}
// GraphBuilder consumes events to update the graph and random walks.
func GraphBuilder(
ctx context.Context,
config EngineConfig,
store *eventstore.Store,
@@ -154,6 +154,8 @@ func GraphUpdater(
walks.WithLogFile("cache.log"),
)
var processed int
for {
select {
case <-ctx.Done():
@@ -165,44 +167,47 @@ func GraphUpdater(
}
err := func() error {
opctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
opctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
replaced, err := store.Replace(opctx, event)
delta, err := computeDelta(opctx, db, cache, event)
if err != nil {
return err
}
if replaced {
return processFollowList(opctx, db, cache, event)
if err := updateWalks(opctx, db, cache, delta); err != nil {
return err
}
return nil
if err := db.Update(opctx, delta); err != nil {
return err
}
return cache.Update(opctx, delta)
}()
if err != nil {
logEvent(event, err)
log.Printf("GraphBuilder: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, err)
}
processed := int(EventTracker.Add(1))
processed++
if processed%config.PrintEvery == 0 {
log.Printf("Engine: processed %d events", processed)
log.Printf("GraphBuilder: processed %d events", processed)
}
}
}
}
// processFollowList parses the pubkeys listed in the event, and uses them to:
// - update the follows of the author (db and cache)
// - update the author's random walks and signal the number to the [WalksTracker]
func processFollowList(ctx context.Context, db redb.RedisDB, cache *walks.CachedWalker, event *nostr.Event) error {
// Compute the delta from the "p" tags in the follow list.
func computeDelta(ctx context.Context, db redb.RedisDB, cache *walks.CachedWalker, event *nostr.Event) (graph.Delta, error) {
author, err := db.NodeByKey(ctx, event.PubKey)
if err != nil {
return err
return graph.Delta{}, fmt.Errorf("failed to compute delta: %w", err)
}
oldFollows, err := cache.Follows(ctx, author.ID)
if err != nil {
return err
return graph.Delta{}, fmt.Errorf("failed to compute delta: %w", err)
}
pubkeys := parsePubkeys(event)
@@ -214,35 +219,35 @@ func processFollowList(ctx context.Context, db redb.RedisDB, cache *walks.Cached
newFollows, err := db.Resolve(ctx, pubkeys, onMissing)
if err != nil {
return err
return graph.Delta{}, fmt.Errorf("failed to compute delta: %w", err)
}
delta := graph.NewDelta(event.Kind, author.ID, oldFollows, newFollows)
return graph.NewDelta(event.Kind, author.ID, oldFollows, newFollows), nil
}
// updateWalks uses the delta to update the random walks.
func updateWalks(ctx context.Context, db redb.RedisDB, cache *walks.CachedWalker, delta graph.Delta) error {
if delta.Size() == 0 {
// old and new follows are the same, stop
// nothing to change, stop
return nil
}
visiting, err := db.WalksVisiting(ctx, author.ID, -1)
visiting, err := db.WalksVisiting(ctx, delta.Node, -1)
if err != nil {
return err
return fmt.Errorf("failed to update walks: %w", err)
}
old, new, err := walks.ToUpdate(ctx, cache, delta, visiting)
if err != nil {
return err
return fmt.Errorf("failed to update walks: %w", err)
}
if err := db.ReplaceWalks(ctx, old, new); err != nil {
return err
}
if err := db.Update(ctx, delta); err != nil {
return err
return fmt.Errorf("failed to update walks: %w", err)
}
WalksTracker.Add(int32(len(new)))
return cache.Update(ctx, delta)
return nil
}
const (
@@ -279,9 +284,8 @@ func parsePubkeys(event *nostr.Event) []string {
return unique(pubkeys)
}
func logEvent(e *nostr.Event, extra any) {
msg := fmt.Sprintf("Engine: event ID %s, kind %d by %s: ", e.ID, e.Kind, e.PubKey)
log.Printf(msg+"%v", extra)
func logEvent(prefix string, e *nostr.Event, extra any) {
log.Printf("%s: event ID %s, kind %d by %s: %v", prefix, e.ID, e.Kind, e.PubKey, extra)
}
// Unique returns a slice of unique elements of the input slice.