Initial working poc for publishing to rabbitmq

This commit is contained in:
Lucas Rouckhout
2023-02-04 13:03:41 +01:00
parent 0b9f53af7f
commit 5280e7308a
6 changed files with 161 additions and 31 deletions

1
go.mod
View File

@@ -131,6 +131,7 @@ require (
github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.39.0 // indirect github.com/prometheus/common v0.39.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect
github.com/rabbitmq/amqp091-go v1.6.1 // indirect
github.com/rogpeppe/fastuuid v1.2.0 // indirect github.com/rogpeppe/fastuuid v1.2.0 // indirect
github.com/rs/zerolog v1.28.0 // indirect github.com/rs/zerolog v1.28.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect

3
go.sum
View File

@@ -1014,6 +1014,8 @@ github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJf
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rabbitmq/amqp091-go v1.1.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= github.com/rabbitmq/amqp091-go v1.1.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/rabbitmq/amqp091-go v1.6.1 h1:r6HybD9gOdWeUTP9TKIKdcAuFl4Va4p3OmWUUoeICAU=
github.com/rabbitmq/amqp091-go v1.6.1/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
@@ -1334,6 +1336,7 @@ go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=

View File

@@ -36,6 +36,8 @@ type Config struct {
MaxReceiveAmount int64 `envconfig:"MAX_RECEIVE_AMOUNT" default:"0"` MaxReceiveAmount int64 `envconfig:"MAX_RECEIVE_AMOUNT" default:"0"`
MaxSendAmount int64 `envconfig:"MAX_SEND_AMOUNT" default:"0"` MaxSendAmount int64 `envconfig:"MAX_SEND_AMOUNT" default:"0"`
MaxAccountBalance int64 `envconfig:"MAX_ACCOUNT_BALANCE" default:"0"` MaxAccountBalance int64 `envconfig:"MAX_ACCOUNT_BALANCE" default:"0"`
RabbitMQUri string `envconfig:"RABBITMQ_URI"`
RabbitMQInvoiceExchange string `envconfig:"RABBITMQ_INVOICE_EXCHANGE" default:"lndhub_invoices"`
Branding BrandingConfig Branding BrandingConfig
} }

115
lib/service/rabbitmq.go Normal file
View File

@@ -0,0 +1,115 @@
package service
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/getAlby/lndhub.go/common"
"github.com/getAlby/lndhub.go/db/models"
amqp "github.com/rabbitmq/amqp091-go"
)
func (svc *LndhubService) StartRabbitMqPublisher(ctx context.Context) error {
conn, err := amqp.Dial(svc.Config.RabbitMQUri)
if err != nil {
return err
}
svc.RabbitMqConn = conn
ch, err := conn.Channel()
if err != nil {
return err
}
defer ch.Close()
err = ch.ExchangeDeclare(
//TODO: review exchange config
svc.Config.RabbitMQInvoiceExchange,
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
svc.Logger.Infof("Starting rabbitmq publisher")
incomingInvoices := make(chan models.Invoice)
outgoingInvoices := make(chan models.Invoice)
_, err = svc.InvoicePubSub.Subscribe(common.InvoiceTypeIncoming, incomingInvoices)
if err != nil {
svc.Logger.Error(err.Error())
}
_, err = svc.InvoicePubSub.Subscribe(common.InvoiceTypeOutgoing, outgoingInvoices)
if err != nil {
svc.Logger.Error(err.Error())
}
for {
select {
case <-ctx.Done():
return fmt.Errorf("context canceled")
case incoming := <-incomingInvoices:
svc.publishInvoice(ctx, incoming, ch)
case outgoing := <-outgoingInvoices:
svc.publishInvoice(ctx, outgoing, ch)
}
}
}
func (svc *LndhubService) publishInvoice(ctx context.Context, invoice models.Invoice, ch *amqp.Channel) {
key := fmt.Sprintf("%s.%s.invoice", invoice.Type, invoice.State)
//Look up the user's login to add it to the invoice
user, err := svc.FindUser(context.Background(), invoice.UserID)
if err != nil {
svc.Logger.Error(err)
return
}
payload := new(bytes.Buffer)
err = json.NewEncoder(payload).Encode(WebhookInvoicePayload{
ID: invoice.ID,
Type: invoice.Type,
UserLogin: user.Login,
Amount: invoice.Amount,
Fee: invoice.Fee,
Memo: invoice.Memo,
DescriptionHash: invoice.DescriptionHash,
PaymentRequest: invoice.PaymentRequest,
DestinationPubkeyHex: invoice.DestinationPubkeyHex,
DestinationCustomRecords: invoice.DestinationCustomRecords,
RHash: invoice.RHash,
Preimage: invoice.Preimage,
Keysend: invoice.Keysend,
State: invoice.State,
ErrorMessage: invoice.ErrorMessage,
CreatedAt: invoice.CreatedAt,
ExpiresAt: invoice.ExpiresAt.Time,
UpdatedAt: invoice.UpdatedAt.Time,
SettledAt: invoice.SettledAt.Time,
})
if err != nil {
svc.Logger.Error(err)
return
}
err = ch.PublishWithContext(ctx,
svc.Config.RabbitMQInvoiceExchange,
key,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: payload.Bytes(),
},
)
if err != nil {
svc.Logger.Error(err)
return
}
svc.Logger.Debugf("Succesfully published %s", payload.String())
}

View File

@@ -12,6 +12,8 @@ import (
"github.com/uptrace/bun" "github.com/uptrace/bun"
"github.com/ziflex/lecho/v3" "github.com/ziflex/lecho/v3"
"golang.org/x/crypto/bcrypt" "golang.org/x/crypto/bcrypt"
amqp "github.com/rabbitmq/amqp091-go"
) )
const alphaNumBytes = random.Alphanumeric const alphaNumBytes = random.Alphanumeric
@@ -23,6 +25,7 @@ type LndhubService struct {
Logger *lecho.Logger Logger *lecho.Logger
IdentityPubkey string IdentityPubkey string
InvoicePubSub *Pubsub InvoicePubSub *Pubsub
RabbitMqConn *amqp.Connection
} }
func (svc *LndhubService) GenerateToken(ctx context.Context, login, password, inRefreshToken string) (accessToken, refreshToken string, err error) { func (svc *LndhubService) GenerateToken(ctx context.Context, login, password, inRefreshToken string) (accessToken, refreshToken string, err error) {

View File

@@ -185,6 +185,12 @@ func main() {
go svc.StartWebhookSubscribtion(webhookCtx, svc.Config.WebhookUrl) go svc.StartWebhookSubscribtion(webhookCtx, svc.Config.WebhookUrl)
defer cancelWebhook() defer cancelWebhook()
} }
//Start rabbit publisher
if svc.Config.RabbitMQUri != "" {
rabbitCtx, cancelRabbit := context.WithCancel(context.Background())
go svc.StartRabbitMqPublisher(rabbitCtx)
defer cancelRabbit()
}
if svc.Config.EnableGRPC { if svc.Config.EnableGRPC {
//start grpc server //start grpc server