Merge pull request #385 from getAlby/refactor/main

refactor: make main.go smaller
This commit is contained in:
kiwiidb
2023-06-22 09:42:28 +02:00
committed by GitHub
3 changed files with 175 additions and 145 deletions

37
background_routines.go Normal file
View File

@@ -0,0 +1,37 @@
package main
import (
"context"
"fmt"
"github.com/getAlby/lndhub.go/lib/service"
)
func StartInvoiceRoutine(svc *service.LndhubService, backGroundCtx context.Context) (err error) {
switch svc.Config.SubscriptionConsumerType {
case "rabbitmq":
err = svc.RabbitMQClient.SubscribeToLndInvoices(backGroundCtx, svc.ProcessInvoiceUpdate)
if err != nil && err != context.Canceled {
return err
}
case "grpc":
err = svc.InvoiceUpdateSubscription(backGroundCtx)
if err != nil && err != context.Canceled {
return err
}
default:
return fmt.Errorf("Unrecognized subscription consumer type %s", svc.Config.SubscriptionConsumerType)
}
return nil
}
func StartPendingPaymentRoutine(svc *service.LndhubService, backGroundCtx context.Context) (err error) {
switch svc.Config.FinalizePendingPaymentsWith {
case "rabbitmq":
return svc.RabbitMQClient.FinalizeInitializedPayments(backGroundCtx, svc)
default:
return svc.CheckAllPendingOutgoingPayments(backGroundCtx)
}
}

119
init_echo.go Normal file
View File

@@ -0,0 +1,119 @@
package main
import (
"fmt"
"log"
"strconv"
"time"
cache "github.com/SporkHubr/echo-http-cache"
"github.com/SporkHubr/echo-http-cache/adapter/memory"
"github.com/getAlby/lndhub.go/lib"
"github.com/getAlby/lndhub.go/lib/responses"
"github.com/getAlby/lndhub.go/lib/service"
sentryecho "github.com/getsentry/sentry-go/echo"
"github.com/go-playground/validator/v10"
"github.com/labstack/echo-contrib/prometheus"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/rs/zerolog"
"github.com/ziflex/lecho/v3"
"golang.org/x/time/rate"
ddEcho "gopkg.in/DataDog/dd-trace-go.v1/contrib/labstack/echo.v4"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)
func initEcho(c *service.Config, logger *lecho.Logger) (e *echo.Echo) {
// New Echo app
e = echo.New()
e.HideBanner = true
e.HTTPErrorHandler = responses.HTTPErrorHandler
e.Validator = &lib.CustomValidator{Validator: validator.New()}
//if Datadog is configured, add datadog middleware
if c.DatadogAgentUrl != "" {
tracer.Start(tracer.WithAgentAddr(c.DatadogAgentUrl))
defer tracer.Stop()
e.Use(ddEcho.Middleware(ddEcho.WithServiceName("lndhub.go")))
}
e.Use(middleware.Recover())
e.Use(middleware.BodyLimit("250K"))
// set the default rate limit defining the overal max requests/second
e.Use(middleware.RateLimiter(middleware.NewRateLimiterMemoryStore(rate.Limit(c.DefaultRateLimit))))
e.Logger = logger
e.Use(middleware.RequestID())
// Setup exception tracking with Sentry if configured
// sentry init needs to happen before the echo middlewares are added
if c.SentryDSN != "" {
e.Use(sentryecho.New(sentryecho.Options{}))
}
return e
}
func createLoggingMiddleware(logger *lecho.Logger) echo.MiddlewareFunc {
return lecho.Middleware(lecho.Config{
Logger: logger,
Enricher: func(c echo.Context, logger zerolog.Context) zerolog.Context {
return logger.Interface("UserID", c.Get("UserID"))
},
})
}
func createRateLimitMiddleware(requestsPerSecond int, burst int) echo.MiddlewareFunc {
config := middleware.RateLimiterConfig{
Store: middleware.NewRateLimiterMemoryStoreWithConfig(
middleware.RateLimiterMemoryStoreConfig{Rate: rate.Limit(requestsPerSecond), Burst: burst},
),
IdentifierExtractor: func(ctx echo.Context) (string, error) {
userId := ctx.Get("UserID")
id := ctx.RealIP()
if userId != nil {
userIdAsInt64 := ctx.Get("UserID").(int64)
id = strconv.FormatInt(userIdAsInt64, 10)
}
return id, nil
},
}
return middleware.RateLimiterWithConfig(config)
}
func createCacheClient() *cache.Client {
memcached, err := memory.NewAdapter(
memory.AdapterWithAlgorithm(memory.LRU),
memory.AdapterWithCapacity(10000000),
)
if err != nil {
log.Fatalf("Error creating cache client memory adapter: %v", err)
}
cacheClient, err := cache.NewClient(
cache.ClientWithAdapter(memcached),
cache.ClientWithTTL(10*time.Minute),
cache.ClientWithRefreshKey("opn"),
)
if err != nil {
log.Fatalf("Error creating cache client: %v", err)
}
return cacheClient
}
func startPrometheusEcho(logger *lecho.Logger, svc *service.LndhubService, e *echo.Echo) {
// Create Prometheus server and Middleware
echoPrometheus := echo.New()
echoPrometheus.HideBanner = true
prom := prometheus.NewPrometheus("echo", nil)
// Scrape metrics from Main Server
e.Use(prom.HandlerFunc)
// Setup metrics endpoint at another server
prom.SetMetricsPath(echoPrometheus)
echoPrometheus.Logger = logger
echoPrometheus.Logger.Infof("Starting prometheus on port %d", svc.Config.PrometheusPort)
echoPrometheus.Logger.Fatal(echoPrometheus.Start(fmt.Sprintf(":%d", svc.Config.PrometheusPort)))
}

164
main.go
View File

@@ -8,38 +8,25 @@ import (
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"time"
"github.com/getAlby/lndhub.go/rabbitmq"
"github.com/rs/zerolog"
cache "github.com/SporkHubr/echo-http-cache"
"github.com/SporkHubr/echo-http-cache/adapter/memory"
"github.com/getAlby/lndhub.go/db"
"github.com/getAlby/lndhub.go/db/migrations"
"github.com/getAlby/lndhub.go/docs"
"github.com/getAlby/lndhub.go/lib"
"github.com/getAlby/lndhub.go/lib/responses"
"github.com/getAlby/lndhub.go/lib/service"
"github.com/getAlby/lndhub.go/lib/tokens"
"github.com/getAlby/lndhub.go/lnd"
"github.com/getsentry/sentry-go"
sentryecho "github.com/getsentry/sentry-go/echo"
"github.com/go-playground/validator/v10"
"github.com/joho/godotenv"
"github.com/kelseyhightower/envconfig"
"github.com/labstack/echo-contrib/prometheus"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/lightningnetwork/lnd/lnrpc"
echoSwagger "github.com/swaggo/echo-swagger"
"github.com/uptrace/bun/migrate"
"github.com/ziflex/lecho/v3"
"golang.org/x/time/rate"
ddEcho "gopkg.in/DataDog/dd-trace-go.v1/contrib/labstack/echo.v4"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)
//go:embed templates/index.html
@@ -111,34 +98,6 @@ func main() {
logger.Errorf("sentry init error: %v", err)
}
}
// New Echo app
e := echo.New()
e.HideBanner = true
e.HTTPErrorHandler = responses.HTTPErrorHandler
e.Validator = &lib.CustomValidator{Validator: validator.New()}
//if Datadog is configured, add datadog middleware
if c.DatadogAgentUrl != "" {
tracer.Start(tracer.WithAgentAddr(c.DatadogAgentUrl))
defer tracer.Stop()
e.Use(ddEcho.Middleware(ddEcho.WithServiceName("lndhub.go")))
}
e.Use(middleware.Recover())
e.Use(middleware.BodyLimit("250K"))
// set the default rate limit defining the overal max requests/second
e.Use(middleware.RateLimiter(middleware.NewRateLimiterMemoryStore(rate.Limit(c.DefaultRateLimit))))
e.Logger = logger
e.Use(middleware.RequestID())
// Setup exception tracking with Sentry if configured
// sentry init needs to happen before the echo middlewares are added
if c.SentryDSN != "" {
e.Use(sentryecho.New(sentryecho.Options{}))
}
// Init new LND client
lndClient, err := lnd.NewLNDclient(lnd.LNDoptions{
Address: c.LNDAddress,
@@ -148,11 +107,11 @@ func main() {
CertHex: c.LNDCertHex,
})
if err != nil {
e.Logger.Fatalf("Error initializing the LND connection: %v", err)
logger.Fatalf("Error initializing the LND connection: %v", err)
}
getInfo, err := lndClient.GetInfo(startupCtx, &lnrpc.GetInfoRequest{})
if err != nil {
e.Logger.Fatalf("Error getting node info: %v", err)
logger.Fatalf("Error getting node info: %v", err)
}
logger.Infof("Connected to LND: %s - %s", getInfo.Alias, getInfo.IdentityPubkey)
@@ -160,20 +119,20 @@ func main() {
// No rabbitmq features will be available in this case.
var rabbitmqClient rabbitmq.Client
if c.RabbitMQUri != "" {
amqpClient, err := rabbitmq.DialAMQP(c.RabbitMQUri)
amqpClient, err := rabbitmq.DialAMQP(c.RabbitMQUri)
if err != nil {
logger.Fatal(err)
}
defer amqpClient.Close()
defer amqpClient.Close()
rabbitmqClient, err = rabbitmq.NewClient(amqpClient,
rabbitmq.WithLogger(logger),
rabbitmq.WithLndInvoiceExchange(c.RabbitMQLndInvoiceExchange),
rabbitmq.WithLndHubInvoiceExchange(c.RabbitMQLndhubInvoiceExchange),
rabbitmq.WithLndInvoiceConsumerQueueName(c.RabbitMQInvoiceConsumerQueueName),
rabbitmq.WithLndPaymentExchange(c.RabbitMQLndPaymentExchange),
rabbitmq.WithLndPaymentConsumerQueueName(c.RabbitMQPaymentConsumerQueueName),
rabbitmq.WithLndPaymentExchange(c.RabbitMQLndPaymentExchange),
rabbitmq.WithLndPaymentConsumerQueueName(c.RabbitMQPaymentConsumerQueueName),
)
if err != nil {
logger.Fatal(err)
@@ -193,6 +152,8 @@ func main() {
RabbitMQClient: rabbitmqClient,
}
//init echo server
e := initEcho(c, logger)
logMw := createLoggingMiddleware(logger)
// strict rate limit for requests for sending payments
strictRateLimitMiddleware := createRateLimitMiddleware(c.StrictRateLimit, c.BurstRateLimit)
@@ -211,48 +172,24 @@ func main() {
// Subscribe to LND invoice updates in the background
backgroundWg.Add(1)
go func() {
switch svc.Config.SubscriptionConsumerType {
case "rabbitmq":
err = svc.RabbitMQClient.SubscribeToLndInvoices(backGroundCtx, svc.ProcessInvoiceUpdate)
if err != nil && err != context.Canceled {
// in case of an error in this routine, we want to restart LNDhub
sentry.CaptureException(err)
svc.Logger.Fatal(err)
}
case "grpc":
err = svc.InvoiceUpdateSubscription(backGroundCtx)
if err != nil && err != context.Canceled {
// in case of an error in this routine, we want to restart LNDhub
svc.Logger.Fatal(err)
}
default:
svc.Logger.Fatalf("Unrecognized subscription consumer type %s", svc.Config.SubscriptionConsumerType)
err = StartInvoiceRoutine(svc, backGroundCtx)
if err != nil {
sentry.CaptureException(err)
//we want to restart in case of an error here
svc.Logger.Fatal(err)
}
svc.Logger.Info("Invoice routine done")
backgroundWg.Done()
}()
// Check the status of all pending outgoing payments
// A goroutine will be spawned for each one
backgroundWg.Add(1)
go func() {
switch svc.Config.FinalizePendingPaymentsWith {
case "rabbitmq":
err = svc.RabbitMQClient.FinalizeInitializedPayments(backGroundCtx, svc)
if err != nil {
sentry.CaptureException(err)
svc.Logger.Error(err)
}
default:
err = svc.CheckAllPendingOutgoingPayments(backGroundCtx)
if err != nil {
sentry.CaptureException(err)
svc.Logger.Error(err)
}
err = StartPendingPaymentRoutine(svc, backGroundCtx)
if err != nil {
sentry.CaptureException(err)
//in case of an error here no restart is necessary
svc.Logger.Error(err)
}
svc.Logger.Info("Pending payment check routines done")
@@ -289,19 +226,7 @@ func main() {
//Start Prometheus server if necessary
var echoPrometheus *echo.Echo
if svc.Config.EnablePrometheus {
// Create Prometheus server and Middleware
echoPrometheus = echo.New()
echoPrometheus.HideBanner = true
prom := prometheus.NewPrometheus("echo", nil)
// Scrape metrics from Main Server
e.Use(prom.HandlerFunc)
// Setup metrics endpoint at another server
prom.SetMetricsPath(echoPrometheus)
go func() {
echoPrometheus.Logger = logger
echoPrometheus.Logger.Infof("Starting prometheus on port %d", svc.Config.PrometheusPort)
echoPrometheus.Logger.Fatal(echoPrometheus.Start(fmt.Sprintf(":%d", svc.Config.PrometheusPort)))
}()
go startPrometheusEcho(logger, svc, e)
}
// Start server
@@ -326,54 +251,3 @@ func main() {
backgroundWg.Wait()
svc.Logger.Info("LNDhub exiting gracefully. Goodbye.")
}
func createLoggingMiddleware(logger *lecho.Logger) echo.MiddlewareFunc {
return lecho.Middleware(lecho.Config{
Logger: logger,
Enricher: func(c echo.Context, logger zerolog.Context) zerolog.Context {
return logger.Interface("UserID", c.Get("UserID"))
},
})
}
func createRateLimitMiddleware(requestsPerSecond int, burst int) echo.MiddlewareFunc {
config := middleware.RateLimiterConfig{
Store: middleware.NewRateLimiterMemoryStoreWithConfig(
middleware.RateLimiterMemoryStoreConfig{Rate: rate.Limit(requestsPerSecond), Burst: burst},
),
IdentifierExtractor: func(ctx echo.Context) (string, error) {
userId := ctx.Get("UserID")
id := ctx.RealIP()
if userId != nil {
userIdAsInt64 := ctx.Get("UserID").(int64)
id = strconv.FormatInt(userIdAsInt64, 10)
}
return id, nil
},
}
return middleware.RateLimiterWithConfig(config)
}
func createCacheClient() *cache.Client {
memcached, err := memory.NewAdapter(
memory.AdapterWithAlgorithm(memory.LRU),
memory.AdapterWithCapacity(10000000),
)
if err != nil {
log.Fatalf("Error creating cache client memory adapter: %v", err)
}
cacheClient, err := cache.NewClient(
cache.ClientWithAdapter(memcached),
cache.ClientWithTTL(10*time.Minute),
cache.ClientWithRefreshKey("opn"),
)
if err != nil {
log.Fatalf("Error creating cache client: %v", err)
}
return cacheClient
}