From 018e74af81a913848e3fdab7055b89d51164fb50 Mon Sep 17 00:00:00 2001 From: pippellia-btc Date: Sun, 14 Sep 2025 17:21:30 +0200 Subject: [PATCH] style cleanup --- pkg/pipe/arbiter.go | 28 ++++-- pkg/pipe/engine.go | 9 +- pkg/pipe/{intake.go => fetcher.go} | 156 ++++------------------------- pkg/pipe/firehose.go | 122 ++++++++++++++++++++++ pkg/pipe/utils.go | 29 ++++++ 5 files changed, 197 insertions(+), 147 deletions(-) rename pkg/pipe/{intake.go => fetcher.go} (52%) create mode 100644 pkg/pipe/firehose.go diff --git a/pkg/pipe/arbiter.go b/pkg/pipe/arbiter.go index 3da310f..a58615a 100644 --- a/pkg/pipe/arbiter.go +++ b/pkg/pipe/arbiter.go @@ -48,16 +48,22 @@ func (c ArbiterConfig) Print() { // Arbiter activates when the % of walks changed is greater than a threshold. Then it: // - scans through all the nodes in the database // - promotes or demotes nodes -func Arbiter(ctx context.Context, config ArbiterConfig, db redb.RedisDB, send func(pk string) error) { +func Arbiter( + ctx context.Context, + config ArbiterConfig, + db redb.RedisDB, + forward Forward[string], +) { + defer log.Println("Arbiter: shut down") + ticker := time.NewTicker(config.PingWait) defer ticker.Stop() - WalksTracker.Add(1000_000_000) // trigger a scan at startup + WalksTracker.Add(100_000_000) // trigger a scan at startup for { select { case <-ctx.Done(): - log.Println("Arbiter: shut down") return case <-ticker.C: @@ -71,7 +77,7 @@ func Arbiter(ctx context.Context, config ArbiterConfig, db redb.RedisDB, send fu changeRatio := float64(changed) / float64(total) if changeRatio > config.Activation { - promoted, demoted, err := arbiterScan(ctx, config, db, send) + promoted, demoted, err := arbiterScan(ctx, config, db, forward) if err != nil && ctx.Err() == nil { log.Printf("Arbiter: %v", err) } @@ -84,14 +90,20 @@ func Arbiter(ctx context.Context, config ArbiterConfig, db redb.RedisDB, send fu } // ArbiterScan performs one entire database scan, promoting or demoting nodes based on their pagerank. -func arbiterScan(ctx context.Context, config ArbiterConfig, db redb.RedisDB, send func(pk string) error) (promoted, demoted int, err error) { +func arbiterScan( + ctx context.Context, + config ArbiterConfig, + db redb.RedisDB, + forward Forward[string], +) (promoted, demoted int, err error) { + maxTime := 2 * time.Minute ctx, cancel := context.WithTimeout(ctx, maxTime) defer cancel() baseRank, err := minPagerank(ctx, db) if err != nil { - return promoted, demoted, err + return 0, 0, err } promotionThreshold := baseRank * config.Promotion @@ -148,7 +160,7 @@ func arbiterScan(ctx context.Context, config ArbiterConfig, db redb.RedisDB, sen } promoted++ - if err := send(node.Pubkey); err != nil { + if err := forward(node.Pubkey); err != nil { return promoted, demoted, err } } @@ -156,7 +168,7 @@ func arbiterScan(ctx context.Context, config ArbiterConfig, db redb.RedisDB, sen } if cursor == 0 { - // returns to 0, the scan is complete + // the scan is complete return promoted, demoted, nil } } diff --git a/pkg/pipe/engine.go b/pkg/pipe/engine.go index e1fc576..7544daf 100644 --- a/pkg/pipe/engine.go +++ b/pkg/pipe/engine.go @@ -46,7 +46,8 @@ func Engine( config EngineConfig, store nastro.Store, db redb.RedisDB, - events chan *nostr.Event) { + events chan *nostr.Event, +) { graphEvents := make(chan *nostr.Event, config.BuilderCapacity) defer close(graphEvents) @@ -74,7 +75,8 @@ func Archiver( config EngineConfig, store nastro.Store, events chan *nostr.Event, - onReplace func(*nostr.Event) error) { + onReplace func(*nostr.Event) error, +) { var processed int @@ -129,7 +131,8 @@ func GraphBuilder( ctx context.Context, config EngineConfig, db redb.RedisDB, - events chan *nostr.Event) { + events chan *nostr.Event, +) { cache := walks.NewWalker( walks.WithCapacity(config.CacheCapacity), diff --git a/pkg/pipe/intake.go b/pkg/pipe/fetcher.go similarity index 52% rename from pkg/pipe/intake.go rename to pkg/pipe/fetcher.go index 9f01bae..a87d1da 100644 --- a/pkg/pipe/intake.go +++ b/pkg/pipe/fetcher.go @@ -2,7 +2,6 @@ package pipe import ( "context" - "errors" "fmt" "log" "time" @@ -11,106 +10,6 @@ import ( "github.com/pippellia-btc/nastro" ) -var ( - defaultRelays = []string{ - "wss://purplepag.es", - "wss://njump.me", - "wss://relay.snort.social", - "wss://relay.damus.io", - "wss://relay.primal.net", - "wss://relay.nostr.band", - "wss://nostr-pub.wellorder.net", - "wss://relay.nostr.net", - "wss://nostr.lu.ke", - "wss://nostr.at", - "wss://e.nos.lol", - "wss://nostr.lopp.social", - "wss://nostr.vulpem.com", - "wss://relay.nostr.bg", - "wss://wot.utxo.one", - "wss://nostrelites.org", - "wss://wot.nostr.party", - "wss://wot.sovbit.host", - "wss://wot.girino.org", - "wss://relay.lnau.net", - "wss://wot.siamstr.com", - "wss://wot.sudocarlos.com", - "wss://relay.otherstuff.fyi", - "wss://relay.lexingtonbitcoin.org", - "wss://wot.azzamo.net", - "wss://wot.swarmstr.com", - "wss://zap.watch", - "wss://satsage.xyz", - } -) - -type FirehoseConfig struct { - Kinds []int - Relays []string - Offset time.Duration -} - -func NewFirehoseConfig() FirehoseConfig { - return FirehoseConfig{ - Kinds: []int{nostr.KindProfileMetadata, nostr.KindFollowList}, - Relays: defaultRelays, - Offset: time.Minute, - } -} - -func (c FirehoseConfig) Since() *nostr.Timestamp { - since := nostr.Timestamp(time.Now().Add(-c.Offset).Unix()) - return &since -} - -func (c FirehoseConfig) Print() { - fmt.Printf("Firehose\n") - fmt.Printf(" Relays: %v\n", c.Relays) - fmt.Printf(" Offset: %v\n", c.Offset) -} - -type PubkeyChecker interface { - Exists(ctx context.Context, pubkey string) (bool, error) -} - -// Firehose connects to a list of relays and pulls config.Kinds events that are newer than [FirehoseConfig.Since]. -// It discards events from unknown pubkeys as an anti-spam mechanism. -func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, send func(*nostr.Event) error) { - defer log.Println("Firehose: shut down") - - pool := nostr.NewSimplePool(ctx) - defer shutdown(pool) - - filter := nostr.Filter{ - Kinds: config.Kinds, - Since: config.Since(), - } - - seen := newBuffer(1024) - for event := range pool.SubscribeMany(ctx, config.Relays, filter) { - if seen.Contains(event.ID) { - // event already seen, skip - continue - } - seen.Add(event.ID) - - exists, err := check.Exists(ctx, event.PubKey) - if err != nil { - log.Printf("Firehose: %v", err) - continue - } - - if !exists { - // event from unknown pubkey, skip - continue - } - - if err := send(event.Event); err != nil { - log.Printf("Firehose: %v", err) - } - } -} - type FetcherConfig struct { Kinds []int Relays []string @@ -120,7 +19,10 @@ type FetcherConfig struct { func NewFetcherConfig() FetcherConfig { return FetcherConfig{ - Kinds: []int{nostr.KindProfileMetadata, nostr.KindFollowList}, + Kinds: []int{ + nostr.KindProfileMetadata, + nostr.KindFollowList, + }, Relays: defaultRelays, Batch: 100, Interval: time.Minute, @@ -129,15 +31,21 @@ func NewFetcherConfig() FetcherConfig { func (c FetcherConfig) Print() { fmt.Printf("Fetcher\n") + fmt.Printf(" Kinds: %v\n", c.Kinds) fmt.Printf(" Relays: %v\n", c.Relays) fmt.Printf(" Batch: %d\n", c.Batch) fmt.Printf(" Interval: %v\n", c.Interval) } -// Fetcher extracts pubkeys from the channel and queries relays for their events: -// - when the batch is bigger than config.Batch -// - after config.Interval since the last query. -func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, send func(*nostr.Event) error) { +// Fetcher extracts pubkeys from the channel and queries relays for their events when: +// - the batch is bigger than config.Batch +// - more than config.Interval has passed since the last query +func Fetcher( + ctx context.Context, + config FetcherConfig, + pubkeys <-chan string, + forward Forward[*nostr.Event], +) { defer log.Println("Fetcher: shut down") batch := make([]string, 0, config.Batch) @@ -165,7 +73,7 @@ func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, s } for _, event := range events { - if err := send(event); err != nil { + if err := forward(event); err != nil { log.Printf("Fetcher: %v", err) } } @@ -181,7 +89,7 @@ func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, s } for _, event := range events { - if err := send(event); err != nil { + if err := forward(event); err != nil { log.Printf("Fetcher: %v", err) } } @@ -236,7 +144,8 @@ func FetcherDB( config FetcherConfig, store nastro.Store, pubkeys <-chan string, - send func(*nostr.Event) error) { + forward Forward[*nostr.Event], +) { defer log.Println("FetcherDB: shut down") @@ -270,7 +179,7 @@ func FetcherDB( } for _, event := range events { - if err := send(&event); err != nil { + if err := forward(&event); err != nil { log.Printf("FetcherDB: %v", err) } } @@ -295,7 +204,7 @@ func FetcherDB( } for _, event := range events { - if err := send(&event); err != nil { + if err := forward(&event); err != nil { log.Printf("FetcherDB: %v", err) } } @@ -305,28 +214,3 @@ func FetcherDB( } } } - -// Shutdown iterates over the relays in the pool and closes all connections. -func shutdown(pool *nostr.SimplePool) { - pool.Relays.Range(func(_ string, relay *nostr.Relay) bool { - relay.Close() - return true - }) -} - -var ( - ErrEventTooBig = errors.New("event is too big") - maxTags = 20_000 - maxContent = 50_000 -) - -// EventTooBig is a [nastro.EventPolicy] that errs if the event is too big. -func EventTooBig(e *nostr.Event) error { - if len(e.Tags) > maxTags { - return fmt.Errorf("%w: event with ID %s has too many tags: %d", ErrEventTooBig, e.ID, len(e.Tags)) - } - if len(e.Content) > maxContent { - return fmt.Errorf("%w: event with ID %s has too much content: %d", ErrEventTooBig, e.ID, len(e.Content)) - } - return nil -} diff --git a/pkg/pipe/firehose.go b/pkg/pipe/firehose.go new file mode 100644 index 0000000..3efad8d --- /dev/null +++ b/pkg/pipe/firehose.go @@ -0,0 +1,122 @@ +package pipe + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/nbd-wtf/go-nostr" +) + +var ( + defaultRelays = []string{ + "wss://purplepag.es", + "wss://njump.me", + "wss://relay.snort.social", + "wss://relay.damus.io", + "wss://relay.primal.net", + "wss://relay.nostr.band", + "wss://nostr-pub.wellorder.net", + "wss://relay.nostr.net", + "wss://nostr.lu.ke", + "wss://nostr.at", + "wss://e.nos.lol", + "wss://nostr.lopp.social", + "wss://nostr.vulpem.com", + "wss://relay.nostr.bg", + "wss://wot.utxo.one", + "wss://nostrelites.org", + "wss://wot.nostr.party", + "wss://wot.sovbit.host", + "wss://wot.girino.org", + "wss://relay.lnau.net", + "wss://wot.siamstr.com", + "wss://wot.sudocarlos.com", + "wss://relay.otherstuff.fyi", + "wss://relay.lexingtonbitcoin.org", + "wss://wot.azzamo.net", + "wss://wot.swarmstr.com", + "wss://zap.watch", + "wss://satsage.xyz", + } +) + +type FirehoseConfig struct { + Kinds []int + Relays []string + Offset time.Duration +} + +func NewFirehoseConfig() FirehoseConfig { + return FirehoseConfig{ + Kinds: []int{ + nostr.KindProfileMetadata, + nostr.KindFollowList, + }, + Relays: defaultRelays, + Offset: time.Minute, + } +} + +func (c FirehoseConfig) Since() *nostr.Timestamp { + since := nostr.Timestamp(time.Now().Add(-c.Offset).Unix()) + return &since +} + +func (c FirehoseConfig) Print() { + fmt.Printf("Firehose\n") + fmt.Printf(" Kinds: %v\n", c.Kinds) + fmt.Printf(" Relays: %v\n", c.Relays) + fmt.Printf(" Offset: %v\n", c.Offset) +} + +type PubkeyChecker interface { + Exists(ctx context.Context, pubkey string) (bool, error) +} + +type Forward[T any] func(T) error + +// Firehose connects to a list of relays and pulls config.Kinds events that are newer than config.Since. +// It deduplicate events using a simple ring-buffer. +// It discards events from unknown pubkeys as an anti-spam mechanism. +// It forwards the rest using the provided [Forward] function. +func Firehose( + ctx context.Context, + config FirehoseConfig, + check PubkeyChecker, + forward Forward[*nostr.Event], +) { + defer log.Println("Firehose: shut down") + + pool := nostr.NewSimplePool(ctx) + defer shutdown(pool) + + filter := nostr.Filter{ + Kinds: config.Kinds, + Since: config.Since(), + } + + seen := newBuffer(1024) + for event := range pool.SubscribeMany(ctx, config.Relays, filter) { + if seen.Contains(event.ID) { + continue + } + seen.Add(event.ID) + + exists, err := check.Exists(ctx, event.PubKey) + if err != nil { + log.Printf("Firehose: %v", err) + continue + } + + if !exists { + // event from unknown pubkey, skip + continue + } + + if err := forward(event.Event); err != nil { + log.Printf("Firehose: %v", err) + } + } +} diff --git a/pkg/pipe/utils.go b/pkg/pipe/utils.go index dfaf3f6..cebfd43 100644 --- a/pkg/pipe/utils.go +++ b/pkg/pipe/utils.go @@ -2,13 +2,34 @@ package pipe import ( "context" + "errors" + "fmt" "log" "os" "os/signal" "slices" "syscall" + + "github.com/nbd-wtf/go-nostr" ) +var ( + ErrEventTooBig = errors.New("event is too big") + maxTags = 20_000 + maxContent = 50_000 +) + +// EventTooBig is a [nastro.EventPolicy] that errs if the event is too big. +func EventTooBig(e *nostr.Event) error { + if len(e.Tags) > maxTags { + return fmt.Errorf("%w: event with ID %s has too many tags: %d", ErrEventTooBig, e.ID, len(e.Tags)) + } + if len(e.Content) > maxContent { + return fmt.Errorf("%w: event with ID %s has too much content: %d", ErrEventTooBig, e.ID, len(e.Content)) + } + return nil +} + // HandleSignals listens for OS signals and triggers context cancellation. func HandleSignals(cancel context.CancelFunc) { signals := make(chan os.Signal, 1) @@ -19,6 +40,14 @@ func HandleSignals(cancel context.CancelFunc) { cancel() } +// Shutdown iterates over the relays in the pool and closes all connections. +func shutdown(pool *nostr.SimplePool) { + pool.Relays.Range(func(_ string, relay *nostr.Relay) bool { + relay.Close() + return true + }) +} + type buffer struct { IDs []string capacity int