mirror of
https://github.com/aljazceru/crawler_v2.git
synced 2025-12-17 07:24:21 +01:00
moved cached walker to walks pkg
This commit is contained in:
@@ -1,95 +0,0 @@
|
||||
package pagerank
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github/pippellia-btc/crawler/pkg/graph"
|
||||
"github/pippellia-btc/crawler/pkg/walks"
|
||||
)
|
||||
|
||||
type cachedWalker struct {
|
||||
follows map[graph.ID][]graph.ID
|
||||
fallback walks.Walker
|
||||
}
|
||||
|
||||
func newCachedWalker(nodes []graph.ID, follows [][]graph.ID, fallback walks.Walker) *cachedWalker {
|
||||
w := cachedWalker{
|
||||
follows: make(map[graph.ID][]graph.ID, len(nodes)),
|
||||
fallback: fallback,
|
||||
}
|
||||
|
||||
for i, node := range nodes {
|
||||
w.follows[node] = follows[i]
|
||||
}
|
||||
|
||||
return &w
|
||||
}
|
||||
|
||||
func (w *cachedWalker) Follows(ctx context.Context, node graph.ID) ([]graph.ID, error) {
|
||||
follows, exists := w.follows[node]
|
||||
if !exists {
|
||||
var err error
|
||||
follows, err = w.fallback.Follows(ctx, node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w.follows[node] = follows
|
||||
}
|
||||
|
||||
return follows, nil
|
||||
}
|
||||
|
||||
type walkPool struct {
|
||||
walks []walks.Walk
|
||||
|
||||
// walkIndexes maps each node to the list of indices in walks where that node appears.
|
||||
// That is, if walkIndexes[v] = [0, 2], then v is visited by walks[0] and walks[2].
|
||||
walkIndexes map[graph.ID][]int
|
||||
}
|
||||
|
||||
func newWalkPool(walks []walks.Walk) *walkPool {
|
||||
walkIndexes := make(map[graph.ID][]int, len(walks)/100)
|
||||
|
||||
// add the index of the walk to each node that it visits
|
||||
// excluding the last one which will be cropped out anyway.
|
||||
for i, walk := range walks {
|
||||
for j := range walk.Len() - 1 {
|
||||
node := walk.Path[j]
|
||||
walkIndexes[node] = append(walkIndexes[node], i)
|
||||
}
|
||||
}
|
||||
|
||||
return &walkPool{
|
||||
walks: walks,
|
||||
walkIndexes: walkIndexes,
|
||||
}
|
||||
}
|
||||
|
||||
// Next returns a path of nodes that starts immediately after node, making sure
|
||||
// that the same walk is only used once to avoid bias in the sampling.
|
||||
// For example, if the walk is [0,1,2,3,4], node = 1, it returns [2,3,4].
|
||||
func (w *walkPool) Next(node graph.ID) ([]graph.ID, bool) {
|
||||
indexes, exists := w.walkIndexes[node]
|
||||
if !exists || len(indexes) == 0 {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
for i, idx := range indexes {
|
||||
walk := w.walks[idx]
|
||||
cut := walk.Index(node)
|
||||
if cut == -1 {
|
||||
// walk already used, skip
|
||||
continue
|
||||
}
|
||||
|
||||
// zero the walk so it can't be reused, and reslice the walk indexes
|
||||
// so we don't spend time looking at walks already used.
|
||||
w.walks[idx].Path = nil
|
||||
w.walkIndexes[node] = indexes[i+1:]
|
||||
return walk.Path[cut+1:], true
|
||||
}
|
||||
|
||||
// all walks where already used
|
||||
delete(w.walkIndexes, node)
|
||||
return nil, false
|
||||
}
|
||||
@@ -122,13 +122,13 @@ func Personalized(
|
||||
return nil, fmt.Errorf("Personalized: failed to fetch the two-hop network of source: %w", err)
|
||||
}
|
||||
|
||||
walker := walks.NewCachedWalker(follows, followByNode, loader)
|
||||
targetWalks := int(float64(targetLenght) * (1 - walks.Alpha))
|
||||
|
||||
walks, err := loader.WalksVisitingAny(ctx, append(follows, source), targetWalks)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Personalized: failed to fetch the walk: %w", err)
|
||||
}
|
||||
|
||||
walker := newCachedWalker(follows, followByNode, loader)
|
||||
pool := newWalkPool(walks)
|
||||
|
||||
walk, err := personalizedWalk(ctx, walker, pool, source, targetLenght)
|
||||
@@ -170,11 +170,60 @@ func (w *pWalk) Reset() {
|
||||
w.node = w.start
|
||||
}
|
||||
|
||||
// // WalkPool makes sure a walk is returned only once, avoiding bias in the [Personalized]
|
||||
// type WalkPool interface {
|
||||
// // Next returns a path that starts with the provided node
|
||||
// Next(node graph.ID) ([]graph.ID, bool)
|
||||
// }
|
||||
type walkPool struct {
|
||||
walks []walks.Walk
|
||||
|
||||
// walkIndexes maps each node to the list of indices in walks where that node appears.
|
||||
// That is, if walkIndexes[v] = [0, 2], then v is visited by walks[0] and walks[2].
|
||||
walkIndexes map[graph.ID][]int
|
||||
}
|
||||
|
||||
func newWalkPool(walks []walks.Walk) *walkPool {
|
||||
walkIndexes := make(map[graph.ID][]int, len(walks)/100)
|
||||
|
||||
// add the index of the walk to each node that it visits
|
||||
// excluding the last one which will be cropped out anyway.
|
||||
for i, walk := range walks {
|
||||
for j := range walk.Len() - 1 {
|
||||
node := walk.Path[j]
|
||||
walkIndexes[node] = append(walkIndexes[node], i)
|
||||
}
|
||||
}
|
||||
|
||||
return &walkPool{
|
||||
walks: walks,
|
||||
walkIndexes: walkIndexes,
|
||||
}
|
||||
}
|
||||
|
||||
// Next returns a path of nodes that starts immediately after node, making sure
|
||||
// that the same walk is only used once to avoid bias in the sampling.
|
||||
// For example, if the walk is [0,1,2,3,4], node = 1, it returns [2,3,4].
|
||||
func (w *walkPool) Next(node graph.ID) ([]graph.ID, bool) {
|
||||
indexes, exists := w.walkIndexes[node]
|
||||
if !exists || len(indexes) == 0 {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
for i, idx := range indexes {
|
||||
walk := w.walks[idx]
|
||||
cut := walk.Index(node)
|
||||
if cut == -1 {
|
||||
// walk already used, skip
|
||||
continue
|
||||
}
|
||||
|
||||
// zero the walk so it can't be reused, and reslice the walk indexes
|
||||
// so we don't spend time looking at walks already used.
|
||||
w.walks[idx].Path = nil
|
||||
w.walkIndexes[node] = indexes[i+1:]
|
||||
return walk.Path[cut+1:], true
|
||||
}
|
||||
|
||||
// all walks where already used
|
||||
delete(w.walkIndexes, node)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// The personalizedWalk() function simulates a long personalized random walk
|
||||
// starting from a node with reset to itself. Whenever possible, walks from the
|
||||
|
||||
@@ -37,3 +37,38 @@ func NewCyclicWalker(n int) *MapWalker {
|
||||
|
||||
return &MapWalker{follows: follows}
|
||||
}
|
||||
|
||||
// CachedWalker is a walker with optional fallback that stores follow relationships
|
||||
// in a compact format (uint32) for reduced memory footprint.
|
||||
type cachedWalker struct {
|
||||
follows map[graph.ID][]graph.ID
|
||||
fallback Walker
|
||||
}
|
||||
|
||||
func NewCachedWalker(nodes []graph.ID, follows [][]graph.ID, fallback Walker) *cachedWalker {
|
||||
w := cachedWalker{
|
||||
follows: make(map[graph.ID][]graph.ID, len(nodes)),
|
||||
fallback: fallback,
|
||||
}
|
||||
|
||||
for i, node := range nodes {
|
||||
w.follows[node] = follows[i]
|
||||
}
|
||||
|
||||
return &w
|
||||
}
|
||||
|
||||
func (w *cachedWalker) Follows(ctx context.Context, node graph.ID) ([]graph.ID, error) {
|
||||
follows, exists := w.follows[node]
|
||||
if !exists {
|
||||
var err error
|
||||
follows, err = w.fallback.Follows(ctx, node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w.follows[node] = follows
|
||||
}
|
||||
|
||||
return follows, nil
|
||||
}
|
||||
|
||||
@@ -171,6 +171,31 @@ func TestFindCycle(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// func TestMemoryUsage(t *testing.T) {
|
||||
// var before runtime.MemStats
|
||||
// runtime.ReadMemStats(&before)
|
||||
|
||||
// follows := make(map[int][]int)
|
||||
// for i := 0; i < 1000000; i++ {
|
||||
// node := int(i)
|
||||
// follows[node] = randomFollows(100)
|
||||
// }
|
||||
|
||||
// var after runtime.MemStats
|
||||
// runtime.ReadMemStats(&after)
|
||||
|
||||
// used := float64(after.Alloc-before.Alloc) / 1024 / 1024
|
||||
// t.Fatalf("Approx. memory used by map: %.2f MB\n", used)
|
||||
// }
|
||||
|
||||
// func randomFollows(size int) []int {
|
||||
// follows := make([]int, size)
|
||||
// for i := range size {
|
||||
// follows[i] = rand.Int()
|
||||
// }
|
||||
// return follows
|
||||
// }
|
||||
|
||||
func BenchmarkFindCycle(b *testing.B) {
|
||||
sizes := []int{10, 100, 1000}
|
||||
for _, size := range sizes {
|
||||
|
||||
Reference in New Issue
Block a user