Files
lndhub.go/lib/service/rabbitmq.go
2023-02-20 14:42:49 +01:00

114 lines
2.9 KiB
Go

package service
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync"
"github.com/getAlby/lndhub.go/db/models"
"github.com/getsentry/sentry-go"
amqp "github.com/rabbitmq/amqp091-go"
)
var bufPool sync.Pool = sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}
func (svc *LndhubService) StartRabbitMqPublisher(ctx context.Context) error {
// It is recommended that, when possible, publishers and consumers
// use separate connections so that consumers are isolated from potential
// flow control messures that may be applied to publishing connections.
// We therefore start a single publishing connection here instead of storing
// one on the service object.
conn, err := amqp.Dial(svc.Config.RabbitMQUri)
if err != nil {
return err
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
return err
}
defer ch.Close()
err = ch.ExchangeDeclare(
// For the time being we simply declare a single exchange and start pushing to it.
// Towards the future however this might become a more involved setup.
svc.Config.RabbitMQInvoiceExchange,
// topic is a type of exchange that allows routing messages to different queue's bases on a routing key
"topic",
// Durable and Non-Auto-Deleted exchanges will survive server restarts and remain
// declared when there are no remaining bindings.
true,
false,
// Non-Internal exchange's accept direct publishing
false,
// Nowait: We set this to false as we want to wait for a server response
// to check wether the exchange was created succesfully
false,
nil,
)
if err != nil {
return err
}
svc.Logger.Infof("Starting rabbitmq publisher")
incomingInvoices, outgoingInvoices, err := svc.subscribeIncomingOutgoingInvoices()
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return context.Canceled
case incoming := <-incomingInvoices:
err = svc.publishInvoice(ctx, incoming, ch)
if err != nil {
svc.Logger.Error(err)
sentry.CaptureException(err)
}
case outgoing := <-outgoingInvoices:
err = svc.publishInvoice(ctx, outgoing, ch)
if err != nil {
svc.Logger.Error(err)
sentry.CaptureException(err)
}
}
}
}
func (svc *LndhubService) publishInvoice(ctx context.Context, invoice models.Invoice, ch *amqp.Channel) error {
key := fmt.Sprintf("invoice.%s.%s", invoice.Type, invoice.State)
user, err := svc.FindUser(context.Background(), invoice.UserID)
if err != nil {
return err
}
payload := bufPool.Get().(*bytes.Buffer)
err = json.NewEncoder(payload).Encode(convertPayload(invoice, user))
if err != nil {
return err
}
err = ch.PublishWithContext(ctx,
svc.Config.RabbitMQInvoiceExchange,
key,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: payload.Bytes(),
},
)
if err != nil {
return err
}
svc.Logger.Debugf("Succesfully published invoice to rabbitmq with RHash %s", invoice.RHash)
return nil
}