mirror of
https://github.com/aljazceru/crawler_v2.git
synced 2025-12-17 07:24:21 +01:00
personalized pagerank
This commit is contained in:
85
pkg/pagerank/cache.go
Normal file
85
pkg/pagerank/cache.go
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
package pagerank
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github/pippellia-btc/crawler/pkg/graph"
|
||||||
|
"github/pippellia-btc/crawler/pkg/walks"
|
||||||
|
)
|
||||||
|
|
||||||
|
type walkerWithFallback struct {
|
||||||
|
follows map[graph.ID][]graph.ID
|
||||||
|
fallback walks.Walker
|
||||||
|
}
|
||||||
|
|
||||||
|
func newWalkerWithFallback(followsMap map[graph.ID][]graph.ID, fallback walks.Walker) *walkerWithFallback {
|
||||||
|
return &walkerWithFallback{
|
||||||
|
follows: followsMap,
|
||||||
|
fallback: fallback,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *walkerWithFallback) Follows(ctx context.Context, node graph.ID) ([]graph.ID, error) {
|
||||||
|
follows, exists := w.follows[node]
|
||||||
|
if !exists {
|
||||||
|
var err error
|
||||||
|
follows, err = w.fallback.Follows(ctx, node)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
w.follows[node] = follows
|
||||||
|
}
|
||||||
|
|
||||||
|
return follows, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type walkPool struct {
|
||||||
|
walks []walks.Walk
|
||||||
|
|
||||||
|
// walkIndexes maps each node to the list of indices in walks where that node appears.
|
||||||
|
// That is, if walkIndexes[v] = [0, 2], then v is visited by walks[0] and walks[2].
|
||||||
|
walkIndexes map[graph.ID][]int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newWalkPool(walks []walks.Walk) *walkPool {
|
||||||
|
walkIndexes := make(map[graph.ID][]int, len(walks)/100)
|
||||||
|
|
||||||
|
// add the index of the walk to each node that it visits
|
||||||
|
// excluding the last one which will be cropped out anyway.
|
||||||
|
for i, walk := range walks {
|
||||||
|
for j := range walk.Len() - 1 {
|
||||||
|
node := walk.Path[j]
|
||||||
|
walkIndexes[node] = append(walkIndexes[node], i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &walkPool{
|
||||||
|
walks: walks,
|
||||||
|
walkIndexes: walkIndexes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *walkPool) Next(node graph.ID) ([]graph.ID, bool) {
|
||||||
|
indexes, exists := w.walkIndexes[node]
|
||||||
|
if !exists || len(indexes) == 0 {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, index := range indexes {
|
||||||
|
walk := w.walks[index]
|
||||||
|
if walk.Len() == 0 {
|
||||||
|
// walk already used, skip
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// zero the walk so it can't be reused, and reslice the walk indexes
|
||||||
|
// so we don't spend time looking at walks already used.
|
||||||
|
w.walks[index].Path = nil
|
||||||
|
w.walkIndexes[node] = indexes[i+1:]
|
||||||
|
return walk.Path, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// all walks where already used
|
||||||
|
delete(w.walkIndexes, node)
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
@@ -17,10 +17,11 @@ type VisitCounter interface {
|
|||||||
|
|
||||||
// Visits returns the number of times each specified node was visited during the walks.
|
// Visits returns the number of times each specified node was visited during the walks.
|
||||||
// The returned slice contains counts in the same order as the input nodes.
|
// The returned slice contains counts in the same order as the input nodes.
|
||||||
|
// If a node is not found, it returns 0 visits.
|
||||||
Visits(ctx context.Context, nodes ...graph.ID) ([]int, error)
|
Visits(ctx context.Context, nodes ...graph.ID) ([]int, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Global computes the global pagerank score for the specified nodes.
|
// Global computes the global pagerank score for each target node, as the frequency of visits.
|
||||||
// If a node is not found, its pagerank is assumed to be 0.
|
// If a node is not found, its pagerank is assumed to be 0.
|
||||||
func Global(ctx context.Context, count VisitCounter, nodes ...graph.ID) ([]float64, error) {
|
func Global(ctx context.Context, count VisitCounter, nodes ...graph.ID) ([]float64, error) {
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
@@ -50,6 +51,76 @@ func Global(ctx context.Context, count VisitCounter, nodes ...graph.ID) ([]float
|
|||||||
return pageranks, nil
|
return pageranks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PersonalizedLoader interface {
|
||||||
|
// Follows returns the follow-list of the node
|
||||||
|
Follows(ctx context.Context, node graph.ID) ([]graph.ID, error)
|
||||||
|
|
||||||
|
// BulkFollows returns the follow-lists of the specified nodes
|
||||||
|
BulkFollows(ctx context.Context, nodes []graph.ID) (map[graph.ID][]graph.ID, error)
|
||||||
|
|
||||||
|
// WalksVisitingAny returns up to limit IDs of walks that visit the specified nodes.
|
||||||
|
// The walks are distributed evenly among the nodes:
|
||||||
|
// - if limit == -1, all walks are returned.
|
||||||
|
// - if limit < len(nodes), no walks are returned
|
||||||
|
WalksVisitingAny(ctx context.Context, nodes []graph.ID, limit int) ([]walks.ID, error)
|
||||||
|
|
||||||
|
// Walks returns the walks associated with the given IDs.
|
||||||
|
Walks(ctx context.Context, IDs ...walks.ID) ([]walks.Walk, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Personalized computes the personalized pagerank of node by simulating a
|
||||||
|
long random walk starting at and resetting to itself. This long walk is generated
|
||||||
|
from the random walks stored in the storage layer.
|
||||||
|
|
||||||
|
# REFERENCES
|
||||||
|
|
||||||
|
[1] B. Bahmani, A. Chowdhury, A. Goel; "Fast Incremental and Personalized PageRank"
|
||||||
|
URL: http://snap.stanford.edu/class/cs224w-readings/bahmani10pagerank.pdf
|
||||||
|
*/
|
||||||
|
func Personalized(
|
||||||
|
ctx context.Context,
|
||||||
|
loader PersonalizedLoader,
|
||||||
|
source graph.ID,
|
||||||
|
targetLenght int) (map[graph.ID]float64, error) {
|
||||||
|
|
||||||
|
follows, err := loader.Follows(ctx, source)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Personalized: failed to fetch the follows of source: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(follows) == 0 {
|
||||||
|
// the special distribution of a dandling node
|
||||||
|
return map[graph.ID]float64{source: 1.0}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
followMap, err := loader.BulkFollows(ctx, follows)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Personalized: failed to fetch the two-hop network of source: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
targetWalks := int(float64(targetLenght) * (1 - walks.Alpha))
|
||||||
|
IDs, err := loader.WalksVisitingAny(ctx, append(follows, source), targetWalks)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Personalized: failed to fetch the walk IDs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
walks, err := loader.Walks(ctx, IDs...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Personalized: failed to fetch the walks: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
walker := newWalkerWithFallback(followMap, loader)
|
||||||
|
pool := newWalkPool(walks)
|
||||||
|
|
||||||
|
walk, err := personalizedWalk(ctx, walker, pool, source, targetLenght)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return frequencies(walk), nil
|
||||||
|
}
|
||||||
|
|
||||||
// pWalk is a personalized walk, which is a random walk that resets to a specified node
|
// pWalk is a personalized walk, which is a random walk that resets to a specified node
|
||||||
// and continues until it reaches a specified target lenght.
|
// and continues until it reaches a specified target lenght.
|
||||||
type pWalk struct {
|
type pWalk struct {
|
||||||
@@ -144,6 +215,22 @@ func personalizedWalk(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// frequencies returns the number of times each node is visited divided by the lenght of the path.
|
||||||
|
func frequencies(path []graph.ID) map[graph.ID]float64 {
|
||||||
|
if len(path) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
total := len(path)
|
||||||
|
freq := 1.0 / float64(total)
|
||||||
|
pp := make(map[graph.ID]float64, total/100)
|
||||||
|
for _, node := range path {
|
||||||
|
pp[node] += freq
|
||||||
|
}
|
||||||
|
|
||||||
|
return pp
|
||||||
|
}
|
||||||
|
|
||||||
// returns a random element of a slice. It panics if the slice is empty or nil.
|
// returns a random element of a slice. It panics if the slice is empty or nil.
|
||||||
func randomElement[S []E, E any](s S) E {
|
func randomElement[S []E, E any](s S) E {
|
||||||
return s[rand.IntN(len(s))]
|
return s[rand.IntN(len(s))]
|
||||||
|
|||||||
@@ -24,8 +24,7 @@ type Walk struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Walker interface {
|
type Walker interface {
|
||||||
// Follows returns the follow-list of the specified node, which will be used in
|
// Follows returns the follow-list of the node, used for generating random walks
|
||||||
// generating random walks
|
|
||||||
Follows(ctx context.Context, node graph.ID) ([]graph.ID, error)
|
Follows(ctx context.Context, node graph.ID) ([]graph.ID, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -81,8 +80,9 @@ func (w *Walk) Graft(path []graph.ID) {
|
|||||||
w.Path = w.Path[:pos]
|
w.Path = w.Path[:pos]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate N random walks for the specified node, using dampening factor alpha.
|
// Generate [N] random walks for the specified node, using dampening factor [Alpha].
|
||||||
// A walk stops early if a cycle is encountered. Walk IDs will be overwritten by the storage layer.
|
// A walk stops early if a cycle is encountered.
|
||||||
|
// Walk IDs are not set, because it's the responsibility of the storage layer.
|
||||||
func Generate(ctx context.Context, walker Walker, nodes ...graph.ID) ([]Walk, error) {
|
func Generate(ctx context.Context, walker Walker, nodes ...graph.ID) ([]Walk, error) {
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
@@ -109,7 +109,7 @@ func Generate(ctx context.Context, walker Walker, nodes ...graph.ID) ([]Walk, er
|
|||||||
// Generate a random path of nodes, by:
|
// Generate a random path of nodes, by:
|
||||||
// - starting from one of the provided nodes, chosen at random
|
// - starting from one of the provided nodes, chosen at random
|
||||||
// - walking along the social graph
|
// - walking along the social graph
|
||||||
// - stopping with probabiliy 1-alpha, on dandling nodes, and on cycles
|
// - stopping with probabiliy 1-Alpha, on dandling nodes, and on cycles
|
||||||
func generate(ctx context.Context, walker Walker, start ...graph.ID) ([]graph.ID, error) {
|
func generate(ctx context.Context, walker Walker, start ...graph.ID) ([]graph.ID, error) {
|
||||||
if len(start) == 0 {
|
if len(start) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
@@ -158,7 +158,7 @@ func ToRemove(node graph.ID, walks []Walk) ([]ID, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(toRemove) != N {
|
if len(toRemove) != N {
|
||||||
return toRemove, fmt.Errorf("walks to be removed (%d) are less than expected (%d)", len(toRemove), N)
|
return toRemove, fmt.Errorf("walks to be removed (%d) are different than expected (%d)", len(toRemove), N)
|
||||||
}
|
}
|
||||||
|
|
||||||
return toRemove, nil
|
return toRemove, nil
|
||||||
|
|||||||
Reference in New Issue
Block a user