mirror of
https://github.com/aljazceru/crawler_v2.git
synced 2025-12-17 07:24:21 +01:00
refactored event store
This commit is contained in:
@@ -101,10 +101,11 @@ func (b *buffer) Contains(ID string) bool {
|
||||
// Firehose connects to a list of relays and pulls [relevantKinds] events that are newer than [FirehoseConfig.Since].
|
||||
// It discards events from unknown pubkeys as an anti-spam mechanism.
|
||||
func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, send func(*nostr.Event) error) {
|
||||
pool := nostr.NewSimplePool(ctx)
|
||||
defer close(pool)
|
||||
defer log.Println("Firehose: shutting down...")
|
||||
|
||||
pool := nostr.NewSimplePool(ctx)
|
||||
defer shutdown(pool)
|
||||
|
||||
filter := nostr.Filter{
|
||||
Kinds: relevantKinds,
|
||||
Since: config.Since(),
|
||||
@@ -160,19 +161,24 @@ func (c FetcherConfig) Print() {
|
||||
// - when the batch is bigger than config.Batch
|
||||
// - after config.Interval since the last query.
|
||||
func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, send func(*nostr.Event) error) {
|
||||
defer log.Println("Fetcher: shutting down...")
|
||||
|
||||
batch := make([]string, 0, config.Batch)
|
||||
timer := time.After(config.Interval)
|
||||
|
||||
pool := nostr.NewSimplePool(ctx)
|
||||
defer close(pool)
|
||||
defer shutdown(pool)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("Fetcher: shutting down...")
|
||||
return
|
||||
|
||||
case pubkey := <-pubkeys:
|
||||
case pubkey, ok := <-pubkeys:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
batch = append(batch, pubkey)
|
||||
if len(batch) < config.Batch {
|
||||
continue
|
||||
@@ -244,8 +250,8 @@ func fetch(ctx context.Context, pool *nostr.SimplePool, relays, pubkeys []string
|
||||
return events, nil
|
||||
}
|
||||
|
||||
// Close iterates over the relays in the pool and closes all connections.
|
||||
func close(pool *nostr.SimplePool) {
|
||||
// Shutdown iterates over the relays in the pool and closes all connections.
|
||||
func shutdown(pool *nostr.SimplePool) {
|
||||
pool.Relays.Range(func(_ string, relay *nostr.Relay) bool {
|
||||
relay.Close()
|
||||
return true
|
||||
|
||||
@@ -10,39 +10,140 @@ import (
|
||||
"github/pippellia-btc/crawler/pkg/walks"
|
||||
"log"
|
||||
"slices"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
"github.com/vertex-lab/relay/pkg/eventstore"
|
||||
)
|
||||
|
||||
// EventTracker tracks the number of events processed
|
||||
var EventTracker atomic.Int32
|
||||
|
||||
var ErrUnsupportedKind = errors.New("unsupported event kind")
|
||||
|
||||
type ProcessorConfig struct {
|
||||
CacheCapacity int
|
||||
PrintEvery int
|
||||
type EngineConfig struct {
|
||||
PrintEvery int
|
||||
|
||||
// for the GraphUpdater
|
||||
UpdaterCapacity int
|
||||
CacheCapacity int
|
||||
|
||||
// for the archiveEngine
|
||||
ArchiverCapacity int
|
||||
}
|
||||
|
||||
func NewProcessorConfig() ProcessorConfig {
|
||||
return ProcessorConfig{
|
||||
CacheCapacity: 10000,
|
||||
PrintEvery: 5000}
|
||||
func NewEngineConfig() EngineConfig {
|
||||
return EngineConfig{
|
||||
PrintEvery: 5000,
|
||||
UpdaterCapacity: 1000,
|
||||
CacheCapacity: 100_000,
|
||||
ArchiverCapacity: 1000,
|
||||
}
|
||||
}
|
||||
|
||||
func (c ProcessorConfig) Print() {
|
||||
fmt.Printf("Processor\n")
|
||||
fmt.Printf(" CacheCapacity: %d\n", c.CacheCapacity)
|
||||
func (c EngineConfig) Print() {
|
||||
fmt.Printf("Engine\n")
|
||||
fmt.Printf(" PrintEvery: %d\n", c.PrintEvery)
|
||||
fmt.Printf(" UpdaterCapacity: %d\n", c.UpdaterCapacity)
|
||||
fmt.Printf(" CacheCapacity: %d\n", c.CacheCapacity)
|
||||
fmt.Printf(" ArchiveCapacity: %d\n", c.ArchiverCapacity)
|
||||
}
|
||||
|
||||
func Processor(
|
||||
// Engine is responsible for dispacting the correct events to the [Archiver] or [GraphUpdater].
|
||||
func Engine(
|
||||
ctx context.Context,
|
||||
config ProcessorConfig,
|
||||
config EngineConfig,
|
||||
store *eventstore.Store,
|
||||
db redb.RedisDB,
|
||||
//store *eventstore.Store,
|
||||
events chan *nostr.Event) {
|
||||
|
||||
var err error
|
||||
var processed int
|
||||
defer log.Println("Engine: shutting down...")
|
||||
|
||||
graphEvents := make(chan *nostr.Event, config.UpdaterCapacity)
|
||||
archiveEvents := make(chan *nostr.Event, config.ArchiverCapacity)
|
||||
defer close(graphEvents)
|
||||
defer close(archiveEvents)
|
||||
|
||||
go GraphUpdater(ctx, config, store, db, graphEvents)
|
||||
go Archiver(ctx, config, store, archiveEvents)
|
||||
|
||||
log.Println("Engine: ready to process events")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case event, ok := <-events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
switch event.Kind {
|
||||
case nostr.KindFollowList:
|
||||
graphEvents <- event
|
||||
|
||||
case nostr.KindProfileMetadata:
|
||||
archiveEvents <- event
|
||||
|
||||
default:
|
||||
logEvent(event, ErrUnsupportedKind)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Archiver consumes events that are not graph-related and stores them.
|
||||
func Archiver(
|
||||
ctx context.Context,
|
||||
config EngineConfig,
|
||||
store *eventstore.Store,
|
||||
events chan *nostr.Event) {
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case event, ok := <-events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
err := func() error {
|
||||
opctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
switch {
|
||||
case nostr.IsRegularKind(event.Kind):
|
||||
return store.Save(opctx, event)
|
||||
|
||||
case nostr.IsReplaceableKind(event.Kind):
|
||||
_, err := store.Replace(opctx, event)
|
||||
return err
|
||||
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
logEvent(event, err)
|
||||
}
|
||||
|
||||
EventTracker.Add(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GraphUpdater consumes events to update the graph and random walks.
|
||||
func GraphUpdater(
|
||||
ctx context.Context,
|
||||
config EngineConfig,
|
||||
store *eventstore.Store,
|
||||
db redb.RedisDB,
|
||||
events chan *nostr.Event) {
|
||||
|
||||
cache := walks.NewWalker(
|
||||
walks.WithCapacity(config.CacheCapacity),
|
||||
@@ -50,34 +151,36 @@ func Processor(
|
||||
walks.WithLogFile("cache.log"),
|
||||
)
|
||||
|
||||
log.Println("Processor: ready to process events")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("Processor: shutting down...")
|
||||
return
|
||||
|
||||
case event := <-events:
|
||||
switch event.Kind {
|
||||
case nostr.KindFollowList:
|
||||
err = processFollowList(cache, db, event)
|
||||
|
||||
case nostr.KindProfileMetadata:
|
||||
err = nil
|
||||
|
||||
default:
|
||||
err = ErrUnsupportedKind
|
||||
case event, ok := <-events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
err := func() error {
|
||||
opctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
|
||||
replaced, err := store.Replace(opctx, event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if replaced {
|
||||
return processFollowList(opctx, db, cache, event)
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Processor: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, err)
|
||||
logEvent(event, err)
|
||||
}
|
||||
|
||||
processed++
|
||||
if processed%config.PrintEvery == 0 {
|
||||
log.Printf("Processor: processed %d events", processed)
|
||||
}
|
||||
EventTracker.Add(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -85,10 +188,7 @@ func Processor(
|
||||
// processFollowList parses the pubkeys listed in the event, and uses them to:
|
||||
// - update the follows of the author (db and cache)
|
||||
// - update the author's random walks and signal the number to the [WalksTracker]
|
||||
func processFollowList(cache *walks.CachedWalker, db redb.RedisDB, event *nostr.Event) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
func processFollowList(ctx context.Context, db redb.RedisDB, cache *walks.CachedWalker, event *nostr.Event) error {
|
||||
author, err := db.NodeByKey(ctx, event.PubKey)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -173,6 +273,11 @@ func parsePubkeys(event *nostr.Event) []string {
|
||||
return unique(pubkeys)
|
||||
}
|
||||
|
||||
func logEvent(e *nostr.Event, extra any) {
|
||||
msg := fmt.Sprintf("Engine: event ID %s, kind %d by %s: ", e.ID, e.Kind, e.PubKey)
|
||||
log.Printf(msg+"%v", extra)
|
||||
}
|
||||
|
||||
// Unique returns a slice of unique elements of the input slice.
|
||||
func unique[E cmp.Ordered](slice []E) []E {
|
||||
if len(slice) == 0 {
|
||||
|
||||
@@ -447,9 +447,13 @@ func (db RedisDB) Pubkeys(ctx context.Context, nodes ...graph.ID) ([]string, err
|
||||
|
||||
type MissingHandler func(ctx context.Context, db RedisDB, pubkey string) (graph.ID, error)
|
||||
|
||||
func Ignore(context.Context, RedisDB, string) (graph.ID, error) { return "", nil }
|
||||
// Ignore pubkeys that are not found
|
||||
func Ignore(context.Context, RedisDB, string) (graph.ID, error) { return "", nil }
|
||||
|
||||
// Return a sentinel value ("-1") as the node ID of pubkeys not found
|
||||
func Sentinel(context.Context, RedisDB, string) (graph.ID, error) { return "-1", nil }
|
||||
|
||||
// AddValid pubkeys to the database if they were not already present
|
||||
func AddValid(ctx context.Context, db RedisDB, pubkey string) (graph.ID, error) {
|
||||
if !nostr.IsValidPublicKey(pubkey) {
|
||||
return "", nil
|
||||
|
||||
Reference in New Issue
Block a user