diff --git a/lib/service/invoicesubscription.go b/lib/service/invoicesubscription.go index 33bff9a..725a09e 100644 --- a/lib/service/invoicesubscription.go +++ b/lib/service/invoicesubscription.go @@ -93,6 +93,9 @@ func (svc *LndhubService) ProcessInvoiceUpdate(ctx context.Context, rawInvoice * if rawInvoice.IsKeysend { err := svc.HandleKeysendPayment(ctx, rawInvoice) if err != nil { + if err == AlreadyProcessedKeysendError { + return nil + } return err } } diff --git a/rabbitmq/rabbitmq.go b/rabbitmq/rabbitmq.go index d250d4e..ef4d43f 100644 --- a/rabbitmq/rabbitmq.go +++ b/rabbitmq/rabbitmq.go @@ -31,14 +31,14 @@ const ( ) type ( - InvoiceHandler = func(ctx context.Context, invoice *lnrpc.Invoice) error - SubscribeToInvoicesFunc = func() (in chan models.Invoice, out chan models.Invoice, err error) - EncodeInvoicePayloadFunc = func(ctx context.Context, w io.Writer, invoice models.Invoice) error + IncomingInvoiceHandler = func(ctx context.Context, invoice *lnrpc.Invoice) error + SubscribeToInvoicesFunc = func() (in chan models.Invoice, out chan models.Invoice, err error) + EncodeOutgoingInvoiceFunc = func(ctx context.Context, w io.Writer, invoice models.Invoice) error ) type Client interface { - SubscribeToLndInvoices(context.Context, InvoiceHandler) error - StartPublishInvoices(context.Context, SubscribeToInvoicesFunc, EncodeInvoicePayloadFunc) error + SubscribeToLndInvoices(context.Context, IncomingInvoiceHandler) error + StartPublishInvoices(context.Context, SubscribeToInvoicesFunc, EncodeOutgoingInvoiceFunc) error // Close will close all connections to rabbitmq Close() error } @@ -128,7 +128,7 @@ func Dial(uri string, options ...ClientOption) (Client, error) { func (client *DefaultClient) Close() error { return client.conn.Close() } -func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler InvoiceHandler) error { +func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler IncomingInvoiceHandler) error { err := client.publishChannel.ExchangeDeclare( client.lndInvoiceExchange, // topic is a type of exchange that allows routing messages to different queue's bases on a routing key @@ -169,7 +169,7 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler err = client.consumeChannel.QueueBind( queue.Name, - "#", + "invoice.incoming.settled", client.lndInvoiceExchange, // Nowait: We set this to false as we want to wait for a server response // to check whether the queue was created successfully @@ -239,7 +239,7 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler } } -func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesSubscribeFunc SubscribeToInvoicesFunc, payloadFunc EncodeInvoicePayloadFunc) error { +func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesSubscribeFunc SubscribeToInvoicesFunc, payloadFunc EncodeOutgoingInvoiceFunc) error { err := client.publishChannel.ExchangeDeclare( client.lndHubInvoiceExchange, // topic is a type of exchange that allows routing messages to different queue's bases on a routing key @@ -286,7 +286,7 @@ func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesS } } -func (client *DefaultClient) publishToLndhubExchange(ctx context.Context, invoice models.Invoice, payloadFunc EncodeInvoicePayloadFunc) error { +func (client *DefaultClient) publishToLndhubExchange(ctx context.Context, invoice models.Invoice, payloadFunc EncodeOutgoingInvoiceFunc) error { payload := bufPool.Get().(*bytes.Buffer) err := payloadFunc(ctx, payload, invoice) if err != nil {