refactored archiver:

- now it stores events whose kind is approved
- archiver and builder have separate configs
This commit is contained in:
pippellia-btc
2025-09-18 16:09:20 +02:00
parent fa3c2919d7
commit 60f33dd4fe
3 changed files with 96 additions and 46 deletions

View File

@@ -92,7 +92,7 @@ func main() {
go func() { go func() {
defer wg.Done() defer wg.Done()
pipe.GraphBuilder(ctx, config.Engine, db, builderQueue) pipe.GraphBuilder(ctx, config.Engine.Builder, db, builderQueue)
}() }()
wg.Wait() wg.Wait()

View File

@@ -184,19 +184,22 @@ func Load() (*Config, error) {
config.Arbiter.PromotionWait = time.Duration(wait) * time.Second config.Arbiter.PromotionWait = time.Duration(wait) * time.Second
case "ENGINE_PRINT_EVERY": case "ENGINE_PRINT_EVERY":
config.Engine.PrintEvery, err = strconv.Atoi(val) printEvery, err := strconv.Atoi(val)
if err != nil { if err != nil {
return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) return nil, fmt.Errorf("error parsing %v: %v", keyVal, err)
} }
config.Engine.Archiver.PrintEvery = printEvery
config.Engine.Builder.PrintEvery = printEvery
case "ENGINE_BUILDER_CAPACITY": case "ENGINE_BUILDER_CAPACITY":
config.Engine.BuilderCapacity, err = strconv.Atoi(val) config.Engine.ChannelCapacity, err = strconv.Atoi(val)
if err != nil { if err != nil {
return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) return nil, fmt.Errorf("error parsing %v: %v", keyVal, err)
} }
case "ENGINE_CACHE_CAPACITY": case "ENGINE_CACHE_CAPACITY":
config.Engine.CacheCapacity, err = strconv.Atoi(val) config.Engine.Builder.CacheCapacity, err = strconv.Atoi(val)
if err != nil { if err != nil {
return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) return nil, fmt.Errorf("error parsing %v: %v", keyVal, err)
} }

View File

@@ -5,6 +5,7 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
"slices"
"time" "time"
"github.com/pippellia-btc/nastro" "github.com/pippellia-btc/nastro"
@@ -17,26 +18,23 @@ import (
) )
type EngineConfig struct { type EngineConfig struct {
PrintEvery int Archiver ArchiverConfig
Builder GraphBuilderConfig
// GraphBuilder params ChannelCapacity int
BuilderCapacity int
CacheCapacity int
} }
func NewEngineConfig() EngineConfig { func NewEngineConfig() EngineConfig {
return EngineConfig{ return EngineConfig{
PrintEvery: 5000, Archiver: NewArchiverConfig(),
BuilderCapacity: 1000, Builder: NewGraphBuilderConfig(),
CacheCapacity: 100_000,
} }
} }
func (c EngineConfig) Print() { func (c EngineConfig) Print() {
fmt.Printf("Engine\n") fmt.Printf("Engine\n")
fmt.Printf(" PrintEvery: %d\n", c.PrintEvery) fmt.Printf(" ChannelCapacity: %d\n", c.ChannelCapacity)
fmt.Printf(" BuilderCapacity: %d\n", c.BuilderCapacity) c.Archiver.Print()
fmt.Printf(" CacheCapacity: %d\n", c.CacheCapacity) c.Builder.Print()
} }
// Engine is responsible for cohordinating the [Archiver] with the [GraphBuilder]. // Engine is responsible for cohordinating the [Archiver] with the [GraphBuilder].
@@ -47,26 +45,45 @@ func Engine(
db redb.RedisDB, db redb.RedisDB,
events chan *nostr.Event, events chan *nostr.Event,
) { ) {
graphEvents := make(chan *nostr.Event, config.BuilderCapacity) graphEvents := make(chan *nostr.Event, config.ChannelCapacity)
defer close(graphEvents) defer close(graphEvents)
go GraphBuilder(ctx, config, db, graphEvents) sendFollowList := func(e *nostr.Event) error {
Archiver(ctx, config, store, events, func(e *nostr.Event) error {
if e.Kind == nostr.KindFollowList { if e.Kind == nostr.KindFollowList {
return Send(graphEvents)(e) return Send(graphEvents)(e)
} }
return nil return nil
}) }
go GraphBuilder(ctx, config.Builder, db, graphEvents)
Archiver(ctx, config.Archiver, store, events, sendFollowList)
}
type ArchiverConfig struct {
Kinds []int
PrintEvery int
}
func NewArchiverConfig() ArchiverConfig {
return ArchiverConfig{
Kinds: profileKinds,
PrintEvery: 10_000,
}
}
func (c ArchiverConfig) Print() {
fmt.Printf("Archiver\n")
fmt.Printf(" Kinds: %v\n", c.Kinds)
fmt.Printf(" PrintEvery: %d\n", c.PrintEvery)
} }
// Archiver stores events in the event store. // Archiver stores events in the event store.
func Archiver( func Archiver(
ctx context.Context, ctx context.Context,
config EngineConfig, config ArchiverConfig,
store nastro.Store, store nastro.Store,
events chan *nostr.Event, events chan *nostr.Event,
onReplace func(*nostr.Event) error, onReplace Forward[*nostr.Event],
) { ) {
log.Println("Archiver: ready") log.Println("Archiver: ready")
defer log.Println("Archiver: shut down") defer log.Println("Archiver: shut down")
@@ -83,30 +100,11 @@ func Archiver(
return return
} }
err := func() error { if !slices.Contains(config.Kinds, event.Kind) {
opctx, cancel := context.WithTimeout(ctx, 5*time.Second) continue
defer cancel()
switch {
case nostr.IsRegularKind(event.Kind):
return nil // do nothing
case nostr.IsReplaceableKind(event.Kind):
replaced, err := store.Replace(opctx, event)
if err != nil {
return err
} }
if replaced { err := archive(ctx, store, event, onReplace)
return onReplace(event)
}
return nil
default:
return nil
}
}()
if err != nil && ctx.Err() == nil { if err != nil && ctx.Err() == nil {
log.Printf("Archiver: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, err) log.Printf("Archiver: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, err)
} }
@@ -119,10 +117,59 @@ func Archiver(
} }
} }
// Archive an event based on its kind.
// If a replacement happened, it calls the provided onReplace
func archive(
ctx context.Context,
store nastro.Store,
event *nostr.Event,
onReplace Forward[*nostr.Event],
) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
switch {
case nostr.IsRegularKind(event.Kind):
return store.Save(ctx, event)
case nostr.IsReplaceableKind(event.Kind) || nostr.IsAddressableKind(event.Kind):
replaced, err := store.Replace(ctx, event)
if err != nil {
return err
}
if replaced && onReplace != nil {
return onReplace(event)
}
return nil
default:
return nil
}
}
type GraphBuilderConfig struct {
CacheCapacity int
PrintEvery int
}
func NewGraphBuilderConfig() GraphBuilderConfig {
return GraphBuilderConfig{
CacheCapacity: 100_000,
PrintEvery: 10_000,
}
}
func (c GraphBuilderConfig) Print() {
fmt.Printf("GraphBuilder\n")
fmt.Printf(" CacheCapacity: %v\n", c.CacheCapacity)
fmt.Printf(" PrintEvery: %d\n", c.PrintEvery)
}
// GraphBuilder consumes events to update the graph and random walks. // GraphBuilder consumes events to update the graph and random walks.
func GraphBuilder( func GraphBuilder(
ctx context.Context, ctx context.Context,
config EngineConfig, config GraphBuilderConfig,
db redb.RedisDB, db redb.RedisDB,
events chan *nostr.Event, events chan *nostr.Event,
) { ) {