diff --git a/controllers/invoicestream.ctrl.go b/controllers/invoicestream.ctrl.go index a355cee..bff6be8 100644 --- a/controllers/invoicestream.ctrl.go +++ b/controllers/invoicestream.ctrl.go @@ -6,7 +6,6 @@ import ( "time" "github.com/getAlby/lndhub.go/common" - "github.com/getAlby/lndhub.go/db/models" "github.com/getAlby/lndhub.go/lib/service" "github.com/getAlby/lndhub.go/lib/tokens" "github.com/gorilla/websocket" @@ -39,8 +38,7 @@ func (controller *InvoiceStreamController) StreamInvoices(c echo.Context) error } defer ws.Close() //start subscription - invoiceChan := make(chan models.Invoice, service.DefaultChannelBufSize) - subId, err := controller.svc.InvoicePubSub.Subscribe(strconv.FormatInt(userId, 10), invoiceChan) + invoiceChan, subId, err := controller.svc.InvoicePubSub.Subscribe(strconv.FormatInt(userId, 10)) if err != nil { controller.svc.Logger.Error(err) return err diff --git a/lib/service/grpc_server.go b/lib/service/grpc_server.go index 1133d01..96364a1 100644 --- a/lib/service/grpc_server.go +++ b/lib/service/grpc_server.go @@ -36,8 +36,7 @@ func NewGrpcServer(svc *LndhubService, ctx context.Context) (*Server, error) { } func (s *Server) SubsribeInvoices(req *lndhubrpc.SubsribeInvoicesRequest, srv lndhubrpc.InvoiceSubscription_SubsribeInvoicesServer) error { - incomingInvoices := make(chan models.Invoice) - _, err := s.svc.InvoicePubSub.Subscribe(common.InvoiceTypeIncoming, incomingInvoices) + incomingInvoices, _, err := s.svc.InvoicePubSub.Subscribe(common.InvoiceTypeIncoming) if err != nil { return err } diff --git a/lib/service/pubsub.go b/lib/service/pubsub.go index 95fda59..003d047 100644 --- a/lib/service/pubsub.go +++ b/lib/service/pubsub.go @@ -6,7 +6,7 @@ import ( "github.com/getAlby/lndhub.go/db/models" ) -const DefaultChannelBufSize = 50 +const DefaultChannelBufSize = 20 type Pubsub struct { mu sync.RWMutex @@ -19,7 +19,7 @@ func NewPubsub() *Pubsub { return ps } -func (ps *Pubsub) Subscribe(topic string, ch chan models.Invoice) (subId string, err error) { +func (ps *Pubsub) Subscribe(topic string) (chan models.Invoice, string, error) { ps.mu.Lock() defer ps.mu.Unlock() if ps.subs[topic] == nil { @@ -28,11 +28,13 @@ func (ps *Pubsub) Subscribe(topic string, ch chan models.Invoice) (subId string, //re-use preimage code for a uuid preImageHex, err := makePreimageHex() if err != nil { - return "", err + return nil, "", err } - subId = string(preImageHex) + subId := string(preImageHex) + + ch := make(chan models.Invoice, DefaultChannelBufSize) ps.subs[topic][subId] = ch - return subId, nil + return ch, subId, nil } func (ps *Pubsub) Unsubscribe(id string, topic string) { diff --git a/lib/service/webhook.go b/lib/service/webhook.go index a5f0d1d..d127f13 100644 --- a/lib/service/webhook.go +++ b/lib/service/webhook.go @@ -82,13 +82,11 @@ type WebhookInvoicePayload struct { } func (svc *LndhubService) subscribeIncomingOutgoingInvoices() (incoming, outgoing chan models.Invoice, err error) { - incomingInvoices := make(chan models.Invoice, DefaultChannelBufSize) - outgoingInvoices := make(chan models.Invoice, DefaultChannelBufSize) - _, err = svc.InvoicePubSub.Subscribe(common.InvoiceTypeIncoming, incomingInvoices) + incomingInvoices, _, err := svc.InvoicePubSub.Subscribe(common.InvoiceTypeIncoming) if err != nil { return nil, nil, err } - _, err = svc.InvoicePubSub.Subscribe(common.InvoiceTypeOutgoing, outgoingInvoices) + outgoingInvoices, _, err := svc.InvoicePubSub.Subscribe(common.InvoiceTypeOutgoing) if err != nil { return nil, nil, err }