mirror of
https://github.com/aljazceru/crawler_v2.git
synced 2025-12-17 07:24:21 +01:00
arbiter implemented
This commit is contained in:
@@ -29,6 +29,16 @@ type Node struct {
|
||||
Records []Record
|
||||
}
|
||||
|
||||
// Added returns the time a node was added to the database.
|
||||
func (n *Node) Added() (time.Time, bool) {
|
||||
for _, rec := range n.Records {
|
||||
if rec.Kind == Addition {
|
||||
return rec.Timestamp, true
|
||||
}
|
||||
}
|
||||
return time.Time{}, false
|
||||
}
|
||||
|
||||
// Record contains the timestamp of a node update.
|
||||
type Record struct {
|
||||
Kind int // either [Addition], [Promotion] or [Demotion]
|
||||
|
||||
211
pkg/pipe/engine.go
Normal file
211
pkg/pipe/engine.go
Normal file
@@ -0,0 +1,211 @@
|
||||
package pipe
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github/pippellia-btc/crawler/pkg/graph"
|
||||
"github/pippellia-btc/crawler/pkg/pagerank"
|
||||
"github/pippellia-btc/crawler/pkg/redb"
|
||||
"github/pippellia-btc/crawler/pkg/walks"
|
||||
"log"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// walksTracker tracks the number of walks that have been updated by [Processor].
|
||||
// It's used to wake-up the [Arbiter], which performs work and then resets it to 0.
|
||||
var walksTracker *atomic.Int32
|
||||
|
||||
type ArbiterConfig struct {
|
||||
Activation float64
|
||||
Promotion float64
|
||||
Demotion float64
|
||||
|
||||
PingPeriod time.Duration
|
||||
WaitPeriod time.Duration
|
||||
}
|
||||
|
||||
func NewArbiterConfig() ArbiterConfig {
|
||||
return ArbiterConfig{
|
||||
Activation: 0.01,
|
||||
Promotion: 0.1,
|
||||
Demotion: 1.05,
|
||||
|
||||
PingPeriod: time.Minute,
|
||||
WaitPeriod: time.Hour,
|
||||
}
|
||||
}
|
||||
|
||||
func (c ArbiterConfig) Print() {
|
||||
fmt.Printf("Arbiter\n")
|
||||
fmt.Printf(" Activation: %f\n", c.Activation)
|
||||
fmt.Printf(" Promotion: %f\n", c.Promotion)
|
||||
fmt.Printf(" Demotion: %f\n", c.Demotion)
|
||||
fmt.Printf(" WaitPeriod: %v\n", c.WaitPeriod)
|
||||
}
|
||||
|
||||
// Arbiter activates when the % of walks changed is greater than a threshold. Then it:
|
||||
// - scans through all the nodes in the database
|
||||
// - promotes or demotes nodes
|
||||
func Arbiter(ctx context.Context, config ArbiterConfig, db redb.RedisDB, send func(pk string) error) {
|
||||
ticker := time.NewTicker(config.PingPeriod)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("Arbiter: shutting down...")
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
total, err := db.TotalWalks(ctx)
|
||||
if err != nil {
|
||||
log.Printf("Arbiter: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
changed := walksTracker.Load()
|
||||
changeRatio := float64(changed) / float64(total)
|
||||
|
||||
if changeRatio > config.Activation {
|
||||
promoted, demoted, err := arbiterScan(ctx, config, db, send)
|
||||
if err != nil {
|
||||
log.Printf("Arbiter: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
walksTracker.Store(0) // resets tracker
|
||||
log.Printf("Arbiter: promoted %d, demoted %d", promoted, demoted)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ArbiterScan performs one entire database scan, promoting or demoting nodes based on their pagerank.
|
||||
func arbiterScan(ctx context.Context, config ArbiterConfig, db redb.RedisDB, send func(pk string) error) (promoted, demoted int, err error) {
|
||||
maxTime := 60 * time.Second
|
||||
ctx, cancel := context.WithTimeout(ctx, maxTime)
|
||||
defer cancel()
|
||||
|
||||
baseRank, err := minPagerank(ctx, db)
|
||||
if err != nil {
|
||||
return promoted, demoted, err
|
||||
}
|
||||
|
||||
promotionThreshold := baseRank * config.Promotion
|
||||
demotionThreshold := baseRank * config.Demotion
|
||||
|
||||
var IDs []graph.ID
|
||||
var cursor uint64
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return promoted, demoted, fmt.Errorf("failed to finish the scan in %v", maxTime)
|
||||
default:
|
||||
// proceed with the scan
|
||||
}
|
||||
|
||||
IDs, cursor, err = db.ScanNodes(ctx, cursor, 3000) // ~1000 nodes
|
||||
if err != nil {
|
||||
return promoted, demoted, err
|
||||
}
|
||||
|
||||
nodes, err := db.Nodes(ctx, IDs...)
|
||||
if err != nil {
|
||||
return promoted, demoted, err
|
||||
}
|
||||
|
||||
ranks, err := pagerank.Global(ctx, db, IDs...)
|
||||
if err != nil {
|
||||
return promoted, demoted, err
|
||||
}
|
||||
|
||||
for i, node := range nodes {
|
||||
switch node.Status {
|
||||
case graph.StatusActive:
|
||||
// active --> inactive
|
||||
if ranks[i] < demotionThreshold {
|
||||
if err := demote(db, node.ID); err != nil {
|
||||
return promoted, demoted, err
|
||||
}
|
||||
|
||||
demoted++
|
||||
}
|
||||
|
||||
case graph.StatusInactive:
|
||||
// inactive --> active
|
||||
added, found := node.Added()
|
||||
if !found {
|
||||
return promoted, demoted, fmt.Errorf("node %s doesn't have an addition record", node.ID)
|
||||
}
|
||||
|
||||
if ranks[i] >= promotionThreshold && time.Since(added) > config.WaitPeriod {
|
||||
if err := promote(db, node.ID); err != nil {
|
||||
return promoted, demoted, err
|
||||
}
|
||||
|
||||
promoted++
|
||||
if err := send(node.Pubkey); err != nil {
|
||||
return promoted, demoted, err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if cursor == 0 {
|
||||
// returns to 0, the scan is complete
|
||||
return promoted, demoted, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// minPagerank returns the minimum possible pagerank value for an active node.
|
||||
// An active node is visited by at least its own walks (the one starting from itself)
|
||||
// which are [walks.N].
|
||||
func minPagerank(ctx context.Context, db redb.RedisDB) (float64, error) {
|
||||
total, err := db.TotalVisits(ctx)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
return float64(walks.N) / float64(total), nil
|
||||
}
|
||||
|
||||
// demote removes all walks that start from the node and changes its status to inactive.
|
||||
func demote(db redb.RedisDB, node graph.ID) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
|
||||
visiting, err := db.WalksVisiting(ctx, node, -1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to demote node %s: %v", node, err)
|
||||
}
|
||||
|
||||
toRemove, err := walks.ToRemove(node, visiting)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to demote node %s: %v", node, err)
|
||||
}
|
||||
|
||||
if err := db.RemoveWalks(ctx, toRemove...); err != nil {
|
||||
return fmt.Errorf("failed to demote node %s: %v", node, err)
|
||||
}
|
||||
|
||||
return db.Demote(ctx, node)
|
||||
}
|
||||
|
||||
// promote generates random walks for the node and changes its status to active.
|
||||
func promote(db redb.RedisDB, node graph.ID) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
|
||||
walks, err := walks.Generate(ctx, db, node)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to promote node %s: %v", node, err)
|
||||
}
|
||||
|
||||
if err := db.AddWalks(ctx, walks...); err != nil {
|
||||
return fmt.Errorf("failed to promote node %s: %v", node, err)
|
||||
}
|
||||
|
||||
return db.Promote(ctx, node)
|
||||
}
|
||||
@@ -103,6 +103,7 @@ func (b *buffer) Contains(ID string) bool {
|
||||
func Firehose(ctx context.Context, config FirehoseConfig, check PubkeyChecker, send func(*nostr.Event) error) {
|
||||
pool := nostr.NewSimplePool(ctx)
|
||||
defer close(pool)
|
||||
defer log.Println("Firehose: shutting down...")
|
||||
|
||||
filter := nostr.Filter{
|
||||
Kinds: relevantKinds,
|
||||
@@ -168,6 +169,7 @@ func Fetcher(ctx context.Context, config FetcherConfig, pubkeys <-chan string, s
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("Fetcher: shutting down...")
|
||||
return
|
||||
|
||||
case pubkey := <-pubkeys:
|
||||
|
||||
@@ -68,6 +68,39 @@ func (r RedisDB) NodeCount(ctx context.Context) (int, error) {
|
||||
return int(nodes), nil
|
||||
}
|
||||
|
||||
// Nodes fetches a slice of nodes by their IDs.
|
||||
func (r RedisDB) Nodes(ctx context.Context, IDs ...graph.ID) ([]*graph.Node, error) {
|
||||
if len(IDs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
pipe := r.client.Pipeline()
|
||||
cmds := make([]*redis.MapStringStringCmd, len(IDs))
|
||||
for i, ID := range IDs {
|
||||
cmds[i] = pipe.HGetAll(ctx, node(ID))
|
||||
}
|
||||
|
||||
var err error
|
||||
if _, err = pipe.Exec(ctx); err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch %d nodes: %w", len(IDs), err)
|
||||
}
|
||||
|
||||
nodes := make([]*graph.Node, len(IDs))
|
||||
for i, cmd := range cmds {
|
||||
fields := cmd.Val()
|
||||
if len(fields) == 0 {
|
||||
return nil, fmt.Errorf("failed to fetch %s: %w", node(IDs[i]), ErrNodeNotFound)
|
||||
}
|
||||
|
||||
nodes[i], err = parseNode(fields)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch %s: %w", node(IDs[i]), err)
|
||||
}
|
||||
}
|
||||
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
// NodeByID fetches a node by its ID
|
||||
func (r RedisDB) NodeByID(ctx context.Context, ID graph.ID) (*graph.Node, error) {
|
||||
fields, err := r.client.HGetAll(ctx, node(ID)).Result()
|
||||
@@ -174,7 +207,7 @@ func (r RedisDB) Promote(ctx context.Context, ID graph.ID) error {
|
||||
func (r RedisDB) Demote(ctx context.Context, ID graph.ID) error {
|
||||
err := r.client.HSet(ctx, node(ID), NodeStatus, graph.StatusInactive, NodeDemotionTS, time.Now().Unix()).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to promote %s: %w", node(ID), err)
|
||||
return fmt.Errorf("failed to demote %s: %w", node(ID), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"errors"
|
||||
"github/pippellia-btc/crawler/pkg/graph"
|
||||
"github/pippellia-btc/crawler/pkg/pagerank"
|
||||
"github/pippellia-btc/crawler/pkg/pipe"
|
||||
"github/pippellia-btc/crawler/pkg/walks"
|
||||
"reflect"
|
||||
"testing"
|
||||
@@ -376,7 +375,7 @@ func TestInterfaces(t *testing.T) {
|
||||
var _ walks.Walker = RedisDB{}
|
||||
var _ pagerank.VisitCounter = RedisDB{}
|
||||
var _ pagerank.PersonalizedLoader = RedisDB{}
|
||||
var _ pipe.PubkeyChecker = RedisDB{}
|
||||
//var _ pipe.PubkeyChecker = RedisDB{}
|
||||
}
|
||||
|
||||
// ------------------------------------- HELPERS -------------------------------
|
||||
|
||||
@@ -3,6 +3,7 @@ package redb
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github/pippellia-btc/crawler/pkg/graph"
|
||||
"github/pippellia-btc/crawler/pkg/walks"
|
||||
"strconv"
|
||||
@@ -86,21 +87,21 @@ func parseNode(fields map[string]string) (*graph.Node, error) {
|
||||
case NodeAddedTS:
|
||||
ts, err := parseTimestamp(val)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to parse node: %v", err)
|
||||
}
|
||||
node.Records = append(node.Records, graph.Record{Kind: graph.Addition, Timestamp: ts})
|
||||
|
||||
case NodePromotionTS:
|
||||
ts, err := parseTimestamp(val)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to parse node: %v", err)
|
||||
}
|
||||
node.Records = append(node.Records, graph.Record{Kind: graph.Promotion, Timestamp: ts})
|
||||
|
||||
case NodeDemotionTS:
|
||||
ts, err := parseTimestamp(val)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to parse node: %v", err)
|
||||
}
|
||||
node.Records = append(node.Records, graph.Record{Kind: graph.Demotion, Timestamp: ts})
|
||||
}
|
||||
@@ -113,7 +114,7 @@ func parseNode(fields map[string]string) (*graph.Node, error) {
|
||||
func parseTimestamp(unix string) (time.Time, error) {
|
||||
ts, err := strconv.ParseInt(unix, 10, 64)
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
return time.Time{}, fmt.Errorf("failed to parse timestamp: %w", err)
|
||||
}
|
||||
return time.Unix(ts, 0), nil
|
||||
}
|
||||
|
||||
@@ -308,6 +308,15 @@ func (r RedisDB) TotalVisits(ctx context.Context) (int, error) {
|
||||
return tot, nil
|
||||
}
|
||||
|
||||
// TotalWalks returns the total number of walks.
|
||||
func (r RedisDB) TotalWalks(ctx context.Context) (int, error) {
|
||||
total, err := r.client.HLen(ctx, KeyWalks).Result()
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("failed to get the total number of walks: %w", err)
|
||||
}
|
||||
return int(total), nil
|
||||
}
|
||||
|
||||
// Visits returns the number of times each specified node was visited during the walks.
|
||||
// The returned slice contains counts in the same order as the input nodes.
|
||||
// If a node is not found, it returns 0 visits.
|
||||
|
||||
@@ -114,7 +114,7 @@ func Generate(ctx context.Context, walker Walker, nodes ...graph.ID) ([]Walk, er
|
||||
for range N {
|
||||
path, err = generate(ctx, walker, node)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to Generate: %w", err)
|
||||
return nil, fmt.Errorf("failed to generate walk: %w", err)
|
||||
}
|
||||
|
||||
walks = append(walks, Walk{Path: path})
|
||||
|
||||
Reference in New Issue
Block a user