diff --git a/broadcasting.go b/broadcasting.go index d435b3d..f35268c 100644 --- a/broadcasting.go +++ b/broadcasting.go @@ -7,5 +7,5 @@ import ( // BroadcastEvent emits an event to all listeners whose filters' match, skipping all filters and actions // it also doesn't attempt to store the event or trigger any reactions or callbacks func (rl *Relay) BroadcastEvent(evt *nostr.Event) { - notifyListeners(evt) + rl.notifyListeners(evt) } diff --git a/docs/cookbook/index.md b/docs/cookbook/index.md index 20301d0..ff15cbe 100644 --- a/docs/cookbook/index.md +++ b/docs/cookbook/index.md @@ -8,3 +8,4 @@ - [Live event generation](custom-live-events) - [Embedding `khatru` inside other Go HTTP servers](embed) - [Generating relays dynamically and serving them from the same path](dynamic) +- [Routing between multiple relays](routing) diff --git a/docs/cookbook/routing.md b/docs/cookbook/routing.md new file mode 100644 index 0000000..2a03034 --- /dev/null +++ b/docs/cookbook/routing.md @@ -0,0 +1,63 @@ +--- +outline: deep +--- + +# Routing + +If you have one (or more) set of policies that have to be executed in sequence (for example, first you check for the presence of a tag, then later in the next policies you use that tag without checking) and they only apply to some class of events, but you still want your relay to deal with other classes of events that can lead to cumbersome sets of rules, always having to check if an event meets the requirements and so on. There is where routing can help you. + +It also can be handy if you get a [`khatru.Relay`](https://pkg.go.dev/github.com/fiatjaf/khatru#Relay) from somewhere else, like a library such as [`relay29`](https://github.com/fiatjaf/relay29), and you want to combine it with other policies without some interfering with the others. As in the example below: + +```go +sk := os.Getenv("RELAY_SECRET_KEY") + +// a relay for NIP-29 groups +groupsStore := badger.BadgerBackend{} +groupsStore.Init() +groupsRelay, _ := khatru29.Init(relay29.Options{Domain: "example.com", DB: groupsStore, SecretKey: sk}) +// ... + +// a relay for everything else +publicStore := slicestore.SliceStore{} +publicStore.Init() +publicRelay := khatru.NewRelay() +publicRelay.StoreEvent = append(publicRelay.StoreEvent, publicStore.SaveEvent) +publicRelay.QueryEvents = append(publicRelay.QueryEvents, publicStore.QueryEvents) +publicRelay.CountEvents = append(publicRelay.CountEvents, publicStore.CountEvents) +publicRelay.DeleteEvent = append(publicRelay.DeleteEvent, publicStore.DeleteEvent) +// ... + +// a higher-level relay that just routes between the two above +router := khatru.NewRouter() + +// route requests and events to the groups relay +router.Route(). + Req(func (filter nostr.Filter) bool { + _, hasHTag := filter.Tags["h"] + if hasHTag { + return true + } + return slices.Contains(filter.Kinds, func (k int) bool { return k == 39000 || k == 39001 || k == 39002 }) + }). + Event(func (event *nostr.Event) bool { + switch { + case event.Kind <= 9021 && event.Kind >= 9000: + return true + case event.Kind <= 39010 && event.Kind >= 39000: + return true + case event.Kind <= 12 && event.Kind >= 9: + return true + case event.Tags.GetFirst([]string{"h", ""}) != nil: + return true + default: + return false + } + }). + Relay(groupsRelay) + +// route requests and events to the other +router.Route(). + Req(func (filter nostr.Filter) bool { return true }). + Event(func (event *nostr.Event) bool { return true }). + Relay(publicRelay) +``` diff --git a/examples/routing/main.go b/examples/routing/main.go new file mode 100644 index 0000000..f2bf9af --- /dev/null +++ b/examples/routing/main.go @@ -0,0 +1,70 @@ +package main + +import ( + "fmt" + "net/http" + "slices" + + "github.com/fiatjaf/eventstore/slicestore" + "github.com/fiatjaf/eventstore/sqlite3" + "github.com/fiatjaf/khatru" + "github.com/nbd-wtf/go-nostr" +) + +func main() { + db1 := slicestore.SliceStore{} + db1.Init() + r1 := khatru.NewRelay() + r1.StoreEvent = append(r1.StoreEvent, db1.SaveEvent) + r1.QueryEvents = append(r1.QueryEvents, db1.QueryEvents) + r1.CountEvents = append(r1.CountEvents, db1.CountEvents) + r1.DeleteEvent = append(r1.DeleteEvent, db1.DeleteEvent) + + db2 := sqlite3.SQLite3Backend{DatabaseURL: "/tmp/t"} + db2.Init() + r2 := khatru.NewRelay() + r2.StoreEvent = append(r2.StoreEvent, db2.SaveEvent) + r2.QueryEvents = append(r2.QueryEvents, db2.QueryEvents) + r2.CountEvents = append(r2.CountEvents, db2.CountEvents) + r2.DeleteEvent = append(r2.DeleteEvent, db2.DeleteEvent) + + db3 := slicestore.SliceStore{} + db3.Init() + r3 := khatru.NewRelay() + r3.StoreEvent = append(r3.StoreEvent, db3.SaveEvent) + r3.QueryEvents = append(r3.QueryEvents, db3.QueryEvents) + r3.CountEvents = append(r3.CountEvents, db3.CountEvents) + r3.DeleteEvent = append(r3.DeleteEvent, db3.DeleteEvent) + + router := khatru.NewRouter() + + router.Route(). + Req(func(filter nostr.Filter) bool { + return slices.Contains(filter.Kinds, 30023) + }). + Event(func(event *nostr.Event) bool { + return event.Kind == 30023 + }). + Relay(r1) + + router.Route(). + Req(func(filter nostr.Filter) bool { + return slices.Contains(filter.Kinds, 1) && slices.Contains(filter.Tags["t"], "spam") + }). + Event(func(event *nostr.Event) bool { + return event.Kind == 1 && event.Tags.GetFirst([]string{"t", "spam"}) != nil + }). + Relay(r2) + + router.Route(). + Req(func(filter nostr.Filter) bool { + return slices.Contains(filter.Kinds, 1) + }). + Event(func(event *nostr.Event) bool { + return event.Kind == 1 + }). + Relay(r3) + + fmt.Println("running on :3334") + http.ListenAndServe(":3334", router) +} diff --git a/get-started.go b/get-started.go index 29b243f..fcc5503 100644 --- a/get-started.go +++ b/get-started.go @@ -49,11 +49,12 @@ func (rl *Relay) Start(host string, port int, started ...chan bool) error { // Shutdown sends a websocket close control message to all connected clients. func (rl *Relay) Shutdown(ctx context.Context) { rl.httpServer.Shutdown(ctx) - - rl.clients.Range(func(conn *websocket.Conn, _ struct{}) bool { - conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second)) - conn.Close() - rl.clients.Delete(conn) - return true - }) + rl.clientsMutex.Lock() + defer rl.clientsMutex.Unlock() + for ws := range rl.clients { + ws.conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second)) + ws.conn.Close() + } + clear(rl.clients) + rl.listeners = rl.listeners[:0] } diff --git a/go.mod b/go.mod index c9df1cd..2fc2d4c 100644 --- a/go.mod +++ b/go.mod @@ -54,3 +54,5 @@ require ( golang.org/x/sys v0.20.0 // indirect google.golang.org/protobuf v1.31.0 // indirect ) + +replace github.com/nbd-wtf/go-nostr => ../go-nostr diff --git a/go.sum b/go.sum index 1b5bd6a..c4d8912 100644 --- a/go.sum +++ b/go.sum @@ -113,8 +113,6 @@ github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI= github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= -github.com/nbd-wtf/go-nostr v0.34.3 h1:JfDOHOje7gzUhisbZD0v2Y9b9vh2PmP6eHsU/GfU8QE= -github.com/nbd-wtf/go-nostr v0.34.3/go.mod h1:NZQkxl96ggbO8rvDpVjcsojJqKTPwqhP4i82O7K5DJs= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/handlers.go b/handlers.go index 3145f29..631c439 100644 --- a/handlers.go +++ b/handlers.go @@ -47,7 +47,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { rl.Log.Printf("failed to upgrade websocket: %v\n", err) return } - rl.clients.Store(conn, struct{}{}) + ticker := time.NewTicker(rl.PingPeriod) // NIP-42 challenge @@ -60,6 +60,10 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { Challenge: hex.EncodeToString(challenge), } + rl.clientsMutex.Lock() + rl.clients[ws] = make([]listenerSpec, 0, 2) + rl.clientsMutex.Unlock() + ctx, cancel := context.WithCancel( context.WithValue( context.Background(), @@ -74,11 +78,22 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { ticker.Stop() cancel() - if _, ok := rl.clients.Load(conn); ok { - conn.Close() - rl.clients.Delete(conn) - removeListener(ws) + conn.Close() + + rl.clientsMutex.Lock() + defer rl.clientsMutex.Unlock() + if specs, ok := rl.clients[ws]; ok { + // swap delete listeners and delete client + for s, spec := range specs { + // no need to cancel contexts since they inherit from the main connection context + // just delete the listeners + srl := spec.subrelay + srl.listeners[spec.index] = srl.listeners[len(srl.listeners)-1] + specs[s] = specs[len(specs)-1] + srl.listeners = srl.listeners[0:len(srl.listeners)] + } } + delete(rl.clients, ws) } go func() { @@ -167,25 +182,30 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { } } + srl := rl + if rl.getSubRelayFromEvent != nil { + srl = rl.getSubRelayFromEvent(&env.Event) + } + var ok bool var writeErr error var skipBroadcast bool if env.Event.Kind == 5 { // this always returns "blocked: " whenever it returns an error - writeErr = rl.handleDeleteRequest(ctx, &env.Event) + writeErr = srl.handleDeleteRequest(ctx, &env.Event) } else { // this will also always return a prefixed reason - skipBroadcast, writeErr = rl.AddEvent(ctx, &env.Event) + skipBroadcast, writeErr = srl.AddEvent(ctx, &env.Event) } var reason string if writeErr == nil { ok = true - for _, ovw := range rl.OverwriteResponseEvent { + for _, ovw := range srl.OverwriteResponseEvent { ovw(ctx, &env.Event) } if !skipBroadcast { - notifyListeners(&env.Event) + srl.notifyListeners(&env.Event) } } else { reason = writeErr.Error() @@ -199,9 +219,14 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: "unsupported: this relay does not support NIP-45"}) return } + var total int64 for _, filter := range env.Filters { - total += rl.handleCountRequest(ctx, ws, filter) + srl := rl + if rl.getSubRelayFromFilter != nil { + srl = rl.getSubRelayFromFilter(filter) + } + total += srl.handleCountRequest(ctx, ws, filter) } ws.WriteJSON(nostr.CountEnvelope{SubscriptionID: env.SubscriptionID, Count: &total}) case *nostr.ReqEnvelope: @@ -216,7 +241,11 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { // handle each filter separately -- dispatching events as they're loaded from databases for _, filter := range env.Filters { - err := rl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter) + srl := rl + if rl.getSubRelayFromFilter != nil { + srl = rl.getSubRelayFromFilter(filter) + } + err := srl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter) if err != nil { // fail everything if any filter is rejected reason := err.Error() @@ -226,6 +255,8 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: reason}) cancelReqCtx(errors.New("filter rejected")) return + } else { + rl.addListener(ws, env.SubscriptionID, srl, filter, cancelReqCtx) } } @@ -236,10 +267,9 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { cancelReqCtx(nil) ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID)) }() - - setListener(env.SubscriptionID, ws, env.Filters, cancelReqCtx) case *nostr.CloseEnvelope: - removeListenerId(ws, string(*env)) + id := string(*env) + rl.removeListenerId(ws, id) case *nostr.AuthEnvelope: wsBaseUrl := strings.Replace(rl.ServiceURL, "http", "ws", 1) if pubkey, ok := nip42.ValidateAuthEvent(&env.Event, ws.Challenge, wsBaseUrl); ok { diff --git a/listener.go b/listener.go index 669dacd..13fab40 100644 --- a/listener.go +++ b/listener.go @@ -2,86 +2,91 @@ package khatru import ( "context" - "fmt" + "errors" "github.com/nbd-wtf/go-nostr" - "github.com/puzpuzpuz/xsync/v3" ) -type Listener struct { - filters nostr.Filters - cancel context.CancelCauseFunc +var ErrSubscriptionClosedByClient = errors.New("subscription closed by client") + +type listenerSpec struct { + subscriptionId string // kept here so we can easily match against it removeListenerId + cancel context.CancelCauseFunc + index int + subrelay *Relay // this is important when we're dealing with routing, otherwise it will be always the same } -var listeners = xsync.NewMapOf[*WebSocket, *xsync.MapOf[string, *Listener]]() +type listener struct { + subscriptionId string // duplicated here so we can easily send it on notifyListeners + filter nostr.Filter + ws *WebSocket +} -func GetListeningFilters() nostr.Filters { - respfilters := make(nostr.Filters, 0, listeners.Size()*2) - - // here we go through all the existing listeners - listeners.Range(func(_ *WebSocket, subs *xsync.MapOf[string, *Listener]) bool { - subs.Range(func(_ string, listener *Listener) bool { - for _, listenerfilter := range listener.filters { - for _, respfilter := range respfilters { - // check if this filter specifically is already added to respfilters - if nostr.FilterEqual(listenerfilter, respfilter) { - goto nextconn - } - } - - // field not yet present on respfilters, add it - respfilters = append(respfilters, listenerfilter) - - // continue to the next filter - nextconn: - continue - } - - return true - }) - - return true - }) - - // respfilters will be a slice with all the distinct filter we currently have active +func (rl *Relay) GetListeningFilters() []nostr.Filter { + respfilters := make([]nostr.Filter, len(rl.listeners)) + for i, l := range rl.listeners { + respfilters[i] = l.filter + } return respfilters } -func setListener(id string, ws *WebSocket, filters nostr.Filters, cancel context.CancelCauseFunc) { - subs, _ := listeners.LoadOrCompute(ws, func() *xsync.MapOf[string, *Listener] { - return xsync.NewMapOf[string, *Listener]() - }) - subs.Store(id, &Listener{filters: filters, cancel: cancel}) +// addListener may be called multiple times for each id and ws -- in which case each filter will +// be added as an independent listener +func (rl *Relay) addListener( + ws *WebSocket, + id string, + subrelay *Relay, + filter nostr.Filter, + cancel context.CancelCauseFunc, +) { + rl.clientsMutex.Lock() + defer rl.clientsMutex.Unlock() + + if specs, ok := rl.clients[ws]; ok /* this will always be true unless client has disconnected very rapidly */ { + idx := len(subrelay.listeners) + rl.clients[ws] = append(specs, listenerSpec{ + subscriptionId: id, + cancel: cancel, + subrelay: subrelay, + index: idx, + }) + subrelay.listeners = append(subrelay.listeners, listener{ + ws: ws, + subscriptionId: id, + filter: filter, + }) + } } // remove a specific subscription id from listeners for a given ws client // and cancel its specific context -func removeListenerId(ws *WebSocket, id string) { - if subs, ok := listeners.Load(ws); ok { - if listener, ok := subs.LoadAndDelete(id); ok { - listener.cancel(fmt.Errorf("subscription closed by client")) - } - if subs.Size() == 0 { - listeners.Delete(ws) +func (rl *Relay) removeListenerId(ws *WebSocket, id string) { + rl.clientsMutex.Lock() + defer rl.clientsMutex.Unlock() + + if specs, ok := rl.clients[ws]; ok { + // swap delete specs that match this id + nswaps := 0 + for s, spec := range specs { + if spec.subscriptionId == id { + spec.cancel(ErrSubscriptionClosedByClient) + specs[s] = specs[len(specs)-1-nswaps] + nswaps++ + + // swap delete listeners one at a time, as they may be each in a different subrelay + srl := spec.subrelay // == rl in normal cases, but different when this came from a route + srl.listeners[spec.index] = srl.listeners[len(srl.listeners)-1] + srl.listeners = srl.listeners[0 : len(srl.listeners)-1] + } } + rl.clients[ws] = specs[0 : len(specs)-nswaps] } } -// remove WebSocket conn from listeners -// (no need to cancel contexts as they are all inherited from the main connection context) -func removeListener(ws *WebSocket) { - listeners.Delete(ws) -} - -func notifyListeners(event *nostr.Event) { - listeners.Range(func(ws *WebSocket, subs *xsync.MapOf[string, *Listener]) bool { - subs.Range(func(id string, listener *Listener) bool { - if !listener.filters.Match(event) { - return true - } - ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event}) - return true - }) - return true - }) +func (rl *Relay) notifyListeners(event *nostr.Event) { + for _, listener := range rl.listeners { + if listener.filter.Matches(event) { + listener.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &listener.subscriptionId, Event: *event}) + } + } } diff --git a/relay.go b/relay.go index f7e72be..1bd48dc 100644 --- a/relay.go +++ b/relay.go @@ -5,16 +5,16 @@ import ( "log" "net/http" "os" + "sync" "time" "github.com/fasthttp/websocket" "github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr/nip11" - "github.com/puzpuzpuz/xsync/v3" ) func NewRelay() *Relay { - return &Relay{ + rl := &Relay{ Log: log.New(os.Stderr, "[khatru-relay] ", log.LstdFlags), Info: &nip11.RelayInformationDocument{ @@ -29,7 +29,9 @@ func NewRelay() *Relay { CheckOrigin: func(r *http.Request) bool { return true }, }, - clients: xsync.NewMapOf[*websocket.Conn, struct{}](), + clients: make(map[*WebSocket][]listenerSpec, 100), + listeners: make([]listener, 0, 100), + serveMux: &http.ServeMux{}, WriteWait: 10 * time.Second, @@ -37,28 +39,36 @@ func NewRelay() *Relay { PingPeriod: 30 * time.Second, MaxMessageSize: 512000, } + + return rl } type Relay struct { ServiceURL string + // these structs keeps track of all the things that can be customized when handling events or requests RejectEvent []func(ctx context.Context, event *nostr.Event) (reject bool, msg string) - RejectFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string) - RejectCountFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string) - RejectConnection []func(r *http.Request) bool OverwriteDeletionOutcome []func(ctx context.Context, target *nostr.Event, deletion *nostr.Event) (acceptDeletion bool, msg string) - OverwriteResponseEvent []func(ctx context.Context, event *nostr.Event) - OverwriteFilter []func(ctx context.Context, filter *nostr.Filter) - OverwriteCountFilter []func(ctx context.Context, filter *nostr.Filter) - OverwriteRelayInformation []func(ctx context.Context, r *http.Request, info nip11.RelayInformationDocument) nip11.RelayInformationDocument StoreEvent []func(ctx context.Context, event *nostr.Event) error DeleteEvent []func(ctx context.Context, event *nostr.Event) error - QueryEvents []func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) - CountEvents []func(ctx context.Context, filter nostr.Filter) (int64, error) - OnConnect []func(ctx context.Context) - OnDisconnect []func(ctx context.Context) OnEventSaved []func(ctx context.Context, event *nostr.Event) OnEphemeralEvent []func(ctx context.Context, event *nostr.Event) + RejectFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string) + RejectCountFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string) + OverwriteFilter []func(ctx context.Context, filter *nostr.Filter) + OverwriteCountFilter []func(ctx context.Context, filter *nostr.Filter) + QueryEvents []func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error) + CountEvents []func(ctx context.Context, filter nostr.Filter) (int64, error) + RejectConnection []func(r *http.Request) bool + OnConnect []func(ctx context.Context) + OnDisconnect []func(ctx context.Context) + OverwriteRelayInformation []func(ctx context.Context, r *http.Request, info nip11.RelayInformationDocument) nip11.RelayInformationDocument + OverwriteResponseEvent []func(ctx context.Context, event *nostr.Event) + + // these are used when this relays acts as a router + routes []Route + getSubRelayFromEvent func(*nostr.Event) *Relay // used for handling EVENTs + getSubRelayFromFilter func(nostr.Filter) *Relay // used for handling REQs // setting up handlers here will enable these methods ManagementAPI RelayManagementAPI @@ -74,7 +84,10 @@ type Relay struct { upgrader websocket.Upgrader // keep a connection reference to all connected clients for Server.Shutdown - clients *xsync.MapOf[*websocket.Conn, struct{}] + // also used for keeping track of who is listening to what + clients map[*WebSocket][]listenerSpec + listeners []listener + clientsMutex sync.Mutex // in case you call Server.Start Addr string diff --git a/router.go b/router.go new file mode 100644 index 0000000..bab833b --- /dev/null +++ b/router.go @@ -0,0 +1,67 @@ +package khatru + +import ( + "github.com/nbd-wtf/go-nostr" +) + +type Router struct{ *Relay } + +type Route struct { + eventMatcher func(*nostr.Event) bool + filterMatcher func(nostr.Filter) bool + relay *Relay +} + +type routeBuilder struct { + router *Router + eventMatcher func(*nostr.Event) bool + filterMatcher func(nostr.Filter) bool +} + +func NewRouter() *Router { + rr := &Router{Relay: NewRelay()} + rr.routes = make([]Route, 0, 3) + rr.getSubRelayFromFilter = func(f nostr.Filter) *Relay { + for _, route := range rr.routes { + if route.filterMatcher(f) { + return route.relay + } + } + return rr.Relay + } + rr.getSubRelayFromEvent = func(e *nostr.Event) *Relay { + for _, route := range rr.routes { + if route.eventMatcher(e) { + return route.relay + } + } + return rr.Relay + } + return rr +} + +func (rr *Router) Route() routeBuilder { + return routeBuilder{ + router: rr, + filterMatcher: func(f nostr.Filter) bool { return false }, + eventMatcher: func(e *nostr.Event) bool { return false }, + } +} + +func (rb routeBuilder) Req(fn func(nostr.Filter) bool) routeBuilder { + rb.filterMatcher = fn + return rb +} + +func (rb routeBuilder) Event(fn func(*nostr.Event) bool) routeBuilder { + rb.eventMatcher = fn + return rb +} + +func (rb routeBuilder) Relay(relay *Relay) { + rb.router.routes = append(rb.router.routes, Route{ + filterMatcher: rb.filterMatcher, + eventMatcher: rb.eventMatcher, + relay: relay, + }) +} diff --git a/utils.go b/utils.go index 8c6265b..4060764 100644 --- a/utils.go +++ b/utils.go @@ -47,15 +47,3 @@ func GetIP(ctx context.Context) string { func GetSubscriptionID(ctx context.Context) string { return ctx.Value(subscriptionIdKey).(string) } - -func GetOpenSubscriptions(ctx context.Context) []nostr.Filter { - if subs, ok := listeners.Load(GetConnection(ctx)); ok { - res := make([]nostr.Filter, 0, listeners.Size()*2) - subs.Range(func(_ string, sub *Listener) bool { - res = append(res, sub.filters...) - return true - }) - return res - } - return nil -}