simplified enqueing

This commit is contained in:
pippellia-btc
2025-06-05 18:14:18 +02:00
parent 7ed167ae84
commit 54db3ef429

View File

@@ -42,7 +42,7 @@ func main() {
}
if count == 0 {
log.Println("initializing crawler from empty database")
log.Println("initialize from empty database")
nodes := make([]graph.ID, len(config.InitPubkeys))
for i, pk := range config.InitPubkeys {
@@ -67,46 +67,29 @@ func main() {
}
var wg sync.WaitGroup
wg.Add(3)
wg.Add(4)
go func() {
defer wg.Done()
pipe.Firehose(ctx, config.Firehose, db, func(event *nostr.Event) error {
select {
case events <- event:
default:
log.Printf("Firehose: channel is full, dropping event ID %s by %s", event.ID, event.PubKey)
}
return nil
})
pipe.Firehose(ctx, config.Firehose, db, enqueue(events))
}()
go func() {
defer wg.Done()
pipe.Fetcher(ctx, config.Fetcher, pubkeys, func(event *nostr.Event) error {
select {
case events <- event:
default:
log.Printf("Fetcher: channel is full, dropping event ID %s by %s", event.ID, event.PubKey)
}
return nil
})
pipe.Fetcher(ctx, config.Fetcher, pubkeys, enqueue(events))
}()
go func() {
defer wg.Done()
pipe.Arbiter(ctx, config.Arbiter, db, func(pubkey string) error {
select {
case pubkeys <- pubkey:
default:
log.Printf("Arbiter: channel is full, dropping pubkey %s", pubkey)
}
return nil
})
pipe.Arbiter(ctx, config.Arbiter, db, enqueue(pubkeys))
}()
go func() {
defer wg.Done()
pipe.Processor(ctx, config.Processor, db, events)
}()
go printStats(ctx)
pipe.Processor(ctx, config.Processor, db, events)
wg.Wait()
}
@@ -120,6 +103,18 @@ func handleSignals(cancel context.CancelFunc) {
cancel()
}
// enqueue things into channels
func enqueue[T any](queue chan T) func(t T) error {
return func(t T) error {
select {
case queue <- t:
default:
log.Printf("channel is full, dropping %v", t)
}
return nil
}
}
func printStats(ctx context.Context) {
filename := "stats.log"
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)