diff --git a/go.mod b/go.mod index 1c506f2..10f6981 100644 --- a/go.mod +++ b/go.mod @@ -79,6 +79,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.4.3 // indirect github.com/golang/glog v1.0.0 // indirect + github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.1.2 // indirect diff --git a/go.sum b/go.sum index 0c97a24..9195a85 100644 --- a/go.sum +++ b/go.sum @@ -282,6 +282,8 @@ github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -1043,6 +1045,7 @@ golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= diff --git a/rabbitmq/amqp.go b/rabbitmq/amqp.go new file mode 100644 index 0000000..4911273 --- /dev/null +++ b/rabbitmq/amqp.go @@ -0,0 +1,195 @@ +package rabbitmq + +import ( + "context" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type AMQPClient interface { + Listen(ctx context.Context, exchange string, routingKey string, queueName string, options ...AMQPListenOptions) (<-chan amqp.Delivery, error) + PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error + ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error + Close() error +} + +type DefaultAMQPCLient struct { + conn *amqp.Connection + + // It is recommended that, when possible, publishers and consumers + // use separate connections so that consumers are isolated from potential + // flow control measures that may be applied to publishing connections. + consumeChannel *amqp.Channel + publishChannel *amqp.Channel +} + +func (c *DefaultAMQPCLient) Close() error { return c.conn.Close() } + +func (c *DefaultAMQPCLient) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error { + // TODO: Seperate management channel? Or provide way to select channel? + ch, err := c.conn.Channel() + if err != nil { + return err + } + defer ch.Close() + + + return ch.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args) +} + + +type listenOptions struct { + Durable bool + AutoDelete bool + Internal bool + Wait bool + Exclusive bool + AutoAck bool +} + +type AMQPListenOptions = func(opts listenOptions) listenOptions + +func WithDurable(durable bool) AMQPListenOptions { + return func(opts listenOptions) listenOptions { + opts.Durable = durable + return opts + } +} + +func WithAutoDelete(autoDelete bool) AMQPListenOptions { + return func(opts listenOptions) listenOptions { + opts.AutoDelete = autoDelete + return opts + } +} + +func WithInternal(internal bool) AMQPListenOptions { + return func(opts listenOptions) listenOptions { + opts.Internal = internal + return opts + } +} + +func WithWait(wait bool) AMQPListenOptions { + return func(opts listenOptions) listenOptions { + opts.Wait = wait + return opts + } +} + +func WithExclusive(exclusive bool) AMQPListenOptions { + return func(opts listenOptions) listenOptions { + opts.Exclusive = exclusive + return opts + } +} + +func WithAutoAck(autoAck bool) AMQPListenOptions { + return func(opts listenOptions) listenOptions { + opts.AutoAck = autoAck + return opts + } +} + +func (c *DefaultAMQPCLient) Listen(ctx context.Context, exchange string, routingKey string, queueName string, options ...AMQPListenOptions) (<-chan amqp.Delivery, error) { + opts := listenOptions{ + Durable: true, + AutoDelete: false, + Internal: false, + Wait: false, + Exclusive: false, + AutoAck: false, + } + + for _, opt := range options { + opts = opt(opts) + } + + err := c.consumeChannel.ExchangeDeclare( + exchange, + // topic is a type of exchange that allows routing messages to different queue's bases on a routing key + "topic", + // Durable and Non-Auto-Deleted exchanges will survive server restarts and remain + // declared when there are no remaining bindings. + opts.Durable, + opts.AutoDelete, + // Non-Internal exchange's accept direct publishing + opts.Internal, + // Nowait: We set this to false as we want to wait for a server response + // to check whether the exchange was created succesfully + opts.Wait, + nil, + ) + if err != nil { + return nil, err + } + + queue, err := c.consumeChannel.QueueDeclare( + queueName, + // Durable and Non-Auto-Deleted queues will survive server restarts and remain + // declared when there are no remaining bindings. + opts.Durable, + opts.AutoDelete, + // None-Exclusive means other consumers can consume from this queue. + // Messages from queues are spread out and load balanced between consumers. + // So multiple lndhub.go instances will spread the load of invoices between them + opts.Exclusive, + // Nowait: We set this to false as we want to wait for a server response + // to check whether the queue was created successfully + opts.Wait, + nil, + ) + if err != nil { + return nil, err + } + + err = c.consumeChannel.QueueBind( + queue.Name, + routingKey, + exchange, + // Nowait: We set this to false as we want to wait for a server response + // to check whether the queue was created successfully + opts.Wait, + nil, + ) + if err != nil { + return nil, err + } + + return c.consumeChannel.Consume( + queue.Name, + "", + opts.AutoAck, + opts.Exclusive, + false, + opts.Wait, + nil, + ) +} + +func (c *DefaultAMQPCLient) PublishWithContext(ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error { + return c.publishChannel.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg) +} + +func Dial(uri string) (AMQPClient, error) { + conn, err := amqp.Dial(uri) + if err != nil { + return nil, err + } + + consumeChannel, err := conn.Channel() + if err != nil { + return nil, err + } + + publishChannel, err := conn.Channel() + if err != nil { + return nil, err + } + + return &DefaultAMQPCLient{ + conn, + consumeChannel, + publishChannel, + }, nil +} diff --git a/rabbitmq/rabbitmq.go b/rabbitmq/rabbitmq.go index f477eac..cb1dffc 100644 --- a/rabbitmq/rabbitmq.go +++ b/rabbitmq/rabbitmq.go @@ -53,14 +53,7 @@ type LndHubService interface { } type DefaultClient struct { - conn *amqp.Connection - - // It is recommended that, when possible, publishers and consumers - // use separate connections so that consumers are isolated from potential - // flow control measures that may be applied to publishing connections. - consumeChannel *amqp.Channel - publishChannel *amqp.Channel - + amqpClient AMQPClient logger *lecho.Logger lndInvoiceConsumerQueueName string @@ -109,27 +102,9 @@ func WithLogger(logger *lecho.Logger) ClientOption { } // Dial sets up a connection to rabbitmq with two channels that are ready to produce and consume -func Dial(uri string, options ...ClientOption) (Client, error) { - conn, err := amqp.Dial(uri) - if err != nil { - return nil, err - } - - consumeChannel, err := conn.Channel() - if err != nil { - return nil, err - } - - produceChannel, err := conn.Channel() - if err != nil { - return nil, err - } - +func NewClient(amqpClient AMQPClient, options ...ClientOption) (Client, error) { client := &DefaultClient{ - conn: conn, - - consumeChannel: consumeChannel, - publishChannel: produceChannel, + amqpClient: amqpClient, logger: lecho.New( os.Stdout, @@ -151,70 +126,15 @@ func Dial(uri string, options ...ClientOption) (Client, error) { return client, nil } -func (client *DefaultClient) Close() error { return client.conn.Close() } +func (client *DefaultClient) Close() error { return client.amqpClient.Close() } func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, svc LndHubService) error { - - err := client.publishChannel.ExchangeDeclare( - client.lndPaymentExchange, - // topic is a type of exchange that allows routing messages to different queue's bases on a routing key - "topic", - // Durable and Non-Auto-Deleted exchanges will survive server restarts and remain - // declared when there are no remaining bindings. - true, - false, - // Non-Internal exchange's accept direct publishing - false, - // Nowait: We set this to false as we want to wait for a server response - // to check whether the exchange was created succesfully - false, - nil, - ) - if err != nil { - return err - } - - queue, err := client.consumeChannel.QueueDeclare( - client.lndPaymentConsumerQueueName, - // Durable and Non-Auto-Deleted queues will survive server restarts and remain - // declared when there are no remaining bindings. - true, - false, - // None-Exclusive means other consumers can consume from this queue. - // Messages from queues are spread out and load balanced between consumers. - // So multiple lndhub.go instances will spread the load of invoices between them - false, - // Nowait: We set this to false as we want to wait for a server response - // to check whether the queue was created successfully - false, - nil, - ) - if err != nil { - return err - } - - err = client.consumeChannel.QueueBind( - queue.Name, - "payment.outgoing.#", - client.lndPaymentExchange, - // Nowait: We set this to false as we want to wait for a server response - // to check whether the queue was created successfully - false, - nil, - ) - if err != nil { - return err - } - - deliveryChan, err := client.consumeChannel.Consume( - queue.Name, - "", - false, - false, - false, - false, - nil, - ) + deliveryChan, err := client.amqpClient.Listen( + ctx, + client.lndPaymentExchange, + "payment.outgoing.*", + client.lndPaymentConsumerQueueName, + ) if err != nil { return err } @@ -315,66 +235,7 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv } func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler IncomingInvoiceHandler) error { - err := client.publishChannel.ExchangeDeclare( - client.lndInvoiceExchange, - // topic is a type of exchange that allows routing messages to different queue's bases on a routing key - "topic", - // Durable and Non-Auto-Deleted exchanges will survive server restarts and remain - // declared when there are no remaining bindings. - true, - false, - // Non-Internal exchange's accept direct publishing - false, - // Nowait: We set this to false as we want to wait for a server response - // to check whether the exchange was created succesfully - false, - nil, - ) - if err != nil { - return err - } - - queue, err := client.consumeChannel.QueueDeclare( - client.lndInvoiceConsumerQueueName, - // Durable and Non-Auto-Deleted queues will survive server restarts and remain - // declared when there are no remaining bindings. - true, - false, - // None-Exclusive means other consumers can consume from this queue. - // Messages from queues are spread out and load balanced between consumers. - // So multiple lndhub.go instances will spread the load of invoices between them - false, - // Nowait: We set this to false as we want to wait for a server response - // to check whether the queue was created successfully - false, - nil, - ) - if err != nil { - return err - } - - err = client.consumeChannel.QueueBind( - queue.Name, - "invoice.incoming.settled", - client.lndInvoiceExchange, - // Nowait: We set this to false as we want to wait for a server response - // to check whether the queue was created successfully - false, - nil, - ) - if err != nil { - return err - } - - deliveryChan, err := client.consumeChannel.Consume( - queue.Name, - "", - false, - false, - false, - false, - nil, - ) + deliveryChan, err := client.amqpClient.Listen(ctx, client.lndInvoiceExchange, "invoice.incoming.settled", client.lndInvoiceConsumerQueueName) if err != nil { return err } @@ -429,7 +290,7 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler } func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesSubscribeFunc SubscribeToInvoicesFunc, payloadFunc EncodeOutgoingInvoiceFunc) error { - err := client.publishChannel.ExchangeDeclare( + err := client.amqpClient.ExchangeDeclare( client.lndHubInvoiceExchange, // topic is a type of exchange that allows routing messages to different queue's bases on a routing key "topic", @@ -484,7 +345,7 @@ func (client *DefaultClient) publishToLndhubExchange(ctx context.Context, invoic key := fmt.Sprintf("invoice.%s.%s", invoice.Type, invoice.State) - err = client.publishChannel.PublishWithContext(ctx, + err = client.amqpClient.PublishWithContext(ctx, client.lndHubInvoiceExchange, key, false, diff --git a/rabbitmq/rabbitmq_test.go b/rabbitmq/rabbitmq_test.go new file mode 100644 index 0000000..092d502 --- /dev/null +++ b/rabbitmq/rabbitmq_test.go @@ -0,0 +1,11 @@ +package rabbitmq_test + +import ( + "testing" +) + +//go:generate mockgen -destination=./rabbitmqmocks/rabbitmq.go -package rabbitmqmocks github.com/getAlby/lndhub.go/rabbitmq LndHubService + +func TestFinalizedInitializedPayments(t *testing.T) { + t.Parallel() +} diff --git a/rabbitmq/rabbitmqmocks/rabbitmq.go b/rabbitmq/rabbitmqmocks/rabbitmq.go new file mode 100644 index 0000000..7a6a35a --- /dev/null +++ b/rabbitmq/rabbitmqmocks/rabbitmq.go @@ -0,0 +1,94 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/getAlby/lndhub.go/rabbitmq (interfaces: LndHubService) + +// Package rabbitmqmocks is a generated GoMock package. +package rabbitmqmocks + +import ( + context "context" + reflect "reflect" + + models "github.com/getAlby/lndhub.go/db/models" + gomock "github.com/golang/mock/gomock" +) + +// MockLndHubService is a mock of LndHubService interface. +type MockLndHubService struct { + ctrl *gomock.Controller + recorder *MockLndHubServiceMockRecorder +} + +// MockLndHubServiceMockRecorder is the mock recorder for MockLndHubService. +type MockLndHubServiceMockRecorder struct { + mock *MockLndHubService +} + +// NewMockLndHubService creates a new mock instance. +func NewMockLndHubService(ctrl *gomock.Controller) *MockLndHubService { + mock := &MockLndHubService{ctrl: ctrl} + mock.recorder = &MockLndHubServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockLndHubService) EXPECT() *MockLndHubServiceMockRecorder { + return m.recorder +} + +// GetAllPendingPayments mocks base method. +func (m *MockLndHubService) GetAllPendingPayments(arg0 context.Context) ([]models.Invoice, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAllPendingPayments", arg0) + ret0, _ := ret[0].([]models.Invoice) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAllPendingPayments indicates an expected call of GetAllPendingPayments. +func (mr *MockLndHubServiceMockRecorder) GetAllPendingPayments(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllPendingPayments", reflect.TypeOf((*MockLndHubService)(nil).GetAllPendingPayments), arg0) +} + +// GetTransactionEntryByInvoiceId mocks base method. +func (m *MockLndHubService) GetTransactionEntryByInvoiceId(arg0 context.Context, arg1 int64) (models.TransactionEntry, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTransactionEntryByInvoiceId", arg0, arg1) + ret0, _ := ret[0].(models.TransactionEntry) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetTransactionEntryByInvoiceId indicates an expected call of GetTransactionEntryByInvoiceId. +func (mr *MockLndHubServiceMockRecorder) GetTransactionEntryByInvoiceId(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTransactionEntryByInvoiceId", reflect.TypeOf((*MockLndHubService)(nil).GetTransactionEntryByInvoiceId), arg0, arg1) +} + +// HandleFailedPayment mocks base method. +func (m *MockLndHubService) HandleFailedPayment(arg0 context.Context, arg1 *models.Invoice, arg2 models.TransactionEntry, arg3 error) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HandleFailedPayment", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// HandleFailedPayment indicates an expected call of HandleFailedPayment. +func (mr *MockLndHubServiceMockRecorder) HandleFailedPayment(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleFailedPayment", reflect.TypeOf((*MockLndHubService)(nil).HandleFailedPayment), arg0, arg1, arg2, arg3) +} + +// HandleSuccessfulPayment mocks base method. +func (m *MockLndHubService) HandleSuccessfulPayment(arg0 context.Context, arg1 *models.Invoice, arg2 models.TransactionEntry) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HandleSuccessfulPayment", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// HandleSuccessfulPayment indicates an expected call of HandleSuccessfulPayment. +func (mr *MockLndHubServiceMockRecorder) HandleSuccessfulPayment(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleSuccessfulPayment", reflect.TypeOf((*MockLndHubService)(nil).HandleSuccessfulPayment), arg0, arg1, arg2) +}