diff --git a/examples/basic-badger/main.go b/examples/basic-badger/main.go index d3152e1..c3b8c37 100644 --- a/examples/basic-badger/main.go +++ b/examples/basic-badger/main.go @@ -20,6 +20,7 @@ func main() { relay.QueryEvents = append(relay.QueryEvents, db.QueryEvents) relay.CountEvents = append(relay.CountEvents, db.CountEvents) relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent) + relay.Negentropy = true fmt.Println("running on :3334") http.ListenAndServe(":3334", relay) diff --git a/go.mod b/go.mod index 889c7b6..b424ac8 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/fiatjaf/khatru go 1.23.1 require ( + github.com/bep/debounce v1.2.1 github.com/fasthttp/websocket v1.5.7 github.com/fiatjaf/eventstore v0.12.0 github.com/nbd-wtf/go-nostr v0.40.0 @@ -18,6 +19,7 @@ require ( github.com/aquasecurity/esquery v0.2.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect + github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/crypto/blake256 v1.1.0 // indirect @@ -36,6 +38,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/flatbuffers v24.3.25+incompatible // indirect + github.com/greatroar/blobloom v0.8.0 // indirect github.com/jmoiron/sqlx v1.3.5 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/compress v1.17.10 // indirect diff --git a/go.sum b/go.sum index c1dd0fd..994cf16 100644 --- a/go.sum +++ b/go.sum @@ -2,17 +2,23 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT fiatjaf.com/lib v0.2.0 h1:TgIJESbbND6GjOgGHxF5jsO6EMjuAxIzZHPo5DXYexs= fiatjaf.com/lib v0.2.0/go.mod h1:Ycqq3+mJ9jAWu7XjbQI1cVr+OFgnHn79dQR5oTII47g= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PowerDNS/lmdb-go v1.9.2 h1:Cmgerh9y3ZKBZGz1irxSShhfmFyRUh+Zdk4cZk7ZJvU= github.com/PowerDNS/lmdb-go v1.9.2/go.mod h1:TE0l+EZK8Z1B4dx070ZxkWTlp8RG1mjN0/+FkFRQMtU= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/aquasecurity/esquery v0.2.0 h1:9WWXve95TE8hbm3736WB7nS6Owl8UGDeu+0jiyE9ttA= github.com/aquasecurity/esquery v0.2.0/go.mod h1:VU+CIFR6C+H142HHZf9RUkp4Eedpo9UrEKeCQHWf9ao= +github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY= +github.com/bep/debounce v1.2.1/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0= github.com/btcsuite/btcd/btcec/v2 v2.3.4 h1:3EJjcN70HCu/mwqlUsGK8GcNVyLVxFDlWurTXGPFfiQ= github.com/btcsuite/btcd/btcec/v2 v2.3.4/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 h1:59Kx4K6lzOW5w6nFlA0v5+lk/6sjybR934QNHSJZPTQ= github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -87,6 +93,8 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/greatroar/blobloom v0.8.0 h1:I9RlEkfqK9/6f1v9mFmDYegDQ/x0mISCpiNpAm23Pt4= +github.com/greatroar/blobloom v0.8.0/go.mod h1:mjMJ1hh1wjGVfr93QIHJ6FfDNVrA0IELv8OvMHJxHKs= github.com/jgroeneveld/schema v1.0.0 h1:J0E10CrOkiSEsw6dfb1IfrDJD14pf6QLVJ3tRPl/syI= github.com/jgroeneveld/schema v1.0.0/go.mod h1:M14lv7sNMtGvo3ops1MwslaSYgDYxrSmbzWIQ0Mr5rs= github.com/jgroeneveld/trial v2.0.0+incompatible h1:d59ctdgor+VqdZCAiUfVN8K13s0ALDioG5DWwZNtRuQ= @@ -126,6 +134,8 @@ github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk= github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/handlers.go b/handlers.go index 7a953bf..914f74a 100644 --- a/handlers.go +++ b/handlers.go @@ -10,9 +10,13 @@ import ( "sync" "time" + "github.com/bep/debounce" "github.com/fasthttp/websocket" "github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr/nip42" + "github.com/nbd-wtf/go-nostr/nip77" + "github.com/nbd-wtf/go-nostr/nip77/negentropy" + "github.com/puzpuzpuz/xsync/v3" "github.com/rs/cors" ) @@ -54,9 +58,10 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { rand.Read(challenge) ws := &WebSocket{ - conn: conn, - Request: r, - Challenge: hex.EncodeToString(challenge), + conn: conn, + Request: r, + Challenge: hex.EncodeToString(challenge), + negentropySessions: xsync.NewMapOf[string, *NegentropySession](), } ws.Context, ws.cancel = context.WithCancel(context.Background()) @@ -123,8 +128,14 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { go func(message []byte) { envelope := nostr.ParseMessage(message) if envelope == nil { - // stop silently - return + if !rl.Negentropy { + // stop silently + return + } + envelope = nip77.ParseNegMessage(message) + if envelope == nil { + return + } } switch env := envelope.(type) { @@ -272,6 +283,75 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { } else { ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: false, Reason: "error: failed to authenticate"}) } + case *nip77.OpenEnvelope: + srl := rl + if rl.getSubRelayFromFilter != nil { + srl = rl.getSubRelayFromFilter(env.Filter) + if !srl.Negentropy { + // ignore + return + } + } + vec, err := srl.startNegentropySession(ctx, env.Filter) + if err != nil { + // fail everything if any filter is rejected + reason := err.Error() + if strings.HasPrefix(reason, "auth-required:") { + RequestAuth(ctx) + } + ws.WriteJSON(nip77.ErrorEnvelope{SubscriptionID: env.SubscriptionID, Reason: reason}) + return + } + + // reconcile to get the next message and return it + neg := negentropy.New(vec, 1024*1024) + out, err := neg.Reconcile(env.Message) + if err != nil { + ws.WriteJSON(nip77.ErrorEnvelope{SubscriptionID: env.SubscriptionID, Reason: err.Error()}) + return + } + ws.WriteJSON(nip77.MessageEnvelope{SubscriptionID: env.SubscriptionID, Message: out}) + + // if the message is not empty that means we'll probably have more reconciliation sessions, so store this + if out != "" { + deb := debounce.New(time.Second * 7) + negSession := &NegentropySession{ + neg: neg, + postponeClose: func() { + deb(func() { + ws.negentropySessions.Delete(env.SubscriptionID) + }) + }, + } + negSession.postponeClose() + + ws.negentropySessions.Store(env.SubscriptionID, negSession) + } + case *nip77.MessageEnvelope: + negSession, ok := ws.negentropySessions.Load(env.SubscriptionID) + if !ok { + // bad luck, your request was destroyed + ws.WriteJSON(nip77.ErrorEnvelope{SubscriptionID: env.SubscriptionID, Reason: "CLOSED"}) + return + } + // reconcile to get the next message and return it + out, err := negSession.neg.Reconcile(env.Message) + if err != nil { + ws.WriteJSON(nip77.ErrorEnvelope{SubscriptionID: env.SubscriptionID, Reason: err.Error()}) + ws.negentropySessions.Delete(env.SubscriptionID) + return + } + ws.WriteJSON(nip77.MessageEnvelope{SubscriptionID: env.SubscriptionID, Message: out}) + + // if there is more reconciliation to do, postpone this + if out != "" { + negSession.postponeClose() + } else { + // otherwise we can just close it + ws.negentropySessions.Delete(env.SubscriptionID) + } + case *nip77.CloseEnvelope: + ws.negentropySessions.Delete(env.SubscriptionID) } }(message) } diff --git a/negentropy.go b/negentropy.go new file mode 100644 index 0000000..4457153 --- /dev/null +++ b/negentropy.go @@ -0,0 +1,50 @@ +package khatru + +import ( + "context" + "errors" + "fmt" + + "github.com/nbd-wtf/go-nostr" + "github.com/nbd-wtf/go-nostr/nip77/negentropy" + "github.com/nbd-wtf/go-nostr/nip77/negentropy/storage/vector" +) + +type NegentropySession struct { + neg *negentropy.Negentropy + postponeClose func() +} + +func (rl *Relay) startNegentropySession(ctx context.Context, filter nostr.Filter) (*vector.Vector, error) { + // do the same overwrite/reject flow we do in normal REQs + for _, ovw := range rl.OverwriteFilter { + ovw(ctx, &filter) + } + if filter.LimitZero { + return nil, fmt.Errorf("invalid limit 0") + } + for _, reject := range rl.RejectFilter { + if reject, msg := reject(ctx, filter); reject { + return nil, errors.New(nostr.NormalizeOKMessage(msg, "blocked")) + } + } + + // fetch events and add them to a negentropy Vector store + vec := vector.New() + for _, query := range rl.QueryEvents { + ch, err := query(ctx, filter) + if err != nil { + continue + } else if ch == nil { + continue + } + + for event := range ch { + // since the goal here is to sync databases we won't do fancy stuff like overwrite events + vec.Insert(event.CreatedAt, event.ID) + } + } + vec.Seal() + + return vec, nil +} diff --git a/nip11.go b/nip11.go index 87bc5c4..0f2bf8f 100644 --- a/nip11.go +++ b/nip11.go @@ -11,10 +11,13 @@ func (rl *Relay) HandleNIP11(w http.ResponseWriter, r *http.Request) { info := *rl.Info if len(rl.DeleteEvent) > 0 { - info.SupportedNIPs = append(info.SupportedNIPs, 9) + info.AddSupportedNIP(9) } if len(rl.CountEvents) > 0 { - info.SupportedNIPs = append(info.SupportedNIPs, 45) + info.AddSupportedNIP(45) + } + if rl.Negentropy { + info.AddSupportedNIP(77) } for _, ovw := range rl.OverwriteRelayInformation { diff --git a/relay.go b/relay.go index 69297db..c2c5f49 100644 --- a/relay.go +++ b/relay.go @@ -46,7 +46,7 @@ func NewRelay() *Relay { type Relay struct { ServiceURL string - // these structs keeps track of all the things that can be customized when handling events or requests + // hooks that will be called at various times RejectEvent []func(ctx context.Context, event *nostr.Event) (reject bool, msg string) OverwriteDeletionOutcome []func(ctx context.Context, target *nostr.Event, deletion *nostr.Event) (acceptDeletion bool, msg string) StoreEvent []func(ctx context.Context, event *nostr.Event) error @@ -90,6 +90,9 @@ type Relay struct { listeners []listener clientsMutex sync.Mutex + // set this to true to support negentropy + Negentropy bool + // in case you call Server.Start Addr string serveMux *http.ServeMux diff --git a/websocket.go b/websocket.go index a09694d..2c684b4 100644 --- a/websocket.go +++ b/websocket.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/fasthttp/websocket" + "github.com/puzpuzpuz/xsync/v3" ) type WebSocket struct { @@ -24,6 +25,9 @@ type WebSocket struct { AuthedPublicKey string Authed chan struct{} + // nip77 + negentropySessions *xsync.MapOf[string, *NegentropySession] + authLock sync.Mutex }