From a0d4dd5ad8031cda1acd659deadc6062f058b9ac Mon Sep 17 00:00:00 2001 From: pippellia-btc Date: Tue, 10 Jun 2025 16:15:22 +0200 Subject: [PATCH] added pipeline fetcher db --- cmd/crawler/main.go | 4 ++- cmd/sync/main.go | 31 ++++++++------------ pkg/pipe/engine.go | 3 +- pkg/pipe/intake.go | 69 ++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 84 insertions(+), 23 deletions(-) diff --git a/cmd/crawler/main.go b/cmd/crawler/main.go index 466584b..ecb8902 100644 --- a/cmd/crawler/main.go +++ b/cmd/crawler/main.go @@ -74,6 +74,8 @@ func main() { log.Printf("correctly added %d init pubkeys", len(config.InitPubkeys)) } + go printStats(ctx, events, pubkeys) + var producers sync.WaitGroup var consumers sync.WaitGroup @@ -91,7 +93,7 @@ func main() { go func() { defer producers.Done() pipe.Arbiter(ctx, config.Arbiter, db, enqueue(pubkeys)) - close(pubkeys) // Arbiter is the only pubkey sender + close(pubkeys) // Arbiter is the only pubkey producer }() consumers.Add(1) diff --git a/cmd/sync/main.go b/cmd/sync/main.go index 981bc8f..3f14d9b 100644 --- a/cmd/sync/main.go +++ b/cmd/sync/main.go @@ -85,36 +85,29 @@ func main() { nostr.KindFollowList, // no need to sync other event kinds } - var producers sync.WaitGroup - var consumers sync.WaitGroup + go printStats(ctx, events, pubkeys) + + var wg sync.WaitGroup + wg.Add(3) - producers.Add(3) go func() { - defer producers.Done() - pipe.Firehose(ctx, config.Firehose, db, enqueue(events)) + defer wg.Done() + pipe.FetcherDB(ctx, config.Fetcher, store, pubkeys, enqueue(events)) + close(events) // FetcherDB is the only event producer }() go func() { - defer producers.Done() - pipe.Fetcher(ctx, config.Fetcher, pubkeys, enqueue(events)) // TODO: fetch from the event store - }() - - go func() { - defer producers.Done() + defer wg.Done() pipe.Arbiter(ctx, config.Arbiter, db, enqueue(pubkeys)) - close(pubkeys) // Arbiter is the only pubkey sender + close(pubkeys) // Arbiter is the only pubkey producer }() - consumers.Add(1) go func() { - defer consumers.Done() - pipe.GraphBuilder(ctx, config.Engine, store, db, events) + defer wg.Done() + pipe.GraphBuilder(ctx, config.Engine, db, events) }() - producers.Wait() - close(events) - - consumers.Wait() + wg.Wait() } // handleSignals listens for OS signals and triggers context cancellation. diff --git a/pkg/pipe/engine.go b/pkg/pipe/engine.go index 8063ec5..3112f85 100644 --- a/pkg/pipe/engine.go +++ b/pkg/pipe/engine.go @@ -54,7 +54,7 @@ func Engine( graphEvents := make(chan *nostr.Event, config.BuilderCapacity) defer close(graphEvents) - go GraphBuilder(ctx, config, store, db, graphEvents) + go GraphBuilder(ctx, config, db, graphEvents) log.Println("Engine: ready to process events") Archiver(ctx, config, store, events, func(e *nostr.Event) error { @@ -144,7 +144,6 @@ func archive( func GraphBuilder( ctx context.Context, config EngineConfig, - store *eventstore.Store, db redb.RedisDB, events chan *nostr.Event) { diff --git a/pkg/pipe/intake.go b/pkg/pipe/intake.go index 483bddb..b973a83 100644 --- a/pkg/pipe/intake.go +++ b/pkg/pipe/intake.go @@ -8,6 +8,7 @@ import ( "time" "github.com/nbd-wtf/go-nostr" + "github.com/vertex-lab/relay/pkg/eventstore" ) var ( @@ -157,7 +158,7 @@ func (c FetcherConfig) Print() { fmt.Printf(" Interval: %v\n", c.Interval) } -// Fetcher extracts pubkeys from the channel and queries for their events: +// Fetcher extracts pubkeys from the channel and queries relays for their events: // - when the batch is bigger than config.Batch // - after config.Interval since the last query. func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, send func(*nostr.Event) error) { @@ -250,6 +251,72 @@ func fetch(ctx context.Context, pool *nostr.SimplePool, relays, pubkeys []string return events, nil } +// Fetcher extracts pubkeys from the channel and queries the store for their events: +// - when the batch is bigger than config.Batch +// - after config.Interval since the last query. +func FetcherDB( + ctx context.Context, + config FetcherConfig, + store *eventstore.Store, + pubkeys <-chan string, + send func(*nostr.Event) error) { + + defer log.Println("FetcherDB: shutting down...") + + batch := make([]string, 0, config.Batch) + timer := time.After(config.Interval) + + for { + select { + case <-ctx.Done(): + return + + case pubkey, ok := <-pubkeys: + if !ok { + return + } + + batch = append(batch, pubkey) + if len(batch) < config.Batch { + continue + } + + events, err := store.Query(ctx, &nostr.Filter{Kinds: Kinds, Authors: batch}) + if err != nil { + log.Printf("FetcherDB: %v", err) + } + + for _, event := range events { + if err := send(&event); err != nil { + log.Printf("FetcherDB: %v", err) + } + } + + batch = make([]string, 0, config.Batch) + timer = time.After(config.Interval) + + case <-timer: + if len(batch) == 0 { + continue + } + + events, err := store.Query(ctx, &nostr.Filter{Kinds: Kinds, Authors: batch}) + if err != nil { + log.Printf("FetcherDB: %v", err) + } + + for _, event := range events { + if err := send(&event); err != nil { + log.Printf("FetcherDB: %v", err) + } + } + + batch = make([]string, 0, config.Batch) + timer = time.After(config.Interval) + } + } +} + // Shutdown iterates over the relays in the pool and closes all connections. func shutdown(pool *nostr.SimplePool) { pool.Relays.Range(func(_ string, relay *nostr.Relay) bool {