add proper pubsub

This commit is contained in:
kiwiidb
2022-04-06 18:00:04 +02:00
parent 45e0c48f1f
commit a5ae01a5f2
5 changed files with 65 additions and 20 deletions

View File

@@ -33,7 +33,8 @@ func (controller *InvoiceStreamController) StreamInvoices(c echo.Context) error
return err return err
} }
invoiceChan := make(chan models.Invoice) invoiceChan := make(chan models.Invoice)
controller.svc.InvoiceSubscribers[userId] = invoiceChan reqId := c.Response().Header().Get(echo.HeaderXRequestID)
controller.svc.InvoicePubSub.Subscribe(reqId, userId, invoiceChan)
ctx := c.Request().Context() ctx := c.Request().Context()
upgrader := websocket.Upgrader{} upgrader := websocket.Upgrader{}
upgrader.CheckOrigin = func(r *http.Request) bool { return true } upgrader.CheckOrigin = func(r *http.Request) bool { return true }
@@ -80,5 +81,6 @@ SocketLoop:
} }
} }
} }
controller.svc.InvoicePubSub.Unsubscribe(reqId, userId)
return nil return nil
} }

View File

@@ -97,9 +97,7 @@ func (svc *LndhubService) ProcessInvoiceUpdate(ctx context.Context, rawInvoice *
svc.Logger.Errorf("Failed to commit DB transaction user_id:%v invoice_id:%v %v", invoice.UserID, invoice.ID, err) svc.Logger.Errorf("Failed to commit DB transaction user_id:%v invoice_id:%v %v", invoice.UserID, invoice.ID, err)
return err return err
} }
if sub, ok := svc.InvoiceSubscribers[invoice.UserID]; ok { svc.InvoicePubSub.Publish(invoice.UserID, invoice)
sub <- invoice
}
return nil return nil
} }

48
lib/service/pubsub.go Normal file
View File

@@ -0,0 +1,48 @@
package service
import (
"sync"
"github.com/getAlby/lndhub.go/db/models"
)
type Pubsub struct {
mu sync.RWMutex
subs map[int64]map[string]chan models.Invoice
}
func NewPubsub() *Pubsub {
ps := &Pubsub{}
ps.subs = make(map[int64]map[string]chan models.Invoice)
return ps
}
func (ps *Pubsub) Subscribe(id string, topic int64, ch chan models.Invoice) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.subs[topic][id] = ch
}
func (ps *Pubsub) Unsubscribe(id string, topic int64) {
ps.mu.Lock()
defer ps.mu.Unlock()
delete(ps.subs[topic], id)
}
func (ps *Pubsub) Publish(topic int64, msg models.Invoice) {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, ch := range ps.subs[topic] {
ch <- msg
}
}
func (ps *Pubsub) CloseAll() {
for _, subs := range ps.subs {
for _, ch := range subs {
close(ch)
}
}
}

View File

@@ -22,7 +22,7 @@ type LndhubService struct {
LndClient lnd.LightningClientWrapper LndClient lnd.LightningClientWrapper
Logger *lecho.Logger Logger *lecho.Logger
IdentityPubkey string IdentityPubkey string
InvoiceSubscribers map[int64]chan models.Invoice InvoicePubSub *Pubsub
} }
func (svc *LndhubService) GenerateToken(ctx context.Context, login, password, inRefreshToken string) (accessToken, refreshToken string, err error) { func (svc *LndhubService) GenerateToken(ctx context.Context, login, password, inRefreshToken string) (accessToken, refreshToken string, err error) {

View File

@@ -15,7 +15,6 @@ import (
"github.com/getAlby/lndhub.go/controllers" "github.com/getAlby/lndhub.go/controllers"
"github.com/getAlby/lndhub.go/db" "github.com/getAlby/lndhub.go/db"
"github.com/getAlby/lndhub.go/db/migrations" "github.com/getAlby/lndhub.go/db/migrations"
"github.com/getAlby/lndhub.go/db/models"
"github.com/getAlby/lndhub.go/lib" "github.com/getAlby/lndhub.go/lib"
"github.com/getAlby/lndhub.go/lib/responses" "github.com/getAlby/lndhub.go/lib/responses"
"github.com/getAlby/lndhub.go/lib/service" "github.com/getAlby/lndhub.go/lib/service"
@@ -124,7 +123,7 @@ func main() {
LndClient: lndClient, LndClient: lndClient,
Logger: logger, Logger: logger,
IdentityPubkey: getInfo.IdentityPubkey, IdentityPubkey: getInfo.IdentityPubkey,
InvoiceSubscribers: map[int64]chan models.Invoice{}, InvoicePubSub: service.NewPubsub(),
} }
strictRateLimitMiddleware := createRateLimitMiddleware(c.StrictRateLimit, c.BurstRateLimit) strictRateLimitMiddleware := createRateLimitMiddleware(c.StrictRateLimit, c.BurstRateLimit)
@@ -204,9 +203,7 @@ func main() {
e.Logger.Fatal(err) e.Logger.Fatal(err)
} }
//close all channels //close all channels
for _, sub := range svc.InvoiceSubscribers { svc.InvoicePubSub.CloseAll()
close(sub)
}
if echoPrometheus != nil { if echoPrometheus != nil {
if err := echoPrometheus.Shutdown(ctx); err != nil { if err := echoPrometheus.Shutdown(ctx); err != nil {
e.Logger.Fatal(err) e.Logger.Fatal(err)