Files
crawler_v2/cmd/crawl/main.go
2025-09-15 17:54:01 +02:00

155 lines
3.6 KiB
Go

package main
import (
"context"
"fmt"
"log"
"os"
"runtime"
"sync"
"time"
"github.com/pippellia-btc/nastro/sqlite"
"github.com/vertex-lab/crawler_v2/pkg/config"
"github.com/vertex-lab/crawler_v2/pkg/pipe"
"github.com/vertex-lab/crawler_v2/pkg/redb"
"github.com/vertex-lab/crawler_v2/pkg/store"
"github.com/nbd-wtf/go-nostr"
"github.com/redis/go-redis/v9"
)
/*
This programs assumes syncronization between Redis and the event store, meaning
that the graph in Redis reflects these events.
If that is not the case, go run /cmd/sync/ to syncronize Redis with the event store.
*/
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go pipe.HandleSignals(cancel)
log.Printf("--------- starting up the crawler --------")
defer log.Printf("-----------------------------------------")
config, err := config.Load()
if err != nil {
panic(err)
}
db := redb.New(&redis.Options{
Addr: config.RedisAddress,
})
store, err := store.New(
config.SQLiteURL,
sqlite.WithEventPolicy(pipe.EventTooBig),
)
if err != nil {
panic(err)
}
recorderQueue := make(chan *nostr.Event, config.EventsCapacity)
engineQueue := make(chan *nostr.Event, config.EventsCapacity)
fetcherQueue := make(chan string, config.PubkeysCapacity)
nodes, err := db.NodeCount(ctx)
if err != nil {
panic(err)
}
if nodes == 0 {
log.Println("initializing from empty database...")
if err := pipe.InitGraph(ctx, db, config.InitPubkeys); err != nil {
panic(err)
}
for _, pk := range config.InitPubkeys {
fetcherQueue <- pk
}
log.Printf("correctly added %d pubkeys", len(config.InitPubkeys))
}
if config.PrintStats {
go printStats(ctx, recorderQueue, engineQueue, fetcherQueue)
}
var producers sync.WaitGroup
var consumers sync.WaitGroup
producers.Add(4)
go func() {
defer producers.Done()
gate := pipe.NewExistenceGate(db)
pipe.Firehose(ctx, config.Firehose, gate, pipe.Send(recorderQueue))
}()
go func() {
defer producers.Done()
pipe.Recorder(ctx, db, recorderQueue, pipe.Send(engineQueue))
}()
go func() {
defer producers.Done()
pipe.Fetcher(ctx, config.Fetcher, fetcherQueue, pipe.Send(engineQueue))
}()
go func() {
defer producers.Done()
pipe.Arbiter(ctx, config.Arbiter, db, pipe.Send(fetcherQueue))
close(fetcherQueue) // Arbiter is the only pubkey producer
}()
consumers.Add(1)
go func() {
defer consumers.Done()
pipe.Engine(ctx, config.Engine, store, db, engineQueue)
}()
producers.Wait()
close(engineQueue)
consumers.Wait()
}
func printStats(
ctx context.Context,
recorderQueue, engineQueue chan *nostr.Event,
fetcherQueue chan string,
) {
filename := "stats.log"
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
panic(fmt.Errorf("failed to open log file %s: %w", filename, err))
}
defer file.Close()
log := log.New(file, "stats: ", log.LstdFlags)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
goroutines := runtime.NumGoroutine()
memStats := new(runtime.MemStats)
runtime.ReadMemStats(memStats)
log.Println("---------------------------------------")
log.Printf("Recorder queue: %d/%d\n", len(recorderQueue), cap(recorderQueue))
log.Printf("Engine queue: %d/%d\n", len(engineQueue), cap(engineQueue))
log.Printf("Fetcher queue: %d/%d\n", len(fetcherQueue), cap(fetcherQueue))
log.Printf("walks tracker: %v\n", pipe.WalksTracker.Load())
log.Printf("goroutines: %d\n", goroutines)
log.Printf("memory usage: %.2f MB\n", float64(memStats.Alloc)/(1024*1024))
log.Println("---------------------------------------")
}
}
}