diff --git a/pkg/pagerank/cache.go b/pkg/pagerank/cache.go index bf4f2b4..d355911 100644 --- a/pkg/pagerank/cache.go +++ b/pkg/pagerank/cache.go @@ -6,19 +6,19 @@ import ( "github/pippellia-btc/crawler/pkg/walks" ) -type walkerWithFallback struct { +type cachedWalker struct { follows map[graph.ID][]graph.ID fallback walks.Walker } -func newWalkerWithFallback(followsMap map[graph.ID][]graph.ID, fallback walks.Walker) *walkerWithFallback { - return &walkerWithFallback{ +func newCachedWalker(followsMap map[graph.ID][]graph.ID, fallback walks.Walker) *cachedWalker { + return &cachedWalker{ follows: followsMap, fallback: fallback, } } -func (w *walkerWithFallback) Follows(ctx context.Context, node graph.ID) ([]graph.ID, error) { +func (w *cachedWalker) Follows(ctx context.Context, node graph.ID) ([]graph.ID, error) { follows, exists := w.follows[node] if !exists { var err error @@ -59,24 +59,28 @@ func newWalkPool(walks []walks.Walk) *walkPool { } } +// Next returns a path of nodes that starts immediately after node, making sure +// that the same walk is only used once to avoid bias in the sampling. +// For example, if the walk is [0,1,2,3,4], node = 1, it returns [2,3,4]. func (w *walkPool) Next(node graph.ID) ([]graph.ID, bool) { indexes, exists := w.walkIndexes[node] if !exists || len(indexes) == 0 { return nil, false } - for i, index := range indexes { - walk := w.walks[index] - if walk.Len() == 0 { + for i, idx := range indexes { + walk := w.walks[idx] + cut := walk.Index(node) + if cut == -1 { // walk already used, skip continue } // zero the walk so it can't be reused, and reslice the walk indexes // so we don't spend time looking at walks already used. - w.walks[index].Path = nil + w.walks[idx].Path = nil w.walkIndexes[node] = indexes[i+1:] - return walk.Path, true + return walk.Path[cut+1:], true } // all walks where already used diff --git a/pkg/pagerank/pagerank.go b/pkg/pagerank/pagerank.go index d0e2949..a0d5c48 100644 --- a/pkg/pagerank/pagerank.go +++ b/pkg/pagerank/pagerank.go @@ -7,6 +7,7 @@ import ( "github/pippellia-btc/crawler/pkg/graph" "github/pippellia-btc/crawler/pkg/walks" "math/rand/v2" + "slices" ) var ErrEmptyWalkStore = errors.New("the walk store is empty") @@ -23,8 +24,8 @@ type VisitCounter interface { // Global computes the global pagerank score for each target node, as the frequency of visits. // If a node is not found, its pagerank is assumed to be 0. -func Global(ctx context.Context, count VisitCounter, nodes ...graph.ID) ([]float64, error) { - if len(nodes) == 0 { +func Global(ctx context.Context, count VisitCounter, targets ...graph.ID) ([]float64, error) { + if len(targets) == 0 { return nil, nil } @@ -37,7 +38,7 @@ func Global(ctx context.Context, count VisitCounter, nodes ...graph.ID) ([]float return nil, ErrEmptyWalkStore } - visits, err := count.Visits(ctx, nodes...) + visits, err := count.Visits(ctx, targets...) if err != nil { return nil, fmt.Errorf("Global: failed to get the nodes visits: %w", err) } @@ -58,14 +59,35 @@ type PersonalizedLoader interface { // BulkFollows returns the follow-lists of the specified nodes BulkFollows(ctx context.Context, nodes []graph.ID) (map[graph.ID][]graph.ID, error) - // WalksVisitingAny returns up to limit IDs of walks that visit the specified nodes. + // WalksVisitingAny returns up to limit walks that visit the specified nodes. // The walks are distributed evenly among the nodes: // - if limit == -1, all walks are returned. // - if limit < len(nodes), no walks are returned - WalksVisitingAny(ctx context.Context, nodes []graph.ID, limit int) ([]walks.ID, error) + WalksVisitingAny(ctx context.Context, nodes []graph.ID, limit int) ([]walks.Walk, error) +} - // Walks returns the walks associated with the given IDs. - Walks(ctx context.Context, IDs ...walks.ID) ([]walks.Walk, error) +func PersonalizedWithTargets( + ctx context.Context, + loader PersonalizedLoader, + source graph.ID, + targets []graph.ID, + targetLenght int) ([]float64, error) { + + if len(targets) == 0 { + return nil, nil + } + + pp, err := Personalized(ctx, loader, source, targetLenght) + if err != nil { + return nil, err + } + + pageranks := make([]float64, len(targets)) + for i, t := range targets { + pageranks[i] = pp[t] + } + + return pageranks, nil } /* @@ -100,17 +122,12 @@ func Personalized( } targetWalks := int(float64(targetLenght) * (1 - walks.Alpha)) - IDs, err := loader.WalksVisitingAny(ctx, append(follows, source), targetWalks) + walks, err := loader.WalksVisitingAny(ctx, append(follows, source), targetWalks) if err != nil { - return nil, fmt.Errorf("Personalized: failed to fetch the walk IDs: %w", err) + return nil, fmt.Errorf("Personalized: failed to fetch the walk: %w", err) } - walks, err := loader.Walks(ctx, IDs...) - if err != nil { - return nil, fmt.Errorf("Personalized: failed to fetch the walks: %w", err) - } - - walker := newWalkerWithFallback(followMap, loader) + walker := newCachedWalker(followMap, loader) pool := newWalkPool(walks) walk, err := personalizedWalk(ctx, walker, pool, source, targetLenght) @@ -118,7 +135,7 @@ func Personalized( return nil, err } - return frequencies(walk), nil + return frequencyMap(walk), nil } // pWalk is a personalized walk, which is a random walk that resets to a specified node @@ -152,11 +169,11 @@ func (w *pWalk) Reset() { w.node = w.start } -// WalkPool makes sure a walk is returned only once, avoiding bias in the [Personalized] -type WalkPool interface { - // Next returns a path that starts with the provided node - Next(node graph.ID) ([]graph.ID, bool) -} +// // WalkPool makes sure a walk is returned only once, avoiding bias in the [Personalized] +// type WalkPool interface { +// // Next returns a path that starts with the provided node +// Next(node graph.ID) ([]graph.ID, bool) +// } // The personalizedWalk() function simulates a long personalized random walk // starting from a node with reset to itself. Whenever possible, walks from the @@ -164,7 +181,7 @@ type WalkPool interface { func personalizedWalk( ctx context.Context, walker walks.Walker, - pool WalkPool, + pool *walkPool, start graph.ID, lenght int) ([]graph.ID, error) { @@ -185,7 +202,7 @@ func personalizedWalk( path, exists = pool.Next(walk.node) switch exists { case true: - // graft the given path + // use the pre-computed walk when available walk.ongoing.Graft(path) walk.Reset() @@ -215,20 +232,43 @@ func personalizedWalk( } } -// frequencies returns the number of times each node is visited divided by the lenght of the path. -func frequencies(path []graph.ID) map[graph.ID]float64 { +// frequencyMap returns a map node --> frequency of visits. +func frequencyMap(path []graph.ID) map[graph.ID]float64 { if len(path) == 0 { return nil } total := len(path) freq := 1.0 / float64(total) - pp := make(map[graph.ID]float64, total/100) + freqs := make(map[graph.ID]float64, total/100) + for _, node := range path { - pp[node] += freq + freqs[node] += freq } - return pp + return freqs +} + +// targetFrequency returns the frequency of visits for each target +func targetFrequency(targets []graph.ID, path []graph.ID) []float64 { + if len(targets) == 0 || len(path) == 0 { + return nil + } + + total := len(path) + freq := 1.0 / float64(total) + freqs := make([]float64, len(targets)) + + for _, node := range path { + idx := slices.Index(targets, node) + if idx == -1 { + continue + } + + freqs[idx] += freq + } + + return freqs } // returns a random element of a slice. It panics if the slice is empty or nil. diff --git a/pkg/pagerank/pagerank_test.go b/pkg/pagerank/pagerank_test.go new file mode 100644 index 0000000..80b0284 --- /dev/null +++ b/pkg/pagerank/pagerank_test.go @@ -0,0 +1,85 @@ +package pagerank + +import ( + "context" + "fmt" + "github/pippellia-btc/crawler/pkg/graph" + "github/pippellia-btc/crawler/pkg/walks" + "math/rand/v2" + "reflect" + "strconv" + "testing" +) + +func TestPersonalized(t *testing.T) { + ctx := context.Background() + walks.Alpha = 1 // making the test deterministic + + walker := walks.NewCyclicWalker(3) + pool := newWalkPool([]walks.Walk{ + {Path: []graph.ID{"0", "1", "X"}}, + {Path: []graph.ID{"0", "1", "Y"}}, + }) + + expected := []graph.ID{ + "0", "1", "X", + "0", "1", "Y", + "0", "1", "2", + "0", "1", "2", + "0", "1", "2"} + + walk, err := personalizedWalk(ctx, walker, pool, "0", 13) + if err != nil { + t.Fatalf("expected nil, got %v", err) + } + + if !reflect.DeepEqual(walk, expected) { + t.Fatalf("expected %v, got %v", expected, walk) + } +} + +func BenchmarkFrequencyMap(b *testing.B) { + sizes := []int{10000, 100000, 1000000} + for _, size := range sizes { + b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { + + path := make([]graph.ID, size) + for i := range sizes { + n := rand.IntN(size / 10) + path[i] = graph.ID(strconv.Itoa(n)) + } + + b.ResetTimer() + for range b.N { + frequencyMap(path) + } + }) + } +} + +func BenchmarkTargetFrequency(b *testing.B) { + targets := make([]graph.ID, 10) + for i := range 10 { + targets[i] = randomID(1000) + } + + sizes := []int{10000, 100000, 1000000} + for _, size := range sizes { + b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { + + path := make([]graph.ID, size) + for i := range sizes { + path[i] = randomID(size / 10) + } + + b.ResetTimer() + for range b.N { + targetFrequency(targets, path) + } + }) + } +} + +func randomID(n int) graph.ID { + return graph.ID(strconv.Itoa(rand.IntN(n))) +}