diff --git a/lib/service/config.go b/lib/service/config.go index c61650b..03a37c6 100644 --- a/lib/service/config.go +++ b/lib/service/config.go @@ -47,8 +47,6 @@ type Config struct { RabbitMQLndPaymentExchange string `envconfig:"RABBITMQ_LND_PAYMENT_EXCHANGE" default:"lnd_payment"` RabbitMQInvoiceConsumerQueueName string `envconfig:"RABBITMQ_INVOICE_CONSUMER_QUEUE_NAME" default:"lnd_invoice_consumer"` RabbitMQPaymentConsumerQueueName string `envconfig:"RABBITMQ_PAYMENT_CONSUMER_QUEUE_NAME" default:"lnd_payment_consumer"` - SubscriptionConsumerType string `envconfig:"SUBSCRIPTION_CONSUMER_TYPE" default:"grpc"` - FinalizePendingPaymentsWith string `envconfig:"FINALIZE_PAYMENTS_WITH" default:"grpc"` Branding BrandingConfig } diff --git a/main.go b/main.go index 9ce167b..dfba987 100644 --- a/main.go +++ b/main.go @@ -160,20 +160,20 @@ func main() { // No rabbitmq features will be available in this case. var rabbitmqClient rabbitmq.Client if c.RabbitMQUri != "" { - amqpClient, err := rabbitmq.DialAMQP(c.RabbitMQUri) + amqpClient, err := rabbitmq.DialAMQP(c.RabbitMQUri) if err != nil { logger.Fatal(err) } - defer amqpClient.Close() + defer amqpClient.Close() rabbitmqClient, err = rabbitmq.NewClient(amqpClient, rabbitmq.WithLogger(logger), rabbitmq.WithLndInvoiceExchange(c.RabbitMQLndInvoiceExchange), rabbitmq.WithLndHubInvoiceExchange(c.RabbitMQLndhubInvoiceExchange), rabbitmq.WithLndInvoiceConsumerQueueName(c.RabbitMQInvoiceConsumerQueueName), - rabbitmq.WithLndPaymentExchange(c.RabbitMQLndPaymentExchange), - rabbitmq.WithLndPaymentConsumerQueueName(c.RabbitMQPaymentConsumerQueueName), + rabbitmq.WithLndPaymentExchange(c.RabbitMQLndPaymentExchange), + rabbitmq.WithLndPaymentConsumerQueueName(c.RabbitMQPaymentConsumerQueueName), ) if err != nil { logger.Fatal(err) @@ -211,24 +211,19 @@ func main() { // Subscribe to LND invoice updates in the background backgroundWg.Add(1) go func() { - switch svc.Config.SubscriptionConsumerType { - case "rabbitmq": + if svc.RabbitMQClient != nil { err = svc.RabbitMQClient.SubscribeToLndInvoices(backGroundCtx, svc.ProcessInvoiceUpdate) if err != nil && err != context.Canceled { // in case of an error in this routine, we want to restart LNDhub sentry.CaptureException(err) svc.Logger.Fatal(err) } - - case "grpc": + } else { err = svc.InvoiceUpdateSubscription(backGroundCtx) if err != nil && err != context.Canceled { // in case of an error in this routine, we want to restart LNDhub svc.Logger.Fatal(err) } - - default: - svc.Logger.Fatalf("Unrecognized subscription consumer type %s", svc.Config.SubscriptionConsumerType) } svc.Logger.Info("Invoice routine done") @@ -239,15 +234,13 @@ func main() { // A goroutine will be spawned for each one backgroundWg.Add(1) go func() { - switch svc.Config.FinalizePendingPaymentsWith { - case "rabbitmq": + if svc.RabbitMQClient != nil { err = svc.RabbitMQClient.FinalizeInitializedPayments(backGroundCtx, svc) if err != nil { sentry.CaptureException(err) svc.Logger.Error(err) } - - default: + } else { err = svc.CheckAllPendingOutgoingPayments(backGroundCtx) if err != nil { sentry.CaptureException(err) @@ -259,15 +252,6 @@ func main() { backgroundWg.Done() }() - //Start webhook subscription - if svc.Config.WebhookUrl != "" { - backgroundWg.Add(1) - go func() { - svc.StartWebhookSubscription(backGroundCtx, svc.Config.WebhookUrl) - svc.Logger.Info("Webhook routine done") - backgroundWg.Done() - }() - } //Start rabbit publisher if svc.RabbitMQClient != nil { backgroundWg.Add(1) @@ -284,6 +268,13 @@ func main() { svc.Logger.Info("Rabbit invoice publisher done") backgroundWg.Done() }() + } else { + backgroundWg.Add(1) + go func() { + svc.StartWebhookSubscription(backGroundCtx, svc.Config.WebhookUrl) + svc.Logger.Info("Webhook routine done") + backgroundWg.Done() + }() } //Start Prometheus server if necessary diff --git a/rabbitmq/amqp.go b/rabbitmq/amqp.go index 20a88a0..4d7026c 100644 --- a/rabbitmq/amqp.go +++ b/rabbitmq/amqp.go @@ -16,8 +16,13 @@ import ( const ( defaultHeartbeat = 10 * time.Second defaultLocale = "en_US" + + msgReconnect = "RECONNECT_DONE" + msgClose = "CLOSE" ) +type listenerMsg = string + type AMQPClient interface { Listen(ctx context.Context, exchange string, routingKey string, queueName string, options ...AMQPListenOptions) (<-chan amqp.Delivery, error) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error @@ -37,7 +42,7 @@ type defaultAMQPCLient struct { notifyCloseChan chan *amqp.Error - listeners []chan interface{} + listeners []chan listenerMsg reconFlag atomic.Bool logger *lecho.Logger @@ -90,7 +95,7 @@ func (c *defaultAMQPCLient) connect() error { c.publishChannel = publishChannel c.notifyCloseChan = notifyCloseChan - c.listeners = []chan interface{}{} + c.listeners = []chan listenerMsg{} return nil } @@ -111,6 +116,7 @@ func (c *defaultAMQPCLient) reconnectionLoop() error { expontentialBackoff.MaxElapsedTime = time.Minute c.reconFlag.Store(true) + err := backoff.Retry(func() error { c.logger.Info("amqp: trying to reconnect...") @@ -145,14 +151,19 @@ func (c *defaultAMQPCLient) reconnectionLoop() error { return nil }, expontentialBackoff) + c.reconFlag.Store(false) if err != nil { + for _, listener := range c.listeners { + listener <- msgClose + } + return err } for _, listener := range c.listeners { - listener <- "DONE" + listener <- msgReconnect } } } @@ -236,13 +247,13 @@ func (c *defaultAMQPCLient) Listen(ctx context.Context, exchange string, routing clientChannel := make(chan amqp.Delivery) - notifyReconnectChan := make(chan interface{}, 2) + notifyReconnectChan := make(chan listenerMsg, 2) c.listeners = append(c.listeners, notifyReconnectChan) // This routine functions as a wrapper arround the "raw" delivery channel. // The happy-path of the select statement, i.e. the last one, is to simply // pass on the message we get from the actual amqp channel. If however, a - // message is passed on the notifyReconnectChan it means the recoonection + // message is passed on the notifyReconnectChan it means the reconnection // loop was successful in reconnecting. Which means the listener should // get a new deliveries channel from the new amqp channels that were made. go func() { @@ -251,18 +262,25 @@ func (c *defaultAMQPCLient) Listen(ctx context.Context, exchange string, routing case <-ctx.Done(): return - case <-notifyReconnectChan: - d, err := c.consume(ctx, exchange, routingKey, queueName, options...) - if err != nil { - c.logger.Error(err) + case msg := <-notifyReconnectChan: + switch msg { + case msgReconnect: + d, err := c.consume(ctx, exchange, routingKey, queueName, options...) + if err != nil { + c.logger.Error(err) - return + return + } + + c.logger.Info("succesfully consuming from new deliveries channel") + deliveries = d + + case msgClose: + close(clientChannel) + default: + c.logger.Warnf("amqp: unrecognized message send to listener: %s", msg) } - c.logger.Info("succesfully consuming from new deliveries channel") - - deliveries = d - case delivery, ok := <-deliveries: if ok { clientChannel <- delivery diff --git a/rabbitmq/rabbitmq.go b/rabbitmq/rabbitmq.go index b7dac1b..da6598a 100644 --- a/rabbitmq/rabbitmq.go +++ b/rabbitmq/rabbitmq.go @@ -196,7 +196,7 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv case delivery, ok := <-deliveryChan: if !ok { - return err + return fmt.Errorf("Disconnected from RabbitMQ") } payment := lnrpc.Payment{}