diff --git a/pkg/pagerank/cache.go b/pkg/pagerank/cache.go new file mode 100644 index 0000000..bf4f2b4 --- /dev/null +++ b/pkg/pagerank/cache.go @@ -0,0 +1,85 @@ +package pagerank + +import ( + "context" + "github/pippellia-btc/crawler/pkg/graph" + "github/pippellia-btc/crawler/pkg/walks" +) + +type walkerWithFallback struct { + follows map[graph.ID][]graph.ID + fallback walks.Walker +} + +func newWalkerWithFallback(followsMap map[graph.ID][]graph.ID, fallback walks.Walker) *walkerWithFallback { + return &walkerWithFallback{ + follows: followsMap, + fallback: fallback, + } +} + +func (w *walkerWithFallback) Follows(ctx context.Context, node graph.ID) ([]graph.ID, error) { + follows, exists := w.follows[node] + if !exists { + var err error + follows, err = w.fallback.Follows(ctx, node) + if err != nil { + return nil, err + } + + w.follows[node] = follows + } + + return follows, nil +} + +type walkPool struct { + walks []walks.Walk + + // walkIndexes maps each node to the list of indices in walks where that node appears. + // That is, if walkIndexes[v] = [0, 2], then v is visited by walks[0] and walks[2]. + walkIndexes map[graph.ID][]int +} + +func newWalkPool(walks []walks.Walk) *walkPool { + walkIndexes := make(map[graph.ID][]int, len(walks)/100) + + // add the index of the walk to each node that it visits + // excluding the last one which will be cropped out anyway. + for i, walk := range walks { + for j := range walk.Len() - 1 { + node := walk.Path[j] + walkIndexes[node] = append(walkIndexes[node], i) + } + } + + return &walkPool{ + walks: walks, + walkIndexes: walkIndexes, + } +} + +func (w *walkPool) Next(node graph.ID) ([]graph.ID, bool) { + indexes, exists := w.walkIndexes[node] + if !exists || len(indexes) == 0 { + return nil, false + } + + for i, index := range indexes { + walk := w.walks[index] + if walk.Len() == 0 { + // walk already used, skip + continue + } + + // zero the walk so it can't be reused, and reslice the walk indexes + // so we don't spend time looking at walks already used. + w.walks[index].Path = nil + w.walkIndexes[node] = indexes[i+1:] + return walk.Path, true + } + + // all walks where already used + delete(w.walkIndexes, node) + return nil, false +} diff --git a/pkg/pagerank/pagerank.go b/pkg/pagerank/pagerank.go index e869269..d0e2949 100644 --- a/pkg/pagerank/pagerank.go +++ b/pkg/pagerank/pagerank.go @@ -17,10 +17,11 @@ type VisitCounter interface { // Visits returns the number of times each specified node was visited during the walks. // The returned slice contains counts in the same order as the input nodes. + // If a node is not found, it returns 0 visits. Visits(ctx context.Context, nodes ...graph.ID) ([]int, error) } -// Global computes the global pagerank score for the specified nodes. +// Global computes the global pagerank score for each target node, as the frequency of visits. // If a node is not found, its pagerank is assumed to be 0. func Global(ctx context.Context, count VisitCounter, nodes ...graph.ID) ([]float64, error) { if len(nodes) == 0 { @@ -50,6 +51,76 @@ func Global(ctx context.Context, count VisitCounter, nodes ...graph.ID) ([]float return pageranks, nil } +type PersonalizedLoader interface { + // Follows returns the follow-list of the node + Follows(ctx context.Context, node graph.ID) ([]graph.ID, error) + + // BulkFollows returns the follow-lists of the specified nodes + BulkFollows(ctx context.Context, nodes []graph.ID) (map[graph.ID][]graph.ID, error) + + // WalksVisitingAny returns up to limit IDs of walks that visit the specified nodes. + // The walks are distributed evenly among the nodes: + // - if limit == -1, all walks are returned. + // - if limit < len(nodes), no walks are returned + WalksVisitingAny(ctx context.Context, nodes []graph.ID, limit int) ([]walks.ID, error) + + // Walks returns the walks associated with the given IDs. + Walks(ctx context.Context, IDs ...walks.ID) ([]walks.Walk, error) +} + +/* +Personalized computes the personalized pagerank of node by simulating a +long random walk starting at and resetting to itself. This long walk is generated +from the random walks stored in the storage layer. + +# REFERENCES + +[1] B. Bahmani, A. Chowdhury, A. Goel; "Fast Incremental and Personalized PageRank" +URL: http://snap.stanford.edu/class/cs224w-readings/bahmani10pagerank.pdf +*/ +func Personalized( + ctx context.Context, + loader PersonalizedLoader, + source graph.ID, + targetLenght int) (map[graph.ID]float64, error) { + + follows, err := loader.Follows(ctx, source) + if err != nil { + return nil, fmt.Errorf("Personalized: failed to fetch the follows of source: %w", err) + } + + if len(follows) == 0 { + // the special distribution of a dandling node + return map[graph.ID]float64{source: 1.0}, nil + } + + followMap, err := loader.BulkFollows(ctx, follows) + if err != nil { + return nil, fmt.Errorf("Personalized: failed to fetch the two-hop network of source: %w", err) + } + + targetWalks := int(float64(targetLenght) * (1 - walks.Alpha)) + IDs, err := loader.WalksVisitingAny(ctx, append(follows, source), targetWalks) + if err != nil { + return nil, fmt.Errorf("Personalized: failed to fetch the walk IDs: %w", err) + } + + walks, err := loader.Walks(ctx, IDs...) + if err != nil { + return nil, fmt.Errorf("Personalized: failed to fetch the walks: %w", err) + } + + walker := newWalkerWithFallback(followMap, loader) + pool := newWalkPool(walks) + + walk, err := personalizedWalk(ctx, walker, pool, source, targetLenght) + if err != nil { + return nil, err + } + + return frequencies(walk), nil +} + // pWalk is a personalized walk, which is a random walk that resets to a specified node // and continues until it reaches a specified target lenght. type pWalk struct { @@ -144,6 +215,22 @@ func personalizedWalk( } } +// frequencies returns the number of times each node is visited divided by the lenght of the path. +func frequencies(path []graph.ID) map[graph.ID]float64 { + if len(path) == 0 { + return nil + } + + total := len(path) + freq := 1.0 / float64(total) + pp := make(map[graph.ID]float64, total/100) + for _, node := range path { + pp[node] += freq + } + + return pp +} + // returns a random element of a slice. It panics if the slice is empty or nil. func randomElement[S []E, E any](s S) E { return s[rand.IntN(len(s))] diff --git a/pkg/walks/walks.go b/pkg/walks/walks.go index 2436df9..c6d3ea1 100644 --- a/pkg/walks/walks.go +++ b/pkg/walks/walks.go @@ -24,8 +24,7 @@ type Walk struct { } type Walker interface { - // Follows returns the follow-list of the specified node, which will be used in - // generating random walks + // Follows returns the follow-list of the node, used for generating random walks Follows(ctx context.Context, node graph.ID) ([]graph.ID, error) } @@ -81,8 +80,9 @@ func (w *Walk) Graft(path []graph.ID) { w.Path = w.Path[:pos] } -// Generate N random walks for the specified node, using dampening factor alpha. -// A walk stops early if a cycle is encountered. Walk IDs will be overwritten by the storage layer. +// Generate [N] random walks for the specified node, using dampening factor [Alpha]. +// A walk stops early if a cycle is encountered. +// Walk IDs are not set, because it's the responsibility of the storage layer. func Generate(ctx context.Context, walker Walker, nodes ...graph.ID) ([]Walk, error) { if len(nodes) == 0 { return nil, nil @@ -109,7 +109,7 @@ func Generate(ctx context.Context, walker Walker, nodes ...graph.ID) ([]Walk, er // Generate a random path of nodes, by: // - starting from one of the provided nodes, chosen at random // - walking along the social graph -// - stopping with probabiliy 1-alpha, on dandling nodes, and on cycles +// - stopping with probabiliy 1-Alpha, on dandling nodes, and on cycles func generate(ctx context.Context, walker Walker, start ...graph.ID) ([]graph.ID, error) { if len(start) == 0 { return nil, nil @@ -158,7 +158,7 @@ func ToRemove(node graph.ID, walks []Walk) ([]ID, error) { } if len(toRemove) != N { - return toRemove, fmt.Errorf("walks to be removed (%d) are less than expected (%d)", len(toRemove), N) + return toRemove, fmt.Errorf("walks to be removed (%d) are different than expected (%d)", len(toRemove), N) } return toRemove, nil