mirror of
https://github.com/getAlby/lndhub.go.git
synced 2025-12-23 15:44:51 +01:00
89 lines
1.9 KiB
Go
89 lines
1.9 KiB
Go
package service
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"github.com/getAlby/lndhub.go/db/models"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
func (svc *LndhubService) StartRabbitMqPublisher(ctx context.Context) error {
|
|
conn, err := amqp.Dial(svc.Config.RabbitMQUri)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
svc.RabbitMqConn = conn
|
|
ch, err := conn.Channel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer ch.Close()
|
|
|
|
err = ch.ExchangeDeclare(
|
|
//TODO: review exchange config
|
|
svc.Config.RabbitMQInvoiceExchange,
|
|
"topic", // type
|
|
true, // durable
|
|
false, // auto-deleted
|
|
false, // internal
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
svc.Logger.Infof("Starting rabbitmq publisher")
|
|
incomingInvoices, outgoingInvoices, err := svc.subscribeIncomingOutgoingInvoices()
|
|
if err != nil {
|
|
svc.Logger.Error(err)
|
|
}
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return fmt.Errorf("context canceled")
|
|
case incoming := <-incomingInvoices:
|
|
svc.publishInvoice(ctx, incoming, ch)
|
|
case outgoing := <-outgoingInvoices:
|
|
svc.publishInvoice(ctx, outgoing, ch)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (svc *LndhubService) publishInvoice(ctx context.Context, invoice models.Invoice, ch *amqp.Channel) {
|
|
key := fmt.Sprintf("%s.%s.invoice", invoice.Type, invoice.State)
|
|
|
|
//Look up the user's login to add it to the invoice
|
|
user, err := svc.FindUser(context.Background(), invoice.UserID)
|
|
if err != nil {
|
|
svc.Logger.Error(err)
|
|
return
|
|
}
|
|
|
|
payload := new(bytes.Buffer)
|
|
err = json.NewEncoder(payload).Encode(convertPayload(invoice, user))
|
|
if err != nil {
|
|
svc.Logger.Error(err)
|
|
return
|
|
}
|
|
|
|
err = ch.PublishWithContext(ctx,
|
|
svc.Config.RabbitMQInvoiceExchange,
|
|
key,
|
|
false,
|
|
false,
|
|
amqp.Publishing{
|
|
ContentType: "application/json",
|
|
Body: payload.Bytes(),
|
|
},
|
|
)
|
|
if err != nil {
|
|
svc.Logger.Error(err)
|
|
return
|
|
}
|
|
svc.Logger.Debugf("Succesfully published %s", payload.String())
|
|
}
|