personalized walk test

This commit is contained in:
pippellia-btc
2025-05-24 11:24:02 +02:00
parent b46f60225e
commit 7bb961e20c
3 changed files with 166 additions and 37 deletions

View File

@@ -6,19 +6,19 @@ import (
"github/pippellia-btc/crawler/pkg/walks"
)
type walkerWithFallback struct {
type cachedWalker struct {
follows map[graph.ID][]graph.ID
fallback walks.Walker
}
func newWalkerWithFallback(followsMap map[graph.ID][]graph.ID, fallback walks.Walker) *walkerWithFallback {
return &walkerWithFallback{
func newCachedWalker(followsMap map[graph.ID][]graph.ID, fallback walks.Walker) *cachedWalker {
return &cachedWalker{
follows: followsMap,
fallback: fallback,
}
}
func (w *walkerWithFallback) Follows(ctx context.Context, node graph.ID) ([]graph.ID, error) {
func (w *cachedWalker) Follows(ctx context.Context, node graph.ID) ([]graph.ID, error) {
follows, exists := w.follows[node]
if !exists {
var err error
@@ -59,24 +59,28 @@ func newWalkPool(walks []walks.Walk) *walkPool {
}
}
// Next returns a path of nodes that starts immediately after node, making sure
// that the same walk is only used once to avoid bias in the sampling.
// For example, if the walk is [0,1,2,3,4], node = 1, it returns [2,3,4].
func (w *walkPool) Next(node graph.ID) ([]graph.ID, bool) {
indexes, exists := w.walkIndexes[node]
if !exists || len(indexes) == 0 {
return nil, false
}
for i, index := range indexes {
walk := w.walks[index]
if walk.Len() == 0 {
for i, idx := range indexes {
walk := w.walks[idx]
cut := walk.Index(node)
if cut == -1 {
// walk already used, skip
continue
}
// zero the walk so it can't be reused, and reslice the walk indexes
// so we don't spend time looking at walks already used.
w.walks[index].Path = nil
w.walks[idx].Path = nil
w.walkIndexes[node] = indexes[i+1:]
return walk.Path, true
return walk.Path[cut+1:], true
}
// all walks where already used

View File

@@ -7,6 +7,7 @@ import (
"github/pippellia-btc/crawler/pkg/graph"
"github/pippellia-btc/crawler/pkg/walks"
"math/rand/v2"
"slices"
)
var ErrEmptyWalkStore = errors.New("the walk store is empty")
@@ -23,8 +24,8 @@ type VisitCounter interface {
// Global computes the global pagerank score for each target node, as the frequency of visits.
// If a node is not found, its pagerank is assumed to be 0.
func Global(ctx context.Context, count VisitCounter, nodes ...graph.ID) ([]float64, error) {
if len(nodes) == 0 {
func Global(ctx context.Context, count VisitCounter, targets ...graph.ID) ([]float64, error) {
if len(targets) == 0 {
return nil, nil
}
@@ -37,7 +38,7 @@ func Global(ctx context.Context, count VisitCounter, nodes ...graph.ID) ([]float
return nil, ErrEmptyWalkStore
}
visits, err := count.Visits(ctx, nodes...)
visits, err := count.Visits(ctx, targets...)
if err != nil {
return nil, fmt.Errorf("Global: failed to get the nodes visits: %w", err)
}
@@ -58,14 +59,35 @@ type PersonalizedLoader interface {
// BulkFollows returns the follow-lists of the specified nodes
BulkFollows(ctx context.Context, nodes []graph.ID) (map[graph.ID][]graph.ID, error)
// WalksVisitingAny returns up to limit IDs of walks that visit the specified nodes.
// WalksVisitingAny returns up to limit walks that visit the specified nodes.
// The walks are distributed evenly among the nodes:
// - if limit == -1, all walks are returned.
// - if limit < len(nodes), no walks are returned
WalksVisitingAny(ctx context.Context, nodes []graph.ID, limit int) ([]walks.ID, error)
WalksVisitingAny(ctx context.Context, nodes []graph.ID, limit int) ([]walks.Walk, error)
}
// Walks returns the walks associated with the given IDs.
Walks(ctx context.Context, IDs ...walks.ID) ([]walks.Walk, error)
func PersonalizedWithTargets(
ctx context.Context,
loader PersonalizedLoader,
source graph.ID,
targets []graph.ID,
targetLenght int) ([]float64, error) {
if len(targets) == 0 {
return nil, nil
}
pp, err := Personalized(ctx, loader, source, targetLenght)
if err != nil {
return nil, err
}
pageranks := make([]float64, len(targets))
for i, t := range targets {
pageranks[i] = pp[t]
}
return pageranks, nil
}
/*
@@ -100,17 +122,12 @@ func Personalized(
}
targetWalks := int(float64(targetLenght) * (1 - walks.Alpha))
IDs, err := loader.WalksVisitingAny(ctx, append(follows, source), targetWalks)
walks, err := loader.WalksVisitingAny(ctx, append(follows, source), targetWalks)
if err != nil {
return nil, fmt.Errorf("Personalized: failed to fetch the walk IDs: %w", err)
return nil, fmt.Errorf("Personalized: failed to fetch the walk: %w", err)
}
walks, err := loader.Walks(ctx, IDs...)
if err != nil {
return nil, fmt.Errorf("Personalized: failed to fetch the walks: %w", err)
}
walker := newWalkerWithFallback(followMap, loader)
walker := newCachedWalker(followMap, loader)
pool := newWalkPool(walks)
walk, err := personalizedWalk(ctx, walker, pool, source, targetLenght)
@@ -118,7 +135,7 @@ func Personalized(
return nil, err
}
return frequencies(walk), nil
return frequencyMap(walk), nil
}
// pWalk is a personalized walk, which is a random walk that resets to a specified node
@@ -152,11 +169,11 @@ func (w *pWalk) Reset() {
w.node = w.start
}
// WalkPool makes sure a walk is returned only once, avoiding bias in the [Personalized]
type WalkPool interface {
// Next returns a path that starts with the provided node
Next(node graph.ID) ([]graph.ID, bool)
}
// // WalkPool makes sure a walk is returned only once, avoiding bias in the [Personalized]
// type WalkPool interface {
// // Next returns a path that starts with the provided node
// Next(node graph.ID) ([]graph.ID, bool)
// }
// The personalizedWalk() function simulates a long personalized random walk
// starting from a node with reset to itself. Whenever possible, walks from the
@@ -164,7 +181,7 @@ type WalkPool interface {
func personalizedWalk(
ctx context.Context,
walker walks.Walker,
pool WalkPool,
pool *walkPool,
start graph.ID,
lenght int) ([]graph.ID, error) {
@@ -185,7 +202,7 @@ func personalizedWalk(
path, exists = pool.Next(walk.node)
switch exists {
case true:
// graft the given path
// use the pre-computed walk when available
walk.ongoing.Graft(path)
walk.Reset()
@@ -215,20 +232,43 @@ func personalizedWalk(
}
}
// frequencies returns the number of times each node is visited divided by the lenght of the path.
func frequencies(path []graph.ID) map[graph.ID]float64 {
// frequencyMap returns a map node --> frequency of visits.
func frequencyMap(path []graph.ID) map[graph.ID]float64 {
if len(path) == 0 {
return nil
}
total := len(path)
freq := 1.0 / float64(total)
pp := make(map[graph.ID]float64, total/100)
freqs := make(map[graph.ID]float64, total/100)
for _, node := range path {
pp[node] += freq
freqs[node] += freq
}
return pp
return freqs
}
// targetFrequency returns the frequency of visits for each target
func targetFrequency(targets []graph.ID, path []graph.ID) []float64 {
if len(targets) == 0 || len(path) == 0 {
return nil
}
total := len(path)
freq := 1.0 / float64(total)
freqs := make([]float64, len(targets))
for _, node := range path {
idx := slices.Index(targets, node)
if idx == -1 {
continue
}
freqs[idx] += freq
}
return freqs
}
// returns a random element of a slice. It panics if the slice is empty or nil.

View File

@@ -0,0 +1,85 @@
package pagerank
import (
"context"
"fmt"
"github/pippellia-btc/crawler/pkg/graph"
"github/pippellia-btc/crawler/pkg/walks"
"math/rand/v2"
"reflect"
"strconv"
"testing"
)
func TestPersonalized(t *testing.T) {
ctx := context.Background()
walks.Alpha = 1 // making the test deterministic
walker := walks.NewCyclicWalker(3)
pool := newWalkPool([]walks.Walk{
{Path: []graph.ID{"0", "1", "X"}},
{Path: []graph.ID{"0", "1", "Y"}},
})
expected := []graph.ID{
"0", "1", "X",
"0", "1", "Y",
"0", "1", "2",
"0", "1", "2",
"0", "1", "2"}
walk, err := personalizedWalk(ctx, walker, pool, "0", 13)
if err != nil {
t.Fatalf("expected nil, got %v", err)
}
if !reflect.DeepEqual(walk, expected) {
t.Fatalf("expected %v, got %v", expected, walk)
}
}
func BenchmarkFrequencyMap(b *testing.B) {
sizes := []int{10000, 100000, 1000000}
for _, size := range sizes {
b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) {
path := make([]graph.ID, size)
for i := range sizes {
n := rand.IntN(size / 10)
path[i] = graph.ID(strconv.Itoa(n))
}
b.ResetTimer()
for range b.N {
frequencyMap(path)
}
})
}
}
func BenchmarkTargetFrequency(b *testing.B) {
targets := make([]graph.ID, 10)
for i := range 10 {
targets[i] = randomID(1000)
}
sizes := []int{10000, 100000, 1000000}
for _, size := range sizes {
b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) {
path := make([]graph.ID, size)
for i := range sizes {
path[i] = randomID(size / 10)
}
b.ResetTimer()
for range b.N {
targetFrequency(targets, path)
}
})
}
}
func randomID(n int) graph.ID {
return graph.ID(strconv.Itoa(rand.IntN(n)))
}