diff --git a/go.mod b/go.mod index b324f29..43bfaa6 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ toolchain go1.24.3 require ( github.com/joho/godotenv v1.5.1 + github.com/kelseyhightower/envconfig v1.4.0 github.com/nbd-wtf/go-nostr v0.51.12 github.com/pippellia-btc/nastro v0.3.0 github.com/pippellia-btc/slicex v0.2.4 diff --git a/go.sum b/go.sum index 4375925..6a5d935 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,8 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= +github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= diff --git a/pkg/config/config.go b/pkg/config/config.go index e6833de..df3fb18 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -2,12 +2,10 @@ package config import ( + "errors" "fmt" - "os" - "strconv" - "strings" - "time" + "github.com/kelseyhightower/envconfig" "github.com/vertex-lab/crawler_v2/pkg/pipe" _ "github.com/joho/godotenv/autoload" // autoloading .env @@ -15,14 +13,12 @@ import ( ) type SystemConfig struct { - RedisAddress string - SQLiteURL string - - EventsCapacity int - PubkeysCapacity int - - InitPubkeys []string // only used during initialization - PrintStats bool + RedisAddress string `envconfig:"REDIS_ADDRESS"` + SQLiteURL string `envconfig:"SQLITE_URL"` + EventsCapacity int `envconfig:"EVENTS_CAPACITY"` + PubkeysCapacity int `envconfig:"PUBKEYS_CAPACITY"` + InitPubkeys []string `envconfig:"INIT_PUBKEYS"` + PrintStats bool `envconfig:"PRINT_STATS"` } func NewSystemConfig() SystemConfig { @@ -34,6 +30,19 @@ func NewSystemConfig() SystemConfig { } } +func (c SystemConfig) Validate() error { + if c.EventsCapacity < 0 { + return errors.New("events: value cannot be negative") + } + + for _, pk := range c.InitPubkeys { + if !nostr.IsValidPublicKey(pk) { + return fmt.Errorf("init pubkeys: \"%s\" is not valid hex pubkey", pk) + } + } + return nil +} + func (c SystemConfig) Print() { fmt.Println("System:") fmt.Printf(" RedisAddress: %s\n", c.RedisAddress) @@ -54,8 +63,8 @@ type Config struct { } // New returns a config with default parameters -func New() *Config { - return &Config{ +func New() Config { + return Config{ SystemConfig: NewSystemConfig(), Firehose: pipe.NewFirehoseConfig(), Fetcher: pipe.NewFetcherConfig(), @@ -64,7 +73,30 @@ func New() *Config { } } -func (c *Config) Print() { +func (c Config) Validate() error { + if err := c.SystemConfig.Validate(); err != nil { + return fmt.Errorf("System: %w", err) + } + + if err := c.Firehose.Validate(); err != nil { + return fmt.Errorf("Firehose: %w", err) + } + + if err := c.Fetcher.Validate(); err != nil { + return fmt.Errorf("Fetcher: %w", err) + } + + if err := c.Arbiter.Validate(); err != nil { + return fmt.Errorf("Arbiter: %w", err) + } + + if err := c.Engine.Validate(); err != nil { + return fmt.Errorf("Engine: %w", err) + } + return nil +} + +func (c Config) Print() { c.SystemConfig.Print() c.Firehose.Print() c.Fetcher.Print() @@ -72,138 +104,17 @@ func (c *Config) Print() { c.Engine.Print() } -// Load reads the enviroment variables and parses them into a [Config] struct -func Load() (*Config, error) { - var config = New() - var err error +// Load creates a new [Config] with default parameters. +// Then, if the corresponding environment variable is set, it overwrites them. +func Load() (Config, error) { + config := New() - for _, item := range os.Environ() { - keyVal := strings.SplitN(item, "=", 2) - key, val := keyVal[0], keyVal[1] + if err := envconfig.Process("", &config); err != nil { + return Config{}, fmt.Errorf("config.Load: %w", err) + } - switch key { - case "REDIS_ADDRESS": - config.RedisAddress = val - - case "SQLITE_URL": - config.SQLiteURL = val - - case "EVENTS_CAPACITY": - config.EventsCapacity, err = strconv.Atoi(val) - if err != nil { - return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) - } - - case "PUBKEYS_CAPACITY": - config.PubkeysCapacity, err = strconv.Atoi(val) - if err != nil { - return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) - } - - case "INIT_PUBKEYS": - pubkeys := strings.Split(val, ",") - for _, pk := range pubkeys { - if !nostr.IsValidPublicKey(pk) { - return nil, fmt.Errorf("pubkey %s is not valid", pk) - } - } - - config.InitPubkeys = pubkeys - - case "PRINT_STATS": - config.PrintStats, err = strconv.ParseBool(val) - if err != nil { - return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) - } - - case "FIREHOSE_OFFSET": - offset, err := strconv.Atoi(val) - if err != nil { - return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) - } - config.Fetcher.Interval = time.Duration(offset) * time.Second - - case "RELAYS": - relays := strings.Split(val, ",") - if len(relays) == 0 { - return nil, fmt.Errorf("relay list is empty") - } - - for _, relay := range relays { - if !nostr.IsValidRelayURL(relay) { - return nil, fmt.Errorf("relay \"%s\" is not a valid url", relay) - } - } - - config.Firehose.Relays = relays - config.Fetcher.Relays = relays - - case "FETCHER_BATCH": - config.Fetcher.Batch, err = strconv.Atoi(val) - if err != nil { - return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) - } - - case "FETCHER_INTERVAL": - interval, err := strconv.Atoi(val) - if err != nil { - return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) - } - config.Fetcher.Interval = time.Duration(interval) * time.Second - - case "ARBITER_ACTIVATION": - config.Arbiter.Activation, err = strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) - } - - case "ARBITER_PROMOTION": - config.Arbiter.Promotion, err = strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) - } - - case "ARBITER_DEMOTION": - config.Arbiter.Demotion, err = strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) - } - - case "ARBITER_PING_WAIT": - wait, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) - } - config.Arbiter.PingWait = time.Duration(wait) * time.Second - - case "ARBITER_PROMOTION_WAIT": - wait, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) - } - config.Arbiter.PromotionWait = time.Duration(wait) * time.Second - - case "ENGINE_PRINT_EVERY": - printEvery, err := strconv.Atoi(val) - if err != nil { - return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) - } - - config.Engine.Archiver.PrintEvery = printEvery - config.Engine.Builder.PrintEvery = printEvery - - case "ENGINE_BUILDER_CAPACITY": - config.Engine.ChannelCapacity, err = strconv.Atoi(val) - if err != nil { - return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) - } - - case "ENGINE_CACHE_CAPACITY": - config.Engine.Builder.CacheCapacity, err = strconv.Atoi(val) - if err != nil { - return nil, fmt.Errorf("error parsing %v: %v", keyVal, err) - } - } + if err := config.Validate(); err != nil { + return Config{}, fmt.Errorf("config.Load: %w", err) } return config, nil diff --git a/pkg/pipe/arbiter.go b/pkg/pipe/arbiter.go index e6f8f41..a4f5072 100644 --- a/pkg/pipe/arbiter.go +++ b/pkg/pipe/arbiter.go @@ -2,6 +2,7 @@ package pipe import ( "context" + "errors" "fmt" "log" "sync/atomic" @@ -18,12 +19,11 @@ import ( var WalksTracker atomic.Int32 type ArbiterConfig struct { - Activation float64 - Promotion float64 - Demotion float64 - - PromotionWait time.Duration - PingWait time.Duration + Activation float64 `envconfig:"ARBITER_ACTIVATION"` + Promotion float64 `envconfig:"ARBITER_PROMOTION"` + Demotion float64 `envconfig:"ARBITER_DEMOTION"` + PromotionWait time.Duration `envconfig:"ARBITER_PROMOTION_WAIT"` + PingWait time.Duration `envconfig:"ARBITER_PING_WAIT"` } func NewArbiterConfig() ArbiterConfig { @@ -36,8 +36,38 @@ func NewArbiterConfig() ArbiterConfig { } } +func (c ArbiterConfig) Validate() error { + if c.Activation < 0 { + return errors.New("activation ratio cannot be negative") + } + + if c.Promotion < 0 { + return errors.New("promotion multiplier cannot be negative") + } + + if c.Demotion < 0 { + return errors.New("demotion multiplier cannot be negative") + } + + if c.Demotion <= 1 { + log.Println("WARN: Arbiter: demotion multiplier is smaller than 1." + + "This implies it's impossible for an active node to be demoted") + } + + if 1+c.Promotion <= c.Demotion { + log.Println("WARN: Arbiter: the inequality (1 + promotion) > demotion is not satisfied." + + "This implies there will be cyclical promotions -> demotions -> promotions...") + } + + if c.PromotionWait < 24*time.Hour { + log.Println("WARN: Arbiter: the promotion wait is less than 24hrs." + + "This implies a reputable attacker could add to the db several bots in a short period of time") + } + return nil +} + func (c ArbiterConfig) Print() { - fmt.Printf("Arbiter\n") + fmt.Printf("Arbiter: \n") fmt.Printf(" Activation: %f\n", c.Activation) fmt.Printf(" Promotion: %f\n", c.Promotion) fmt.Printf(" Demotion: %f\n", c.Demotion) diff --git a/pkg/pipe/engine.go b/pkg/pipe/engine.go index d6a4233..a912b3b 100644 --- a/pkg/pipe/engine.go +++ b/pkg/pipe/engine.go @@ -3,6 +3,7 @@ package pipe import ( "context" + "errors" "fmt" "log" "slices" @@ -20,7 +21,7 @@ import ( type EngineConfig struct { Archiver ArchiverConfig Builder GraphBuilderConfig - ChannelCapacity int + ChannelCapacity int `envconfig:"ENGINE_BUILDER_CAPACITY"` } func NewEngineConfig() EngineConfig { @@ -30,8 +31,20 @@ func NewEngineConfig() EngineConfig { } } +func (c EngineConfig) Validate() error { + if err := c.Archiver.Validate(); err != nil { + return fmt.Errorf("Archiver: %w", err) + } + + if err := c.Builder.Validate(); err != nil { + return fmt.Errorf("GraphBuilder: %w", err) + } + + return nil +} + func (c EngineConfig) Print() { - fmt.Printf("Engine\n") + fmt.Printf("Engine:\n") fmt.Printf(" ChannelCapacity: %d\n", c.ChannelCapacity) c.Archiver.Print() c.Builder.Print() @@ -60,8 +73,8 @@ func Engine( } type ArchiverConfig struct { - Kinds []int - PrintEvery int + Kinds []int `envconfig:"ARCHIVER_KINDS"` + PrintEvery int `envconfig:"ENGINE_PRINT_EVERY"` } func NewArchiverConfig() ArchiverConfig { @@ -71,8 +84,19 @@ func NewArchiverConfig() ArchiverConfig { } } +func (c ArchiverConfig) Validate() error { + if len(c.Kinds) < 1 { + return errors.New("kind list cannot be empty") + } + + if c.PrintEvery < 0 { + return errors.New("print every cannot be negative") + } + return nil +} + func (c ArchiverConfig) Print() { - fmt.Printf("Archiver\n") + fmt.Printf("Archiver:\n") fmt.Printf(" Kinds: %v\n", c.Kinds) fmt.Printf(" PrintEvery: %d\n", c.PrintEvery) } @@ -149,8 +173,8 @@ func archive( } type GraphBuilderConfig struct { - CacheCapacity int - PrintEvery int + CacheCapacity int `envconfig:"ENGINE_CACHE_CAPACITY"` + PrintEvery int `envconfig:"ENGINE_PRINT_EVERY"` } func NewGraphBuilderConfig() GraphBuilderConfig { @@ -160,8 +184,19 @@ func NewGraphBuilderConfig() GraphBuilderConfig { } } +func (c GraphBuilderConfig) Validate() error { + if c.CacheCapacity < 0 { + return errors.New("cache capacity cannot be negative") + } + + if c.PrintEvery < 0 { + return errors.New("print every cannot be negative") + } + return nil +} + func (c GraphBuilderConfig) Print() { - fmt.Printf("GraphBuilder\n") + fmt.Printf("GraphBuilder:\n") fmt.Printf(" CacheCapacity: %v\n", c.CacheCapacity) fmt.Printf(" PrintEvery: %d\n", c.PrintEvery) } diff --git a/pkg/pipe/fetcher.go b/pkg/pipe/fetcher.go index 97cd0db..15a055b 100644 --- a/pkg/pipe/fetcher.go +++ b/pkg/pipe/fetcher.go @@ -2,6 +2,7 @@ package pipe import ( "context" + "errors" "fmt" "log" "time" @@ -11,10 +12,10 @@ import ( ) type FetcherConfig struct { - Kinds []int - Relays []string - Batch int - Interval time.Duration + Kinds []int `envconfig:"FETCHER_KINDS"` + Relays []string `envconfig:"RELAYS"` + Batch int `envconfig:"FETCHER_BATCH"` + Interval time.Duration `envconfig:"FETCHER_INTERVAL"` } func NewFetcherConfig() FetcherConfig { @@ -26,8 +27,25 @@ func NewFetcherConfig() FetcherConfig { } } +func (c FetcherConfig) Validate() error { + if len(c.Kinds) < 1 { + return errors.New("kind list cannot be empty") + } + + if c.Batch <= 1 { + return errors.New("batch value must be positive") + } + + for _, relay := range c.Relays { + if !nostr.IsValidRelayURL(relay) { + return fmt.Errorf("\"%s\" is not a valid relay url", relay) + } + } + return nil +} + func (c FetcherConfig) Print() { - fmt.Printf("Fetcher\n") + fmt.Printf("Fetcher:\n") fmt.Printf(" Kinds: %v\n", c.Kinds) fmt.Printf(" Relays: %v\n", c.Relays) fmt.Printf(" Batch: %d\n", c.Batch) diff --git a/pkg/pipe/firehose.go b/pkg/pipe/firehose.go index 831e6d6..1b5d995 100644 --- a/pkg/pipe/firehose.go +++ b/pkg/pipe/firehose.go @@ -2,6 +2,7 @@ package pipe import ( "context" + "errors" "fmt" "log" "slices" @@ -76,9 +77,9 @@ var ( ) type FirehoseConfig struct { - Kinds []int - Relays []string - Offset time.Duration + Kinds []int `envconfig:"FIREHOSE_KINDS"` + Relays []string `envconfig:"RELAYS"` + Offset time.Duration `envconfig:"FIREHOSE_OFFSET"` } func NewFirehoseConfig() FirehoseConfig { @@ -89,13 +90,26 @@ func NewFirehoseConfig() FirehoseConfig { } } +func (c FirehoseConfig) Validate() error { + if len(c.Kinds) < 1 { + return errors.New("kind list cannot be empty") + } + + for _, relay := range c.Relays { + if !nostr.IsValidRelayURL(relay) { + return fmt.Errorf("\"%s\" is not a valid relay url", relay) + } + } + return nil +} + func (c FirehoseConfig) Since() *nostr.Timestamp { since := nostr.Timestamp(time.Now().Add(-c.Offset).Unix()) return &since } func (c FirehoseConfig) Print() { - fmt.Printf("Firehose\n") + fmt.Printf("Firehose:\n") fmt.Printf(" Kinds: %v\n", c.Kinds) fmt.Printf(" Relays: %v\n", c.Relays) fmt.Printf(" Offset: %v\n", c.Offset)