diff --git a/go.mod b/go.mod index e12909c..322a246 100644 --- a/go.mod +++ b/go.mod @@ -131,6 +131,7 @@ require ( github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.39.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect + github.com/rabbitmq/amqp091-go v1.6.1 // indirect github.com/rogpeppe/fastuuid v1.2.0 // indirect github.com/rs/zerolog v1.28.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect diff --git a/go.sum b/go.sum index 244f5b3..e4135ad 100644 --- a/go.sum +++ b/go.sum @@ -1014,6 +1014,8 @@ github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJf github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rabbitmq/amqp091-go v1.1.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= +github.com/rabbitmq/amqp091-go v1.6.1 h1:r6HybD9gOdWeUTP9TKIKdcAuFl4Va4p3OmWUUoeICAU= +github.com/rabbitmq/amqp091-go v1.6.1/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= @@ -1334,6 +1336,7 @@ go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0 go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= diff --git a/lib/service/config.go b/lib/service/config.go index cd41902..74b6333 100644 --- a/lib/service/config.go +++ b/lib/service/config.go @@ -6,37 +6,39 @@ import ( ) type Config struct { - DatabaseUri string `envconfig:"DATABASE_URI" required:"true"` - SentryDSN string `envconfig:"SENTRY_DSN"` - SentryTracesSampleRate float64 `envconfig:"SENTRY_TRACES_SAMPLE_RATE"` - LogFilePath string `envconfig:"LOG_FILE_PATH"` - JWTSecret []byte `envconfig:"JWT_SECRET" required:"true"` - AdminToken string `envconfig:"ADMIN_TOKEN"` - JWTRefreshTokenExpiry int `envconfig:"JWT_REFRESH_EXPIRY" default:"604800"` // in seconds, default 7 days - JWTAccessTokenExpiry int `envconfig:"JWT_ACCESS_EXPIRY" default:"172800"` // in seconds, default 2 days - LNDAddress string `envconfig:"LND_ADDRESS" required:"true"` - LNDMacaroonFile string `envconfig:"LND_MACAROON_FILE"` - LNDCertFile string `envconfig:"LND_CERT_FILE"` - LNDMacaroonHex string `envconfig:"LND_MACAROON_HEX"` - LNDCertHex string `envconfig:"LND_CERT_HEX"` - CustomName string `envconfig:"CUSTOM_NAME"` - Host string `envconfig:"HOST" default:"localhost:3000"` - Port int `envconfig:"PORT" default:"3000"` - EnableGRPC bool `envconfig:"ENABLE_GRPC" default:"false"` - GRPCPort int `envconfig:"GRPC_PORT" default:"10009"` - DefaultRateLimit int `envconfig:"DEFAULT_RATE_LIMIT" default:"10"` - StrictRateLimit int `envconfig:"STRICT_RATE_LIMIT" default:"10"` - BurstRateLimit int `envconfig:"BURST_RATE_LIMIT" default:"1"` - EnablePrometheus bool `envconfig:"ENABLE_PROMETHEUS" default:"false"` - PrometheusPort int `envconfig:"PROMETHEUS_PORT" default:"9092"` - WebhookUrl string `envconfig:"WEBHOOK_URL"` - FeeReserve bool `envconfig:"FEE_RESERVE" default:"false"` - AllowAccountCreation bool `envconfig:"ALLOW_ACCOUNT_CREATION" default:"true"` - MinPasswordEntropy int `envconfig:"MIN_PASSWORD_ENTROPY" default:"0"` - MaxReceiveAmount int64 `envconfig:"MAX_RECEIVE_AMOUNT" default:"0"` - MaxSendAmount int64 `envconfig:"MAX_SEND_AMOUNT" default:"0"` - MaxAccountBalance int64 `envconfig:"MAX_ACCOUNT_BALANCE" default:"0"` - Branding BrandingConfig + DatabaseUri string `envconfig:"DATABASE_URI" required:"true"` + SentryDSN string `envconfig:"SENTRY_DSN"` + SentryTracesSampleRate float64 `envconfig:"SENTRY_TRACES_SAMPLE_RATE"` + LogFilePath string `envconfig:"LOG_FILE_PATH"` + JWTSecret []byte `envconfig:"JWT_SECRET" required:"true"` + AdminToken string `envconfig:"ADMIN_TOKEN"` + JWTRefreshTokenExpiry int `envconfig:"JWT_REFRESH_EXPIRY" default:"604800"` // in seconds, default 7 days + JWTAccessTokenExpiry int `envconfig:"JWT_ACCESS_EXPIRY" default:"172800"` // in seconds, default 2 days + LNDAddress string `envconfig:"LND_ADDRESS" required:"true"` + LNDMacaroonFile string `envconfig:"LND_MACAROON_FILE"` + LNDCertFile string `envconfig:"LND_CERT_FILE"` + LNDMacaroonHex string `envconfig:"LND_MACAROON_HEX"` + LNDCertHex string `envconfig:"LND_CERT_HEX"` + CustomName string `envconfig:"CUSTOM_NAME"` + Host string `envconfig:"HOST" default:"localhost:3000"` + Port int `envconfig:"PORT" default:"3000"` + EnableGRPC bool `envconfig:"ENABLE_GRPC" default:"false"` + GRPCPort int `envconfig:"GRPC_PORT" default:"10009"` + DefaultRateLimit int `envconfig:"DEFAULT_RATE_LIMIT" default:"10"` + StrictRateLimit int `envconfig:"STRICT_RATE_LIMIT" default:"10"` + BurstRateLimit int `envconfig:"BURST_RATE_LIMIT" default:"1"` + EnablePrometheus bool `envconfig:"ENABLE_PROMETHEUS" default:"false"` + PrometheusPort int `envconfig:"PROMETHEUS_PORT" default:"9092"` + WebhookUrl string `envconfig:"WEBHOOK_URL"` + FeeReserve bool `envconfig:"FEE_RESERVE" default:"false"` + AllowAccountCreation bool `envconfig:"ALLOW_ACCOUNT_CREATION" default:"true"` + MinPasswordEntropy int `envconfig:"MIN_PASSWORD_ENTROPY" default:"0"` + MaxReceiveAmount int64 `envconfig:"MAX_RECEIVE_AMOUNT" default:"0"` + MaxSendAmount int64 `envconfig:"MAX_SEND_AMOUNT" default:"0"` + MaxAccountBalance int64 `envconfig:"MAX_ACCOUNT_BALANCE" default:"0"` + RabbitMQUri string `envconfig:"RABBITMQ_URI"` + RabbitMQInvoiceExchange string `envconfig:"RABBITMQ_INVOICE_EXCHANGE" default:"lndhub_invoices"` + Branding BrandingConfig } type BrandingConfig struct { diff --git a/lib/service/rabbitmq.go b/lib/service/rabbitmq.go new file mode 100644 index 0000000..8f39ee4 --- /dev/null +++ b/lib/service/rabbitmq.go @@ -0,0 +1,115 @@ +package service + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + + "github.com/getAlby/lndhub.go/common" + "github.com/getAlby/lndhub.go/db/models" + amqp "github.com/rabbitmq/amqp091-go" +) + +func (svc *LndhubService) StartRabbitMqPublisher(ctx context.Context) error { + conn, err := amqp.Dial(svc.Config.RabbitMQUri) + if err != nil { + return err + } + svc.RabbitMqConn = conn + ch, err := conn.Channel() + if err != nil { + return err + } + defer ch.Close() + + err = ch.ExchangeDeclare( + //TODO: review exchange config + svc.Config.RabbitMQInvoiceExchange, + "topic", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + return err + } + + svc.Logger.Infof("Starting rabbitmq publisher") + incomingInvoices := make(chan models.Invoice) + outgoingInvoices := make(chan models.Invoice) + _, err = svc.InvoicePubSub.Subscribe(common.InvoiceTypeIncoming, incomingInvoices) + if err != nil { + svc.Logger.Error(err.Error()) + } + _, err = svc.InvoicePubSub.Subscribe(common.InvoiceTypeOutgoing, outgoingInvoices) + if err != nil { + svc.Logger.Error(err.Error()) + } + for { + select { + case <-ctx.Done(): + return fmt.Errorf("context canceled") + case incoming := <-incomingInvoices: + svc.publishInvoice(ctx, incoming, ch) + case outgoing := <-outgoingInvoices: + svc.publishInvoice(ctx, outgoing, ch) + } + } +} + +func (svc *LndhubService) publishInvoice(ctx context.Context, invoice models.Invoice, ch *amqp.Channel) { + key := fmt.Sprintf("%s.%s.invoice", invoice.Type, invoice.State) + + //Look up the user's login to add it to the invoice + user, err := svc.FindUser(context.Background(), invoice.UserID) + if err != nil { + svc.Logger.Error(err) + return + } + + payload := new(bytes.Buffer) + err = json.NewEncoder(payload).Encode(WebhookInvoicePayload{ + ID: invoice.ID, + Type: invoice.Type, + UserLogin: user.Login, + Amount: invoice.Amount, + Fee: invoice.Fee, + Memo: invoice.Memo, + DescriptionHash: invoice.DescriptionHash, + PaymentRequest: invoice.PaymentRequest, + DestinationPubkeyHex: invoice.DestinationPubkeyHex, + DestinationCustomRecords: invoice.DestinationCustomRecords, + RHash: invoice.RHash, + Preimage: invoice.Preimage, + Keysend: invoice.Keysend, + State: invoice.State, + ErrorMessage: invoice.ErrorMessage, + CreatedAt: invoice.CreatedAt, + ExpiresAt: invoice.ExpiresAt.Time, + UpdatedAt: invoice.UpdatedAt.Time, + SettledAt: invoice.SettledAt.Time, + }) + if err != nil { + svc.Logger.Error(err) + return + } + + err = ch.PublishWithContext(ctx, + svc.Config.RabbitMQInvoiceExchange, + key, + false, + false, + amqp.Publishing{ + ContentType: "application/json", + Body: payload.Bytes(), + }, + ) + if err != nil { + svc.Logger.Error(err) + return + } + svc.Logger.Debugf("Succesfully published %s", payload.String()) +} diff --git a/lib/service/service.go b/lib/service/service.go index 06b3238..72e02e5 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -12,6 +12,8 @@ import ( "github.com/uptrace/bun" "github.com/ziflex/lecho/v3" "golang.org/x/crypto/bcrypt" + + amqp "github.com/rabbitmq/amqp091-go" ) const alphaNumBytes = random.Alphanumeric @@ -23,6 +25,7 @@ type LndhubService struct { Logger *lecho.Logger IdentityPubkey string InvoicePubSub *Pubsub + RabbitMqConn *amqp.Connection } func (svc *LndhubService) GenerateToken(ctx context.Context, login, password, inRefreshToken string) (accessToken, refreshToken string, err error) { diff --git a/main.go b/main.go index 5bbc7f4..7ebdb3d 100644 --- a/main.go +++ b/main.go @@ -185,6 +185,12 @@ func main() { go svc.StartWebhookSubscribtion(webhookCtx, svc.Config.WebhookUrl) defer cancelWebhook() } + //Start rabbit publisher + if svc.Config.RabbitMQUri != "" { + rabbitCtx, cancelRabbit := context.WithCancel(context.Background()) + go svc.StartRabbitMqPublisher(rabbitCtx) + defer cancelRabbit() + } if svc.Config.EnableGRPC { //start grpc server