Initial offline integration tests

This commit is contained in:
Lucas Rouckhout
2023-05-30 10:38:36 +02:00
parent 220985bf3c
commit af8e7f2376
5 changed files with 258 additions and 56 deletions

View File

@@ -45,6 +45,14 @@ type Client interface {
Close() error
}
type ClientConfig struct {
lndInvoiceConsumerQueueName string
lndPaymentConsumerQueueName string
lndInvoiceExchange string
lndPaymentExchange string
lndHubInvoiceExchange string
}
type LndHubService interface {
HandleFailedPayment(context.Context, *models.Invoice, models.TransactionEntry, error) error
HandleSuccessfulPayment(context.Context, *models.Invoice, models.TransactionEntry) error
@@ -53,45 +61,41 @@ type LndHubService interface {
}
type DefaultClient struct {
amqpClient AMQPClient
logger *lecho.Logger
amqpClient AMQPClient
logger *lecho.Logger
lndInvoiceConsumerQueueName string
lndPaymentConsumerQueueName string
lndInvoiceExchange string
lndPaymentExchange string
lndHubInvoiceExchange string
config ClientConfig
}
type ClientOption = func(client *DefaultClient)
func WithLndInvoiceExchange(exchange string) ClientOption {
return func(client *DefaultClient) {
client.lndInvoiceExchange = exchange
client.config.lndInvoiceExchange = exchange
}
}
func WithLndHubInvoiceExchange(exchange string) ClientOption {
return func(client *DefaultClient) {
client.lndHubInvoiceExchange = exchange
client.config.lndHubInvoiceExchange = exchange
}
}
func WithLndInvoiceConsumerQueueName(name string) ClientOption {
return func(client *DefaultClient) {
client.lndInvoiceConsumerQueueName = name
client.config.lndInvoiceConsumerQueueName = name
}
}
func WithLndPaymentConsumerQueueName(name string) ClientOption {
return func(client *DefaultClient) {
client.lndPaymentConsumerQueueName = name
client.config.lndPaymentConsumerQueueName = name
}
}
func WithLndPaymentExchange(exchange string) ClientOption {
return func(client *DefaultClient) {
client.lndPaymentExchange = exchange
client.config.lndPaymentExchange = exchange
}
}
@@ -104,7 +108,7 @@ func WithLogger(logger *lecho.Logger) ClientOption {
// Dial sets up a connection to rabbitmq with two channels that are ready to produce and consume
func NewClient(amqpClient AMQPClient, options ...ClientOption) (Client, error) {
client := &DefaultClient{
amqpClient: amqpClient,
amqpClient: amqpClient,
logger: lecho.New(
os.Stdout,
@@ -112,11 +116,13 @@ func NewClient(amqpClient AMQPClient, options ...ClientOption) (Client, error) {
lecho.WithTimestamp(),
),
lndInvoiceConsumerQueueName: "lnd_invoice_consumer",
lndPaymentConsumerQueueName: "lnd_payment_consumer",
lndInvoiceExchange: "lnd_invoice",
lndPaymentExchange: "lnd_payment",
lndHubInvoiceExchange: "lndhub_invoice",
config: ClientConfig{
lndInvoiceConsumerQueueName: "lnd_invoice_consumer",
lndPaymentConsumerQueueName: "lnd_payment_consumer",
lndInvoiceExchange: "lnd_invoice",
lndPaymentExchange: "lnd_payment",
lndHubInvoiceExchange: "lndhub_invoice",
},
}
for _, opt := range options {
@@ -130,11 +136,11 @@ func (client *DefaultClient) Close() error { return client.amqpClient.Close() }
func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, svc LndHubService) error {
deliveryChan, err := client.amqpClient.Listen(
ctx,
client.lndPaymentExchange,
"payment.outgoing.*",
client.lndPaymentConsumerQueueName,
)
ctx,
client.config.lndPaymentExchange,
"payment.outgoing.*",
client.config.lndPaymentConsumerQueueName,
)
if err != nil {
return err
}
@@ -157,30 +163,37 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv
if err != nil {
return err
}
client.logger.Infof("Payment finalizer: Found %d pending invoices", len(pendingInvoices))
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
client.logger.Info("Starting payment finalizer rabbitmq consumer")
for {
// Shortcircuit if no pending invoices are left
if len(pendingInvoices) == 0 {
client.logger.Info("Payment finalizer: Resolved all pending payments, exiting payment finalizer routine")
return nil
}
select {
case <-ctx.Done():
return context.Canceled
case <-ticker.C:
invoices, err := getInvoicesTable(ctx)
pendingInvoices = invoices
client.logger.Infof("Payment finalizer: Found %d pending invoices", len(pendingInvoices))
if err != nil {
return err
}
case delivery, ok := <-deliveryChan:
// Shortcircuit if no pending invoices are left
if len(pendingInvoices) == 0 {
client.logger.Info("Payment finalizer: Resolved all pending payments, exiting payment finalizer routine")
return nil
}
pendingInvoices = invoices
client.logger.Infof("Payment finalizer: Found %d pending invoices", len(pendingInvoices))
case delivery, ok := <-deliveryChan:
if !ok {
return err
}
@@ -215,6 +228,7 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv
continue
}
client.logger.Infof("Payment finalizer: updated successful payment with hash: %s", payment.PaymentHash)
delete(pendingInvoices, payment.PaymentHash)
@@ -225,17 +239,18 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv
continue
}
client.logger.Infof("Payment finalizer: updated failed payment with hash: %s", payment.PaymentHash)
delete(pendingInvoices, payment.PaymentHash)
}
}
delivery.Ack(false)
}
}
}
}
func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler IncomingInvoiceHandler) error {
deliveryChan, err := client.amqpClient.Listen(ctx, client.lndInvoiceExchange, "invoice.incoming.settled", client.lndInvoiceConsumerQueueName)
deliveryChan, err := client.amqpClient.Listen(ctx, client.config.lndInvoiceExchange, "invoice.incoming.settled", client.config.lndInvoiceConsumerQueueName)
if err != nil {
return err
}
@@ -291,7 +306,7 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler
func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesSubscribeFunc SubscribeToInvoicesFunc, payloadFunc EncodeOutgoingInvoiceFunc) error {
err := client.amqpClient.ExchangeDeclare(
client.lndHubInvoiceExchange,
client.config.lndHubInvoiceExchange,
// topic is a type of exchange that allows routing messages to different queue's bases on a routing key
"topic",
// Durable and Non-Auto-Deleted exchanges will survive server restarts and remain
@@ -346,7 +361,7 @@ func (client *DefaultClient) publishToLndhubExchange(ctx context.Context, invoic
key := fmt.Sprintf("invoice.%s.%s", invoice.Type, invoice.State)
err = client.amqpClient.PublishWithContext(ctx,
client.lndHubInvoiceExchange,
client.config.lndHubInvoiceExchange,
key,
false,
false,