mirror of
https://github.com/aljazceru/crawler_v2.git
synced 2025-12-17 07:24:21 +01:00
using config specific kinds instead of global variable
This commit is contained in:
@@ -37,6 +37,8 @@ func main() {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
config.Fetcher.Kinds = []int{nostr.KindFollowList} // no need to sync other event kinds
|
||||||
|
|
||||||
events := make(chan *nostr.Event, config.EventsCapacity)
|
events := make(chan *nostr.Event, config.EventsCapacity)
|
||||||
pubkeys := make(chan string, config.PubkeysCapacity)
|
pubkeys := make(chan string, config.PubkeysCapacity)
|
||||||
|
|
||||||
@@ -81,7 +83,6 @@ func main() {
|
|||||||
go printStats(ctx, events, pubkeys)
|
go printStats(ctx, events, pubkeys)
|
||||||
}
|
}
|
||||||
|
|
||||||
pipe.Kinds = []int{nostr.KindFollowList} // no need to sync other event kinds
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(3)
|
wg.Add(3)
|
||||||
|
|
||||||
|
|||||||
@@ -13,11 +13,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
Kinds = []int{
|
|
||||||
nostr.KindProfileMetadata,
|
|
||||||
nostr.KindFollowList,
|
|
||||||
}
|
|
||||||
|
|
||||||
defaultRelays = []string{
|
defaultRelays = []string{
|
||||||
"wss://purplepag.es",
|
"wss://purplepag.es",
|
||||||
"wss://njump.me",
|
"wss://njump.me",
|
||||||
@@ -51,12 +46,14 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type FirehoseConfig struct {
|
type FirehoseConfig struct {
|
||||||
|
Kinds []int
|
||||||
Relays []string
|
Relays []string
|
||||||
Offset time.Duration
|
Offset time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFirehoseConfig() FirehoseConfig {
|
func NewFirehoseConfig() FirehoseConfig {
|
||||||
return FirehoseConfig{
|
return FirehoseConfig{
|
||||||
|
Kinds: []int{nostr.KindProfileMetadata, nostr.KindFollowList},
|
||||||
Relays: defaultRelays,
|
Relays: defaultRelays,
|
||||||
Offset: time.Minute,
|
Offset: time.Minute,
|
||||||
}
|
}
|
||||||
@@ -100,7 +97,7 @@ func (b *buffer) Contains(ID string) bool {
|
|||||||
return slices.Contains(b.IDs, ID)
|
return slices.Contains(b.IDs, ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Firehose connects to a list of relays and pulls [Kinds] events that are newer than [FirehoseConfig.Since].
|
// Firehose connects to a list of relays and pulls config.Kinds events that are newer than [FirehoseConfig.Since].
|
||||||
// It discards events from unknown pubkeys as an anti-spam mechanism.
|
// It discards events from unknown pubkeys as an anti-spam mechanism.
|
||||||
func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, send func(*nostr.Event) error) {
|
func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, send func(*nostr.Event) error) {
|
||||||
defer log.Println("Firehose: shutting down...")
|
defer log.Println("Firehose: shutting down...")
|
||||||
@@ -109,7 +106,7 @@ func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, s
|
|||||||
defer shutdown(pool)
|
defer shutdown(pool)
|
||||||
|
|
||||||
filter := nostr.Filter{
|
filter := nostr.Filter{
|
||||||
Kinds: Kinds,
|
Kinds: config.Kinds,
|
||||||
Since: config.Since(),
|
Since: config.Since(),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -139,6 +136,7 @@ func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, s
|
|||||||
}
|
}
|
||||||
|
|
||||||
type FetcherConfig struct {
|
type FetcherConfig struct {
|
||||||
|
Kinds []int
|
||||||
Relays []string
|
Relays []string
|
||||||
Batch int
|
Batch int
|
||||||
Interval time.Duration
|
Interval time.Duration
|
||||||
@@ -146,6 +144,7 @@ type FetcherConfig struct {
|
|||||||
|
|
||||||
func NewFetcherConfig() FetcherConfig {
|
func NewFetcherConfig() FetcherConfig {
|
||||||
return FetcherConfig{
|
return FetcherConfig{
|
||||||
|
Kinds: []int{nostr.KindProfileMetadata, nostr.KindFollowList},
|
||||||
Relays: defaultRelays,
|
Relays: defaultRelays,
|
||||||
Batch: 100,
|
Batch: 100,
|
||||||
Interval: time.Minute,
|
Interval: time.Minute,
|
||||||
@@ -168,9 +167,6 @@ func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, s
|
|||||||
batch := make([]string, 0, config.Batch)
|
batch := make([]string, 0, config.Batch)
|
||||||
timer := time.After(config.Interval)
|
timer := time.After(config.Interval)
|
||||||
|
|
||||||
pool := nostr.NewSimplePool(ctx)
|
|
||||||
defer shutdown(pool)
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@@ -186,7 +182,7 @@ func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, s
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
events, err := fetch(ctx, pool, config.Relays, batch)
|
events, err := fetch(ctx, config, batch)
|
||||||
if err != nil && ctx.Err() == nil {
|
if err != nil && ctx.Err() == nil {
|
||||||
log.Printf("Fetcher: %v", err)
|
log.Printf("Fetcher: %v", err)
|
||||||
continue
|
continue
|
||||||
@@ -202,7 +198,7 @@ func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, s
|
|||||||
timer = time.After(config.Interval)
|
timer = time.After(config.Interval)
|
||||||
|
|
||||||
case <-timer:
|
case <-timer:
|
||||||
events, err := fetch(ctx, pool, config.Relays, batch)
|
events, err := fetch(ctx, config, batch)
|
||||||
if err != nil && ctx.Err() == nil {
|
if err != nil && ctx.Err() == nil {
|
||||||
log.Printf("Fetcher: %v", err)
|
log.Printf("Fetcher: %v", err)
|
||||||
continue
|
continue
|
||||||
@@ -220,8 +216,8 @@ func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetch queries the [Kinds] of the specified pubkeys.
|
// fetch queries the config.Relays for the config.Kinds of the specified pubkeys.
|
||||||
func fetch(ctx context.Context, pool *nostr.SimplePool, relays, pubkeys []string) ([]*nostr.Event, error) {
|
func fetch(ctx context.Context, config FetcherConfig, pubkeys []string) ([]*nostr.Event, error) {
|
||||||
if len(pubkeys) == 0 {
|
if len(pubkeys) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
@@ -229,13 +225,17 @@ func fetch(ctx context.Context, pool *nostr.SimplePool, relays, pubkeys []string
|
|||||||
ctx, cancel := context.WithTimeout(ctx, time.Second*15)
|
ctx, cancel := context.WithTimeout(ctx, time.Second*15)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
pool := nostr.NewSimplePool(ctx)
|
||||||
|
defer shutdown(pool)
|
||||||
|
|
||||||
filter := nostr.Filter{
|
filter := nostr.Filter{
|
||||||
Kinds: Kinds,
|
Kinds: config.Kinds,
|
||||||
Authors: pubkeys,
|
Authors: pubkeys,
|
||||||
|
Limit: len(config.Kinds) * len(pubkeys),
|
||||||
}
|
}
|
||||||
|
|
||||||
latest := make(map[string]*nostr.Event, len(pubkeys)*len(filter.Kinds))
|
latest := make(map[string]*nostr.Event, len(pubkeys)*len(config.Kinds))
|
||||||
for event := range pool.FetchMany(ctx, relays, filter) {
|
for event := range pool.FetchMany(ctx, config.Relays, filter) {
|
||||||
|
|
||||||
key := fmt.Sprintf("%s:%d", event.PubKey, event.Kind)
|
key := fmt.Sprintf("%s:%d", event.PubKey, event.Kind)
|
||||||
e, exists := latest[key]
|
e, exists := latest[key]
|
||||||
@@ -252,7 +252,7 @@ func fetch(ctx context.Context, pool *nostr.SimplePool, relays, pubkeys []string
|
|||||||
return events, nil
|
return events, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetcher extracts pubkeys from the channel and queries the store for their events:
|
// FetcherDB extracts pubkeys from the channel and queries the store for their events:
|
||||||
// - when the batch is bigger than config.Batch
|
// - when the batch is bigger than config.Batch
|
||||||
// - after config.Interval since the last query.
|
// - after config.Interval since the last query.
|
||||||
func FetcherDB(
|
func FetcherDB(
|
||||||
@@ -283,9 +283,9 @@ func FetcherDB(
|
|||||||
}
|
}
|
||||||
|
|
||||||
filter := nostr.Filter{
|
filter := nostr.Filter{
|
||||||
Kinds: Kinds,
|
Kinds: config.Kinds,
|
||||||
Authors: batch,
|
Authors: batch,
|
||||||
Limit: len(Kinds) * len(batch),
|
Limit: len(config.Kinds) * len(batch),
|
||||||
}
|
}
|
||||||
|
|
||||||
events, err := store.Query(ctx, filter)
|
events, err := store.Query(ctx, filter)
|
||||||
@@ -308,9 +308,9 @@ func FetcherDB(
|
|||||||
}
|
}
|
||||||
|
|
||||||
filter := nostr.Filter{
|
filter := nostr.Filter{
|
||||||
Kinds: Kinds,
|
Kinds: config.Kinds,
|
||||||
Authors: batch,
|
Authors: batch,
|
||||||
Limit: len(Kinds) * len(batch),
|
Limit: len(config.Kinds) * len(batch),
|
||||||
}
|
}
|
||||||
|
|
||||||
events, err := store.Query(ctx, filter)
|
events, err := store.Query(ctx, filter)
|
||||||
|
|||||||
@@ -28,15 +28,15 @@ func TestFirehose(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestFetch(t *testing.T) {
|
func TestFetch(t *testing.T) {
|
||||||
pool := nostr.NewSimplePool(ctx)
|
|
||||||
pubkeys := []string{odell, calle, pip}
|
pubkeys := []string{odell, calle, pip}
|
||||||
|
config := NewFetcherConfig()
|
||||||
|
|
||||||
events, err := fetch(ctx, pool, defaultRelays, pubkeys)
|
events, err := fetch(ctx, config, pubkeys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("expected error nil, got %v", err)
|
t.Fatalf("expected error nil, got %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
expected := len(pubkeys) * len(Kinds)
|
expected := len(pubkeys) * 2
|
||||||
if len(events) != expected {
|
if len(events) != expected {
|
||||||
t.Fatalf("expected %d events, got %d", expected, len(events))
|
t.Fatalf("expected %d events, got %d", expected, len(events))
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user