diff --git a/integration_tests/rabbitmq_test.go b/integration_tests/rabbitmq_test.go index 54a8476..f14d1dc 100644 --- a/integration_tests/rabbitmq_test.go +++ b/integration_tests/rabbitmq_test.go @@ -3,9 +3,11 @@ package integration_tests import ( "bytes" "context" + "encoding/hex" "encoding/json" "log" "testing" + "time" "github.com/getAlby/lndhub.go/common" "github.com/getAlby/lndhub.go/controllers" @@ -55,7 +57,6 @@ func (suite *RabbitMQTestSuite) SetupSuite() { ctx, cancel := context.WithCancel(context.Background()) suite.invoiceUpdateSubCancelFn = cancel - go svc.InvoiceUpdateSubscription(ctx) suite.svc = svc e := echo.New() @@ -72,6 +73,92 @@ func (suite *RabbitMQTestSuite) SetupSuite() { }() } +func (suite *RabbitMQTestSuite) TestConsumeAndPublishInvoice() { + conn, err := amqp.Dial(suite.svc.Config.RabbitMQUri) + assert.NoError(suite.T(), err) + defer conn.Close() + + ch, err := conn.Channel() + assert.NoError(suite.T(), err) + + //listen to outgoing lndhub invoice channel to test e2e + q, err := ch.QueueDeclare( + suite.testQueueName, + true, + false, + false, + false, + nil, + ) + assert.NoError(suite.T(), err) + + err = ch.QueueBind(q.Name, "#", suite.svc.Config.RabbitMQLndhubInvoiceExchange, false, nil) + assert.NoError(suite.T(), err) + defer ch.Close() + err = ch.ExchangeDeclare( + suite.svc.Config.RabbitMQLndInvoiceExchange, + // topic is a type of exchange that allows routing messages to different queue's bases on a routing key + "topic", + // Durable and Non-Auto-Deleted exchanges will survive server restarts and remain + // declared when there are no remaining bindings. + true, + false, + // Non-Internal exchange's accept direct publishing + false, + // Nowait: We set this to false as we want to wait for a server response + // to check whether the exchange was created succesfully + false, + nil, + ) + assert.NoError(suite.T(), err) + + go func() { + err = suite.svc.RabbitMQClient.SubscribeToLndInvoices(context.Background(), suite.svc.ProcessInvoiceUpdate) + assert.NoError(suite.T(), err) + }() + time.Sleep(100 * time.Millisecond) + + //create payload + invoice := suite.createAddInvoiceReq(1000, "integration test rabbitmq", suite.userToken) + hash, err := hex.DecodeString(invoice.RHash) + assert.NoError(suite.T(), err) + payload := &lnrpc.Invoice{ + RHash: hash, + AmtPaidSat: 1000, + Settled: true, + SettleDate: time.Now().Unix(), + } + payloadBytes := new(bytes.Buffer) + err = json.NewEncoder(payloadBytes).Encode(payload) + assert.NoError(suite.T(), err) + err = ch.Publish(suite.svc.Config.RabbitMQLndInvoiceExchange, "invoice.incoming.settled", false, false, amqp.Publishing{ + ContentType: "application/json", + Body: payloadBytes.Bytes(), + }) + assert.NoError(suite.T(), err) + + m, err := ch.Consume( + q.Name, + "invoice.*.*", + true, + false, + false, + false, + nil, + ) + assert.NoError(suite.T(), err) + + msg := <-m + + var receivedInvoice models.Invoice + r := bytes.NewReader(msg.Body) + err = json.NewDecoder(r).Decode(&receivedInvoice) + assert.NoError(suite.T(), err) + + assert.Equal(suite.T(), invoice.RHash, receivedInvoice.RHash) + assert.Equal(suite.T(), common.InvoiceTypeIncoming, receivedInvoice.Type) +} + func (suite *RabbitMQTestSuite) TestPublishInvoice() { conn, err := amqp.Dial(suite.svc.Config.RabbitMQUri) assert.NoError(suite.T(), err) @@ -94,6 +181,8 @@ func (suite *RabbitMQTestSuite) TestPublishInvoice() { err = ch.QueueBind(q.Name, "#", suite.svc.Config.RabbitMQLndhubInvoiceExchange, false, nil) assert.NoError(suite.T(), err) + go suite.svc.InvoiceUpdateSubscription(context.Background()) + time.Sleep(100 * time.Microsecond) invoice := suite.createAddInvoiceReq(1000, "integration test rabbitmq", suite.userToken) err = suite.mlnd.mockPaidInvoice(invoice, 0, false, nil) assert.NoError(suite.T(), err) diff --git a/integration_tests/util.go b/integration_tests/util.go index b2851fc..8ab688b 100644 --- a/integration_tests/util.go +++ b/integration_tests/util.go @@ -64,6 +64,7 @@ func LndHubTestServiceInit(lndClientMock lnd.LightningClientWrapper) (svc *servi if ok { c.RabbitMQUri = rabbitmqUri c.RabbitMQLndhubInvoiceExchange = "test_lndhub_invoices" + c.RabbitMQLndInvoiceExchange = "test_lnd_invoices" rabbitmqClient, err = rabbitmq.Dial(c.RabbitMQUri, rabbitmq.WithLndInvoiceExchange(c.RabbitMQLndInvoiceExchange), rabbitmq.WithLndHubInvoiceExchange(c.RabbitMQLndhubInvoiceExchange), diff --git a/rabbitmq/rabbitmq.go b/rabbitmq/rabbitmq.go index d53dbad..d250d4e 100644 --- a/rabbitmq/rabbitmq.go +++ b/rabbitmq/rabbitmq.go @@ -193,6 +193,7 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler return err } + client.logger.Info("Starting RabbitMQ consumer loop") for { select { case <-ctx.Done():