From 7d489c2ae97f58a1d5c558451bc51f6c03bc8f4e Mon Sep 17 00:00:00 2001 From: Lucas Rouckhout Date: Sat, 1 Apr 2023 13:49:32 +0200 Subject: [PATCH] WIP SAVE --- docker-compose.yml | 42 ++++++++ integration_tests/rabbitmq_test.go | 164 +++++++++++++++++++++++++++++ rabbitmq/rabbitmq.go | 24 ++++- 3 files changed, 227 insertions(+), 3 deletions(-) create mode 100644 docker-compose.yml create mode 100644 integration_tests/rabbitmq_test.go diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..bf163a8 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,42 @@ +services: + lndhub: + image: lndhub:latest + environment: + DATABASE_URI: postgresql://user:password@db:5432/lndhub?sslmode=disable + RABBITMQ_URI: amqp://root:password@rabbitmq:5672 + SUBSCRIPTION_CONSUMER_TYPE: rabbitmq + JWT_SECRET: superSecret + LND_ADDRESS: rpc.lnd3.regtest.getalby.com:443 + LND_MACAROON_HEX: 0201036c6e6402f801030a10dd10fe7764ab4115317571e4ccf747d21201301a160a0761646472657373120472656164120577726974651a130a04696e666f120472656164120577726974651a170a08696e766f69636573120472656164120577726974651a210a086d616361726f6f6e120867656e6572617465120472656164120577726974651a160a076d657373616765120472656164120577726974651a170a086f6666636861696e120472656164120577726974651a160a076f6e636861696e120472656164120577726974651a140a057065657273120472656164120577726974651a180a067369676e6572120867656e6572617465120472656164000006208cfab2edeaa289f5dfed9a5edc6388e824227ed5109fa85c67cf66bf7f134f13 + ports: + - 3000:3000 + + db: + image: postgres:14.2 + environment: + POSTGRES_DB: lndhub + POSTGRES_USER: user + POSTGRES_PASSWORD: password + ports: + - 5432:5432 + + rabbitmq: + image: rabbitmq:3.11.8-management + ports: + - 5672:5672 + - 15672:15672 + environment: + RABBITMQ_DEFAULT_USER: root + RABBITMQ_DEFAULT_PASS: password + + ln-invoice-proxy: + image: ghcr.io/getalby/ln-event-publisher:main-ed993c4 + volumes: + - /Users/lucas.rouckhout/Documents/code/scripts/albykube/admin.macaroon:/admin.macaroon + environment: + RABBITMQ_URI: amqp://root:password@rabbitmq:5672 + LND_ADDRESS: rpc.lnd3.regtest.getalby.com:443 + LND_MACAROON_FILE: /admin.macaroon + LND_MACAROON_HEX: 0201036c6e6402f801030a10dd10fe7764ab4115317571e4ccf747d21201301a160a0761646472657373120472656164120577726974651a130a04696e666f120472656164120577726974651a170a08696e766f69636573120472656164120577726974651a210a086d616361726f6f6e120867656e6572617465120472656164120577726974651a160a076d657373616765120472656164120577726974651a170a086f6666636861696e120472656164120577726974651a160a076f6e636861696e120472656164120577726974651a140a057065657273120472656164120577726974651a180a067369676e6572120867656e6572617465120472656164000006208cfab2edeaa289f5dfed9a5edc6388e824227ed5109fa85c67cf66bf7f134f13 + DATABASE_URI: postgresql://user:password@db:5432/lnproxy?sslmode=disable + diff --git a/integration_tests/rabbitmq_test.go b/integration_tests/rabbitmq_test.go new file mode 100644 index 0000000..50076c4 --- /dev/null +++ b/integration_tests/rabbitmq_test.go @@ -0,0 +1,164 @@ +package integration_tests + +import ( + "bytes" + "context" + "encoding/json" + "log" + "testing" + + "github.com/getAlby/lndhub.go/common" + "github.com/getAlby/lndhub.go/controllers" + "github.com/getAlby/lndhub.go/db/models" + "github.com/getAlby/lndhub.go/lib" + "github.com/getAlby/lndhub.go/lib/responses" + "github.com/getAlby/lndhub.go/lib/service" + "github.com/getAlby/lndhub.go/lib/tokens" + "github.com/go-playground/validator/v10" + "github.com/labstack/echo/v4" + "github.com/lightningnetwork/lnd/lnrpc" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +type RabbitMQTestSuite struct { + TestSuite + mlnd *MockLND + externalLnd *MockLND + invoiceUpdateSubCancelFn context.CancelFunc + userToken string + svc *service.LndhubService + testQueueName string +} + +func (suite *RabbitMQTestSuite) SetupSuite() { + mlnd := newDefaultMockLND() + //needs different pubkey + //to allow for "external" payments + externalLnd, err := NewMockLND("1234567890abcdef1234", 0, make(chan (*lnrpc.Invoice))) + assert.NoError(suite.T(), err) + svc, err := LndHubTestServiceInit(mlnd) + if err != nil { + log.Fatalf("could not initialize test service: %v", err) + } + + suite.mlnd = mlnd + suite.externalLnd = externalLnd + suite.testQueueName = "test_invoice" + + _, userTokens, err := createUsers(svc, 1) + if err != nil { + log.Fatalf("error creating test users: %v", err) + } + suite.userToken = userTokens[0] + + ctx, cancel := context.WithCancel(context.Background()) + suite.invoiceUpdateSubCancelFn = cancel + go svc.InvoiceUpdateSubscription(ctx) + suite.svc = svc + + e := echo.New() + e.HTTPErrorHandler = responses.HTTPErrorHandler + e.Validator = &lib.CustomValidator{Validator: validator.New()} + + suite.echo = e + suite.echo.Use(tokens.Middleware(suite.svc.Config.JWTSecret)) + suite.echo.POST("/addinvoice", controllers.NewAddInvoiceController(suite.svc).AddInvoice) + suite.echo.POST("/payinvoice", controllers.NewPayInvoiceController(suite.svc).PayInvoice) + go func() { + err = svc.StartRabbitMqPublisher(ctx) + assert.NoError(suite.T(), err) + }() +} + +func (suite *RabbitMQTestSuite) TestPublishInvoice() { + conn, err := amqp.Dial(suite.svc.Config.RabbitMQUri) + assert.NoError(suite.T(), err) + defer conn.Close() + + ch, err := conn.Channel() + assert.NoError(suite.T(), err) + defer ch.Close() + + q, err := ch.QueueDeclare( + suite.testQueueName, + true, + false, + false, + false, + nil, + ) + assert.NoError(suite.T(), err) + + err = ch.QueueBind(q.Name, "#", suite.svc.Config.RabbitMQInvoiceExchange, false, nil) + assert.NoError(suite.T(), err) + + invoice := suite.createAddInvoiceReq(1000, "integration test rabbitmq", suite.userToken) + err = suite.mlnd.mockPaidInvoice(invoice, 0, false, nil) + assert.NoError(suite.T(), err) + + m, err := ch.Consume( + q.Name, + "invoice.*.*", + true, + false, + false, + false, + nil, + ) + assert.NoError(suite.T(), err) + + msg := <-m + + var receivedInvoice models.Invoice + r := bytes.NewReader(msg.Body) + err = json.NewDecoder(r).Decode(&receivedInvoice) + assert.NoError(suite.T(), err) + + assert.Equal(suite.T(), invoice.RHash, receivedInvoice.RHash) + assert.Equal(suite.T(), common.InvoiceTypeIncoming, receivedInvoice.Type) + + //check if outgoing invoices also get published + outgoingInvoiceValue := 500 + outgoingInvoiceDescription := "test rabbit outgoing invoice" + preimage, err := makePreimageHex() + assert.NoError(suite.T(), err) + outgoingInv, err := suite.externalLnd.AddInvoice(context.Background(), &lnrpc.Invoice{Value: int64(outgoingInvoiceValue), Memo: outgoingInvoiceDescription, RPreimage: preimage}) + assert.NoError(suite.T(), err) + //pay invoice + suite.createPayInvoiceReq(&ExpectedPayInvoiceRequestBody{ + Invoice: outgoingInv.PaymentRequest, + }, suite.userToken) + msg = <-m + + var receivedPayment models.Invoice + r = bytes.NewReader(msg.Body) + err = json.NewDecoder(r).Decode(&receivedPayment) + assert.NoError(suite.T(), err) + + assert.Equal(suite.T(), common.InvoiceTypeOutgoing, receivedPayment.Type) + assert.Equal(suite.T(), int64(outgoingInvoiceValue), receivedPayment.Amount) + assert.Equal(suite.T(), outgoingInvoiceDescription, receivedPayment.Memo) + +} + +func (suite *RabbitMQTestSuite) TearDownSuite() { + conn, err := amqp.Dial(suite.svc.Config.RabbitMQUri) + assert.NoError(suite.T(), err) + defer conn.Close() + + ch, err := conn.Channel() + assert.NoError(suite.T(), err) + defer ch.Close() + + _, err = ch.QueueDelete(suite.testQueueName, false, false, false) + assert.NoError(suite.T(), err) + + err = ch.ExchangeDelete(suite.svc.Config.RabbitMQInvoiceExchange, true, false) + assert.NoError(suite.T(), err) +} + +func TestRabbitMQTestSuite(t *testing.T) { + suite.Run(t, new(RabbitMQTestSuite)) +} diff --git a/rabbitmq/rabbitmq.go b/rabbitmq/rabbitmq.go index 7324829..3eb093f 100644 --- a/rabbitmq/rabbitmq.go +++ b/rabbitmq/rabbitmq.go @@ -35,8 +35,6 @@ type ( EncodeWebhookInvoicePayloadFunc = func(ctx context.Context, w io.Writer, invoice models.Invoice) error ) -// InvoiceHandler is a closure that defined what to do with an invoice once it's been consumed from the rabbitmq topic - type Client interface { SubscribeToLndInvoices(context.Context, InvoiceHandler) error StartPublishInvoices(context.Context, SubscribeToInvoicesFunc, EncodeWebhookInvoicePayloadFunc) error @@ -130,6 +128,25 @@ func Dial(uri string, options ...ClientOption) (Client, error) { func (client *DefaultClient) Close() error { return client.conn.Close() } func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler InvoiceHandler) error { + err := client.publishChannel.ExchangeDeclare( + client.lndInvoiceExchange, + // topic is a type of exchange that allows routing messages to different queue's bases on a routing key + "topic", + // Durable and Non-Auto-Deleted exchanges will survive server restarts and remain + // declared when there are no remaining bindings. + true, + false, + // Non-Internal exchange's accept direct publishing + false, + // Nowait: We set this to false as we want to wait for a server response + // to check whether the exchange was created succesfully + false, + nil, + ) + if err != nil { + return err + } + queue, err := client.consumeChannel.QueueDeclare( client.lndInvoiceConsumerQueueName, // Durable and Non-Auto-Deleted queues will survive server restarts and remain @@ -222,7 +239,7 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesSubscribeFunc SubscribeToInvoicesFunc, payloadFunc EncodeWebhookInvoicePayloadFunc) error { err := client.publishChannel.ExchangeDeclare( - client.lndInvoiceExchange, + client.lndHubInvoiceExchange, // topic is a type of exchange that allows routing messages to different queue's bases on a routing key "topic", // Durable and Non-Auto-Deleted exchanges will survive server restarts and remain @@ -287,6 +304,7 @@ func (client *DefaultClient) publishToLndhubExchange(ctx context.Context, invoic }, ) if err != nil { + captureErr(client.logger, err) return err }