From be2e093422ed7bd341fb6c4b04ecf899c88eb0e3 Mon Sep 17 00:00:00 2001 From: pippellia-btc Date: Fri, 6 Jun 2025 17:58:17 +0200 Subject: [PATCH] added sync cmd --- cmd/{crawler.go => crawler/main.go} | 9 +- cmd/sync/main.go | 174 ++++++++++++++++++++++++++++ {cmd => pkg/config}/config.go | 12 +- pkg/pipe/intake.go | 10 +- pkg/pipe/intake_test.go | 2 +- 5 files changed, 193 insertions(+), 14 deletions(-) rename cmd/{crawler.go => crawler/main.go} (96%) create mode 100644 cmd/sync/main.go rename {cmd => pkg/config}/config.go (95%) diff --git a/cmd/crawler.go b/cmd/crawler/main.go similarity index 96% rename from cmd/crawler.go rename to cmd/crawler/main.go index 52bb7af..858ed63 100644 --- a/cmd/crawler.go +++ b/cmd/crawler/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "github/pippellia-btc/crawler/pkg/config" "github/pippellia-btc/crawler/pkg/graph" "github/pippellia-btc/crawler/pkg/pipe" "github/pippellia-btc/crawler/pkg/redb" @@ -25,7 +26,7 @@ func main() { defer cancel() go handleSignals(cancel) - config, err := LoadConfig() + config, err := config.Load() if err != nil { panic(err) } @@ -45,9 +46,13 @@ func main() { } if count == 0 { - log.Println("initialize from empty database...") + if len(config.InitPubkeys) == 0 { + panic("init pubkeys are empty") + } + log.Println("initialize from empty database...") nodes := make([]graph.ID, len(config.InitPubkeys)) + for i, pk := range config.InitPubkeys { nodes[i], err = db.AddNode(ctx, pk) if err != nil { diff --git a/cmd/sync/main.go b/cmd/sync/main.go new file mode 100644 index 0000000..932bb2d --- /dev/null +++ b/cmd/sync/main.go @@ -0,0 +1,174 @@ +package main + +import ( + "context" + "fmt" + "github/pippellia-btc/crawler/pkg/config" + "github/pippellia-btc/crawler/pkg/graph" + "github/pippellia-btc/crawler/pkg/pipe" + "github/pippellia-btc/crawler/pkg/redb" + "github/pippellia-btc/crawler/pkg/walks" + "log" + "os" + "os/signal" + "runtime" + "sync" + "syscall" + "time" + + "github.com/nbd-wtf/go-nostr" + "github.com/redis/go-redis/v9" + "github.com/vertex-lab/relay/pkg/eventstore" +) + +/* +This program syncronize the Redis database using the events already stored in the EventStore. +If Redis and the eventstore are already in sync, run the executable at /cmd/crawler/. +*/ + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go handleSignals(cancel) + + config, err := config.Load() + if err != nil { + panic(err) + } + + events := make(chan *nostr.Event, config.EventsCapacity) + pubkeys := make(chan string, config.PubkeysCapacity) + + store, err := eventstore.New(config.SQLiteURL) + if err != nil { + panic(err) + } + + db := redb.New(&redis.Options{Addr: config.RedisAddress}) + count, err := db.NodeCount(ctx) + if err != nil { + panic(err) + } + + if count != 0 { + panic("refusing to run sync when redis is not empty") + } + + if len(config.InitPubkeys) == 0 { + panic("init pubkeys are empty") + } + + log.Println("initialize from empty database...") + nodes := make([]graph.ID, len(config.InitPubkeys)) + + for i, pk := range config.InitPubkeys { + nodes[i], err = db.AddNode(ctx, pk) + if err != nil { + panic(err) + } + + pubkeys <- pk // add to queue + } + + walks, err := walks.Generate(ctx, db, nodes...) + if err != nil { + panic(err) + } + + if err := db.AddWalks(ctx, walks...); err != nil { + panic(err) + } + + log.Printf("correctly added %d init pubkeys", len(config.InitPubkeys)) + + pipe.Kinds = []int{ + nostr.KindFollowList, // no need to sync other event kinds + } + + var producers sync.WaitGroup + var consumers sync.WaitGroup + + producers.Add(3) + go func() { + defer producers.Done() + pipe.Firehose(ctx, config.Firehose, db, enqueue(events)) + }() + + go func() { + defer producers.Done() + pipe.Fetcher(ctx, config.Fetcher, pubkeys, enqueue(events)) // TODO: fetch from the event store + }() + + go func() { + defer producers.Done() + pipe.Arbiter(ctx, config.Arbiter, db, enqueue(pubkeys)) + close(pubkeys) // Arbiter is the only pubkey sender + }() + + consumers.Add(1) + go func() { + defer consumers.Done() + pipe.GraphUpdater(ctx, config.Engine, store, db, events) + }() + + producers.Wait() + close(events) + + consumers.Wait() +} + +// handleSignals listens for OS signals and triggers context cancellation. +func handleSignals(cancel context.CancelFunc) { + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) + <-signals + + log.Println(" Signal received. Shutting down...") + cancel() +} + +// enqueue things into the specified channel or return an error if full. +func enqueue[T any](queue chan T) func(t T) error { + return func(t T) error { + select { + case queue <- t: + return nil + default: + return fmt.Errorf("channel is full, dropping %v", t) + } + } +} + +func printStats(ctx context.Context, events chan *nostr.Event, pubkeys chan string) { + filename := "stats.log" + file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + panic(fmt.Errorf("failed to open log file %s: %w", filename, err)) + } + + defer file.Close() + log := log.New(file, "stats: ", log.LstdFlags) + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + goroutines := runtime.NumGoroutine() + memStats := new(runtime.MemStats) + runtime.ReadMemStats(memStats) + + log.Println("---------------------------------------") + log.Printf("events queue: %d/%d\n", len(events), cap(events)) + log.Printf("pubkeys queue: %d/%d\n", len(pubkeys), cap(pubkeys)) + log.Printf("walks tracker: %v\n", pipe.WalksTracker.Load()) + log.Printf("goroutines: %d\n", goroutines) + log.Printf("memory usage: %.2f MB\n", float64(memStats.Alloc)/(1024*1024)) + log.Println("---------------------------------------") + } + } +} diff --git a/cmd/config.go b/pkg/config/config.go similarity index 95% rename from cmd/config.go rename to pkg/config/config.go index 8a601f9..e5cb856 100644 --- a/cmd/config.go +++ b/pkg/config/config.go @@ -1,4 +1,4 @@ -package main +package config import ( "fmt" @@ -50,8 +50,8 @@ type Config struct { Engine pipe.EngineConfig } -// NewConfig returns a config with default parameters -func NewConfig() *Config { +// New returns a config with default parameters +func New() *Config { return &Config{ SystemConfig: NewSystemConfig(), Firehose: pipe.NewFirehoseConfig(), @@ -69,9 +69,9 @@ func (c *Config) Print() { c.Engine.Print() } -// LoadConfig reads the enviroment variables and parses them into a [Config] struct -func LoadConfig() (*Config, error) { - var config = NewConfig() +// Load reads the enviroment variables and parses them into a [Config] struct +func Load() (*Config, error) { + var config = New() var err error for _, item := range os.Environ() { diff --git a/pkg/pipe/intake.go b/pkg/pipe/intake.go index 3ab997d..9a86a41 100644 --- a/pkg/pipe/intake.go +++ b/pkg/pipe/intake.go @@ -11,7 +11,7 @@ import ( ) var ( - relevantKinds = []int{ + Kinds = []int{ //nostr.KindProfileMetadata, nostr.KindFollowList, } @@ -98,7 +98,7 @@ func (b *buffer) Contains(ID string) bool { return slices.Contains(b.IDs, ID) } -// Firehose connects to a list of relays and pulls [relevantKinds] events that are newer than [FirehoseConfig.Since]. +// Firehose connects to a list of relays and pulls [Kinds] events that are newer than [FirehoseConfig.Since]. // It discards events from unknown pubkeys as an anti-spam mechanism. func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, send func(*nostr.Event) error) { defer log.Println("Firehose: shutting down...") @@ -107,7 +107,7 @@ func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, s defer shutdown(pool) filter := nostr.Filter{ - Kinds: relevantKinds, + Kinds: Kinds, Since: config.Since(), } @@ -218,7 +218,7 @@ func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, s } } -// fetch queries the [relevantKinds] of the specified pubkeys. +// fetch queries the [Kinds] of the specified pubkeys. func fetch(ctx context.Context, pool *nostr.SimplePool, relays, pubkeys []string) ([]*nostr.Event, error) { if len(pubkeys) == 0 { return nil, nil @@ -228,7 +228,7 @@ func fetch(ctx context.Context, pool *nostr.SimplePool, relays, pubkeys []string defer cancel() filter := nostr.Filter{ - Kinds: relevantKinds, + Kinds: Kinds, Authors: pubkeys, } diff --git a/pkg/pipe/intake_test.go b/pkg/pipe/intake_test.go index b2623e1..007f3bc 100644 --- a/pkg/pipe/intake_test.go +++ b/pkg/pipe/intake_test.go @@ -37,7 +37,7 @@ func TestFetch(t *testing.T) { t.Fatalf("expected error nil, got %v", err) } - expected := len(pubkeys) * len(relevantKinds) + expected := len(pubkeys) * len(Kinds) if len(events) != expected { t.Fatalf("expected %d events, got %d", expected, len(events)) }