GetListenerFilters() helper and Injector interface for implementations.

This commit is contained in:
fiatjaf
2021-12-26 07:11:54 -03:00
parent b4f94b8fdd
commit ba0d99c131
3 changed files with 45 additions and 1 deletions

View File

@@ -13,3 +13,7 @@ type Relay interface {
SaveEvent(*event.Event) error
QueryEvents(*filter.EventFilter) ([]event.Event, error)
}
type Injector interface {
InjectEvents() chan event.Event
}

View File

@@ -15,6 +15,39 @@ type Listener struct {
var listeners = make(map[*websocket.Conn]map[string]*Listener)
var listenersMutex = sync.Mutex{}
func GetListeningFilters() filter.EventFilters {
var respfilters = make(filter.EventFilters, 0, len(listeners)*2)
listenersMutex.Lock()
defer func() {
listenersMutex.Unlock()
}()
// here we go through all the existing listeners
for _, connlisteners := range listeners {
for _, listener := range connlisteners {
for _, listenerfilter := range listener.filters {
for _, respfilter := range respfilters {
// check if this filter specifically is already added to respfilters
if filter.Equal(listenerfilter, respfilter) {
goto nextconn
}
}
// field not yet present on respfilters, add it
respfilters = append(respfilters, listenerfilter)
// continue to the next filter
nextconn:
continue
}
}
}
// respfilters will be a slice with all the distinct filter we currently have active
return respfilters
}
func setListener(id string, conn *websocket.Conn, filters filter.EventFilters) {
listenersMutex.Lock()
defer func() {

View File

@@ -27,9 +27,16 @@ func Start(relay Relay) {
Log.Fatal().Err(err).Msg("failed to start")
}
// NIP01
router.Path("/").Methods("GET").HandlerFunc(handleWebsocket(relay))
if inj, ok := relay.(Injector); ok {
go func() {
for event := range inj.InjectEvents() {
notifyListeners(&event)
}
}()
}
srv := &http.Server{
Handler: cors.Default().Handler(router),
Addr: s.Host + ":" + s.Port,