From 1a9bea800588cc6a48b9ee1e6ab11ec004011230 Mon Sep 17 00:00:00 2001 From: pippellia-btc Date: Thu, 5 Jun 2025 16:41:19 +0200 Subject: [PATCH] added e2e tests --- .gitignore | 16 +++ cmd/crawler.go | 53 +++++-- pkg/pipe/arbiter.go | 13 +- pkg/pipe/processor.go | 6 +- pkg/redb/graph.go | 78 +++++++---- pkg/redb/graph_test.go | 10 +- pkg/redb/utils.go | 2 +- pkg/redb/walks.go | 53 +++++-- pkg/redb/walks_test.go | 4 +- pkg/walks/walker.go | 26 +++- tests/e2e/e2e_test.go | 167 +++++++++++++++++++++++ tests/random/pagerank_test.go | 2 +- tests/random/{utils_test.go => utils.go} | 2 +- 13 files changed, 358 insertions(+), 74 deletions(-) create mode 100644 .gitignore create mode 100644 tests/e2e/e2e_test.go rename tests/random/{utils_test.go => utils.go} (99%) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dec9e4d --- /dev/null +++ b/.gitignore @@ -0,0 +1,16 @@ +# Ignore all log files +**/*.log + +# Ignore all the .env +**/*.env + +# Ignore all sqlite files +**/*.sqlite +**/*-shm +**/*-wal + +# Ignore all images +**/*.jpeg + +# Ignore the debugs +.vscode \ No newline at end of file diff --git a/cmd/crawler.go b/cmd/crawler.go index 0272110..ea31610 100644 --- a/cmd/crawler.go +++ b/cmd/crawler.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "github/pippellia-btc/crawler/pkg/graph" "github/pippellia-btc/crawler/pkg/pipe" "github/pippellia-btc/crawler/pkg/redb" @@ -9,13 +10,18 @@ import ( "log" "os" "os/signal" + "runtime" "sync" "syscall" + "time" "github.com/nbd-wtf/go-nostr" "github.com/redis/go-redis/v9" ) +var events chan *nostr.Event +var pubkeys chan string + func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -26,8 +32,8 @@ func main() { panic(err) } - events := make(chan *nostr.Event, config.EventsCapacity) - pubkeys := make(chan string, config.PubkeysCapacity) + events = make(chan *nostr.Event, config.EventsCapacity) + pubkeys = make(chan string, config.PubkeysCapacity) db := redb.New(&redis.Options{Addr: config.RedisAddress}) count, err := db.NodeCount(ctx) @@ -60,13 +66,6 @@ func main() { log.Printf("correctly added %d init pubkeys", len(config.InitPubkeys)) } - _ = events - - // eventStore, err := eventstore.New(config.SQLiteURL) - // if err != nil { - // panic("failed to connect to the sqlite eventstore: " + err.Error()) - // } - var wg sync.WaitGroup wg.Add(3) @@ -106,7 +105,7 @@ func main() { }) }() - log.Println("ready to process events") + go printStats(ctx) pipe.Processor(ctx, config.Processor, db, events) wg.Wait() } @@ -120,3 +119,37 @@ func handleSignals(cancel context.CancelFunc) { log.Println(" Signal received. Shutting down...") cancel() } + +func printStats(ctx context.Context) { + filename := "stats.log" + file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + panic(fmt.Errorf("failed to open log file %s: %w", filename, err)) + } + + defer file.Close() + log := log.New(file, "stats: ", log.LstdFlags) + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + goroutines := runtime.NumGoroutine() + memStats := new(runtime.MemStats) + runtime.ReadMemStats(memStats) + + log.Println("---------------------------------------") + log.Printf("events queue: %d/%d\n", len(events), cap(events)) + log.Printf("pubkeys queue: %d/%d\n", len(pubkeys), cap(pubkeys)) + log.Printf("walks tracker: %v\n", pipe.WalksTracker.Load()) + log.Printf("goroutines: %d\n", goroutines) + log.Printf("memory usage: %.2f MB\n", float64(memStats.Alloc)/(1024*1024)) + log.Println("---------------------------------------") + } + } +} diff --git a/pkg/pipe/arbiter.go b/pkg/pipe/arbiter.go index 1570c2b..ffbe2ea 100644 --- a/pkg/pipe/arbiter.go +++ b/pkg/pipe/arbiter.go @@ -12,9 +12,9 @@ import ( "time" ) -// walksTracker tracks the number of walks that have been updated by [Processor]. +// WalksTracker tracks the number of walks that have been updated by [Processor]. // It's used to wake-up the [Arbiter], which performs work and then resets it to 0. -var walksTracker atomic.Int32 +var WalksTracker atomic.Int32 type ArbiterConfig struct { Activation float64 @@ -51,7 +51,7 @@ func Arbiter(ctx context.Context, config ArbiterConfig, db redb.RedisDB, send fu ticker := time.NewTicker(config.PingWait) defer ticker.Stop() - walksTracker.Add(1000_000_000) // trigger a scan at startup + WalksTracker.Add(1000_000_000) // trigger a scan at startup for { select { @@ -66,17 +66,16 @@ func Arbiter(ctx context.Context, config ArbiterConfig, db redb.RedisDB, send fu continue } - changed := walksTracker.Load() + changed := WalksTracker.Load() changeRatio := float64(changed) / float64(total) if changeRatio > config.Activation { promoted, demoted, err := arbiterScan(ctx, config, db, send) if err != nil { log.Printf("Arbiter: %v", err) - continue } - walksTracker.Store(0) // resets tracker + WalksTracker.Store(0) // resets tracker log.Printf("Arbiter: promoted %d, demoted %d", promoted, demoted) } } @@ -85,7 +84,7 @@ func Arbiter(ctx context.Context, config ArbiterConfig, db redb.RedisDB, send fu // ArbiterScan performs one entire database scan, promoting or demoting nodes based on their pagerank. func arbiterScan(ctx context.Context, config ArbiterConfig, db redb.RedisDB, send func(pk string) error) (promoted, demoted int, err error) { - maxTime := 60 * time.Second + maxTime := 2 * time.Minute ctx, cancel := context.WithTimeout(ctx, maxTime) defer cancel() diff --git a/pkg/pipe/processor.go b/pkg/pipe/processor.go index 0866e55..b13e082 100644 --- a/pkg/pipe/processor.go +++ b/pkg/pipe/processor.go @@ -30,7 +30,6 @@ func (c ProcessorConfig) Print() { fmt.Printf(" PrintEvery: %d\n", c.PrintEvery) } -// Processor() process one event at the time from the eventChannel, based on their kind. func Processor( ctx context.Context, config ProcessorConfig, @@ -44,8 +43,11 @@ func Processor( cache := walks.NewWalker( walks.WithCapacity(10000), walks.WithFallback(db), + walks.WithLogFile("cache.log"), ) + log.Println("Processor: ready to process events") + for { select { case <-ctx.Done(): @@ -126,7 +128,7 @@ func processFollowList(cache *walks.CachedWalker, db redb.RedisDB, event *nostr. return err } - walksTracker.Add(int32(len(new))) + WalksTracker.Add(int32(len(new))) return cache.Update(ctx, delta) } diff --git a/pkg/redb/graph.go b/pkg/redb/graph.go index 329c8b3..839065d 100644 --- a/pkg/redb/graph.go +++ b/pkg/redb/graph.go @@ -34,11 +34,11 @@ const ( ) type RedisDB struct { - client *redis.Client + Client *redis.Client } func New(opt *redis.Options) RedisDB { - db := RedisDB{client: redis.NewClient(opt)} + db := RedisDB{Client: redis.NewClient(opt)} if err := db.init(); err != nil { panic(err) } @@ -47,7 +47,7 @@ func New(opt *redis.Options) RedisDB { // Size returns the DBSize of redis, which is the total number of keys func (db RedisDB) Size(ctx context.Context) (int, error) { - size, err := db.client.DBSize(ctx).Result() + size, err := db.Client.DBSize(ctx).Result() if err != nil { return 0, fmt.Errorf("failed to fetch the db size: %w", err) } @@ -56,7 +56,7 @@ func (db RedisDB) Size(ctx context.Context) (int, error) { // NodeCount returns the number of nodes stored in redis (in the keyIndex) func (db RedisDB) NodeCount(ctx context.Context) (int, error) { - nodes, err := db.client.HLen(ctx, KeyKeyIndex).Result() + nodes, err := db.Client.HLen(ctx, KeyKeyIndex).Result() if err != nil { return 0, fmt.Errorf("failed to fetch the node count: %w", err) } @@ -69,7 +69,7 @@ func (db RedisDB) Nodes(ctx context.Context, IDs ...graph.ID) ([]*graph.Node, er return nil, nil } - pipe := db.client.Pipeline() + pipe := db.Client.Pipeline() cmds := make([]*redis.MapStringStringCmd, len(IDs)) for i, ID := range IDs { cmds[i] = pipe.HGetAll(ctx, node(ID)) @@ -98,7 +98,7 @@ func (db RedisDB) Nodes(ctx context.Context, IDs ...graph.ID) ([]*graph.Node, er // NodeByID fetches a node by its ID func (db RedisDB) NodeByID(ctx context.Context, ID graph.ID) (*graph.Node, error) { - fields, err := db.client.HGetAll(ctx, node(ID)).Result() + fields, err := db.Client.HGetAll(ctx, node(ID)).Result() if err != nil { return nil, fmt.Errorf("failed to fetch %s: %w", node(ID), err) } @@ -112,12 +112,12 @@ func (db RedisDB) NodeByID(ctx context.Context, ID graph.ID) (*graph.Node, error // NodeByKey fetches a node by its pubkey func (db RedisDB) NodeByKey(ctx context.Context, pubkey string) (*graph.Node, error) { - ID, err := db.client.HGet(ctx, KeyKeyIndex, pubkey).Result() + ID, err := db.Client.HGet(ctx, KeyKeyIndex, pubkey).Result() if err != nil { return nil, fmt.Errorf("failed to fetch ID of node with pubkey %s: %w", pubkey, err) } - fields, err := db.client.HGetAll(ctx, node(ID)).Result() + fields, err := db.Client.HGetAll(ctx, node(ID)).Result() if err != nil { return nil, fmt.Errorf("failed to fetch node with pubkey %s: %w", pubkey, err) } @@ -131,7 +131,7 @@ func (db RedisDB) NodeByKey(ctx context.Context, pubkey string) (*graph.Node, er // Exists checks for the existance of the pubkey func (db RedisDB) Exists(ctx context.Context, pubkey string) (bool, error) { - exists, err := db.client.HExists(ctx, KeyKeyIndex, pubkey).Result() + exists, err := db.Client.HExists(ctx, KeyKeyIndex, pubkey).Result() if err != nil { return false, fmt.Errorf("failed to check existance of pubkey %s: %w", pubkey, err) } @@ -148,7 +148,7 @@ func (db RedisDB) ensureExists(ctx context.Context, IDs ...graph.ID) error { nodes[i] = node(ID) } - exists, err := db.client.Exists(ctx, nodes...).Result() + exists, err := db.Client.Exists(ctx, nodes...).Result() if err != nil { return fmt.Errorf("failed to check for the existence of %d nodes: %w", len(IDs), err) } @@ -162,7 +162,7 @@ func (db RedisDB) ensureExists(ctx context.Context, IDs ...graph.ID) error { // AddNode adds a new inactive node to the database and returns its assigned ID func (db RedisDB) AddNode(ctx context.Context, pubkey string) (graph.ID, error) { - exists, err := db.client.HExists(ctx, KeyKeyIndex, pubkey).Result() + exists, err := db.Client.HExists(ctx, KeyKeyIndex, pubkey).Result() if err != nil { return "", fmt.Errorf("failed to check for existence of pubkey %s: %w", pubkey, err) } @@ -173,13 +173,13 @@ func (db RedisDB) AddNode(ctx context.Context, pubkey string) (graph.ID, error) // get the ID outside the transaction, which implies there might be "holes", // meaning IDs not associated with any node - next, err := db.client.HIncrBy(ctx, KeyDatabase, KeyLastNodeID, 1).Result() + next, err := db.Client.HIncrBy(ctx, KeyDatabase, KeyLastNodeID, 1).Result() if err != nil { return "", fmt.Errorf("failed to add node with pubkey %s: failed to increment ID", pubkey) } ID := strconv.FormatInt(next-1, 10) - pipe := db.client.TxPipeline() + pipe := db.Client.TxPipeline() pipe.HSetNX(ctx, KeyKeyIndex, pubkey, ID) pipe.HSet(ctx, node(ID), NodeID, ID, NodePubkey, pubkey, NodeStatus, graph.StatusInactive, NodeAddedTS, time.Now().Unix()) if _, err := pipe.Exec(ctx); err != nil { @@ -191,7 +191,7 @@ func (db RedisDB) AddNode(ctx context.Context, pubkey string) (graph.ID, error) // Promote changes the node status to active func (db RedisDB) Promote(ctx context.Context, ID graph.ID) error { - err := db.client.HSet(ctx, node(ID), NodeStatus, graph.StatusActive, NodePromotionTS, time.Now().Unix()).Err() + err := db.Client.HSet(ctx, node(ID), NodeStatus, graph.StatusActive, NodePromotionTS, time.Now().Unix()).Err() if err != nil { return fmt.Errorf("failed to promote %s: %w", node(ID), err) } @@ -200,7 +200,7 @@ func (db RedisDB) Promote(ctx context.Context, ID graph.ID) error { // Demote changes the node status to inactive func (db RedisDB) Demote(ctx context.Context, ID graph.ID) error { - err := db.client.HSet(ctx, node(ID), NodeStatus, graph.StatusInactive, NodeDemotionTS, time.Now().Unix()).Err() + err := db.Client.HSet(ctx, node(ID), NodeStatus, graph.StatusInactive, NodeDemotionTS, time.Now().Unix()).Err() if err != nil { return fmt.Errorf("failed to demote %s: %w", node(ID), err) } @@ -218,7 +218,7 @@ func (db RedisDB) Followers(ctx context.Context, node graph.ID) ([]graph.ID, err } func (db RedisDB) members(ctx context.Context, key func(graph.ID) string, node graph.ID) ([]graph.ID, error) { - members, err := db.client.SMembers(ctx, key(node)).Result() + members, err := db.Client.SMembers(ctx, key(node)).Result() if err != nil { return nil, fmt.Errorf("failed to fetch %s: %w", key(node), err) } @@ -245,7 +245,7 @@ func (db RedisDB) bulkMembers(ctx context.Context, key func(graph.ID) string, no return nil, nil case len(nodes) < 10000: - pipe := db.client.Pipeline() + pipe := db.Client.Pipeline() cmds := make([]*redis.StringSliceCmd, len(nodes)) for i, node := range nodes { @@ -310,7 +310,7 @@ func (db RedisDB) counts(ctx context.Context, key func(graph.ID) string, nodes . return nil, nil } - pipe := db.client.Pipeline() + pipe := db.Client.Pipeline() cmds := make([]*redis.IntCmd, len(nodes)) for i, node := range nodes { @@ -356,7 +356,7 @@ func (db RedisDB) Update(ctx context.Context, delta graph.Delta) error { } func (db RedisDB) updateFollows(ctx context.Context, delta graph.Delta) error { - pipe := db.client.TxPipeline() + pipe := db.Client.TxPipeline() if len(delta.Add) > 0 { // add all node --> added pipe.SAdd(ctx, follows(delta.Node), toStrings(delta.Add)) @@ -389,7 +389,7 @@ func (db RedisDB) NodeIDs(ctx context.Context, pubkeys ...string) ([]graph.ID, e return nil, nil } - IDs, err := db.client.HMGet(ctx, KeyKeyIndex, pubkeys...).Result() + IDs, err := db.Client.HMGet(ctx, KeyKeyIndex, pubkeys...).Result() if err != nil { return nil, fmt.Errorf("failed to fetch the node IDs of %v: %w", pubkeys, err) } @@ -417,7 +417,7 @@ func (db RedisDB) Pubkeys(ctx context.Context, nodes ...graph.ID) ([]string, err return nil, nil } - pipe := db.client.Pipeline() + pipe := db.Client.Pipeline() cmds := make([]*redis.StringCmd, len(nodes)) for i, ID := range nodes { cmds[i] = pipe.HGet(ctx, node(ID), NodePubkey) @@ -480,12 +480,12 @@ func (db RedisDB) Resolve(ctx context.Context, pubkeys []string, onMissing Missi default: if j != i { - // write only if necessary - IDs[j] = ID + IDs[j] = ID // write only if necessary } j++ } } + return IDs[:j], nil } @@ -494,7 +494,7 @@ func (db RedisDB) Resolve(ctx context.Context, pubkeys []string, onMissing Missi // Learn more about scan: https://redis.io/docs/latest/commands/scan/ func (db RedisDB) ScanNodes(ctx context.Context, cursor uint64, limit int) ([]graph.ID, uint64, error) { match := KeyNodePrefix + "*" - keys, cursor, err := db.client.Scan(ctx, cursor, match, int64(limit)).Result() + keys, cursor, err := db.Client.Scan(ctx, cursor, match, int64(limit)).Result() if err != nil { return nil, 0, fmt.Errorf("failed to scan for keys matching %s: %w", match, err) } @@ -511,3 +511,33 @@ func (db RedisDB) ScanNodes(ctx context.Context, cursor uint64, limit int) ([]gr return nodes, cursor, nil } + +// AllNodes returns all the node IDs stored in the database. It's not a blocking operation. +func (db RedisDB) AllNodes(ctx context.Context) ([]graph.ID, error) { + nodes := make([]graph.ID, 0, 100000) + var batch []graph.ID + var cursor uint64 + var err error + + for { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("failed to fetch all nodes: %w", ctx.Err()) + + default: + // proceed with the scan + } + + batch, cursor, err = db.ScanNodes(ctx, cursor, 10000) + if err != nil { + return nil, fmt.Errorf("failed to fetch all nodes: %w", err) + } + + nodes = append(nodes, batch...) + if cursor == 0 { + break + } + } + + return nodes, nil +} diff --git a/pkg/redb/graph_test.go b/pkg/redb/graph_test.go index d8b79de..5b175ad 100644 --- a/pkg/redb/graph_test.go +++ b/pkg/redb/graph_test.go @@ -432,11 +432,11 @@ func TestInterfaces(t *testing.T) { // ------------------------------------- HELPERS ------------------------------- func Empty() (RedisDB, error) { - return RedisDB{client: redis.NewClient(&redis.Options{Addr: testAddress})}, nil + return RedisDB{Client: redis.NewClient(&redis.Options{Addr: testAddress})}, nil } func OneNode() (RedisDB, error) { - db := RedisDB{client: redis.NewClient(&redis.Options{Addr: testAddress})} + db := RedisDB{Client: redis.NewClient(&redis.Options{Addr: testAddress})} if _, err := db.AddNode(ctx, "0"); err != nil { db.flushAll() return RedisDB{}, err @@ -446,7 +446,7 @@ func OneNode() (RedisDB, error) { } func Simple() (RedisDB, error) { - db := RedisDB{client: redis.NewClient(&redis.Options{Addr: testAddress})} + db := RedisDB{Client: redis.NewClient(&redis.Options{Addr: testAddress})} for _, pk := range []string{"0", "1", "2"} { if _, err := db.AddNode(ctx, pk); err != nil { db.flushAll() @@ -455,12 +455,12 @@ func Simple() (RedisDB, error) { } // 0 ---> 1 - if err := db.client.SAdd(ctx, follows("0"), "1").Err(); err != nil { + if err := db.Client.SAdd(ctx, follows("0"), "1").Err(); err != nil { db.flushAll() return RedisDB{}, err } - if err := db.client.SAdd(ctx, followers("1"), "0").Err(); err != nil { + if err := db.Client.SAdd(ctx, followers("1"), "0").Err(); err != nil { db.flushAll() return RedisDB{}, err } diff --git a/pkg/redb/utils.go b/pkg/redb/utils.go index 1443588..1f0560b 100644 --- a/pkg/redb/utils.go +++ b/pkg/redb/utils.go @@ -20,7 +20,7 @@ var ( // flushAll deletes all the keys of all existing databases. This command never fails. func (r RedisDB) flushAll() { - r.client.FlushAll(context.Background()) + r.Client.FlushAll(context.Background()) } func node[ID string | graph.ID](id ID) string { diff --git a/pkg/redb/walks.go b/pkg/redb/walks.go index 208a43e..872bc83 100644 --- a/pkg/redb/walks.go +++ b/pkg/redb/walks.go @@ -34,7 +34,7 @@ var ( // If it doesn't, store [walks.Alpha] and [walks.N] func (db RedisDB) init() error { ctx := context.Background() - exists, err := db.client.Exists(ctx, KeyRWS).Result() + exists, err := db.Client.Exists(ctx, KeyRWS).Result() if err != nil { return fmt.Errorf("failed to check for existence of %s %w", KeyRWS, err) } @@ -42,7 +42,7 @@ func (db RedisDB) init() error { switch exists { case 1: // exists, check the values - vals, err := db.client.HMGet(ctx, KeyRWS, KeyAlpha, KeyWalksPerNode).Result() + vals, err := db.Client.HMGet(ctx, KeyRWS, KeyAlpha, KeyWalksPerNode).Result() if err != nil { return fmt.Errorf("failed to fetch alpha and walksPerNode %w", err) } @@ -67,7 +67,7 @@ func (db RedisDB) init() error { case 0: // doesn't exists, seed the values - err := db.client.HSet(ctx, KeyRWS, KeyAlpha, walks.Alpha, KeyWalksPerNode, walks.N).Err() + err := db.Client.HSet(ctx, KeyRWS, KeyAlpha, walks.Alpha, KeyWalksPerNode, walks.N).Err() if err != nil { return fmt.Errorf("failed to set alpha and walksPerNode %w", err) } @@ -86,7 +86,7 @@ func (db RedisDB) Walks(ctx context.Context, IDs ...walks.ID) ([]walks.Walk, err return nil, nil case len(IDs) <= 100000: - vals, err := db.client.HMGet(ctx, KeyWalks, toStrings(IDs)...).Result() + vals, err := db.Client.HMGet(ctx, KeyWalks, toStrings(IDs)...).Result() if err != nil { return nil, fmt.Errorf("failed to fetch walks: %w", err) } @@ -127,7 +127,7 @@ func (db RedisDB) WalksVisiting(ctx context.Context, node graph.ID, limit int) ( switch { case limit == -1: // return all walks visiting node - IDs, err := db.client.SMembers(ctx, walksVisiting(node)).Result() + IDs, err := db.Client.SMembers(ctx, walksVisiting(node)).Result() if err != nil { return nil, fmt.Errorf("failed to fetch %s: %w", walksVisiting(node), err) } @@ -135,7 +135,7 @@ func (db RedisDB) WalksVisiting(ctx context.Context, node graph.ID, limit int) ( return db.Walks(ctx, toWalks(IDs)...) case limit > 0: - IDs, err := db.client.SRandMemberN(ctx, walksVisiting(node), int64(limit)).Result() + IDs, err := db.Client.SRandMemberN(ctx, walksVisiting(node), int64(limit)).Result() if err != nil { return nil, fmt.Errorf("failed to fetch %s: %w", walksVisiting(node), err) } @@ -155,7 +155,7 @@ func (db RedisDB) WalksVisitingAny(ctx context.Context, nodes []graph.ID, limit switch { case limit == -1: // return all walks visiting all nodes - pipe := db.client.Pipeline() + pipe := db.Client.Pipeline() cmds := make([]*redis.StringSliceCmd, len(nodes)) for i, node := range nodes { @@ -181,7 +181,7 @@ func (db RedisDB) WalksVisitingAny(ctx context.Context, nodes []graph.ID, limit return nil, nil } - pipe := db.client.Pipeline() + pipe := db.Client.Pipeline() cmds := make([]*redis.StringSliceCmd, len(nodes)) for i, node := range nodes { @@ -214,13 +214,13 @@ func (db RedisDB) AddWalks(ctx context.Context, walks ...walks.Walk) error { // get the IDs outside the transaction, which implies there might be "holes", // meaning IDs not associated with any walk - next, err := db.client.HIncrBy(ctx, KeyRWS, KeyLastWalkID, int64(len(walks))).Result() + next, err := db.Client.HIncrBy(ctx, KeyRWS, KeyLastWalkID, int64(len(walks))).Result() if err != nil { return fmt.Errorf("failed to add walks: failed to increment ID: %w", err) } var visits, ID int - pipe := db.client.TxPipeline() + pipe := db.Client.TxPipeline() for i, walk := range walks { visits += walk.Len() @@ -248,7 +248,7 @@ func (db RedisDB) RemoveWalks(ctx context.Context, walks ...walks.Walk) error { } var visits int - pipe := db.client.TxPipeline() + pipe := db.Client.TxPipeline() for _, walk := range walks { pipe.HDel(ctx, KeyWalks, string(walk.ID)) @@ -275,7 +275,7 @@ func (db RedisDB) ReplaceWalks(ctx context.Context, before, after []walks.Walk) } var visits int64 - pipe := db.client.TxPipeline() + pipe := db.Client.TxPipeline() for i := range before { div := walks.Divergence(before[i], after[i]) @@ -308,7 +308,7 @@ func (db RedisDB) ReplaceWalks(ctx context.Context, before, after []walks.Walk) return fmt.Errorf("failed to replace walks: pipeline failed %w", err) } - pipe = db.client.TxPipeline() + pipe = db.Client.TxPipeline() visits = 0 } } @@ -345,7 +345,7 @@ func validateReplacement(old, new []walks.Walk) error { // TotalVisits returns the total number of visits, which is the sum of the lengths of all walks. func (db RedisDB) TotalVisits(ctx context.Context) (int, error) { - total, err := db.client.HGet(ctx, KeyRWS, KeyTotalVisits).Result() + total, err := db.Client.HGet(ctx, KeyRWS, KeyTotalVisits).Result() if err != nil { return -1, fmt.Errorf("failed to get the total number of visits: %w", err) } @@ -360,7 +360,7 @@ func (db RedisDB) TotalVisits(ctx context.Context) (int, error) { // TotalWalks returns the total number of walks. func (db RedisDB) TotalWalks(ctx context.Context) (int, error) { - total, err := db.client.HLen(ctx, KeyWalks).Result() + total, err := db.Client.HLen(ctx, KeyWalks).Result() if err != nil { return -1, fmt.Errorf("failed to get the total number of walks: %w", err) } @@ -374,6 +374,29 @@ func (db RedisDB) Visits(ctx context.Context, nodes ...graph.ID) ([]int, error) return db.counts(ctx, walksVisiting, nodes...) } +// ScanWalks to return a batch of walks of size roughly proportional to limit. +// Limit controls how much "work" is invested in fetching the batch, hence it's not precise. +// Learn more about scan: https://redis.io/docs/latest/commands/hscan/ +func (db RedisDB) ScanWalks(ctx context.Context, cursor uint64, limit int) ([]walks.Walk, uint64, error) { + keyVals, cursor, err := db.Client.HScan(ctx, KeyWalks, cursor, "*", int64(limit)).Result() + if err != nil { + return nil, 0, fmt.Errorf("failed to scan for walks: %w", err) + } + + if len(keyVals)%2 != 0 { + return nil, 0, fmt.Errorf("unexpected HSCAN result length: got %d elements", len(keyVals)) + } + + batch := make([]walks.Walk, 0, len(keyVals)/2) + for i := 0; i < len(keyVals); i += 2 { + walk := parseWalk(keyVals[i+1]) + walk.ID = walks.ID(keyVals[i]) + batch = append(batch, walk) + } + + return batch, cursor, nil +} + // unique returns a slice of unique elements of the input slice. func unique[E cmp.Ordered](slice []E) []E { if len(slice) == 0 { diff --git a/pkg/redb/walks_test.go b/pkg/redb/walks_test.go index 5a00476..8266c7e 100644 --- a/pkg/redb/walks_test.go +++ b/pkg/redb/walks_test.go @@ -388,8 +388,8 @@ var defaultWalk = walks.Walk{Path: []graph.ID{"0", "1"}} func SomeWalks(n int) func() (RedisDB, error) { return func() (RedisDB, error) { - db := RedisDB{client: redis.NewClient(&redis.Options{Addr: testAddress})} - if err := db.client.HSet(ctx, KeyRWS, KeyAlpha, walks.Alpha, KeyWalksPerNode, walks.N).Err(); err != nil { + db := RedisDB{Client: redis.NewClient(&redis.Options{Addr: testAddress})} + if err := db.Client.HSet(ctx, KeyRWS, KeyAlpha, walks.Alpha, KeyWalksPerNode, walks.N).Err(); err != nil { return RedisDB{}, err } diff --git a/pkg/walks/walker.go b/pkg/walks/walker.go index 4b65c4b..da13c29 100644 --- a/pkg/walks/walker.go +++ b/pkg/walks/walker.go @@ -6,6 +6,7 @@ import ( "fmt" "github/pippellia-btc/crawler/pkg/graph" "log" + "os" "strconv" ) @@ -26,13 +27,16 @@ type CachedWalker struct { capacity int // for stats - calls, hits, misses int + calls, hits int + log *log.Logger fallback Walker } type Option func(*CachedWalker) +func WithFallback(w Walker) Option { return func(c *CachedWalker) { c.fallback = w } } + func WithCapacity(cap int) Option { return func(c *CachedWalker) { c.lookup = make(map[uint32]*list.Element, cap) @@ -40,7 +44,15 @@ func WithCapacity(cap int) Option { } } -func WithFallback(w Walker) Option { return func(c *CachedWalker) { c.fallback = w } } +func WithLogFile(filename string) Option { + return func(c *CachedWalker) { + file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + panic(fmt.Errorf("failed to open log file %s: %w", filename, err)) + } + c.log = log.New(file, "cache: ", log.LstdFlags) + } +} func NewWalker(opts ...Option) *CachedWalker { c := &CachedWalker{ @@ -109,8 +121,11 @@ func (c *CachedWalker) Size() int { } func (c *CachedWalker) logStats() { - log.Printf("cache: calls %d, hits %d, misses %d", c.calls, c.hits, c.misses) - c.calls, c.hits, c.misses = 0, 0, 0 + if c.log != nil { + hitRatio := 100 * float64(c.hits) / float64(c.calls) + c.log.Printf("calls %d, hits %f%%", c.calls, hitRatio) + c.calls, c.hits = 0, 0 + } } func (c *CachedWalker) Follows(ctx context.Context, node graph.ID) ([]graph.ID, error) { @@ -120,7 +135,7 @@ func (c *CachedWalker) Follows(ctx context.Context, node graph.ID) ([]graph.ID, } c.calls++ - if c.calls >= 10000 { + if c.calls >= 1000_000 { defer c.logStats() } @@ -131,7 +146,6 @@ func (c *CachedWalker) Follows(ctx context.Context, node graph.ID) ([]graph.ID, return nodes(element.Value.(edges).follows), nil } - c.misses++ if c.fallback == nil { return nil, fmt.Errorf("%w: %s", graph.ErrNodeNotFound, node) } diff --git a/tests/e2e/e2e_test.go b/tests/e2e/e2e_test.go new file mode 100644 index 0000000..ee86fd5 --- /dev/null +++ b/tests/e2e/e2e_test.go @@ -0,0 +1,167 @@ +package e2e_test + +import ( + "context" + "fmt" + "github/pippellia-btc/crawler/pkg/graph" + "github/pippellia-btc/crawler/pkg/pagerank" + "github/pippellia-btc/crawler/pkg/redb" + "github/pippellia-btc/crawler/pkg/walks" + "github/pippellia-btc/crawler/tests/random" + "math" + "testing" + + "github.com/redis/go-redis/v9" +) + +var ctx = context.Background() + +// TestWalks will perform multiple iterations of the following: +// - fetch walks in batches +// - verify their consistency, meaning each node in a walk should contain its walk ID +func TestWalks(t *testing.T) { + fmt.Println("-----------------------------") + fmt.Println("Testing the walks consistency") + fmt.Printf("-----------------------------\n\n") + + db := redb.New(&redis.Options{Addr: "localhost:6380"}) + + var iteration int + var limit int = 10000 + + var batch []walks.Walk + var cursor uint64 + var err error + + for { + iteration++ + fmt.Printf("\033[1A") + fmt.Print("\033[J") + fmt.Printf("iteration %d...\n", iteration) + + batch, cursor, err = db.ScanWalks(ctx, cursor, limit) + if err != nil { + t.Fatal(err) + } + + pipe := db.Client.Pipeline() + cmds := make(map[string]*redis.BoolCmd) + + for _, walk := range batch { + for _, node := range walk.Path { + // check that the walks visiting node contain this walk ID + key := string(node) + ":" + string(walk.ID) + cmds[key] = pipe.SIsMember(ctx, redb.KeyWalksVisitingPrefix+string(node), walk.ID) + } + } + + if _, err := pipe.Exec(ctx); err != nil { + t.Fatalf("pipeline failed: %v", err) + } + + for key, cmd := range cmds { + if !cmd.Val() { + t.Errorf("expected true, got %v: %v", cmd.Val(), key) + } + } + + if cursor == 0 { + break + } + } + + fmt.Println("passed!") + fmt.Println("-----------------------------") +} + +// TestPagerank regenerate all walks for all active nodes to compute pagerank. +// The resulting distribution is compared with the one in Redis. +func TestPagerank(t *testing.T) { + fmt.Println("---------------------------------") + fmt.Println("Testing the pagerank distribution") + + db := redb.New(&redis.Options{Addr: "localhost:6380"}) + nodes, err := db.AllNodes(ctx) + if err != nil { + t.Fatal(err) + } + + original, err := pagerank.Global(ctx, db, nodes...) + if err != nil { + t.Fatal(err) + } + fmt.Println(" > original pagerank computed") + + // copy the db into a map to speed up random walks generation + followMap := make(map[graph.ID][]graph.ID, len(nodes)) + for _, node := range nodes { + follows, err := db.Follows(ctx, node) + if err != nil { + t.Fatal(err) + } + + followMap[node] = follows + } + + walker := walks.NewSimpleWalker(followMap) + store := random.NewWalkStore() + + fmt.Println(" > db copied") + fmt.Printf(" > generating walks...\n") + fmt.Printf("---------------------------------\n\n") + + var active int + for i, ID := range nodes { + if i%1000 == 0 { + fmt.Printf("\033[1A") + fmt.Print("\033[J") + fmt.Printf("progress %d/%d...\n", i+1, len(nodes)) + } + + node, err := db.NodeByID(ctx, ID) + if err != nil { + t.Fatal(err) + } + + if node.Status == graph.StatusActive { + walks, err := walks.Generate(ctx, walker, ID) + if err != nil { + t.Fatal(err) + } + + store.AddWalks(walks) + active++ + } + } + + recomputed, err := pagerank.Global(ctx, store, nodes...) + if err != nil { + t.Fatal(err) + } + + expected := expectedDistance(active, len(nodes)) + distance := random.Distance(original, recomputed) + fmt.Printf("expected distance %f, got %f\n", expected, distance) + + if distance > expected { + t.Fatalf("distance is higher than expected!") + } + + fmt.Println("passed!") + fmt.Println("-----------------------------") +} + +/* +ExpectedDistance between the real pagerank and the Monte-Carlo pagerank. +Such distance goes as ~N/sqrt(R), where N is the number of nodes and R is the number of walks. + +# REFERENCES: +[1] K. Avrachenkov, N. Litvak, D. Nemirovsky, N. Osipova; "Monte Carlo methods in PageRank computation" +URL: https://www-sop.inria.fr/members/Konstantin.Avratchenkov/pubs/mc.pdf +*/ +func expectedDistance(activeNodes, totalNodes int) float64 { + const errorConstant = 0.00035 // empirically derived + + walks := float64(activeNodes * walks.N) + return errorConstant * float64(totalNodes) / math.Sqrt(walks) +} diff --git a/tests/random/pagerank_test.go b/tests/random/pagerank_test.go index 10702f4..cc813a7 100644 --- a/tests/random/pagerank_test.go +++ b/tests/random/pagerank_test.go @@ -1,4 +1,4 @@ -package random_test +package random import ( "context" diff --git a/tests/random/utils_test.go b/tests/random/utils.go similarity index 99% rename from tests/random/utils_test.go rename to tests/random/utils.go index b0a2827..02a45de 100644 --- a/tests/random/utils_test.go +++ b/tests/random/utils.go @@ -1,4 +1,4 @@ -package random_test +package random import ( "context"