mirror of
https://github.com/aljazceru/crawler_v2.git
synced 2025-12-17 07:24:21 +01:00
Firehose working
This commit is contained in:
144
pkg/pipe/pipe.go
Normal file
144
pkg/pipe/pipe.go
Normal file
@@ -0,0 +1,144 @@
|
|||||||
|
package pipe
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"slices"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/nbd-wtf/go-nostr"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
relevantKinds = []int{
|
||||||
|
nostr.KindProfileMetadata,
|
||||||
|
nostr.KindFollowList,
|
||||||
|
}
|
||||||
|
|
||||||
|
defaultRelays = []string{
|
||||||
|
"wss://purplepag.es",
|
||||||
|
"wss://njump.me",
|
||||||
|
"wss://relay.snort.social",
|
||||||
|
"wss://relay.damus.io",
|
||||||
|
"wss://relay.primal.net",
|
||||||
|
"wss://relay.nostr.band",
|
||||||
|
"wss://nostr-pub.wellorder.net",
|
||||||
|
"wss://relay.nostr.net",
|
||||||
|
"wss://nostr.lu.ke",
|
||||||
|
"wss://nostr.at",
|
||||||
|
"wss://e.nos.lol",
|
||||||
|
"wss://nostr.lopp.social",
|
||||||
|
"wss://nostr.vulpem.com",
|
||||||
|
"wss://relay.nostr.bg",
|
||||||
|
"wss://wot.utxo.one",
|
||||||
|
"wss://nostrelites.org",
|
||||||
|
"wss://wot.nostr.party",
|
||||||
|
"wss://wot.sovbit.host",
|
||||||
|
"wss://wot.girino.org",
|
||||||
|
"wss://relay.lnau.net",
|
||||||
|
"wss://wot.siamstr.com",
|
||||||
|
"wss://wot.sudocarlos.com",
|
||||||
|
"wss://relay.otherstuff.fyi",
|
||||||
|
"wss://relay.lexingtonbitcoin.org",
|
||||||
|
"wss://wot.azzamo.net",
|
||||||
|
"wss://wot.swarmstr.com",
|
||||||
|
"wss://zap.watch",
|
||||||
|
"wss://satsage.xyz",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
type FirehoseConfig struct {
|
||||||
|
Relays []string
|
||||||
|
Offset time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFirehoseConfig() FirehoseConfig {
|
||||||
|
return FirehoseConfig{
|
||||||
|
Relays: defaultRelays,
|
||||||
|
Offset: 10 * time.Second,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c FirehoseConfig) Since() *nostr.Timestamp {
|
||||||
|
since := nostr.Timestamp(time.Now().Add(-c.Offset).Unix())
|
||||||
|
return &since
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c FirehoseConfig) Print() {
|
||||||
|
fmt.Printf("Firehose\n")
|
||||||
|
fmt.Printf(" Relays: %v\n", c.Relays)
|
||||||
|
fmt.Printf(" Offset: %v\n", c.Offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
type PubkeyChecker interface {
|
||||||
|
Exists(ctx context.Context, pubkey string) (bool, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// buffer is a minimalistic ring buffer used to keep track of the latest event IDs
|
||||||
|
type buffer struct {
|
||||||
|
IDs []string
|
||||||
|
capacity int
|
||||||
|
write int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBuffer(capacity int) *buffer {
|
||||||
|
return &buffer{
|
||||||
|
IDs: make([]string, capacity),
|
||||||
|
capacity: capacity,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *buffer) Add(ID string) {
|
||||||
|
b.IDs[b.write] = ID
|
||||||
|
b.write = (b.write + 1) % b.capacity
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *buffer) Contains(ID string) bool {
|
||||||
|
return slices.Contains(b.IDs, ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Firehose connects to a list of relays and pulls [relevantKinds] events that are newer than [FirehoseConfig.Since].
|
||||||
|
// It discards events from unknown pubkeys as an anti-spam mechanism.
|
||||||
|
func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, send func(e *nostr.Event) error) {
|
||||||
|
pool := nostr.NewSimplePool(ctx)
|
||||||
|
defer close(pool)
|
||||||
|
|
||||||
|
filter := nostr.Filter{
|
||||||
|
Kinds: relevantKinds,
|
||||||
|
Since: config.Since(),
|
||||||
|
}
|
||||||
|
|
||||||
|
seen := newBuffer(2048)
|
||||||
|
for event := range pool.SubscribeMany(ctx, config.Relays, filter) {
|
||||||
|
|
||||||
|
if seen.Contains(event.ID) {
|
||||||
|
// event already seen, skip
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
seen.Add(event.ID)
|
||||||
|
|
||||||
|
exists, err := check.Exists(ctx, event.PubKey)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Firehose: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
// event from unknown pubkey, skip
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := send(event.Event); err != nil {
|
||||||
|
log.Printf("Firehose: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close iterates over the relays in the pool and closes all connections.
|
||||||
|
func close(pool *nostr.SimplePool) {
|
||||||
|
pool.Relays.Range(func(_ string, relay *nostr.Relay) bool {
|
||||||
|
relay.Close()
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
41
pkg/pipe/pipe_test.go
Normal file
41
pkg/pipe/pipe_test.go
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
package pipe
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/nbd-wtf/go-nostr"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ctx = context.Background()
|
||||||
|
|
||||||
|
pip = "f683e87035f7ad4f44e0b98cfbd9537e16455a92cd38cefc4cb31db7557f5ef2"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Manually change pip's follow list and see if the events gets printed. Works only with `go test`
|
||||||
|
func TestFirehose(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Second*20)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
checker := mockChecker{pubkey: pip}
|
||||||
|
config := FirehoseConfig{Relays: defaultRelays}
|
||||||
|
Firehose(ctx, config, checker, print)
|
||||||
|
}
|
||||||
|
|
||||||
|
type mockChecker struct {
|
||||||
|
pubkey string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c mockChecker) Exists(ctx context.Context, pubkey string) (bool, error) {
|
||||||
|
return pubkey == c.pubkey, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func print(e *nostr.Event) error {
|
||||||
|
fmt.Printf("\nevent ID: %v", e.ID)
|
||||||
|
fmt.Printf("\nevent pubkey: %v", e.PubKey)
|
||||||
|
fmt.Printf("\nevent kind: %d\n", e.Kind)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -101,6 +101,15 @@ func (r RedisDB) NodeByKey(ctx context.Context, pubkey string) (*graph.Node, err
|
|||||||
return parseNode(fields)
|
return parseNode(fields)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Exists checks for the existance of the pubkey
|
||||||
|
func (r RedisDB) Exists(ctx context.Context, pubkey string) (bool, error) {
|
||||||
|
exists, err := r.client.HExists(ctx, KeyKeyIndex, pubkey).Result()
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed to check existance of pubkey %s: %w", pubkey, err)
|
||||||
|
}
|
||||||
|
return exists, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r RedisDB) ensureExists(ctx context.Context, IDs ...graph.ID) error {
|
func (r RedisDB) ensureExists(ctx context.Context, IDs ...graph.ID) error {
|
||||||
if len(IDs) == 0 {
|
if len(IDs) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"github/pippellia-btc/crawler/pkg/graph"
|
"github/pippellia-btc/crawler/pkg/graph"
|
||||||
"github/pippellia-btc/crawler/pkg/pagerank"
|
"github/pippellia-btc/crawler/pkg/pagerank"
|
||||||
|
"github/pippellia-btc/crawler/pkg/pipe"
|
||||||
"github/pippellia-btc/crawler/pkg/walks"
|
"github/pippellia-btc/crawler/pkg/walks"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -375,6 +376,7 @@ func TestInterfaces(t *testing.T) {
|
|||||||
var _ walks.Walker = RedisDB{}
|
var _ walks.Walker = RedisDB{}
|
||||||
var _ pagerank.VisitCounter = RedisDB{}
|
var _ pagerank.VisitCounter = RedisDB{}
|
||||||
var _ pagerank.PersonalizedLoader = RedisDB{}
|
var _ pagerank.PersonalizedLoader = RedisDB{}
|
||||||
|
var _ pipe.PubkeyChecker = RedisDB{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ------------------------------------- HELPERS -------------------------------
|
// ------------------------------------- HELPERS -------------------------------
|
||||||
|
|||||||
Reference in New Issue
Block a user