mirror of
https://github.com/aljazceru/crawler_v2.git
synced 2025-12-16 23:14:19 +01:00
240 lines
5.4 KiB
Go
240 lines
5.4 KiB
Go
package walks
|
|
|
|
import (
|
|
"container/list"
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"strconv"
|
|
|
|
"github.com/vertex-lab/crawler_v2/pkg/graph"
|
|
)
|
|
|
|
type Walker interface {
|
|
// Follows returns the follow-list of the node, used for generating random walks
|
|
Follows(ctx context.Context, node graph.ID) ([]graph.ID, error)
|
|
}
|
|
|
|
// CachedWalker is a [Walker] with optional fallback that stores follow relationships
|
|
// in a compact format (uint32) for reduced memory footprint.
|
|
// If its size grows larger than capacity, the least recently used (LRU) key is evicted.
|
|
// It is not safe for concurrent use.
|
|
type CachedWalker struct {
|
|
lookup map[uint32]*list.Element
|
|
|
|
// newest at the front, oldest at the back
|
|
edgeList *list.List
|
|
capacity int
|
|
|
|
// for stats
|
|
calls, hits int
|
|
log *log.Logger
|
|
|
|
fallback Walker
|
|
}
|
|
|
|
type Option func(*CachedWalker)
|
|
|
|
func WithFallback(w Walker) Option { return func(c *CachedWalker) { c.fallback = w } }
|
|
|
|
func WithCapacity(cap int) Option {
|
|
return func(c *CachedWalker) {
|
|
c.lookup = make(map[uint32]*list.Element, cap)
|
|
c.capacity = cap
|
|
}
|
|
}
|
|
|
|
func WithLogFile(filename string) Option {
|
|
return func(c *CachedWalker) {
|
|
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
|
if err != nil {
|
|
panic(fmt.Errorf("failed to open log file %s: %w", filename, err))
|
|
}
|
|
c.log = log.New(file, "cache: ", log.LstdFlags)
|
|
}
|
|
}
|
|
|
|
func NewWalker(opts ...Option) *CachedWalker {
|
|
c := &CachedWalker{
|
|
lookup: make(map[uint32]*list.Element, 10000),
|
|
edgeList: list.New(),
|
|
capacity: 10000,
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(c)
|
|
}
|
|
return c
|
|
}
|
|
|
|
type edges struct {
|
|
node uint32
|
|
follows []uint32
|
|
}
|
|
|
|
// Add compresses node and follows and adds them to the cache.
|
|
// It evicts the LRU element if the capacity has been exeeded.
|
|
func (c *CachedWalker) Add(node graph.ID, follows []graph.ID) error {
|
|
ID, err := compactID(node)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to compress node %s: %w", node, err)
|
|
}
|
|
|
|
IDs, err := compactIDs(follows)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to compress follows of node %s: %w", node, err)
|
|
}
|
|
|
|
c.add(ID, IDs)
|
|
return nil
|
|
}
|
|
|
|
// Add node and follows as [edges]. It evicts the LRU element if the capacity has been exeeded.
|
|
func (c *CachedWalker) add(node uint32, follows []uint32) {
|
|
if e, ok := c.lookup[node]; ok {
|
|
// node already present, update value
|
|
e.Value = edges{node: node, follows: follows}
|
|
c.edgeList.MoveToFront(e)
|
|
return
|
|
}
|
|
|
|
c.lookup[node] = c.edgeList.PushFront(
|
|
edges{node: node, follows: follows},
|
|
)
|
|
|
|
if c.Size() > c.capacity {
|
|
oldest := c.edgeList.Back()
|
|
c.edgeList.Remove(oldest)
|
|
delete(c.lookup, oldest.Value.(edges).node)
|
|
}
|
|
}
|
|
|
|
func (c *CachedWalker) Update(ctx context.Context, delta graph.Delta) error {
|
|
if err := c.Add(delta.Node, delta.New()); err != nil {
|
|
return fmt.Errorf("failed to update: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *CachedWalker) Size() int {
|
|
return c.edgeList.Len()
|
|
}
|
|
|
|
func (c *CachedWalker) logStats() {
|
|
if c.log != nil {
|
|
hitRatio := 100 * float64(c.hits) / float64(c.calls)
|
|
c.log.Printf("calls %d, hits %f%%", c.calls, hitRatio)
|
|
}
|
|
c.calls, c.hits = 0, 0
|
|
}
|
|
|
|
func (c *CachedWalker) Follows(ctx context.Context, node graph.ID) ([]graph.ID, error) {
|
|
ID, err := compactID(node)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch follows of %s: %w", node, err)
|
|
}
|
|
|
|
c.calls++
|
|
if c.calls >= 1_000_000 {
|
|
defer c.logStats()
|
|
}
|
|
|
|
element, hit := c.lookup[ID]
|
|
if hit {
|
|
c.hits++
|
|
c.edgeList.MoveToFront(element)
|
|
return nodes(element.Value.(edges).follows), nil
|
|
}
|
|
|
|
if c.fallback == nil {
|
|
return nil, fmt.Errorf("%w: %s", graph.ErrNodeNotFound, node)
|
|
}
|
|
|
|
follows, err := c.fallback.Follows(ctx, node)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
IDs, err := compactIDs(follows)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch follows of %s: %w", node, err)
|
|
}
|
|
|
|
c.add(ID, IDs)
|
|
return follows, nil
|
|
}
|
|
|
|
func (c *CachedWalker) Load(nodes []graph.ID, follows [][]graph.ID) error {
|
|
if len(nodes) != len(follows) {
|
|
return fmt.Errorf("failed to load: nodes and follows must have the same lenght")
|
|
}
|
|
|
|
for i, node := range nodes {
|
|
if err := c.Add(node, follows[i]); err != nil {
|
|
return fmt.Errorf("failed to load: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func compactID(node graph.ID) (uint32, error) {
|
|
ID, err := strconv.ParseUint(string(node), 10, 32)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return uint32(ID), err
|
|
}
|
|
|
|
func compactIDs(nodes []graph.ID) ([]uint32, error) {
|
|
IDs := make([]uint32, len(nodes))
|
|
var err error
|
|
for i, node := range nodes {
|
|
IDs[i], err = compactID(node)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return IDs, nil
|
|
}
|
|
|
|
func node(ID uint32) graph.ID {
|
|
return graph.ID(strconv.FormatUint(uint64(ID), 10))
|
|
}
|
|
|
|
func nodes(IDs []uint32) []graph.ID {
|
|
nodes := make([]graph.ID, len(IDs))
|
|
for i, ID := range IDs {
|
|
nodes[i] = node(ID)
|
|
}
|
|
return nodes
|
|
}
|
|
|
|
type SimpleWalker struct {
|
|
follows map[graph.ID][]graph.ID
|
|
}
|
|
|
|
func NewSimpleWalker(m map[graph.ID][]graph.ID) *SimpleWalker {
|
|
return &SimpleWalker{follows: m}
|
|
}
|
|
|
|
func (w *SimpleWalker) Follows(ctx context.Context, node graph.ID) ([]graph.ID, error) {
|
|
return w.follows[node], nil
|
|
}
|
|
|
|
func (w *SimpleWalker) Update(ctx context.Context, delta graph.Delta) {
|
|
w.follows[delta.Node] = delta.New()
|
|
}
|
|
|
|
func NewCyclicWalker(n int) *SimpleWalker {
|
|
follows := make(map[graph.ID][]graph.ID, n)
|
|
for i := range n {
|
|
node := graph.ID(strconv.Itoa(i))
|
|
next := graph.ID(strconv.Itoa((i + 1) % n))
|
|
follows[node] = []graph.ID{next}
|
|
}
|
|
|
|
return &SimpleWalker{follows: follows}
|
|
}
|