mirror of
https://github.com/aljazceru/crawler_v2.git
synced 2025-12-16 23:14:19 +01:00
moved back to a single db writer
This commit is contained in:
@@ -200,12 +200,6 @@ func Load() (*Config, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing %v: %v", keyVal, err)
|
||||
}
|
||||
|
||||
case "ENGINE_ARCHIVERS":
|
||||
config.Engine.Archivers, err = strconv.Atoi(val)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing %v: %v", keyVal, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -24,9 +24,6 @@ type EngineConfig struct {
|
||||
// GraphBuilder params
|
||||
BuilderCapacity int
|
||||
CacheCapacity int
|
||||
|
||||
// Archiver params
|
||||
Archivers int
|
||||
}
|
||||
|
||||
func NewEngineConfig() EngineConfig {
|
||||
@@ -34,7 +31,6 @@ func NewEngineConfig() EngineConfig {
|
||||
PrintEvery: 5000,
|
||||
BuilderCapacity: 1000,
|
||||
CacheCapacity: 100_000,
|
||||
Archivers: 4,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,8 +52,10 @@ func Engine(
|
||||
graphEvents := make(chan *nostr.Event, config.BuilderCapacity)
|
||||
defer close(graphEvents)
|
||||
|
||||
go GraphBuilder(ctx, config, db, graphEvents)
|
||||
log.Println("Engine: ready to process events")
|
||||
defer log.Println("Engine: shutting down...")
|
||||
|
||||
go GraphBuilder(ctx, config, db, graphEvents)
|
||||
|
||||
Archiver(ctx, config, store, events, func(e *nostr.Event) error {
|
||||
if e.Kind == nostr.KindFollowList {
|
||||
@@ -69,8 +67,6 @@ func Engine(
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
log.Println("Engine: shutting down...")
|
||||
}
|
||||
|
||||
// Archiver stores events in the event store.
|
||||
@@ -81,7 +77,6 @@ func Archiver(
|
||||
events chan *nostr.Event,
|
||||
onReplace func(*nostr.Event) error) {
|
||||
|
||||
sem := make(chan struct{}, config.Archivers)
|
||||
var processed int
|
||||
|
||||
for {
|
||||
@@ -94,16 +89,34 @@ func Archiver(
|
||||
return
|
||||
}
|
||||
|
||||
sem <- struct{}{}
|
||||
go func() {
|
||||
err := archive(ctx, store, event, onReplace)
|
||||
<-sem
|
||||
err := func() error {
|
||||
opctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Archiver: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, err)
|
||||
switch {
|
||||
case nostr.IsRegularKind(event.Kind):
|
||||
return store.Save(opctx, event)
|
||||
|
||||
case nostr.IsReplaceableKind(event.Kind):
|
||||
replaced, err := store.Replace(opctx, event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if replaced {
|
||||
return onReplace(event)
|
||||
}
|
||||
return nil
|
||||
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Archiver: event ID %s, kind %d by %s: %v", event.ID, event.Kind, event.PubKey, err)
|
||||
}
|
||||
|
||||
processed++
|
||||
if processed%config.PrintEvery == 0 {
|
||||
log.Printf("Archiver: processed %d events", processed)
|
||||
@@ -112,36 +125,6 @@ func Archiver(
|
||||
}
|
||||
}
|
||||
|
||||
// Archive an event based on its kind.
|
||||
func archive(
|
||||
ctx context.Context,
|
||||
store *eventstore.Store,
|
||||
event *nostr.Event,
|
||||
onReplace func(*nostr.Event) error) error {
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
switch {
|
||||
case nostr.IsRegularKind(event.Kind):
|
||||
return store.Save(ctx, event)
|
||||
|
||||
case nostr.IsReplaceableKind(event.Kind):
|
||||
replaced, err := store.Replace(ctx, event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if replaced {
|
||||
return onReplace(event)
|
||||
}
|
||||
return nil
|
||||
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// GraphBuilder consumes events to update the graph and random walks.
|
||||
func GraphBuilder(
|
||||
ctx context.Context,
|
||||
|
||||
Reference in New Issue
Block a user