From c489203932685d179ecf7490ab516f427e38bca9 Mon Sep 17 00:00:00 2001 From: pippellia-btc Date: Mon, 15 Sep 2025 17:54:01 +0200 Subject: [PATCH] using cache for faster existance check in Firehose --- cmd/crawl/main.go | 3 ++- pkg/pipe/firehose.go | 54 ++++++++++++++++++++++++++++++++----------- pkg/pipe/pipe_test.go | 10 ++++---- 3 files changed, 47 insertions(+), 20 deletions(-) diff --git a/cmd/crawl/main.go b/cmd/crawl/main.go index 9cd7de1..d8a31fd 100644 --- a/cmd/crawl/main.go +++ b/cmd/crawl/main.go @@ -83,7 +83,8 @@ func main() { producers.Add(4) go func() { defer producers.Done() - pipe.Firehose(ctx, config.Firehose, db, pipe.Send(recorderQueue)) + gate := pipe.NewExistenceGate(db) + pipe.Firehose(ctx, config.Firehose, gate, pipe.Send(recorderQueue)) }() go func() { diff --git a/pkg/pipe/firehose.go b/pkg/pipe/firehose.go index 64da286..fb0894e 100644 --- a/pkg/pipe/firehose.go +++ b/pkg/pipe/firehose.go @@ -8,6 +8,7 @@ import ( "time" "github.com/nbd-wtf/go-nostr" + "github.com/vertex-lab/crawler_v2/pkg/redb" ) var ( @@ -100,10 +101,6 @@ func (c FirehoseConfig) Print() { fmt.Printf(" Offset: %v\n", c.Offset) } -type PubkeyChecker interface { - Exists(ctx context.Context, pubkey string) (bool, error) -} - type Forward[T any] func(T) error // Send returns a [Forward] function that will attempt to send values into the given channel. @@ -119,14 +116,50 @@ func Send[T any](ch chan T) Forward[T] { } } +type PubkeyGate interface { + Allows(ctx context.Context, pubkey string) bool +} + +// ExistenceGate is a [PubkeyGate] that allows pubkeys if they exists in the database. +// The assumption is that keys can't be removed from the database. +type ExistenceGate struct { + exists map[string]struct{} + fallback redb.RedisDB +} + +func NewExistenceGate(fallback redb.RedisDB) *ExistenceGate { + return &ExistenceGate{ + exists: make(map[string]struct{}), + fallback: fallback, + } +} + +func (g *ExistenceGate) Allows(ctx context.Context, pubkey string) bool { + if _, ok := g.exists[pubkey]; ok { + return true + } + + exists, err := g.fallback.Exists(ctx, pubkey) + if err != nil { + log.Printf("ExistanceGate: %v", err) + return false + } + + if exists { + g.exists[pubkey] = struct{}{} + return true + } + return false +} + // Firehose connects to a list of relays and pulls config.Kinds events that are newer than config.Since. // It deduplicate events using a simple ring-buffer. -// It discards events from unknown pubkeys as an anti-spam mechanism. +// It applies the [PubkeyGate] to remove events from undesired pubkeys. // It forwards the rest using the provided [Forward] function. func Firehose( ctx context.Context, config FirehoseConfig, - check PubkeyChecker, + gate PubkeyGate, forward Forward[*nostr.Event], ) { log.Println("Firehose: ready") @@ -147,14 +180,7 @@ func Firehose( } seen.Add(event.ID) - exists, err := check.Exists(ctx, event.PubKey) - if err != nil { - log.Printf("Firehose: %v", err) - continue - } - - if !exists { - // event from unknown pubkey, skip + if !gate.Allows(ctx, event.PubKey) { continue } diff --git a/pkg/pipe/pipe_test.go b/pkg/pipe/pipe_test.go index 4f1c327..e8e2d86 100644 --- a/pkg/pipe/pipe_test.go +++ b/pkg/pipe/pipe_test.go @@ -19,12 +19,12 @@ var ( pip string = "f683e87035f7ad4f44e0b98cfbd9537e16455a92cd38cefc4cb31db7557f5ef2" ) -type mockChecker struct { +type mockGate struct { pubkey string } -func (c mockChecker) Exists(ctx context.Context, pubkey string) (bool, error) { - return pubkey == c.pubkey, nil +func (g mockGate) Allows(ctx context.Context, pubkey string) bool { + return pubkey == g.pubkey } func print(e *nostr.Event) error { @@ -39,9 +39,9 @@ func TestFirehose(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, time.Second*20) defer cancel() - checker := mockChecker{pubkey: pip} + gate := mockGate{pubkey: pip} config := NewFirehoseConfig() - Firehose(ctx, config, checker, print) + Firehose(ctx, config, gate, print) } func TestFetch(t *testing.T) {