added e2e tests

This commit is contained in:
pippellia-btc
2025-06-05 16:41:19 +02:00
parent 6cbd20f452
commit 1a9bea8005
13 changed files with 358 additions and 74 deletions

16
.gitignore vendored Normal file
View File

@@ -0,0 +1,16 @@
# Ignore all log files
**/*.log
# Ignore all the .env
**/*.env
# Ignore all sqlite files
**/*.sqlite
**/*-shm
**/*-wal
# Ignore all images
**/*.jpeg
# Ignore the debugs
.vscode

View File

@@ -2,6 +2,7 @@ package main
import (
"context"
"fmt"
"github/pippellia-btc/crawler/pkg/graph"
"github/pippellia-btc/crawler/pkg/pipe"
"github/pippellia-btc/crawler/pkg/redb"
@@ -9,13 +10,18 @@ import (
"log"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
"github.com/nbd-wtf/go-nostr"
"github.com/redis/go-redis/v9"
)
var events chan *nostr.Event
var pubkeys chan string
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -26,8 +32,8 @@ func main() {
panic(err)
}
events := make(chan *nostr.Event, config.EventsCapacity)
pubkeys := make(chan string, config.PubkeysCapacity)
events = make(chan *nostr.Event, config.EventsCapacity)
pubkeys = make(chan string, config.PubkeysCapacity)
db := redb.New(&redis.Options{Addr: config.RedisAddress})
count, err := db.NodeCount(ctx)
@@ -60,13 +66,6 @@ func main() {
log.Printf("correctly added %d init pubkeys", len(config.InitPubkeys))
}
_ = events
// eventStore, err := eventstore.New(config.SQLiteURL)
// if err != nil {
// panic("failed to connect to the sqlite eventstore: " + err.Error())
// }
var wg sync.WaitGroup
wg.Add(3)
@@ -106,7 +105,7 @@ func main() {
})
}()
log.Println("ready to process events")
go printStats(ctx)
pipe.Processor(ctx, config.Processor, db, events)
wg.Wait()
}
@@ -120,3 +119,37 @@ func handleSignals(cancel context.CancelFunc) {
log.Println(" Signal received. Shutting down...")
cancel()
}
func printStats(ctx context.Context) {
filename := "stats.log"
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
panic(fmt.Errorf("failed to open log file %s: %w", filename, err))
}
defer file.Close()
log := log.New(file, "stats: ", log.LstdFlags)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
goroutines := runtime.NumGoroutine()
memStats := new(runtime.MemStats)
runtime.ReadMemStats(memStats)
log.Println("---------------------------------------")
log.Printf("events queue: %d/%d\n", len(events), cap(events))
log.Printf("pubkeys queue: %d/%d\n", len(pubkeys), cap(pubkeys))
log.Printf("walks tracker: %v\n", pipe.WalksTracker.Load())
log.Printf("goroutines: %d\n", goroutines)
log.Printf("memory usage: %.2f MB\n", float64(memStats.Alloc)/(1024*1024))
log.Println("---------------------------------------")
}
}
}

View File

@@ -12,9 +12,9 @@ import (
"time"
)
// walksTracker tracks the number of walks that have been updated by [Processor].
// WalksTracker tracks the number of walks that have been updated by [Processor].
// It's used to wake-up the [Arbiter], which performs work and then resets it to 0.
var walksTracker atomic.Int32
var WalksTracker atomic.Int32
type ArbiterConfig struct {
Activation float64
@@ -51,7 +51,7 @@ func Arbiter(ctx context.Context, config ArbiterConfig, db redb.RedisDB, send fu
ticker := time.NewTicker(config.PingWait)
defer ticker.Stop()
walksTracker.Add(1000_000_000) // trigger a scan at startup
WalksTracker.Add(1000_000_000) // trigger a scan at startup
for {
select {
@@ -66,17 +66,16 @@ func Arbiter(ctx context.Context, config ArbiterConfig, db redb.RedisDB, send fu
continue
}
changed := walksTracker.Load()
changed := WalksTracker.Load()
changeRatio := float64(changed) / float64(total)
if changeRatio > config.Activation {
promoted, demoted, err := arbiterScan(ctx, config, db, send)
if err != nil {
log.Printf("Arbiter: %v", err)
continue
}
walksTracker.Store(0) // resets tracker
WalksTracker.Store(0) // resets tracker
log.Printf("Arbiter: promoted %d, demoted %d", promoted, demoted)
}
}
@@ -85,7 +84,7 @@ func Arbiter(ctx context.Context, config ArbiterConfig, db redb.RedisDB, send fu
// ArbiterScan performs one entire database scan, promoting or demoting nodes based on their pagerank.
func arbiterScan(ctx context.Context, config ArbiterConfig, db redb.RedisDB, send func(pk string) error) (promoted, demoted int, err error) {
maxTime := 60 * time.Second
maxTime := 2 * time.Minute
ctx, cancel := context.WithTimeout(ctx, maxTime)
defer cancel()

View File

@@ -30,7 +30,6 @@ func (c ProcessorConfig) Print() {
fmt.Printf(" PrintEvery: %d\n", c.PrintEvery)
}
// Processor() process one event at the time from the eventChannel, based on their kind.
func Processor(
ctx context.Context,
config ProcessorConfig,
@@ -44,8 +43,11 @@ func Processor(
cache := walks.NewWalker(
walks.WithCapacity(10000),
walks.WithFallback(db),
walks.WithLogFile("cache.log"),
)
log.Println("Processor: ready to process events")
for {
select {
case <-ctx.Done():
@@ -126,7 +128,7 @@ func processFollowList(cache *walks.CachedWalker, db redb.RedisDB, event *nostr.
return err
}
walksTracker.Add(int32(len(new)))
WalksTracker.Add(int32(len(new)))
return cache.Update(ctx, delta)
}

View File

@@ -34,11 +34,11 @@ const (
)
type RedisDB struct {
client *redis.Client
Client *redis.Client
}
func New(opt *redis.Options) RedisDB {
db := RedisDB{client: redis.NewClient(opt)}
db := RedisDB{Client: redis.NewClient(opt)}
if err := db.init(); err != nil {
panic(err)
}
@@ -47,7 +47,7 @@ func New(opt *redis.Options) RedisDB {
// Size returns the DBSize of redis, which is the total number of keys
func (db RedisDB) Size(ctx context.Context) (int, error) {
size, err := db.client.DBSize(ctx).Result()
size, err := db.Client.DBSize(ctx).Result()
if err != nil {
return 0, fmt.Errorf("failed to fetch the db size: %w", err)
}
@@ -56,7 +56,7 @@ func (db RedisDB) Size(ctx context.Context) (int, error) {
// NodeCount returns the number of nodes stored in redis (in the keyIndex)
func (db RedisDB) NodeCount(ctx context.Context) (int, error) {
nodes, err := db.client.HLen(ctx, KeyKeyIndex).Result()
nodes, err := db.Client.HLen(ctx, KeyKeyIndex).Result()
if err != nil {
return 0, fmt.Errorf("failed to fetch the node count: %w", err)
}
@@ -69,7 +69,7 @@ func (db RedisDB) Nodes(ctx context.Context, IDs ...graph.ID) ([]*graph.Node, er
return nil, nil
}
pipe := db.client.Pipeline()
pipe := db.Client.Pipeline()
cmds := make([]*redis.MapStringStringCmd, len(IDs))
for i, ID := range IDs {
cmds[i] = pipe.HGetAll(ctx, node(ID))
@@ -98,7 +98,7 @@ func (db RedisDB) Nodes(ctx context.Context, IDs ...graph.ID) ([]*graph.Node, er
// NodeByID fetches a node by its ID
func (db RedisDB) NodeByID(ctx context.Context, ID graph.ID) (*graph.Node, error) {
fields, err := db.client.HGetAll(ctx, node(ID)).Result()
fields, err := db.Client.HGetAll(ctx, node(ID)).Result()
if err != nil {
return nil, fmt.Errorf("failed to fetch %s: %w", node(ID), err)
}
@@ -112,12 +112,12 @@ func (db RedisDB) NodeByID(ctx context.Context, ID graph.ID) (*graph.Node, error
// NodeByKey fetches a node by its pubkey
func (db RedisDB) NodeByKey(ctx context.Context, pubkey string) (*graph.Node, error) {
ID, err := db.client.HGet(ctx, KeyKeyIndex, pubkey).Result()
ID, err := db.Client.HGet(ctx, KeyKeyIndex, pubkey).Result()
if err != nil {
return nil, fmt.Errorf("failed to fetch ID of node with pubkey %s: %w", pubkey, err)
}
fields, err := db.client.HGetAll(ctx, node(ID)).Result()
fields, err := db.Client.HGetAll(ctx, node(ID)).Result()
if err != nil {
return nil, fmt.Errorf("failed to fetch node with pubkey %s: %w", pubkey, err)
}
@@ -131,7 +131,7 @@ func (db RedisDB) NodeByKey(ctx context.Context, pubkey string) (*graph.Node, er
// Exists checks for the existance of the pubkey
func (db RedisDB) Exists(ctx context.Context, pubkey string) (bool, error) {
exists, err := db.client.HExists(ctx, KeyKeyIndex, pubkey).Result()
exists, err := db.Client.HExists(ctx, KeyKeyIndex, pubkey).Result()
if err != nil {
return false, fmt.Errorf("failed to check existance of pubkey %s: %w", pubkey, err)
}
@@ -148,7 +148,7 @@ func (db RedisDB) ensureExists(ctx context.Context, IDs ...graph.ID) error {
nodes[i] = node(ID)
}
exists, err := db.client.Exists(ctx, nodes...).Result()
exists, err := db.Client.Exists(ctx, nodes...).Result()
if err != nil {
return fmt.Errorf("failed to check for the existence of %d nodes: %w", len(IDs), err)
}
@@ -162,7 +162,7 @@ func (db RedisDB) ensureExists(ctx context.Context, IDs ...graph.ID) error {
// AddNode adds a new inactive node to the database and returns its assigned ID
func (db RedisDB) AddNode(ctx context.Context, pubkey string) (graph.ID, error) {
exists, err := db.client.HExists(ctx, KeyKeyIndex, pubkey).Result()
exists, err := db.Client.HExists(ctx, KeyKeyIndex, pubkey).Result()
if err != nil {
return "", fmt.Errorf("failed to check for existence of pubkey %s: %w", pubkey, err)
}
@@ -173,13 +173,13 @@ func (db RedisDB) AddNode(ctx context.Context, pubkey string) (graph.ID, error)
// get the ID outside the transaction, which implies there might be "holes",
// meaning IDs not associated with any node
next, err := db.client.HIncrBy(ctx, KeyDatabase, KeyLastNodeID, 1).Result()
next, err := db.Client.HIncrBy(ctx, KeyDatabase, KeyLastNodeID, 1).Result()
if err != nil {
return "", fmt.Errorf("failed to add node with pubkey %s: failed to increment ID", pubkey)
}
ID := strconv.FormatInt(next-1, 10)
pipe := db.client.TxPipeline()
pipe := db.Client.TxPipeline()
pipe.HSetNX(ctx, KeyKeyIndex, pubkey, ID)
pipe.HSet(ctx, node(ID), NodeID, ID, NodePubkey, pubkey, NodeStatus, graph.StatusInactive, NodeAddedTS, time.Now().Unix())
if _, err := pipe.Exec(ctx); err != nil {
@@ -191,7 +191,7 @@ func (db RedisDB) AddNode(ctx context.Context, pubkey string) (graph.ID, error)
// Promote changes the node status to active
func (db RedisDB) Promote(ctx context.Context, ID graph.ID) error {
err := db.client.HSet(ctx, node(ID), NodeStatus, graph.StatusActive, NodePromotionTS, time.Now().Unix()).Err()
err := db.Client.HSet(ctx, node(ID), NodeStatus, graph.StatusActive, NodePromotionTS, time.Now().Unix()).Err()
if err != nil {
return fmt.Errorf("failed to promote %s: %w", node(ID), err)
}
@@ -200,7 +200,7 @@ func (db RedisDB) Promote(ctx context.Context, ID graph.ID) error {
// Demote changes the node status to inactive
func (db RedisDB) Demote(ctx context.Context, ID graph.ID) error {
err := db.client.HSet(ctx, node(ID), NodeStatus, graph.StatusInactive, NodeDemotionTS, time.Now().Unix()).Err()
err := db.Client.HSet(ctx, node(ID), NodeStatus, graph.StatusInactive, NodeDemotionTS, time.Now().Unix()).Err()
if err != nil {
return fmt.Errorf("failed to demote %s: %w", node(ID), err)
}
@@ -218,7 +218,7 @@ func (db RedisDB) Followers(ctx context.Context, node graph.ID) ([]graph.ID, err
}
func (db RedisDB) members(ctx context.Context, key func(graph.ID) string, node graph.ID) ([]graph.ID, error) {
members, err := db.client.SMembers(ctx, key(node)).Result()
members, err := db.Client.SMembers(ctx, key(node)).Result()
if err != nil {
return nil, fmt.Errorf("failed to fetch %s: %w", key(node), err)
}
@@ -245,7 +245,7 @@ func (db RedisDB) bulkMembers(ctx context.Context, key func(graph.ID) string, no
return nil, nil
case len(nodes) < 10000:
pipe := db.client.Pipeline()
pipe := db.Client.Pipeline()
cmds := make([]*redis.StringSliceCmd, len(nodes))
for i, node := range nodes {
@@ -310,7 +310,7 @@ func (db RedisDB) counts(ctx context.Context, key func(graph.ID) string, nodes .
return nil, nil
}
pipe := db.client.Pipeline()
pipe := db.Client.Pipeline()
cmds := make([]*redis.IntCmd, len(nodes))
for i, node := range nodes {
@@ -356,7 +356,7 @@ func (db RedisDB) Update(ctx context.Context, delta graph.Delta) error {
}
func (db RedisDB) updateFollows(ctx context.Context, delta graph.Delta) error {
pipe := db.client.TxPipeline()
pipe := db.Client.TxPipeline()
if len(delta.Add) > 0 {
// add all node --> added
pipe.SAdd(ctx, follows(delta.Node), toStrings(delta.Add))
@@ -389,7 +389,7 @@ func (db RedisDB) NodeIDs(ctx context.Context, pubkeys ...string) ([]graph.ID, e
return nil, nil
}
IDs, err := db.client.HMGet(ctx, KeyKeyIndex, pubkeys...).Result()
IDs, err := db.Client.HMGet(ctx, KeyKeyIndex, pubkeys...).Result()
if err != nil {
return nil, fmt.Errorf("failed to fetch the node IDs of %v: %w", pubkeys, err)
}
@@ -417,7 +417,7 @@ func (db RedisDB) Pubkeys(ctx context.Context, nodes ...graph.ID) ([]string, err
return nil, nil
}
pipe := db.client.Pipeline()
pipe := db.Client.Pipeline()
cmds := make([]*redis.StringCmd, len(nodes))
for i, ID := range nodes {
cmds[i] = pipe.HGet(ctx, node(ID), NodePubkey)
@@ -480,12 +480,12 @@ func (db RedisDB) Resolve(ctx context.Context, pubkeys []string, onMissing Missi
default:
if j != i {
// write only if necessary
IDs[j] = ID
IDs[j] = ID // write only if necessary
}
j++
}
}
return IDs[:j], nil
}
@@ -494,7 +494,7 @@ func (db RedisDB) Resolve(ctx context.Context, pubkeys []string, onMissing Missi
// Learn more about scan: https://redis.io/docs/latest/commands/scan/
func (db RedisDB) ScanNodes(ctx context.Context, cursor uint64, limit int) ([]graph.ID, uint64, error) {
match := KeyNodePrefix + "*"
keys, cursor, err := db.client.Scan(ctx, cursor, match, int64(limit)).Result()
keys, cursor, err := db.Client.Scan(ctx, cursor, match, int64(limit)).Result()
if err != nil {
return nil, 0, fmt.Errorf("failed to scan for keys matching %s: %w", match, err)
}
@@ -511,3 +511,33 @@ func (db RedisDB) ScanNodes(ctx context.Context, cursor uint64, limit int) ([]gr
return nodes, cursor, nil
}
// AllNodes returns all the node IDs stored in the database. It's not a blocking operation.
func (db RedisDB) AllNodes(ctx context.Context) ([]graph.ID, error) {
nodes := make([]graph.ID, 0, 100000)
var batch []graph.ID
var cursor uint64
var err error
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("failed to fetch all nodes: %w", ctx.Err())
default:
// proceed with the scan
}
batch, cursor, err = db.ScanNodes(ctx, cursor, 10000)
if err != nil {
return nil, fmt.Errorf("failed to fetch all nodes: %w", err)
}
nodes = append(nodes, batch...)
if cursor == 0 {
break
}
}
return nodes, nil
}

View File

@@ -432,11 +432,11 @@ func TestInterfaces(t *testing.T) {
// ------------------------------------- HELPERS -------------------------------
func Empty() (RedisDB, error) {
return RedisDB{client: redis.NewClient(&redis.Options{Addr: testAddress})}, nil
return RedisDB{Client: redis.NewClient(&redis.Options{Addr: testAddress})}, nil
}
func OneNode() (RedisDB, error) {
db := RedisDB{client: redis.NewClient(&redis.Options{Addr: testAddress})}
db := RedisDB{Client: redis.NewClient(&redis.Options{Addr: testAddress})}
if _, err := db.AddNode(ctx, "0"); err != nil {
db.flushAll()
return RedisDB{}, err
@@ -446,7 +446,7 @@ func OneNode() (RedisDB, error) {
}
func Simple() (RedisDB, error) {
db := RedisDB{client: redis.NewClient(&redis.Options{Addr: testAddress})}
db := RedisDB{Client: redis.NewClient(&redis.Options{Addr: testAddress})}
for _, pk := range []string{"0", "1", "2"} {
if _, err := db.AddNode(ctx, pk); err != nil {
db.flushAll()
@@ -455,12 +455,12 @@ func Simple() (RedisDB, error) {
}
// 0 ---> 1
if err := db.client.SAdd(ctx, follows("0"), "1").Err(); err != nil {
if err := db.Client.SAdd(ctx, follows("0"), "1").Err(); err != nil {
db.flushAll()
return RedisDB{}, err
}
if err := db.client.SAdd(ctx, followers("1"), "0").Err(); err != nil {
if err := db.Client.SAdd(ctx, followers("1"), "0").Err(); err != nil {
db.flushAll()
return RedisDB{}, err
}

View File

@@ -20,7 +20,7 @@ var (
// flushAll deletes all the keys of all existing databases. This command never fails.
func (r RedisDB) flushAll() {
r.client.FlushAll(context.Background())
r.Client.FlushAll(context.Background())
}
func node[ID string | graph.ID](id ID) string {

View File

@@ -34,7 +34,7 @@ var (
// If it doesn't, store [walks.Alpha] and [walks.N]
func (db RedisDB) init() error {
ctx := context.Background()
exists, err := db.client.Exists(ctx, KeyRWS).Result()
exists, err := db.Client.Exists(ctx, KeyRWS).Result()
if err != nil {
return fmt.Errorf("failed to check for existence of %s %w", KeyRWS, err)
}
@@ -42,7 +42,7 @@ func (db RedisDB) init() error {
switch exists {
case 1:
// exists, check the values
vals, err := db.client.HMGet(ctx, KeyRWS, KeyAlpha, KeyWalksPerNode).Result()
vals, err := db.Client.HMGet(ctx, KeyRWS, KeyAlpha, KeyWalksPerNode).Result()
if err != nil {
return fmt.Errorf("failed to fetch alpha and walksPerNode %w", err)
}
@@ -67,7 +67,7 @@ func (db RedisDB) init() error {
case 0:
// doesn't exists, seed the values
err := db.client.HSet(ctx, KeyRWS, KeyAlpha, walks.Alpha, KeyWalksPerNode, walks.N).Err()
err := db.Client.HSet(ctx, KeyRWS, KeyAlpha, walks.Alpha, KeyWalksPerNode, walks.N).Err()
if err != nil {
return fmt.Errorf("failed to set alpha and walksPerNode %w", err)
}
@@ -86,7 +86,7 @@ func (db RedisDB) Walks(ctx context.Context, IDs ...walks.ID) ([]walks.Walk, err
return nil, nil
case len(IDs) <= 100000:
vals, err := db.client.HMGet(ctx, KeyWalks, toStrings(IDs)...).Result()
vals, err := db.Client.HMGet(ctx, KeyWalks, toStrings(IDs)...).Result()
if err != nil {
return nil, fmt.Errorf("failed to fetch walks: %w", err)
}
@@ -127,7 +127,7 @@ func (db RedisDB) WalksVisiting(ctx context.Context, node graph.ID, limit int) (
switch {
case limit == -1:
// return all walks visiting node
IDs, err := db.client.SMembers(ctx, walksVisiting(node)).Result()
IDs, err := db.Client.SMembers(ctx, walksVisiting(node)).Result()
if err != nil {
return nil, fmt.Errorf("failed to fetch %s: %w", walksVisiting(node), err)
}
@@ -135,7 +135,7 @@ func (db RedisDB) WalksVisiting(ctx context.Context, node graph.ID, limit int) (
return db.Walks(ctx, toWalks(IDs)...)
case limit > 0:
IDs, err := db.client.SRandMemberN(ctx, walksVisiting(node), int64(limit)).Result()
IDs, err := db.Client.SRandMemberN(ctx, walksVisiting(node), int64(limit)).Result()
if err != nil {
return nil, fmt.Errorf("failed to fetch %s: %w", walksVisiting(node), err)
}
@@ -155,7 +155,7 @@ func (db RedisDB) WalksVisitingAny(ctx context.Context, nodes []graph.ID, limit
switch {
case limit == -1:
// return all walks visiting all nodes
pipe := db.client.Pipeline()
pipe := db.Client.Pipeline()
cmds := make([]*redis.StringSliceCmd, len(nodes))
for i, node := range nodes {
@@ -181,7 +181,7 @@ func (db RedisDB) WalksVisitingAny(ctx context.Context, nodes []graph.ID, limit
return nil, nil
}
pipe := db.client.Pipeline()
pipe := db.Client.Pipeline()
cmds := make([]*redis.StringSliceCmd, len(nodes))
for i, node := range nodes {
@@ -214,13 +214,13 @@ func (db RedisDB) AddWalks(ctx context.Context, walks ...walks.Walk) error {
// get the IDs outside the transaction, which implies there might be "holes",
// meaning IDs not associated with any walk
next, err := db.client.HIncrBy(ctx, KeyRWS, KeyLastWalkID, int64(len(walks))).Result()
next, err := db.Client.HIncrBy(ctx, KeyRWS, KeyLastWalkID, int64(len(walks))).Result()
if err != nil {
return fmt.Errorf("failed to add walks: failed to increment ID: %w", err)
}
var visits, ID int
pipe := db.client.TxPipeline()
pipe := db.Client.TxPipeline()
for i, walk := range walks {
visits += walk.Len()
@@ -248,7 +248,7 @@ func (db RedisDB) RemoveWalks(ctx context.Context, walks ...walks.Walk) error {
}
var visits int
pipe := db.client.TxPipeline()
pipe := db.Client.TxPipeline()
for _, walk := range walks {
pipe.HDel(ctx, KeyWalks, string(walk.ID))
@@ -275,7 +275,7 @@ func (db RedisDB) ReplaceWalks(ctx context.Context, before, after []walks.Walk)
}
var visits int64
pipe := db.client.TxPipeline()
pipe := db.Client.TxPipeline()
for i := range before {
div := walks.Divergence(before[i], after[i])
@@ -308,7 +308,7 @@ func (db RedisDB) ReplaceWalks(ctx context.Context, before, after []walks.Walk)
return fmt.Errorf("failed to replace walks: pipeline failed %w", err)
}
pipe = db.client.TxPipeline()
pipe = db.Client.TxPipeline()
visits = 0
}
}
@@ -345,7 +345,7 @@ func validateReplacement(old, new []walks.Walk) error {
// TotalVisits returns the total number of visits, which is the sum of the lengths of all walks.
func (db RedisDB) TotalVisits(ctx context.Context) (int, error) {
total, err := db.client.HGet(ctx, KeyRWS, KeyTotalVisits).Result()
total, err := db.Client.HGet(ctx, KeyRWS, KeyTotalVisits).Result()
if err != nil {
return -1, fmt.Errorf("failed to get the total number of visits: %w", err)
}
@@ -360,7 +360,7 @@ func (db RedisDB) TotalVisits(ctx context.Context) (int, error) {
// TotalWalks returns the total number of walks.
func (db RedisDB) TotalWalks(ctx context.Context) (int, error) {
total, err := db.client.HLen(ctx, KeyWalks).Result()
total, err := db.Client.HLen(ctx, KeyWalks).Result()
if err != nil {
return -1, fmt.Errorf("failed to get the total number of walks: %w", err)
}
@@ -374,6 +374,29 @@ func (db RedisDB) Visits(ctx context.Context, nodes ...graph.ID) ([]int, error)
return db.counts(ctx, walksVisiting, nodes...)
}
// ScanWalks to return a batch of walks of size roughly proportional to limit.
// Limit controls how much "work" is invested in fetching the batch, hence it's not precise.
// Learn more about scan: https://redis.io/docs/latest/commands/hscan/
func (db RedisDB) ScanWalks(ctx context.Context, cursor uint64, limit int) ([]walks.Walk, uint64, error) {
keyVals, cursor, err := db.Client.HScan(ctx, KeyWalks, cursor, "*", int64(limit)).Result()
if err != nil {
return nil, 0, fmt.Errorf("failed to scan for walks: %w", err)
}
if len(keyVals)%2 != 0 {
return nil, 0, fmt.Errorf("unexpected HSCAN result length: got %d elements", len(keyVals))
}
batch := make([]walks.Walk, 0, len(keyVals)/2)
for i := 0; i < len(keyVals); i += 2 {
walk := parseWalk(keyVals[i+1])
walk.ID = walks.ID(keyVals[i])
batch = append(batch, walk)
}
return batch, cursor, nil
}
// unique returns a slice of unique elements of the input slice.
func unique[E cmp.Ordered](slice []E) []E {
if len(slice) == 0 {

View File

@@ -388,8 +388,8 @@ var defaultWalk = walks.Walk{Path: []graph.ID{"0", "1"}}
func SomeWalks(n int) func() (RedisDB, error) {
return func() (RedisDB, error) {
db := RedisDB{client: redis.NewClient(&redis.Options{Addr: testAddress})}
if err := db.client.HSet(ctx, KeyRWS, KeyAlpha, walks.Alpha, KeyWalksPerNode, walks.N).Err(); err != nil {
db := RedisDB{Client: redis.NewClient(&redis.Options{Addr: testAddress})}
if err := db.Client.HSet(ctx, KeyRWS, KeyAlpha, walks.Alpha, KeyWalksPerNode, walks.N).Err(); err != nil {
return RedisDB{}, err
}

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"github/pippellia-btc/crawler/pkg/graph"
"log"
"os"
"strconv"
)
@@ -26,13 +27,16 @@ type CachedWalker struct {
capacity int
// for stats
calls, hits, misses int
calls, hits int
log *log.Logger
fallback Walker
}
type Option func(*CachedWalker)
func WithFallback(w Walker) Option { return func(c *CachedWalker) { c.fallback = w } }
func WithCapacity(cap int) Option {
return func(c *CachedWalker) {
c.lookup = make(map[uint32]*list.Element, cap)
@@ -40,7 +44,15 @@ func WithCapacity(cap int) Option {
}
}
func WithFallback(w Walker) Option { return func(c *CachedWalker) { c.fallback = w } }
func WithLogFile(filename string) Option {
return func(c *CachedWalker) {
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
panic(fmt.Errorf("failed to open log file %s: %w", filename, err))
}
c.log = log.New(file, "cache: ", log.LstdFlags)
}
}
func NewWalker(opts ...Option) *CachedWalker {
c := &CachedWalker{
@@ -109,8 +121,11 @@ func (c *CachedWalker) Size() int {
}
func (c *CachedWalker) logStats() {
log.Printf("cache: calls %d, hits %d, misses %d", c.calls, c.hits, c.misses)
c.calls, c.hits, c.misses = 0, 0, 0
if c.log != nil {
hitRatio := 100 * float64(c.hits) / float64(c.calls)
c.log.Printf("calls %d, hits %f%%", c.calls, hitRatio)
c.calls, c.hits = 0, 0
}
}
func (c *CachedWalker) Follows(ctx context.Context, node graph.ID) ([]graph.ID, error) {
@@ -120,7 +135,7 @@ func (c *CachedWalker) Follows(ctx context.Context, node graph.ID) ([]graph.ID,
}
c.calls++
if c.calls >= 10000 {
if c.calls >= 1000_000 {
defer c.logStats()
}
@@ -131,7 +146,6 @@ func (c *CachedWalker) Follows(ctx context.Context, node graph.ID) ([]graph.ID,
return nodes(element.Value.(edges).follows), nil
}
c.misses++
if c.fallback == nil {
return nil, fmt.Errorf("%w: %s", graph.ErrNodeNotFound, node)
}

167
tests/e2e/e2e_test.go Normal file
View File

@@ -0,0 +1,167 @@
package e2e_test
import (
"context"
"fmt"
"github/pippellia-btc/crawler/pkg/graph"
"github/pippellia-btc/crawler/pkg/pagerank"
"github/pippellia-btc/crawler/pkg/redb"
"github/pippellia-btc/crawler/pkg/walks"
"github/pippellia-btc/crawler/tests/random"
"math"
"testing"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
// TestWalks will perform multiple iterations of the following:
// - fetch walks in batches
// - verify their consistency, meaning each node in a walk should contain its walk ID
func TestWalks(t *testing.T) {
fmt.Println("-----------------------------")
fmt.Println("Testing the walks consistency")
fmt.Printf("-----------------------------\n\n")
db := redb.New(&redis.Options{Addr: "localhost:6380"})
var iteration int
var limit int = 10000
var batch []walks.Walk
var cursor uint64
var err error
for {
iteration++
fmt.Printf("\033[1A")
fmt.Print("\033[J")
fmt.Printf("iteration %d...\n", iteration)
batch, cursor, err = db.ScanWalks(ctx, cursor, limit)
if err != nil {
t.Fatal(err)
}
pipe := db.Client.Pipeline()
cmds := make(map[string]*redis.BoolCmd)
for _, walk := range batch {
for _, node := range walk.Path {
// check that the walks visiting node contain this walk ID
key := string(node) + ":" + string(walk.ID)
cmds[key] = pipe.SIsMember(ctx, redb.KeyWalksVisitingPrefix+string(node), walk.ID)
}
}
if _, err := pipe.Exec(ctx); err != nil {
t.Fatalf("pipeline failed: %v", err)
}
for key, cmd := range cmds {
if !cmd.Val() {
t.Errorf("expected true, got %v: %v", cmd.Val(), key)
}
}
if cursor == 0 {
break
}
}
fmt.Println("passed!")
fmt.Println("-----------------------------")
}
// TestPagerank regenerate all walks for all active nodes to compute pagerank.
// The resulting distribution is compared with the one in Redis.
func TestPagerank(t *testing.T) {
fmt.Println("---------------------------------")
fmt.Println("Testing the pagerank distribution")
db := redb.New(&redis.Options{Addr: "localhost:6380"})
nodes, err := db.AllNodes(ctx)
if err != nil {
t.Fatal(err)
}
original, err := pagerank.Global(ctx, db, nodes...)
if err != nil {
t.Fatal(err)
}
fmt.Println(" > original pagerank computed")
// copy the db into a map to speed up random walks generation
followMap := make(map[graph.ID][]graph.ID, len(nodes))
for _, node := range nodes {
follows, err := db.Follows(ctx, node)
if err != nil {
t.Fatal(err)
}
followMap[node] = follows
}
walker := walks.NewSimpleWalker(followMap)
store := random.NewWalkStore()
fmt.Println(" > db copied")
fmt.Printf(" > generating walks...\n")
fmt.Printf("---------------------------------\n\n")
var active int
for i, ID := range nodes {
if i%1000 == 0 {
fmt.Printf("\033[1A")
fmt.Print("\033[J")
fmt.Printf("progress %d/%d...\n", i+1, len(nodes))
}
node, err := db.NodeByID(ctx, ID)
if err != nil {
t.Fatal(err)
}
if node.Status == graph.StatusActive {
walks, err := walks.Generate(ctx, walker, ID)
if err != nil {
t.Fatal(err)
}
store.AddWalks(walks)
active++
}
}
recomputed, err := pagerank.Global(ctx, store, nodes...)
if err != nil {
t.Fatal(err)
}
expected := expectedDistance(active, len(nodes))
distance := random.Distance(original, recomputed)
fmt.Printf("expected distance %f, got %f\n", expected, distance)
if distance > expected {
t.Fatalf("distance is higher than expected!")
}
fmt.Println("passed!")
fmt.Println("-----------------------------")
}
/*
ExpectedDistance between the real pagerank and the Monte-Carlo pagerank.
Such distance goes as ~N/sqrt(R), where N is the number of nodes and R is the number of walks.
# REFERENCES:
[1] K. Avrachenkov, N. Litvak, D. Nemirovsky, N. Osipova; "Monte Carlo methods in PageRank computation"
URL: https://www-sop.inria.fr/members/Konstantin.Avratchenkov/pubs/mc.pdf
*/
func expectedDistance(activeNodes, totalNodes int) float64 {
const errorConstant = 0.00035 // empirically derived
walks := float64(activeNodes * walks.N)
return errorConstant * float64(totalNodes) / math.Sqrt(walks)
}

View File

@@ -1,4 +1,4 @@
package random_test
package random
import (
"context"

View File

@@ -1,4 +1,4 @@
package random_test
package random
import (
"context"