From 27e67b7d569ba98ea57feb261c06be8999cb9b29 Mon Sep 17 00:00:00 2001 From: Lucas Rouckhout Date: Sat, 18 Feb 2023 16:46:56 +0100 Subject: [PATCH] Create the channels during the Subscribe function call and make them buffered --- controllers/invoicestream.ctrl.go | 6 ++---- lib/service/grpc_server.go | 3 +-- lib/service/pubsub.go | 12 ++++++++---- lib/service/webhook.go | 6 ++---- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/controllers/invoicestream.ctrl.go b/controllers/invoicestream.ctrl.go index d07fc4c..bff6be8 100644 --- a/controllers/invoicestream.ctrl.go +++ b/controllers/invoicestream.ctrl.go @@ -6,7 +6,6 @@ import ( "time" "github.com/getAlby/lndhub.go/common" - "github.com/getAlby/lndhub.go/db/models" "github.com/getAlby/lndhub.go/lib/service" "github.com/getAlby/lndhub.go/lib/tokens" "github.com/gorilla/websocket" @@ -32,7 +31,6 @@ func (controller *InvoiceStreamController) StreamInvoices(c echo.Context) error if err != nil { return err } - invoiceChan := make(chan models.Invoice) ticker := time.NewTicker(30 * time.Second) ws, done, err := createWebsocketUpgrader(c) if err != nil { @@ -40,7 +38,7 @@ func (controller *InvoiceStreamController) StreamInvoices(c echo.Context) error } defer ws.Close() //start subscription - subId, err := controller.svc.InvoicePubSub.Subscribe(strconv.FormatInt(userId, 10), invoiceChan) + invoiceChan, subId, err := controller.svc.InvoicePubSub.Subscribe(strconv.FormatInt(userId, 10)) if err != nil { controller.svc.Logger.Error(err) return err @@ -97,7 +95,7 @@ SocketLoop: return nil } -//open the websocket and start listening for close messages in a goroutine +// open the websocket and start listening for close messages in a goroutine func createWebsocketUpgrader(c echo.Context) (conn *websocket.Conn, done chan struct{}, err error) { upgrader := websocket.Upgrader{} upgrader.CheckOrigin = func(r *http.Request) bool { return true } diff --git a/lib/service/grpc_server.go b/lib/service/grpc_server.go index 1133d01..96364a1 100644 --- a/lib/service/grpc_server.go +++ b/lib/service/grpc_server.go @@ -36,8 +36,7 @@ func NewGrpcServer(svc *LndhubService, ctx context.Context) (*Server, error) { } func (s *Server) SubsribeInvoices(req *lndhubrpc.SubsribeInvoicesRequest, srv lndhubrpc.InvoiceSubscription_SubsribeInvoicesServer) error { - incomingInvoices := make(chan models.Invoice) - _, err := s.svc.InvoicePubSub.Subscribe(common.InvoiceTypeIncoming, incomingInvoices) + incomingInvoices, _, err := s.svc.InvoicePubSub.Subscribe(common.InvoiceTypeIncoming) if err != nil { return err } diff --git a/lib/service/pubsub.go b/lib/service/pubsub.go index cd37353..348383d 100644 --- a/lib/service/pubsub.go +++ b/lib/service/pubsub.go @@ -6,6 +6,9 @@ import ( "github.com/getAlby/lndhub.go/db/models" ) +// This should give enough space to allow for some spike in traffic without flooding memory to much. +const DefaultChannelBufSize = 50 + type Pubsub struct { mu sync.RWMutex subs map[string]map[string]chan models.Invoice @@ -17,7 +20,7 @@ func NewPubsub() *Pubsub { return ps } -func (ps *Pubsub) Subscribe(topic string, ch chan models.Invoice) (subId string, err error) { +func (ps *Pubsub) Subscribe(topic string) (chan models.Invoice, string, error) { ps.mu.Lock() defer ps.mu.Unlock() if ps.subs[topic] == nil { @@ -26,11 +29,12 @@ func (ps *Pubsub) Subscribe(topic string, ch chan models.Invoice) (subId string, //re-use preimage code for a uuid preImageHex, err := makePreimageHex() if err != nil { - return "", err + return nil, "", err } - subId = string(preImageHex) + subId := string(preImageHex) + ch := make(chan models.Invoice, DefaultChannelBufSize) ps.subs[topic][subId] = ch - return subId, nil + return ch, subId, nil } func (ps *Pubsub) Unsubscribe(id string, topic string) { diff --git a/lib/service/webhook.go b/lib/service/webhook.go index c8d8b55..9586659 100644 --- a/lib/service/webhook.go +++ b/lib/service/webhook.go @@ -15,13 +15,11 @@ import ( func (svc *LndhubService) StartWebhookSubscribtion(ctx context.Context, url string) { svc.Logger.Infof("Starting webhook subscription with webhook url %s", svc.Config.WebhookUrl) - incomingInvoices := make(chan models.Invoice) - outgoingInvoices := make(chan models.Invoice) - _, err := svc.InvoicePubSub.Subscribe(common.InvoiceTypeIncoming, incomingInvoices) + incomingInvoices, _, err := svc.InvoicePubSub.Subscribe(common.InvoiceTypeIncoming) if err != nil { svc.Logger.Error(err.Error()) } - _, err = svc.InvoicePubSub.Subscribe(common.InvoiceTypeOutgoing, outgoingInvoices) + outgoingInvoices, _, err := svc.InvoicePubSub.Subscribe(common.InvoiceTypeOutgoing) if err != nil { svc.Logger.Error(err.Error()) }