mirror of
https://github.com/aljazceru/crawler_v2.git
synced 2025-12-17 07:24:21 +01:00
added fetcher
This commit is contained in:
111
pkg/pipe/pipe.go
111
pkg/pipe/pipe.go
@@ -100,7 +100,7 @@ func (b *buffer) Contains(ID string) bool {
|
|||||||
|
|
||||||
// Firehose connects to a list of relays and pulls [relevantKinds] events that are newer than [FirehoseConfig.Since].
|
// Firehose connects to a list of relays and pulls [relevantKinds] events that are newer than [FirehoseConfig.Since].
|
||||||
// It discards events from unknown pubkeys as an anti-spam mechanism.
|
// It discards events from unknown pubkeys as an anti-spam mechanism.
|
||||||
func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, send func(e *nostr.Event) error) {
|
func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, send func(*nostr.Event) error) {
|
||||||
pool := nostr.NewSimplePool(ctx)
|
pool := nostr.NewSimplePool(ctx)
|
||||||
defer close(pool)
|
defer close(pool)
|
||||||
|
|
||||||
@@ -111,7 +111,6 @@ func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, s
|
|||||||
|
|
||||||
seen := newBuffer(2048)
|
seen := newBuffer(2048)
|
||||||
for event := range pool.SubscribeMany(ctx, config.Relays, filter) {
|
for event := range pool.SubscribeMany(ctx, config.Relays, filter) {
|
||||||
|
|
||||||
if seen.Contains(event.ID) {
|
if seen.Contains(event.ID) {
|
||||||
// event already seen, skip
|
// event already seen, skip
|
||||||
continue
|
continue
|
||||||
@@ -135,6 +134,114 @@ func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type FetcherConfig struct {
|
||||||
|
Relays []string
|
||||||
|
Batch int
|
||||||
|
Interval time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFetcherConfig() FetcherConfig {
|
||||||
|
return FetcherConfig{
|
||||||
|
Relays: defaultRelays,
|
||||||
|
Batch: 100,
|
||||||
|
Interval: time.Minute,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c FetcherConfig) Print() {
|
||||||
|
fmt.Printf("Fetcher\n")
|
||||||
|
fmt.Printf(" Relays: %v\n", c.Relays)
|
||||||
|
fmt.Printf(" Batch: %d\n", c.Batch)
|
||||||
|
fmt.Printf(" Interval: %v\n", c.Interval)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetcher extracts pubkeys from the channel and queries for their events when either:
|
||||||
|
// - the batch is bigger than config.Batch
|
||||||
|
// - after config.Interval since the last query.
|
||||||
|
func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, send func(*nostr.Event) error) {
|
||||||
|
batch := make([]string, 0, config.Batch)
|
||||||
|
timer := time.After(config.Interval)
|
||||||
|
|
||||||
|
pool := nostr.NewSimplePool(ctx)
|
||||||
|
defer close(pool)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
|
||||||
|
case pubkey := <-pubkeys:
|
||||||
|
batch = append(batch, pubkey)
|
||||||
|
if len(batch) < config.Batch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
events, err := fetch(ctx, pool, config.Relays, batch)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Fetcher: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, event := range events {
|
||||||
|
if err := send(event); err != nil {
|
||||||
|
log.Printf("Fetcher: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
batch = make([]string, 0, config.Batch)
|
||||||
|
timer = time.After(config.Interval)
|
||||||
|
|
||||||
|
case <-timer:
|
||||||
|
events, err := fetch(ctx, pool, config.Relays, batch)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Fetcher: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, event := range events {
|
||||||
|
if err := send(event); err != nil {
|
||||||
|
log.Printf("Fetcher: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
batch = make([]string, 0, config.Batch)
|
||||||
|
timer = time.After(config.Interval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetch queries the [relevantKinds] of the specified pubkeys.
|
||||||
|
func fetch(ctx context.Context, pool *nostr.SimplePool, relays, pubkeys []string) ([]*nostr.Event, error) {
|
||||||
|
if len(pubkeys) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Second*15)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
filter := nostr.Filter{
|
||||||
|
Kinds: relevantKinds,
|
||||||
|
Authors: pubkeys,
|
||||||
|
}
|
||||||
|
|
||||||
|
latest := make(map[string]*nostr.Event, len(pubkeys)*len(filter.Kinds))
|
||||||
|
for event := range pool.FetchMany(ctx, relays, filter) {
|
||||||
|
|
||||||
|
key := fmt.Sprintf("%s:%d", event.PubKey, event.Kind)
|
||||||
|
e, exists := latest[key]
|
||||||
|
if !exists || event.CreatedAt > e.CreatedAt {
|
||||||
|
latest[key] = event.Event
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
events := make([]*nostr.Event, 0, len(latest))
|
||||||
|
for _, event := range latest {
|
||||||
|
events = append(events, event)
|
||||||
|
}
|
||||||
|
|
||||||
|
return events, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Close iterates over the relays in the pool and closes all connections.
|
// Close iterates over the relays in the pool and closes all connections.
|
||||||
func close(pool *nostr.SimplePool) {
|
func close(pool *nostr.SimplePool) {
|
||||||
pool.Relays.Range(func(_ string, relay *nostr.Relay) bool {
|
pool.Relays.Range(func(_ string, relay *nostr.Relay) bool {
|
||||||
|
|||||||
@@ -12,7 +12,9 @@ import (
|
|||||||
var (
|
var (
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
|
|
||||||
pip = "f683e87035f7ad4f44e0b98cfbd9537e16455a92cd38cefc4cb31db7557f5ef2"
|
odell string = "04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9"
|
||||||
|
calle string = "50d94fc2d8580c682b071a542f8b1e31a200b0508bab95a33bef0855df281d63"
|
||||||
|
pip string = "f683e87035f7ad4f44e0b98cfbd9537e16455a92cd38cefc4cb31db7557f5ef2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Manually change pip's follow list and see if the events gets printed. Works only with `go test`
|
// Manually change pip's follow list and see if the events gets printed. Works only with `go test`
|
||||||
@@ -22,9 +24,25 @@ func TestFirehose(t *testing.T) {
|
|||||||
|
|
||||||
checker := mockChecker{pubkey: pip}
|
checker := mockChecker{pubkey: pip}
|
||||||
config := FirehoseConfig{Relays: defaultRelays}
|
config := FirehoseConfig{Relays: defaultRelays}
|
||||||
|
|
||||||
Firehose(ctx, config, checker, print)
|
Firehose(ctx, config, checker, print)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFetch(t *testing.T) {
|
||||||
|
pool := nostr.NewSimplePool(ctx)
|
||||||
|
pubkeys := []string{odell, calle, pip}
|
||||||
|
|
||||||
|
events, err := fetch(ctx, pool, defaultRelays, pubkeys)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected error nil, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := len(pubkeys) * len(relevantKinds)
|
||||||
|
if len(events) != expected {
|
||||||
|
t.Fatalf("expected %d events, got %d", expected, len(events))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type mockChecker struct {
|
type mockChecker struct {
|
||||||
pubkey string
|
pubkey string
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user