DRYed up with InitGraph helper

This commit is contained in:
pippellia-btc
2025-09-14 19:37:44 +02:00
parent dc30c2242c
commit aad5523de8
3 changed files with 38 additions and 46 deletions

View File

@@ -11,7 +11,6 @@ import (
"github.com/pippellia-btc/nastro/sqlite" "github.com/pippellia-btc/nastro/sqlite"
"github.com/vertex-lab/crawler_v2/pkg/config" "github.com/vertex-lab/crawler_v2/pkg/config"
"github.com/vertex-lab/crawler_v2/pkg/graph"
"github.com/vertex-lab/crawler_v2/pkg/pipe" "github.com/vertex-lab/crawler_v2/pkg/pipe"
"github.com/vertex-lab/crawler_v2/pkg/redb" "github.com/vertex-lab/crawler_v2/pkg/redb"
"github.com/vertex-lab/crawler_v2/pkg/store" "github.com/vertex-lab/crawler_v2/pkg/store"
@@ -61,13 +60,17 @@ func main() {
} }
if nodes == 0 { if nodes == 0 {
if err := initGraph(ctx, db, config.InitPubkeys); err != nil { log.Println("initializing from empty database...")
if err := pipe.InitGraph(ctx, db, config.InitPubkeys); err != nil {
panic(err) panic(err)
} }
for _, pk := range config.InitPubkeys { for _, pk := range config.InitPubkeys {
fetcherQueue <- pk fetcherQueue <- pk
} }
log.Printf("correctly added %d pubkeys", len(config.InitPubkeys))
} }
if config.PrintStats { if config.PrintStats {
@@ -110,32 +113,6 @@ func main() {
consumers.Wait() consumers.Wait()
} }
func initGraph(ctx context.Context, db redb.RedisDB, pubkeys []string) error {
if len(pubkeys) == 0 {
panic("init pubkeys are empty: impossible to initialize")
}
log.Println("initialize from empty database...")
var initNodes = make([]graph.ID, len(pubkeys))
var err error
for i, pk := range pubkeys {
initNodes[i], err = db.AddNode(ctx, pk)
if err != nil {
panic(err)
}
}
for _, node := range initNodes {
if err := pipe.Promote(db, node); err != nil {
panic(err)
}
}
log.Printf("correctly added %d init pubkeys", len(pubkeys))
return nil
}
func printStats( func printStats(
ctx context.Context, ctx context.Context,
recorderQueue, engineQueue chan *nostr.Event, recorderQueue, engineQueue chan *nostr.Event,

View File

@@ -10,7 +10,6 @@ import (
"time" "time"
"github.com/vertex-lab/crawler_v2/pkg/config" "github.com/vertex-lab/crawler_v2/pkg/config"
"github.com/vertex-lab/crawler_v2/pkg/graph"
"github.com/vertex-lab/crawler_v2/pkg/pipe" "github.com/vertex-lab/crawler_v2/pkg/pipe"
"github.com/vertex-lab/crawler_v2/pkg/redb" "github.com/vertex-lab/crawler_v2/pkg/redb"
"github.com/vertex-lab/crawler_v2/pkg/store" "github.com/vertex-lab/crawler_v2/pkg/store"
@@ -57,30 +56,20 @@ func main() {
} }
if nodes != 0 { if nodes != 0 {
panic("refuse to run sync when redis is not empty") panic("refuse to sync when redis is not empty")
} }
if len(config.InitPubkeys) == 0 {
panic("init fetcherQueue are empty: impossible to initialize")
}
log.Println("initialize from empty database...") log.Println("initialize from empty database...")
initNodes := make([]graph.ID, len(config.InitPubkeys)) if err := pipe.InitGraph(ctx, db, config.InitPubkeys); err != nil {
for i, pk := range config.InitPubkeys { panic(err)
initNodes[i], err = db.AddNode(ctx, pk)
if err != nil {
panic(err)
}
fetcherQueue <- pk // add to queue
} }
for _, node := range initNodes { for _, pk := range config.InitPubkeys {
if err := pipe.Promote(db, node); err != nil { fetcherQueue <- pk
panic(err)
}
} }
log.Printf("correctly added %d init fetcherQueue", len(config.InitPubkeys))
log.Printf("correctly added %d pubkeys", len(config.InitPubkeys))
if config.PrintStats { if config.PrintStats {
go printStats(ctx, builderQueue, fetcherQueue) go printStats(ctx, builderQueue, fetcherQueue)

View File

@@ -11,6 +11,8 @@ import (
"syscall" "syscall"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr"
"github.com/vertex-lab/crawler_v2/pkg/graph"
"github.com/vertex-lab/crawler_v2/pkg/redb"
) )
var ( var (
@@ -40,6 +42,30 @@ func HandleSignals(cancel context.CancelFunc) {
cancel() cancel()
} }
// InitGraph by adding and promoting the provided pubkeys.
func InitGraph(ctx context.Context, db redb.RedisDB, pubkeys []string) error {
if len(pubkeys) == 0 {
return fmt.Errorf("InitGraph: init pubkeys are empty")
}
var initNodes = make([]graph.ID, len(pubkeys))
var err error
for i, pk := range pubkeys {
initNodes[i], err = db.AddNode(ctx, pk)
if err != nil {
return fmt.Errorf("InitGraph: %v", err)
}
}
for _, node := range initNodes {
if err := Promote(db, node); err != nil {
return fmt.Errorf("InitGraph: %v", err)
}
}
return nil
}
// Shutdown iterates over the relays in the pool and closes all connections. // Shutdown iterates over the relays in the pool and closes all connections.
func shutdown(pool *nostr.SimplePool) { func shutdown(pool *nostr.SimplePool) {
pool.Relays.Range(func(_ string, relay *nostr.Relay) bool { pool.Relays.Range(func(_ string, relay *nostr.Relay) bool {