diff --git a/integration_tests/rabbitmq_test.go b/integration_tests/rabbitmq_test.go index 50076c4..57b0a36 100644 --- a/integration_tests/rabbitmq_test.go +++ b/integration_tests/rabbitmq_test.go @@ -91,7 +91,7 @@ func (suite *RabbitMQTestSuite) TestPublishInvoice() { ) assert.NoError(suite.T(), err) - err = ch.QueueBind(q.Name, "#", suite.svc.Config.RabbitMQInvoiceExchange, false, nil) + err = ch.QueueBind(q.Name, "#", suite.svc.Config.RabbitMQLndhubInvoiceExchange, false, nil) assert.NoError(suite.T(), err) invoice := suite.createAddInvoiceReq(1000, "integration test rabbitmq", suite.userToken) @@ -155,7 +155,7 @@ func (suite *RabbitMQTestSuite) TearDownSuite() { _, err = ch.QueueDelete(suite.testQueueName, false, false, false) assert.NoError(suite.T(), err) - err = ch.ExchangeDelete(suite.svc.Config.RabbitMQInvoiceExchange, true, false) + err = ch.ExchangeDelete(suite.svc.Config.RabbitMQLndhubInvoiceExchange, true, false) assert.NoError(suite.T(), err) } diff --git a/integration_tests/util.go b/integration_tests/util.go index c270141..d1fb353 100644 --- a/integration_tests/util.go +++ b/integration_tests/util.go @@ -61,7 +61,7 @@ func LndHubTestServiceInit(lndClientMock lnd.LightningClientWrapper) (svc *servi rabbitmqUri, ok := os.LookupEnv("RABBITMQ_URI") if ok { c.RabbitMQUri = rabbitmqUri - c.RabbitMQInvoiceExchange = "test_lndhub_invoices" + c.RabbitMQLndhubInvoiceExchange = "test_lndhub_invoices" } dbConn, err := db.Open(c) diff --git a/lib/service/config.go b/lib/service/config.go index 87ca32b..1465565 100644 --- a/lib/service/config.go +++ b/lib/service/config.go @@ -6,44 +6,47 @@ import ( ) type Config struct { - DatabaseUri string `envconfig:"DATABASE_URI" required:"true"` - DatabaseMaxConns int `envconfig:"DATABASE_MAX_CONNS" default:"10"` - DatabaseMaxIdleConns int `envconfig:"DATABASE_MAX_IDLE_CONNS" default:"5"` - DatabaseConnMaxLifetime int `envconfig:"DATABASE_CONN_MAX_LIFETIME" default:"1800"` // 30 minutes - DatabaseTimeout int `envconfig:"DATABASE_TIMEOUT" default:"60"` // 60 seconds - SentryDSN string `envconfig:"SENTRY_DSN"` - DatadogAgentUrl string `envconfig:"DATADOG_AGENT_URL"` - SentryTracesSampleRate float64 `envconfig:"SENTRY_TRACES_SAMPLE_RATE"` - LogFilePath string `envconfig:"LOG_FILE_PATH"` - JWTSecret []byte `envconfig:"JWT_SECRET" required:"true"` - AdminToken string `envconfig:"ADMIN_TOKEN"` - JWTRefreshTokenExpiry int `envconfig:"JWT_REFRESH_EXPIRY" default:"604800"` // in seconds, default 7 days - JWTAccessTokenExpiry int `envconfig:"JWT_ACCESS_EXPIRY" default:"172800"` // in seconds, default 2 days - LNDAddress string `envconfig:"LND_ADDRESS" required:"true"` - LNDMacaroonFile string `envconfig:"LND_MACAROON_FILE"` - LNDCertFile string `envconfig:"LND_CERT_FILE"` - LNDMacaroonHex string `envconfig:"LND_MACAROON_HEX"` - LNDCertHex string `envconfig:"LND_CERT_HEX"` - CustomName string `envconfig:"CUSTOM_NAME"` - Host string `envconfig:"HOST" default:"localhost:3000"` - Port int `envconfig:"PORT" default:"3000"` - EnableGRPC bool `envconfig:"ENABLE_GRPC" default:"false"` - GRPCPort int `envconfig:"GRPC_PORT" default:"10009"` - DefaultRateLimit int `envconfig:"DEFAULT_RATE_LIMIT" default:"10"` - StrictRateLimit int `envconfig:"STRICT_RATE_LIMIT" default:"10"` - BurstRateLimit int `envconfig:"BURST_RATE_LIMIT" default:"1"` - EnablePrometheus bool `envconfig:"ENABLE_PROMETHEUS" default:"false"` - PrometheusPort int `envconfig:"PROMETHEUS_PORT" default:"9092"` - WebhookUrl string `envconfig:"WEBHOOK_URL"` - FeeReserve bool `envconfig:"FEE_RESERVE" default:"false"` - AllowAccountCreation bool `envconfig:"ALLOW_ACCOUNT_CREATION" default:"true"` - MinPasswordEntropy int `envconfig:"MIN_PASSWORD_ENTROPY" default:"0"` - MaxReceiveAmount int64 `envconfig:"MAX_RECEIVE_AMOUNT" default:"0"` - MaxSendAmount int64 `envconfig:"MAX_SEND_AMOUNT" default:"0"` - MaxAccountBalance int64 `envconfig:"MAX_ACCOUNT_BALANCE" default:"0"` - RabbitMQUri string `envconfig:"RABBITMQ_URI"` - RabbitMQInvoiceExchange string `envconfig:"RABBITMQ_INVOICE_EXCHANGE" default:"lndhub_invoice"` - Branding BrandingConfig + DatabaseUri string `envconfig:"DATABASE_URI" required:"true"` + DatabaseMaxConns int `envconfig:"DATABASE_MAX_CONNS" default:"10"` + DatabaseMaxIdleConns int `envconfig:"DATABASE_MAX_IDLE_CONNS" default:"5"` + DatabaseConnMaxLifetime int `envconfig:"DATABASE_CONN_MAX_LIFETIME" default:"1800"` // 30 minutes + DatabaseTimeout int `envconfig:"DATABASE_TIMEOUT" default:"60"` // 60 seconds + SentryDSN string `envconfig:"SENTRY_DSN"` + DatadogAgentUrl string `envconfig:"DATADOG_AGENT_URL"` + SentryTracesSampleRate float64 `envconfig:"SENTRY_TRACES_SAMPLE_RATE"` + LogFilePath string `envconfig:"LOG_FILE_PATH"` + JWTSecret []byte `envconfig:"JWT_SECRET" required:"true"` + AdminToken string `envconfig:"ADMIN_TOKEN"` + JWTRefreshTokenExpiry int `envconfig:"JWT_REFRESH_EXPIRY" default:"604800"` // in seconds, default 7 days + JWTAccessTokenExpiry int `envconfig:"JWT_ACCESS_EXPIRY" default:"172800"` // in seconds, default 2 days + LNDAddress string `envconfig:"LND_ADDRESS" required:"true"` + LNDMacaroonFile string `envconfig:"LND_MACAROON_FILE"` + LNDCertFile string `envconfig:"LND_CERT_FILE"` + LNDMacaroonHex string `envconfig:"LND_MACAROON_HEX"` + LNDCertHex string `envconfig:"LND_CERT_HEX"` + CustomName string `envconfig:"CUSTOM_NAME"` + Host string `envconfig:"HOST" default:"localhost:3000"` + Port int `envconfig:"PORT" default:"3000"` + EnableGRPC bool `envconfig:"ENABLE_GRPC" default:"false"` + GRPCPort int `envconfig:"GRPC_PORT" default:"10009"` + DefaultRateLimit int `envconfig:"DEFAULT_RATE_LIMIT" default:"10"` + StrictRateLimit int `envconfig:"STRICT_RATE_LIMIT" default:"10"` + BurstRateLimit int `envconfig:"BURST_RATE_LIMIT" default:"1"` + EnablePrometheus bool `envconfig:"ENABLE_PROMETHEUS" default:"false"` + PrometheusPort int `envconfig:"PROMETHEUS_PORT" default:"9092"` + WebhookUrl string `envconfig:"WEBHOOK_URL"` + FeeReserve bool `envconfig:"FEE_RESERVE" default:"false"` + AllowAccountCreation bool `envconfig:"ALLOW_ACCOUNT_CREATION" default:"true"` + MinPasswordEntropy int `envconfig:"MIN_PASSWORD_ENTROPY" default:"0"` + MaxReceiveAmount int64 `envconfig:"MAX_RECEIVE_AMOUNT" default:"0"` + MaxSendAmount int64 `envconfig:"MAX_SEND_AMOUNT" default:"0"` + MaxAccountBalance int64 `envconfig:"MAX_ACCOUNT_BALANCE" default:"0"` + RabbitMQUri string `envconfig:"RABBITMQ_URI"` + RabbitMQLndhubInvoiceExchange string `envconfig:"RABBITMQ_INVOICE_EXCHANGE" default:"lndhub_invoice"` + RabbitMQLndInvoiceExchange string `envconfig:"RABBITMQ_LND_INVOICE_EXCHANGE" default:"lnd_invoice"` + RabbitMQInvoiceConsumerQueueName string `envconfig:"RABBITMQ_INVOICE_CONSUMER_QUEUE_NAME" default:"lndhub_invoice_consumer"` + SubscriptionConsumerType string `envconfig:"SUBSCRIPTION_CONSUMER_TYPE" default:"grpc"` + Branding BrandingConfig } type BrandingConfig struct { diff --git a/lib/service/rabbitmq.go b/lib/service/rabbitmq.go index 1bb7f4a..0924b40 100644 --- a/lib/service/rabbitmq.go +++ b/lib/service/rabbitmq.go @@ -17,11 +17,6 @@ var bufPool sync.Pool = sync.Pool{ } func (svc *LndhubService) StartRabbitMqPublisher(ctx context.Context) error { - // It is recommended that, when possible, publishers and consumers - // use separate connections so that consumers are isolated from potential - // flow control messures that may be applied to publishing connections. - // We therefore start a single publishing connection here instead of storing - // one on the service object. conn, err := amqp.Dial(svc.Config.RabbitMQUri) if err != nil { return err @@ -37,7 +32,7 @@ func (svc *LndhubService) StartRabbitMqPublisher(ctx context.Context) error { err = ch.ExchangeDeclare( // For the time being we simply declare a single exchange and start pushing to it. // Towards the future however this might become a more involved setup. - svc.Config.RabbitMQInvoiceExchange, + svc.Config.RabbitMQLndhubInvoiceExchange, // topic is a type of exchange that allows routing messages to different queue's bases on a routing key "topic", // Durable and Non-Auto-Deleted exchanges will survive server restarts and remain @@ -57,7 +52,7 @@ func (svc *LndhubService) StartRabbitMqPublisher(ctx context.Context) error { svc.Logger.Infof("Starting rabbitmq publisher") - incomingInvoices, outgoingInvoices, err := svc.subscribeIncomingOutgoingInvoices() + incomingInvoices, outgoingInvoices, err := svc.SubscribeIncomingOutgoingInvoices() if err != nil { return err } @@ -96,7 +91,7 @@ func (svc *LndhubService) publishInvoice(ctx context.Context, invoice models.Inv } err = ch.PublishWithContext(ctx, - svc.Config.RabbitMQInvoiceExchange, + svc.Config.RabbitMQLndhubInvoiceExchange, key, false, false, diff --git a/lib/service/service.go b/lib/service/service.go index 06b3238..7fe1048 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -3,6 +3,7 @@ package service import ( "context" "fmt" + "github.com/getAlby/lndhub.go/rabbitmq" "strconv" "github.com/getAlby/lndhub.go/db/models" @@ -20,6 +21,7 @@ type LndhubService struct { Config *Config DB *bun.DB LndClient lnd.LightningClientWrapper + RabbitMQClient rabbitmq.Client Logger *lecho.Logger IdentityPubkey string InvoicePubSub *Pubsub diff --git a/lib/service/webhook.go b/lib/service/webhook.go index 27c4d0e..209b6c7 100644 --- a/lib/service/webhook.go +++ b/lib/service/webhook.go @@ -14,7 +14,7 @@ import ( func (svc *LndhubService) StartWebhookSubscription(ctx context.Context, url string) { svc.Logger.Infof("Starting webhook subscription with webhook url %s", svc.Config.WebhookUrl) - incomingInvoices, outgoingInvoices, err := svc.subscribeIncomingOutgoingInvoices() + incomingInvoices, outgoingInvoices, err := svc.SubscribeIncomingOutgoingInvoices() if err != nil { svc.Logger.Error(err) } @@ -81,7 +81,7 @@ type WebhookInvoicePayload struct { SettledAt time.Time `json:"settled_at"` } -func (svc *LndhubService) subscribeIncomingOutgoingInvoices() (incoming, outgoing chan models.Invoice, err error) { +func (svc *LndhubService) SubscribeIncomingOutgoingInvoices() (incoming, outgoing chan models.Invoice, err error) { incomingInvoices, _, err := svc.InvoicePubSub.Subscribe(common.InvoiceTypeIncoming) if err != nil { return nil, nil, err diff --git a/main.go b/main.go index 1b5ceab..30914e8 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "context" "embed" "fmt" + "github.com/getAlby/lndhub.go/rabbitmq" "log" "net" "net/http" @@ -157,10 +158,29 @@ func main() { } logger.Infof("Connected to LND: %s - %s", getInfo.Alias, getInfo.IdentityPubkey) + // If not RABBITMQ_URI was provided we will not attempt to create a client + // No rabbitmq features will be available in this case. + var rabbitmqClient rabbitmq.Client + if c.RabbitMQUri != "" { + rabbitmqClient, err = rabbitmq.Dial(c.RabbitMQUri, + rabbitmq.WithLogger(logger), + rabbitmq.WithLndInvoiceExchange(c.RabbitMQLndInvoiceExchange), + rabbitmq.WithLndhubInvoiceExchange(c.RabbitMQLndhubInvoiceExchange), + rabbitmq.WithLndInvoiceConsumerQueueName(c.RabbitMQInvoiceConsumerQueueName), + ) + if err != nil { + logger.Fatal(err) + } + + // close the connection gently at the end of the runtime + defer rabbitmqClient.Close() + } + svc := &service.LndhubService{ Config: c, DB: dbConn, LndClient: lndClient, + RabbitMQClient: rabbitmqClient, Logger: logger, IdentityPubkey: getInfo.IdentityPubkey, InvoicePubSub: service.NewPubsub(), @@ -186,11 +206,25 @@ func main() { // Subscribe to LND invoice updates in the background backgroundWg.Add(1) go func() { - err = svc.InvoiceUpdateSubscription(backGroundCtx) - if err != nil && err != context.Canceled { - // in case of an error in this routine, we want to restart LNDhub - svc.Logger.Fatal(err) + switch svc.Config.SubscriptionConsumerType { + case "rabbitmq": + err = svc.RabbitMQClient.SubscribeToLndInvoices(backGroundCtx, svc.ProcessInvoiceUpdate) + if err != nil && err != context.Canceled { + // in case of an error in this routine, we want to restart LNDhub + svc.Logger.Fatal(err) + } + + case "grpc": + err = svc.InvoiceUpdateSubscription(backGroundCtx) + if err != nil && err != context.Canceled { + // in case of an error in this routine, we want to restart LNDhub + svc.Logger.Fatal(err) + } + + default: + svc.Logger.Fatalf("Unrecognized subscription consumer type %s", svc.Config.SubscriptionConsumerType) } + svc.Logger.Info("Invoice routine done") backgroundWg.Done() }() @@ -217,15 +251,16 @@ func main() { }() } //Start rabbit publisher - if svc.Config.RabbitMQUri != "" { + if svc.RabbitMQClient != nil { backgroundWg.Add(1) go func() { - err = svc.StartRabbitMqPublisher(backGroundCtx) + err = svc.RabbitMQClient.StartPublishInvoices(backGroundCtx, svc.SubscribeIncomingOutgoingInvoices) if err != nil { svc.Logger.Error(err) sentry.CaptureException(err) } - svc.Logger.Info("Rabbit routine done") + + svc.Logger.Info("Rabbit invoice publisher done") backgroundWg.Done() }() } diff --git a/rabbitmq/rabbitmq.go b/rabbitmq/rabbitmq.go new file mode 100644 index 0000000..d465a5d --- /dev/null +++ b/rabbitmq/rabbitmq.go @@ -0,0 +1,239 @@ +package rabbitmq + +import ( + "context" + "encoding/json" + "github.com/getAlby/lndhub.go/db/models" + "github.com/getsentry/sentry-go" + "github.com/labstack/gommon/log" + "github.com/lightningnetwork/lnd/lnrpc" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/ziflex/lecho/v3" + "os" +) + +type Client interface { + SubscribeToLndInvoices(context.Context, InvoiceHandler) error + StartPublishInvoices(context.Context, SubscribeToInvoicesFunc) error + Close() error +} + +type DefaultClient struct { + conn *amqp.Connection + + // It is recommended that, when possible, publishers and consumers + // use separate connections so that consumers are isolated from potential + // flow control measures that may be applied to publishing connections. + consumeChannel *amqp.Channel + produceChannel *amqp.Channel + + logger *lecho.Logger + + lndInvoiceConsumerQueueName string + lndInvoiceExchange string + lndhubInvoiceExchange string +} + +type ClientOption = func(client *DefaultClient) + +func WithLndInvoiceExchange(exchange string) ClientOption { + return func(client *DefaultClient) { + client.lndInvoiceExchange = exchange + } +} + +func WithLndhubInvoiceExchange(exchange string) ClientOption { + return func(client *DefaultClient) { + client.lndhubInvoiceExchange = exchange + } +} + +func WithLndInvoiceConsumerQueueName(name string) ClientOption { + return func(client *DefaultClient) { + client.lndInvoiceConsumerQueueName = name + } +} + +func WithLogger(logger *lecho.Logger) ClientOption { + return func(client *DefaultClient) { + client.logger = logger + } +} + +func Dial(uri string, options ...ClientOption) (Client, error) { + conn, err := amqp.Dial(uri) + if err != nil { + return nil, err + } + + consumeChannel, err := conn.Channel() + if err != nil { + return nil, err + } + + produceChannel, err := conn.Channel() + if err != nil { + return nil, err + } + + client := &DefaultClient{ + conn: conn, + + consumeChannel: consumeChannel, + produceChannel: produceChannel, + + logger: lecho.New( + os.Stdout, + lecho.WithLevel(log.DEBUG), + lecho.WithTimestamp(), + ), + + lndInvoiceConsumerQueueName: "lndhub_invoice_consumer", + lndInvoiceExchange: "lnd_invoice", + lndhubInvoiceExchange: "lndhub_invoice", + } + + for _, opt := range options { + opt(client) + } + + return client, nil +} + +func (client *DefaultClient) Close() error { return client.conn.Close() } + +type InvoiceHandler = func(ctx context.Context, invoice *lnrpc.Invoice) error + +func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler InvoiceHandler) error { + queue, err := client.consumeChannel.QueueDeclare( + client.lndInvoiceConsumerQueueName, + // Durable and Non-Auto-Deleted queues will survive server restarts and remain + // declared when there are no remaining bindings. + true, + false, + // None-Exclusive means other consumers can consume from this queue. + // Messages from queues are spread out and load balanced between consumers. + // So multiple lndhub.go instances will spread the load of invoices between them + false, + // Nowait: We set this to false as we want to wait for a server response + // to check whether the queue was created successfully + false, + nil, + ) + if err != nil { + return err + } + + err = client.consumeChannel.QueueBind( + queue.Name, + "#", + client.lndInvoiceExchange, + // Nowait: We set this to false as we want to wait for a server response + // to check whether the queue was created successfully + false, + nil, + ) + if err != nil { + return err + } + + deliveryChan, err := client.consumeChannel.Consume( + queue.Name, + "", + false, + false, + false, + false, + nil, + ) + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return context.Canceled + case delivery := <-deliveryChan: + var invoice lnrpc.Invoice + + err := json.Unmarshal(delivery.Body, &invoice) + if err != nil { + client.logger.Error(err) + sentry.CaptureException(err) + + err = delivery.Nack(false, false) + if err != nil { + client.logger.Error(err) + sentry.CaptureException(err) + } + + continue + } + + err = handler(ctx, &invoice) + if err != nil { + client.logger.Error(err) + sentry.CaptureException(err) + + delivery.Nack(false, false) + continue + } + + delivery.Ack(false) + } + } +} + +type SubscribeToInvoicesFunc = func() (in chan models.Invoice, out chan models.Invoice, err error) + +func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesSubscribeFunc SubscribeToInvoicesFunc) error { + err := client.produceChannel.ExchangeDeclare( + client.lndInvoiceExchange, + // topic is a type of exchange that allows routing messages to different queue's bases on a routing key + "topic", + // Durable and Non-Auto-Deleted exchanges will survive server restarts and remain + // declared when there are no remaining bindings. + true, + false, + // Non-Internal exchange's accept direct publishing + false, + // Nowait: We set this to false as we want to wait for a server response + // to check wether the exchange was created succesfully + false, + nil, + ) + if err != nil { + return err + } + + client.logger.Info("Starting rabbitmq publisher") + + in, out, err := invoicesSubscribeFunc() + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return context.Canceled + case incomingInvoice := <-in: + err = client.publishToLndhubExchange(ctx, incomingInvoice) + if err != nil { + client.logger.Error(err) + sentry.CaptureException(err) + } + case outgoing := <-out: + err = client.publishToLndhubExchange(ctx, outgoing) + if err != nil { + client.logger.Error(err) + sentry.CaptureException(err) + } + } + } +} + +func (client *DefaultClient) publishToLndhubExchange(ctx context.Context, invoice models.Invoice) error { + return nil +}