mirror of
https://github.com/aljazceru/crawler_v2.git
synced 2025-12-17 07:24:21 +01:00
added recorder to /crawl
This commit is contained in:
@@ -39,8 +39,9 @@ func main() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
events := make(chan *nostr.Event, config.EventsCapacity)
|
||||
pubkeys := make(chan string, config.PubkeysCapacity)
|
||||
db := redb.New(&redis.Options{
|
||||
Addr: config.RedisAddress,
|
||||
})
|
||||
|
||||
store, err := store.New(
|
||||
config.SQLiteURL,
|
||||
@@ -50,86 +51,96 @@ func main() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
db := redb.New(&redis.Options{Addr: config.RedisAddress})
|
||||
recorderQueue := make(chan *nostr.Event, config.EventsCapacity)
|
||||
engineQueue := make(chan *nostr.Event, config.EventsCapacity)
|
||||
fetcherQueue := make(chan string, config.PubkeysCapacity)
|
||||
|
||||
nodes, err := db.NodeCount(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if nodes == 0 {
|
||||
if len(config.InitPubkeys) == 0 {
|
||||
panic("init pubkeys are empty: impossible to initialize")
|
||||
if err := initGraph(ctx, db, config.InitPubkeys); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
log.Println("initialize from empty database...")
|
||||
|
||||
initNodes := make([]graph.ID, len(config.InitPubkeys))
|
||||
for i, pk := range config.InitPubkeys {
|
||||
initNodes[i], err = db.AddNode(ctx, pk)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
pubkeys <- pk // add to queue
|
||||
for _, pk := range config.InitPubkeys {
|
||||
fetcherQueue <- pk
|
||||
}
|
||||
|
||||
for _, node := range initNodes {
|
||||
if err := pipe.Promote(db, node); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
log.Printf("correctly added %d init pubkeys", len(config.InitPubkeys))
|
||||
}
|
||||
|
||||
if config.PrintStats {
|
||||
go printStats(ctx, events, pubkeys)
|
||||
go printStats(ctx, recorderQueue, engineQueue, fetcherQueue)
|
||||
}
|
||||
|
||||
var producers sync.WaitGroup
|
||||
var consumers sync.WaitGroup
|
||||
|
||||
producers.Add(3)
|
||||
producers.Add(4)
|
||||
go func() {
|
||||
defer producers.Done()
|
||||
pipe.Firehose(ctx, config.Firehose, db, enqueue(events))
|
||||
pipe.Firehose(ctx, config.Firehose, db, pipe.Send(recorderQueue))
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer producers.Done()
|
||||
pipe.Fetcher(ctx, config.Fetcher, pubkeys, enqueue(events))
|
||||
pipe.Recorder(ctx, db, recorderQueue, pipe.Send(engineQueue))
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer producers.Done()
|
||||
pipe.Arbiter(ctx, config.Arbiter, db, enqueue(pubkeys))
|
||||
close(pubkeys) // Arbiter is the only pubkey producer
|
||||
pipe.Fetcher(ctx, config.Fetcher, fetcherQueue, pipe.Send(engineQueue))
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer producers.Done()
|
||||
pipe.Arbiter(ctx, config.Arbiter, db, pipe.Send(fetcherQueue))
|
||||
close(fetcherQueue) // Arbiter is the only pubkey producer
|
||||
}()
|
||||
|
||||
consumers.Add(1)
|
||||
go func() {
|
||||
defer consumers.Done()
|
||||
pipe.Engine(ctx, config.Engine, store, db, events)
|
||||
pipe.Engine(ctx, config.Engine, store, db, engineQueue)
|
||||
}()
|
||||
|
||||
producers.Wait()
|
||||
close(events)
|
||||
|
||||
close(engineQueue)
|
||||
consumers.Wait()
|
||||
}
|
||||
|
||||
// enqueue things into the specified channel or return an error if full.
|
||||
func enqueue[T any](queue chan T) func(t T) error {
|
||||
return func(t T) error {
|
||||
select {
|
||||
case queue <- t:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("channel is full, dropping %v", t)
|
||||
func initGraph(ctx context.Context, db redb.RedisDB, pubkeys []string) error {
|
||||
if len(pubkeys) == 0 {
|
||||
panic("init pubkeys are empty: impossible to initialize")
|
||||
}
|
||||
log.Println("initialize from empty database...")
|
||||
|
||||
var initNodes = make([]graph.ID, len(pubkeys))
|
||||
var err error
|
||||
|
||||
for i, pk := range pubkeys {
|
||||
initNodes[i], err = db.AddNode(ctx, pk)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, node := range initNodes {
|
||||
if err := pipe.Promote(db, node); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("correctly added %d init pubkeys", len(pubkeys))
|
||||
return nil
|
||||
}
|
||||
|
||||
func printStats(ctx context.Context, events chan *nostr.Event, pubkeys chan string) {
|
||||
func printStats(
|
||||
ctx context.Context,
|
||||
recorderQueue, engineQueue chan *nostr.Event,
|
||||
fetcherQueue chan string,
|
||||
) {
|
||||
filename := "stats.log"
|
||||
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
@@ -153,8 +164,9 @@ func printStats(ctx context.Context, events chan *nostr.Event, pubkeys chan stri
|
||||
runtime.ReadMemStats(memStats)
|
||||
|
||||
log.Println("---------------------------------------")
|
||||
log.Printf("events queue: %d/%d\n", len(events), cap(events))
|
||||
log.Printf("pubkeys queue: %d/%d\n", len(pubkeys), cap(pubkeys))
|
||||
log.Printf("Recorder queue: %d/%d\n", len(recorderQueue), cap(recorderQueue))
|
||||
log.Printf("Engine queue: %d/%d\n", len(engineQueue), cap(engineQueue))
|
||||
log.Printf("Fetcher queue: %d/%d\n", len(fetcherQueue), cap(fetcherQueue))
|
||||
log.Printf("walks tracker: %v\n", pipe.WalksTracker.Load())
|
||||
log.Printf("goroutines: %d\n", goroutines)
|
||||
log.Printf("memory usage: %.2f MB\n", float64(memStats.Alloc)/(1024*1024))
|
||||
|
||||
Reference in New Issue
Block a user