diff --git a/main.go b/main.go index 1e01b7a..87d4360 100644 --- a/main.go +++ b/main.go @@ -161,14 +161,14 @@ func main() { } logger.Infof("Connected to LND: %s - %s", getInfo.Alias, getInfo.IdentityPubkey) - // If not RABBITMQ_URI was provided we will not attempt to create a client + // If no RABBITMQ_URI was provided we will not attempt to create a client // No rabbitmq features will be available in this case. var rabbitmqClient rabbitmq.Client if c.RabbitMQUri != "" { rabbitmqClient, err = rabbitmq.Dial(c.RabbitMQUri, rabbitmq.WithLogger(logger), rabbitmq.WithLndInvoiceExchange(c.RabbitMQLndInvoiceExchange), - rabbitmq.WithLndhubInvoiceExchange(c.RabbitMQLndhubInvoiceExchange), + rabbitmq.WithLndHubInvoiceExchange(c.RabbitMQLndhubInvoiceExchange), rabbitmq.WithLndInvoiceConsumerQueueName(c.RabbitMQInvoiceConsumerQueueName), ) if err != nil { diff --git a/rabbitmq/rabbitmq.go b/rabbitmq/rabbitmq.go index 65c84a4..7324829 100644 --- a/rabbitmq/rabbitmq.go +++ b/rabbitmq/rabbitmq.go @@ -16,6 +16,11 @@ import ( "sync" ) +// bufPool is a classic buffer pool pattern that allows more clever reuse of heap memory. +// Instead of allocating new memory everytime we need to encode the invoices we +// reuse buffers from this buffer pool. If we consume events sequentially there will +// only be one buffer in this pool at all times, but when scaling to multiple go +// routines this memory pool will scale with it. var bufPool = sync.Pool{ New: func() interface{} { return new(bytes.Buffer) }, } @@ -24,9 +29,18 @@ const ( contentTypeJSON = "application/json" ) +type ( + InvoiceHandler = func(ctx context.Context, invoice *lnrpc.Invoice) error + SubscribeToInvoicesFunc = func() (in chan models.Invoice, out chan models.Invoice, err error) + EncodeWebhookInvoicePayloadFunc = func(ctx context.Context, w io.Writer, invoice models.Invoice) error +) + +// InvoiceHandler is a closure that defined what to do with an invoice once it's been consumed from the rabbitmq topic + type Client interface { SubscribeToLndInvoices(context.Context, InvoiceHandler) error StartPublishInvoices(context.Context, SubscribeToInvoicesFunc, EncodeWebhookInvoicePayloadFunc) error + // Close will close all connections to rabbitmq Close() error } @@ -43,7 +57,7 @@ type DefaultClient struct { lndInvoiceConsumerQueueName string lndInvoiceExchange string - lndhubInvoiceExchange string + lndHubInvoiceExchange string } type ClientOption = func(client *DefaultClient) @@ -54,9 +68,9 @@ func WithLndInvoiceExchange(exchange string) ClientOption { } } -func WithLndhubInvoiceExchange(exchange string) ClientOption { +func WithLndHubInvoiceExchange(exchange string) ClientOption { return func(client *DefaultClient) { - client.lndhubInvoiceExchange = exchange + client.lndHubInvoiceExchange = exchange } } @@ -72,6 +86,7 @@ func WithLogger(logger *lecho.Logger) ClientOption { } } +// Dial sets up a connection to rabbitmq with two channels that are ready to produce and consume func Dial(uri string, options ...ClientOption) (Client, error) { conn, err := amqp.Dial(uri) if err != nil { @@ -102,7 +117,7 @@ func Dial(uri string, options ...ClientOption) (Client, error) { lndInvoiceConsumerQueueName: "lndhub_invoice_consumer", lndInvoiceExchange: "lnd_invoice", - lndhubInvoiceExchange: "lndhub_invoice", + lndHubInvoiceExchange: "lndhub_invoice", } for _, opt := range options { @@ -114,8 +129,6 @@ func Dial(uri string, options ...ClientOption) (Client, error) { func (client *DefaultClient) Close() error { return client.conn.Close() } -type InvoiceHandler = func(ctx context.Context, invoice *lnrpc.Invoice) error - func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler InvoiceHandler) error { queue, err := client.consumeChannel.QueueDeclare( client.lndInvoiceConsumerQueueName, @@ -171,13 +184,14 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler err := json.Unmarshal(delivery.Body, &invoice) if err != nil { - client.logger.Error(err) - sentry.CaptureException(err) + captureErr(client.logger, err) + // If we can't even Unmarshall the message we are dealing with + // badly formatted events. In that case we simply Nack the message + // and explicitly do not requeue it. err = delivery.Nack(false, false) if err != nil { - client.logger.Error(err) - sentry.CaptureException(err) + captureErr(client.logger, err) } continue @@ -185,21 +199,27 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler err = handler(ctx, &invoice) if err != nil { - client.logger.Error(err) - sentry.CaptureException(err) + captureErr(client.logger, err) + + // If for some reason we can't handle the message we instruct rabbitmq to + // requeue the message in hopes of finding another consumer that can deal + // with this message. + err := delivery.Nack(false, true) + if err != nil { + captureErr(client.logger, err) + } - delivery.Nack(false, false) continue } - delivery.Ack(false) + err = delivery.Ack(false) + if err != nil { + captureErr(client.logger, err) + } } } } -type SubscribeToInvoicesFunc = func() (in chan models.Invoice, out chan models.Invoice, err error) -type EncodeWebhookInvoicePayloadFunc = func(ctx context.Context, w io.Writer, invoice models.Invoice) error - func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesSubscribeFunc SubscribeToInvoicesFunc, payloadFunc EncodeWebhookInvoicePayloadFunc) error { err := client.publishChannel.ExchangeDeclare( client.lndInvoiceExchange, @@ -212,7 +232,7 @@ func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesS // Non-Internal exchange's accept direct publishing false, // Nowait: We set this to false as we want to wait for a server response - // to check wether the exchange was created succesfully + // to check whether the exchange was created succesfully false, nil, ) @@ -233,15 +253,15 @@ func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesS return context.Canceled case incomingInvoice := <-in: err = client.publishToLndhubExchange(ctx, incomingInvoice, payloadFunc) + if err != nil { - client.logger.Error(err) - sentry.CaptureException(err) + captureErr(client.logger, err) } case outgoing := <-out: err = client.publishToLndhubExchange(ctx, outgoing, payloadFunc) + if err != nil { - client.logger.Error(err) - sentry.CaptureException(err) + captureErr(client.logger, err) } } } @@ -257,7 +277,7 @@ func (client *DefaultClient) publishToLndhubExchange(ctx context.Context, invoic key := fmt.Sprintf("invoice.%s.%s", invoice.Type, invoice.State) err = client.publishChannel.PublishWithContext(ctx, - client.lndhubInvoiceExchange, + client.lndHubInvoiceExchange, key, false, false, @@ -274,3 +294,8 @@ func (client *DefaultClient) publishToLndhubExchange(ctx context.Context, invoic return nil } + +func captureErr(logger *lecho.Logger, err error) { + logger.Error(err) + sentry.CaptureException(err) +}