This commit is contained in:
Lucas Rouckhout
2023-03-04 17:52:03 +01:00
parent bdd9431527
commit dbe5ce83a3
8 changed files with 332 additions and 58 deletions

View File

@@ -91,7 +91,7 @@ func (suite *RabbitMQTestSuite) TestPublishInvoice() {
) )
assert.NoError(suite.T(), err) assert.NoError(suite.T(), err)
err = ch.QueueBind(q.Name, "#", suite.svc.Config.RabbitMQInvoiceExchange, false, nil) err = ch.QueueBind(q.Name, "#", suite.svc.Config.RabbitMQLndhubInvoiceExchange, false, nil)
assert.NoError(suite.T(), err) assert.NoError(suite.T(), err)
invoice := suite.createAddInvoiceReq(1000, "integration test rabbitmq", suite.userToken) invoice := suite.createAddInvoiceReq(1000, "integration test rabbitmq", suite.userToken)
@@ -155,7 +155,7 @@ func (suite *RabbitMQTestSuite) TearDownSuite() {
_, err = ch.QueueDelete(suite.testQueueName, false, false, false) _, err = ch.QueueDelete(suite.testQueueName, false, false, false)
assert.NoError(suite.T(), err) assert.NoError(suite.T(), err)
err = ch.ExchangeDelete(suite.svc.Config.RabbitMQInvoiceExchange, true, false) err = ch.ExchangeDelete(suite.svc.Config.RabbitMQLndhubInvoiceExchange, true, false)
assert.NoError(suite.T(), err) assert.NoError(suite.T(), err)
} }

View File

@@ -61,7 +61,7 @@ func LndHubTestServiceInit(lndClientMock lnd.LightningClientWrapper) (svc *servi
rabbitmqUri, ok := os.LookupEnv("RABBITMQ_URI") rabbitmqUri, ok := os.LookupEnv("RABBITMQ_URI")
if ok { if ok {
c.RabbitMQUri = rabbitmqUri c.RabbitMQUri = rabbitmqUri
c.RabbitMQInvoiceExchange = "test_lndhub_invoices" c.RabbitMQLndhubInvoiceExchange = "test_lndhub_invoices"
} }
dbConn, err := db.Open(c) dbConn, err := db.Open(c)

View File

@@ -42,7 +42,10 @@ type Config struct {
MaxSendAmount int64 `envconfig:"MAX_SEND_AMOUNT" default:"0"` MaxSendAmount int64 `envconfig:"MAX_SEND_AMOUNT" default:"0"`
MaxAccountBalance int64 `envconfig:"MAX_ACCOUNT_BALANCE" default:"0"` MaxAccountBalance int64 `envconfig:"MAX_ACCOUNT_BALANCE" default:"0"`
RabbitMQUri string `envconfig:"RABBITMQ_URI"` RabbitMQUri string `envconfig:"RABBITMQ_URI"`
RabbitMQInvoiceExchange string `envconfig:"RABBITMQ_INVOICE_EXCHANGE" default:"lndhub_invoice"` RabbitMQLndhubInvoiceExchange string `envconfig:"RABBITMQ_INVOICE_EXCHANGE" default:"lndhub_invoice"`
RabbitMQLndInvoiceExchange string `envconfig:"RABBITMQ_LND_INVOICE_EXCHANGE" default:"lnd_invoice"`
RabbitMQInvoiceConsumerQueueName string `envconfig:"RABBITMQ_INVOICE_CONSUMER_QUEUE_NAME" default:"lndhub_invoice_consumer"`
SubscriptionConsumerType string `envconfig:"SUBSCRIPTION_CONSUMER_TYPE" default:"grpc"`
Branding BrandingConfig Branding BrandingConfig
} }

View File

@@ -17,11 +17,6 @@ var bufPool sync.Pool = sync.Pool{
} }
func (svc *LndhubService) StartRabbitMqPublisher(ctx context.Context) error { func (svc *LndhubService) StartRabbitMqPublisher(ctx context.Context) error {
// It is recommended that, when possible, publishers and consumers
// use separate connections so that consumers are isolated from potential
// flow control messures that may be applied to publishing connections.
// We therefore start a single publishing connection here instead of storing
// one on the service object.
conn, err := amqp.Dial(svc.Config.RabbitMQUri) conn, err := amqp.Dial(svc.Config.RabbitMQUri)
if err != nil { if err != nil {
return err return err
@@ -37,7 +32,7 @@ func (svc *LndhubService) StartRabbitMqPublisher(ctx context.Context) error {
err = ch.ExchangeDeclare( err = ch.ExchangeDeclare(
// For the time being we simply declare a single exchange and start pushing to it. // For the time being we simply declare a single exchange and start pushing to it.
// Towards the future however this might become a more involved setup. // Towards the future however this might become a more involved setup.
svc.Config.RabbitMQInvoiceExchange, svc.Config.RabbitMQLndhubInvoiceExchange,
// topic is a type of exchange that allows routing messages to different queue's bases on a routing key // topic is a type of exchange that allows routing messages to different queue's bases on a routing key
"topic", "topic",
// Durable and Non-Auto-Deleted exchanges will survive server restarts and remain // Durable and Non-Auto-Deleted exchanges will survive server restarts and remain
@@ -57,7 +52,7 @@ func (svc *LndhubService) StartRabbitMqPublisher(ctx context.Context) error {
svc.Logger.Infof("Starting rabbitmq publisher") svc.Logger.Infof("Starting rabbitmq publisher")
incomingInvoices, outgoingInvoices, err := svc.subscribeIncomingOutgoingInvoices() incomingInvoices, outgoingInvoices, err := svc.SubscribeIncomingOutgoingInvoices()
if err != nil { if err != nil {
return err return err
} }
@@ -96,7 +91,7 @@ func (svc *LndhubService) publishInvoice(ctx context.Context, invoice models.Inv
} }
err = ch.PublishWithContext(ctx, err = ch.PublishWithContext(ctx,
svc.Config.RabbitMQInvoiceExchange, svc.Config.RabbitMQLndhubInvoiceExchange,
key, key,
false, false,
false, false,

View File

@@ -3,6 +3,7 @@ package service
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/getAlby/lndhub.go/rabbitmq"
"strconv" "strconv"
"github.com/getAlby/lndhub.go/db/models" "github.com/getAlby/lndhub.go/db/models"
@@ -20,6 +21,7 @@ type LndhubService struct {
Config *Config Config *Config
DB *bun.DB DB *bun.DB
LndClient lnd.LightningClientWrapper LndClient lnd.LightningClientWrapper
RabbitMQClient rabbitmq.Client
Logger *lecho.Logger Logger *lecho.Logger
IdentityPubkey string IdentityPubkey string
InvoicePubSub *Pubsub InvoicePubSub *Pubsub

View File

@@ -14,7 +14,7 @@ import (
func (svc *LndhubService) StartWebhookSubscription(ctx context.Context, url string) { func (svc *LndhubService) StartWebhookSubscription(ctx context.Context, url string) {
svc.Logger.Infof("Starting webhook subscription with webhook url %s", svc.Config.WebhookUrl) svc.Logger.Infof("Starting webhook subscription with webhook url %s", svc.Config.WebhookUrl)
incomingInvoices, outgoingInvoices, err := svc.subscribeIncomingOutgoingInvoices() incomingInvoices, outgoingInvoices, err := svc.SubscribeIncomingOutgoingInvoices()
if err != nil { if err != nil {
svc.Logger.Error(err) svc.Logger.Error(err)
} }
@@ -81,7 +81,7 @@ type WebhookInvoicePayload struct {
SettledAt time.Time `json:"settled_at"` SettledAt time.Time `json:"settled_at"`
} }
func (svc *LndhubService) subscribeIncomingOutgoingInvoices() (incoming, outgoing chan models.Invoice, err error) { func (svc *LndhubService) SubscribeIncomingOutgoingInvoices() (incoming, outgoing chan models.Invoice, err error) {
incomingInvoices, _, err := svc.InvoicePubSub.Subscribe(common.InvoiceTypeIncoming) incomingInvoices, _, err := svc.InvoicePubSub.Subscribe(common.InvoiceTypeIncoming)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err

41
main.go
View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"embed" "embed"
"fmt" "fmt"
"github.com/getAlby/lndhub.go/rabbitmq"
"log" "log"
"net" "net"
"net/http" "net/http"
@@ -157,10 +158,29 @@ func main() {
} }
logger.Infof("Connected to LND: %s - %s", getInfo.Alias, getInfo.IdentityPubkey) logger.Infof("Connected to LND: %s - %s", getInfo.Alias, getInfo.IdentityPubkey)
// If not RABBITMQ_URI was provided we will not attempt to create a client
// No rabbitmq features will be available in this case.
var rabbitmqClient rabbitmq.Client
if c.RabbitMQUri != "" {
rabbitmqClient, err = rabbitmq.Dial(c.RabbitMQUri,
rabbitmq.WithLogger(logger),
rabbitmq.WithLndInvoiceExchange(c.RabbitMQLndInvoiceExchange),
rabbitmq.WithLndhubInvoiceExchange(c.RabbitMQLndhubInvoiceExchange),
rabbitmq.WithLndInvoiceConsumerQueueName(c.RabbitMQInvoiceConsumerQueueName),
)
if err != nil {
logger.Fatal(err)
}
// close the connection gently at the end of the runtime
defer rabbitmqClient.Close()
}
svc := &service.LndhubService{ svc := &service.LndhubService{
Config: c, Config: c,
DB: dbConn, DB: dbConn,
LndClient: lndClient, LndClient: lndClient,
RabbitMQClient: rabbitmqClient,
Logger: logger, Logger: logger,
IdentityPubkey: getInfo.IdentityPubkey, IdentityPubkey: getInfo.IdentityPubkey,
InvoicePubSub: service.NewPubsub(), InvoicePubSub: service.NewPubsub(),
@@ -186,11 +206,25 @@ func main() {
// Subscribe to LND invoice updates in the background // Subscribe to LND invoice updates in the background
backgroundWg.Add(1) backgroundWg.Add(1)
go func() { go func() {
switch svc.Config.SubscriptionConsumerType {
case "rabbitmq":
err = svc.RabbitMQClient.SubscribeToLndInvoices(backGroundCtx, svc.ProcessInvoiceUpdate)
if err != nil && err != context.Canceled {
// in case of an error in this routine, we want to restart LNDhub
svc.Logger.Fatal(err)
}
case "grpc":
err = svc.InvoiceUpdateSubscription(backGroundCtx) err = svc.InvoiceUpdateSubscription(backGroundCtx)
if err != nil && err != context.Canceled { if err != nil && err != context.Canceled {
// in case of an error in this routine, we want to restart LNDhub // in case of an error in this routine, we want to restart LNDhub
svc.Logger.Fatal(err) svc.Logger.Fatal(err)
} }
default:
svc.Logger.Fatalf("Unrecognized subscription consumer type %s", svc.Config.SubscriptionConsumerType)
}
svc.Logger.Info("Invoice routine done") svc.Logger.Info("Invoice routine done")
backgroundWg.Done() backgroundWg.Done()
}() }()
@@ -217,15 +251,16 @@ func main() {
}() }()
} }
//Start rabbit publisher //Start rabbit publisher
if svc.Config.RabbitMQUri != "" { if svc.RabbitMQClient != nil {
backgroundWg.Add(1) backgroundWg.Add(1)
go func() { go func() {
err = svc.StartRabbitMqPublisher(backGroundCtx) err = svc.RabbitMQClient.StartPublishInvoices(backGroundCtx, svc.SubscribeIncomingOutgoingInvoices)
if err != nil { if err != nil {
svc.Logger.Error(err) svc.Logger.Error(err)
sentry.CaptureException(err) sentry.CaptureException(err)
} }
svc.Logger.Info("Rabbit routine done")
svc.Logger.Info("Rabbit invoice publisher done")
backgroundWg.Done() backgroundWg.Done()
}() }()
} }

239
rabbitmq/rabbitmq.go Normal file
View File

@@ -0,0 +1,239 @@
package rabbitmq
import (
"context"
"encoding/json"
"github.com/getAlby/lndhub.go/db/models"
"github.com/getsentry/sentry-go"
"github.com/labstack/gommon/log"
"github.com/lightningnetwork/lnd/lnrpc"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/ziflex/lecho/v3"
"os"
)
type Client interface {
SubscribeToLndInvoices(context.Context, InvoiceHandler) error
StartPublishInvoices(context.Context, SubscribeToInvoicesFunc) error
Close() error
}
type DefaultClient struct {
conn *amqp.Connection
// It is recommended that, when possible, publishers and consumers
// use separate connections so that consumers are isolated from potential
// flow control measures that may be applied to publishing connections.
consumeChannel *amqp.Channel
produceChannel *amqp.Channel
logger *lecho.Logger
lndInvoiceConsumerQueueName string
lndInvoiceExchange string
lndhubInvoiceExchange string
}
type ClientOption = func(client *DefaultClient)
func WithLndInvoiceExchange(exchange string) ClientOption {
return func(client *DefaultClient) {
client.lndInvoiceExchange = exchange
}
}
func WithLndhubInvoiceExchange(exchange string) ClientOption {
return func(client *DefaultClient) {
client.lndhubInvoiceExchange = exchange
}
}
func WithLndInvoiceConsumerQueueName(name string) ClientOption {
return func(client *DefaultClient) {
client.lndInvoiceConsumerQueueName = name
}
}
func WithLogger(logger *lecho.Logger) ClientOption {
return func(client *DefaultClient) {
client.logger = logger
}
}
func Dial(uri string, options ...ClientOption) (Client, error) {
conn, err := amqp.Dial(uri)
if err != nil {
return nil, err
}
consumeChannel, err := conn.Channel()
if err != nil {
return nil, err
}
produceChannel, err := conn.Channel()
if err != nil {
return nil, err
}
client := &DefaultClient{
conn: conn,
consumeChannel: consumeChannel,
produceChannel: produceChannel,
logger: lecho.New(
os.Stdout,
lecho.WithLevel(log.DEBUG),
lecho.WithTimestamp(),
),
lndInvoiceConsumerQueueName: "lndhub_invoice_consumer",
lndInvoiceExchange: "lnd_invoice",
lndhubInvoiceExchange: "lndhub_invoice",
}
for _, opt := range options {
opt(client)
}
return client, nil
}
func (client *DefaultClient) Close() error { return client.conn.Close() }
type InvoiceHandler = func(ctx context.Context, invoice *lnrpc.Invoice) error
func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler InvoiceHandler) error {
queue, err := client.consumeChannel.QueueDeclare(
client.lndInvoiceConsumerQueueName,
// Durable and Non-Auto-Deleted queues will survive server restarts and remain
// declared when there are no remaining bindings.
true,
false,
// None-Exclusive means other consumers can consume from this queue.
// Messages from queues are spread out and load balanced between consumers.
// So multiple lndhub.go instances will spread the load of invoices between them
false,
// Nowait: We set this to false as we want to wait for a server response
// to check whether the queue was created successfully
false,
nil,
)
if err != nil {
return err
}
err = client.consumeChannel.QueueBind(
queue.Name,
"#",
client.lndInvoiceExchange,
// Nowait: We set this to false as we want to wait for a server response
// to check whether the queue was created successfully
false,
nil,
)
if err != nil {
return err
}
deliveryChan, err := client.consumeChannel.Consume(
queue.Name,
"",
false,
false,
false,
false,
nil,
)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return context.Canceled
case delivery := <-deliveryChan:
var invoice lnrpc.Invoice
err := json.Unmarshal(delivery.Body, &invoice)
if err != nil {
client.logger.Error(err)
sentry.CaptureException(err)
err = delivery.Nack(false, false)
if err != nil {
client.logger.Error(err)
sentry.CaptureException(err)
}
continue
}
err = handler(ctx, &invoice)
if err != nil {
client.logger.Error(err)
sentry.CaptureException(err)
delivery.Nack(false, false)
continue
}
delivery.Ack(false)
}
}
}
type SubscribeToInvoicesFunc = func() (in chan models.Invoice, out chan models.Invoice, err error)
func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesSubscribeFunc SubscribeToInvoicesFunc) error {
err := client.produceChannel.ExchangeDeclare(
client.lndInvoiceExchange,
// topic is a type of exchange that allows routing messages to different queue's bases on a routing key
"topic",
// Durable and Non-Auto-Deleted exchanges will survive server restarts and remain
// declared when there are no remaining bindings.
true,
false,
// Non-Internal exchange's accept direct publishing
false,
// Nowait: We set this to false as we want to wait for a server response
// to check wether the exchange was created succesfully
false,
nil,
)
if err != nil {
return err
}
client.logger.Info("Starting rabbitmq publisher")
in, out, err := invoicesSubscribeFunc()
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return context.Canceled
case incomingInvoice := <-in:
err = client.publishToLndhubExchange(ctx, incomingInvoice)
if err != nil {
client.logger.Error(err)
sentry.CaptureException(err)
}
case outgoing := <-out:
err = client.publishToLndhubExchange(ctx, outgoing)
if err != nil {
client.logger.Error(err)
sentry.CaptureException(err)
}
}
}
}
func (client *DefaultClient) publishToLndhubExchange(ctx context.Context, invoice models.Invoice) error {
return nil
}