From 768cec8fea82df80bb01e1c70e0ca9c593e56b3c Mon Sep 17 00:00:00 2001 From: pippellia-btc Date: Thu, 29 May 2025 17:05:57 +0200 Subject: [PATCH] Firehose working --- pkg/pipe/pipe.go | 144 +++++++++++++++++++++++++++++++++++++++++ pkg/pipe/pipe_test.go | 41 ++++++++++++ pkg/redb/graph.go | 9 +++ pkg/redb/graph_test.go | 2 + 4 files changed, 196 insertions(+) create mode 100644 pkg/pipe/pipe.go create mode 100644 pkg/pipe/pipe_test.go diff --git a/pkg/pipe/pipe.go b/pkg/pipe/pipe.go new file mode 100644 index 0000000..c8c0813 --- /dev/null +++ b/pkg/pipe/pipe.go @@ -0,0 +1,144 @@ +package pipe + +import ( + "context" + "fmt" + "log" + "slices" + "time" + + "github.com/nbd-wtf/go-nostr" +) + +var ( + relevantKinds = []int{ + nostr.KindProfileMetadata, + nostr.KindFollowList, + } + + defaultRelays = []string{ + "wss://purplepag.es", + "wss://njump.me", + "wss://relay.snort.social", + "wss://relay.damus.io", + "wss://relay.primal.net", + "wss://relay.nostr.band", + "wss://nostr-pub.wellorder.net", + "wss://relay.nostr.net", + "wss://nostr.lu.ke", + "wss://nostr.at", + "wss://e.nos.lol", + "wss://nostr.lopp.social", + "wss://nostr.vulpem.com", + "wss://relay.nostr.bg", + "wss://wot.utxo.one", + "wss://nostrelites.org", + "wss://wot.nostr.party", + "wss://wot.sovbit.host", + "wss://wot.girino.org", + "wss://relay.lnau.net", + "wss://wot.siamstr.com", + "wss://wot.sudocarlos.com", + "wss://relay.otherstuff.fyi", + "wss://relay.lexingtonbitcoin.org", + "wss://wot.azzamo.net", + "wss://wot.swarmstr.com", + "wss://zap.watch", + "wss://satsage.xyz", + } +) + +type FirehoseConfig struct { + Relays []string + Offset time.Duration +} + +func NewFirehoseConfig() FirehoseConfig { + return FirehoseConfig{ + Relays: defaultRelays, + Offset: 10 * time.Second, + } +} + +func (c FirehoseConfig) Since() *nostr.Timestamp { + since := nostr.Timestamp(time.Now().Add(-c.Offset).Unix()) + return &since +} + +func (c FirehoseConfig) Print() { + fmt.Printf("Firehose\n") + fmt.Printf(" Relays: %v\n", c.Relays) + fmt.Printf(" Offset: %v\n", c.Offset) +} + +type PubkeyChecker interface { + Exists(ctx context.Context, pubkey string) (bool, error) +} + +// buffer is a minimalistic ring buffer used to keep track of the latest event IDs +type buffer struct { + IDs []string + capacity int + write int +} + +func newBuffer(capacity int) *buffer { + return &buffer{ + IDs: make([]string, capacity), + capacity: capacity, + } +} + +func (b *buffer) Add(ID string) { + b.IDs[b.write] = ID + b.write = (b.write + 1) % b.capacity +} + +func (b *buffer) Contains(ID string) bool { + return slices.Contains(b.IDs, ID) +} + +// Firehose connects to a list of relays and pulls [relevantKinds] events that are newer than [FirehoseConfig.Since]. +// It discards events from unknown pubkeys as an anti-spam mechanism. +func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, send func(e *nostr.Event) error) { + pool := nostr.NewSimplePool(ctx) + defer close(pool) + + filter := nostr.Filter{ + Kinds: relevantKinds, + Since: config.Since(), + } + + seen := newBuffer(2048) + for event := range pool.SubscribeMany(ctx, config.Relays, filter) { + + if seen.Contains(event.ID) { + // event already seen, skip + continue + } + seen.Add(event.ID) + + exists, err := check.Exists(ctx, event.PubKey) + if err != nil { + log.Printf("Firehose: %v", err) + continue + } + + if !exists { + // event from unknown pubkey, skip + continue + } + + if err := send(event.Event); err != nil { + log.Printf("Firehose: %v", err) + } + } +} + +// Close iterates over the relays in the pool and closes all connections. +func close(pool *nostr.SimplePool) { + pool.Relays.Range(func(_ string, relay *nostr.Relay) bool { + relay.Close() + return true + }) +} diff --git a/pkg/pipe/pipe_test.go b/pkg/pipe/pipe_test.go new file mode 100644 index 0000000..6d4c16e --- /dev/null +++ b/pkg/pipe/pipe_test.go @@ -0,0 +1,41 @@ +package pipe + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/nbd-wtf/go-nostr" +) + +var ( + ctx = context.Background() + + pip = "f683e87035f7ad4f44e0b98cfbd9537e16455a92cd38cefc4cb31db7557f5ef2" +) + +// Manually change pip's follow list and see if the events gets printed. Works only with `go test` +func TestFirehose(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, time.Second*20) + defer cancel() + + checker := mockChecker{pubkey: pip} + config := FirehoseConfig{Relays: defaultRelays} + Firehose(ctx, config, checker, print) +} + +type mockChecker struct { + pubkey string +} + +func (c mockChecker) Exists(ctx context.Context, pubkey string) (bool, error) { + return pubkey == c.pubkey, nil +} + +func print(e *nostr.Event) error { + fmt.Printf("\nevent ID: %v", e.ID) + fmt.Printf("\nevent pubkey: %v", e.PubKey) + fmt.Printf("\nevent kind: %d\n", e.Kind) + return nil +} diff --git a/pkg/redb/graph.go b/pkg/redb/graph.go index 86e75a6..a660d6b 100644 --- a/pkg/redb/graph.go +++ b/pkg/redb/graph.go @@ -101,6 +101,15 @@ func (r RedisDB) NodeByKey(ctx context.Context, pubkey string) (*graph.Node, err return parseNode(fields) } +// Exists checks for the existance of the pubkey +func (r RedisDB) Exists(ctx context.Context, pubkey string) (bool, error) { + exists, err := r.client.HExists(ctx, KeyKeyIndex, pubkey).Result() + if err != nil { + return false, fmt.Errorf("failed to check existance of pubkey %s: %w", pubkey, err) + } + return exists, nil +} + func (r RedisDB) ensureExists(ctx context.Context, IDs ...graph.ID) error { if len(IDs) == 0 { return nil diff --git a/pkg/redb/graph_test.go b/pkg/redb/graph_test.go index ea30be8..227bb16 100644 --- a/pkg/redb/graph_test.go +++ b/pkg/redb/graph_test.go @@ -5,6 +5,7 @@ import ( "errors" "github/pippellia-btc/crawler/pkg/graph" "github/pippellia-btc/crawler/pkg/pagerank" + "github/pippellia-btc/crawler/pkg/pipe" "github/pippellia-btc/crawler/pkg/walks" "reflect" "testing" @@ -375,6 +376,7 @@ func TestInterfaces(t *testing.T) { var _ walks.Walker = RedisDB{} var _ pagerank.VisitCounter = RedisDB{} var _ pagerank.PersonalizedLoader = RedisDB{} + var _ pipe.PubkeyChecker = RedisDB{} } // ------------------------------------- HELPERS -------------------------------