using helper function pipe.Send

This commit is contained in:
pippellia-btc
2025-09-14 19:01:07 +02:00
parent f70fcb2404
commit 44f235adfb
3 changed files with 46 additions and 37 deletions

View File

@@ -39,15 +39,18 @@ func main() {
config.Fetcher.Kinds = []int{nostr.KindFollowList} // no need to sync other event kinds config.Fetcher.Kinds = []int{nostr.KindFollowList} // no need to sync other event kinds
events := make(chan *nostr.Event, config.EventsCapacity) db := redb.New(&redis.Options{
pubkeys := make(chan string, config.PubkeysCapacity) Addr: config.RedisAddress,
})
store, err := store.New(config.SQLiteURL) store, err := store.New(config.SQLiteURL)
if err != nil { if err != nil {
panic(err) panic(err)
} }
db := redb.New(&redis.Options{Addr: config.RedisAddress}) builderQueue := make(chan *nostr.Event, config.EventsCapacity)
fetcherQueue := make(chan string, config.PubkeysCapacity)
nodes, err := db.NodeCount(ctx) nodes, err := db.NodeCount(ctx)
if err != nil { if err != nil {
panic(err) panic(err)
@@ -58,7 +61,7 @@ func main() {
} }
if len(config.InitPubkeys) == 0 { if len(config.InitPubkeys) == 0 {
panic("init pubkeys are empty: impossible to initialize") panic("init fetcherQueue are empty: impossible to initialize")
} }
log.Println("initialize from empty database...") log.Println("initialize from empty database...")
@@ -69,7 +72,7 @@ func main() {
panic(err) panic(err)
} }
pubkeys <- pk // add to queue fetcherQueue <- pk // add to queue
} }
for _, node := range initNodes { for _, node := range initNodes {
@@ -77,10 +80,10 @@ func main() {
panic(err) panic(err)
} }
} }
log.Printf("correctly added %d init pubkeys", len(config.InitPubkeys)) log.Printf("correctly added %d init fetcherQueue", len(config.InitPubkeys))
if config.PrintStats { if config.PrintStats {
go printStats(ctx, events, pubkeys) go printStats(ctx, builderQueue, fetcherQueue)
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@@ -88,37 +91,29 @@ func main() {
go func() { go func() {
defer wg.Done() defer wg.Done()
pipe.FetcherDB(ctx, config.Fetcher, store, pubkeys, enqueue(events)) pipe.FetcherDB(ctx, config.Fetcher, store, fetcherQueue, pipe.Send(builderQueue))
close(events) // FetcherDB is the only event producer close(builderQueue) // FetcherDB is the only event producer
}() }()
go func() { go func() {
defer wg.Done() defer wg.Done()
pipe.Arbiter(ctx, config.Arbiter, db, enqueue(pubkeys)) pipe.Arbiter(ctx, config.Arbiter, db, pipe.Send(fetcherQueue))
close(pubkeys) // Arbiter is the only pubkey producer close(fetcherQueue) // Arbiter is the only pubkey producer
}() }()
go func() { go func() {
defer wg.Done() defer wg.Done()
pipe.GraphBuilder(ctx, config.Engine, db, events) pipe.GraphBuilder(ctx, config.Engine, db, builderQueue)
}() }()
wg.Wait() wg.Wait()
} }
// enqueue things into the specified channel or return an error if full. func printStats(
func enqueue[T any](queue chan T) func(t T) error { ctx context.Context,
return func(t T) error { builderQueue chan *nostr.Event,
select { fetcherQueue chan string,
case queue <- t: ) {
return nil
default:
return fmt.Errorf("channel is full, dropping %v", t)
}
}
}
func printStats(ctx context.Context, events chan *nostr.Event, pubkeys chan string) {
filename := "stats.log" filename := "stats.log"
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil { if err != nil {
@@ -142,8 +137,8 @@ func printStats(ctx context.Context, events chan *nostr.Event, pubkeys chan stri
runtime.ReadMemStats(memStats) runtime.ReadMemStats(memStats)
log.Println("---------------------------------------") log.Println("---------------------------------------")
log.Printf("events queue: %d/%d\n", len(events), cap(events)) log.Printf("GraphBuilder queue: %d/%d\n", len(builderQueue), cap(builderQueue))
log.Printf("pubkeys queue: %d/%d\n", len(pubkeys), cap(pubkeys)) log.Printf("FetcherDB queue: %d/%d\n", len(fetcherQueue), cap(fetcherQueue))
log.Printf("walks tracker: %v\n", pipe.WalksTracker.Load()) log.Printf("walks tracker: %v\n", pipe.WalksTracker.Load())
log.Printf("goroutines: %d\n", goroutines) log.Printf("goroutines: %d\n", goroutines)
log.Printf("memory usage: %.2f MB\n", float64(memStats.Alloc)/(1024*1024)) log.Printf("memory usage: %.2f MB\n", float64(memStats.Alloc)/(1024*1024))

View File

@@ -3,7 +3,6 @@ package pipe
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"log" "log"
"time" "time"
@@ -48,22 +47,14 @@ func Engine(
db redb.RedisDB, db redb.RedisDB,
events chan *nostr.Event, events chan *nostr.Event,
) { ) {
graphEvents := make(chan *nostr.Event, config.BuilderCapacity) graphEvents := make(chan *nostr.Event, config.BuilderCapacity)
defer close(graphEvents) defer close(graphEvents)
log.Println("Engine: ready to process events")
defer log.Println("Engine: shut down")
go GraphBuilder(ctx, config, db, graphEvents) go GraphBuilder(ctx, config, db, graphEvents)
Archiver(ctx, config, store, events, func(e *nostr.Event) error { Archiver(ctx, config, store, events, func(e *nostr.Event) error {
if e.Kind == nostr.KindFollowList { if e.Kind == nostr.KindFollowList {
select { return Send(graphEvents)(e)
case graphEvents <- e:
default:
return errors.New("channel is full")
}
} }
return nil return nil
}) })
@@ -77,6 +68,8 @@ func Archiver(
events chan *nostr.Event, events chan *nostr.Event,
onReplace func(*nostr.Event) error, onReplace func(*nostr.Event) error,
) { ) {
log.Println("Archiver: ready")
defer log.Println("Archiver: shut down")
var processed int var processed int
@@ -133,6 +126,8 @@ func GraphBuilder(
db redb.RedisDB, db redb.RedisDB,
events chan *nostr.Event, events chan *nostr.Event,
) { ) {
log.Println("GraphBuilder: ready")
defer log.Println("GraphBuilder: shut down")
cache := walks.NewWalker( cache := walks.NewWalker(
walks.WithCapacity(config.CacheCapacity), walks.WithCapacity(config.CacheCapacity),
@@ -151,6 +146,11 @@ func GraphBuilder(
return return
} }
if event.Kind != nostr.KindFollowList {
log.Printf("GraphBuilder: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, "unexpected kind")
continue
}
err := func() error { err := func() error {
opctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) opctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()

View File

@@ -77,6 +77,19 @@ type PubkeyChecker interface {
type Forward[T any] func(T) error type Forward[T any] func(T) error
// Send returns a [Forward] function that will attempt to send values into the given channel.
// It returns an error if the channel is full.
func Send[T any](ch chan T) Forward[T] {
return func(t T) error {
select {
case ch <- t:
return nil
default:
return fmt.Errorf("channel is full, dropping %v", t)
}
}
}
// Firehose connects to a list of relays and pulls config.Kinds events that are newer than config.Since. // Firehose connects to a list of relays and pulls config.Kinds events that are newer than config.Since.
// It deduplicate events using a simple ring-buffer. // It deduplicate events using a simple ring-buffer.
// It discards events from unknown pubkeys as an anti-spam mechanism. // It discards events from unknown pubkeys as an anti-spam mechanism.
@@ -87,6 +100,7 @@ func Firehose(
check PubkeyChecker, check PubkeyChecker,
forward Forward[*nostr.Event], forward Forward[*nostr.Event],
) { ) {
log.Println("Firehose: ready")
defer log.Println("Firehose: shut down") defer log.Println("Firehose: shut down")
pool := nostr.NewSimplePool(ctx) pool := nostr.NewSimplePool(ctx)