package service import ( "bytes" "context" "encoding/json" "fmt" "sync" "github.com/getAlby/lndhub.go/db/models" "github.com/getsentry/sentry-go" amqp "github.com/rabbitmq/amqp091-go" ) var bufPool sync.Pool = sync.Pool{ New: func() interface{} { return new(bytes.Buffer) }, } func (svc *LndhubService) StartRabbitMqPublisher(ctx context.Context) error { // It is recommended that, when possible, publishers and consumers // use separate connections so that consumers are isolated from potential // flow control messures that may be applied to publishing connections. // We therefore start a single publishing connection here instead of storing // one on the service object. conn, err := amqp.Dial(svc.Config.RabbitMQUri) if err != nil { return err } defer conn.Close() ch, err := conn.Channel() if err != nil { return err } defer ch.Close() err = ch.ExchangeDeclare( // For the time being we simply declare a single exchange and start pushing to it. // Towards the future however this might become a more involved setup. svc.Config.RabbitMQInvoiceExchange, // topic is a type of exchange that allows routing messages to different queue's bases on a routing key "topic", // Durable and Non-Auto-Deleted exchanges will survive server restarts and remain // declared when there are no remaining bindings. true, false, // Non-Internal exchange's accept direct publishing false, // Nowait: We set this to false as we want to wait for a server response // to check wether the exchange was created succesfully false, nil, ) if err != nil { return err } svc.Logger.Infof("Starting rabbitmq publisher") incomingInvoices, outgoingInvoices, err := svc.subscribeIncomingOutgoingInvoices() if err != nil { svc.Logger.Error(err) } for { select { case <-ctx.Done(): return context.Canceled case incoming := <-incomingInvoices: err = svc.publishInvoice(ctx, incoming, ch) if err != nil { svc.Logger.Error(err) sentry.CaptureException(err) } case outgoing := <-outgoingInvoices: err = svc.publishInvoice(ctx, outgoing, ch) if err != nil { svc.Logger.Error(err) sentry.CaptureException(err) } } } } func (svc *LndhubService) publishInvoice(ctx context.Context, invoice models.Invoice, ch *amqp.Channel) error { key := fmt.Sprintf("invoice.%s.%s", invoice.Type, invoice.State) user, err := svc.FindUser(context.Background(), invoice.UserID) if err != nil { return err } payload := bufPool.Get().(*bytes.Buffer) err = json.NewEncoder(payload).Encode(convertPayload(invoice, user)) if err != nil { return err } err = ch.PublishWithContext(ctx, svc.Config.RabbitMQInvoiceExchange, key, false, false, amqp.Publishing{ ContentType: "application/json", Body: payload.Bytes(), }, ) if err != nil { return err } svc.Logger.Debugf("Succesfully published invoice to rabbitmq with RHash %s", invoice.RHash) return nil }