diff --git a/integration_tests/websocket_test.go b/integration_tests/websocket_test.go index e5d4f68..910a264 100644 --- a/integration_tests/websocket_test.go +++ b/integration_tests/websocket_test.go @@ -271,6 +271,7 @@ func (suite *WebSocketTestSuite) TestWebSocketMissingInvoice() { func (suite *WebSocketTestSuite) TearDownSuite() { suite.invoiceUpdateSubCancelFn() suite.websocketServer.Close() + clearTable(suite.service, "invoices") } func TestWebSocketSuite(t *testing.T) { diff --git a/lib/service/invoicesubscription.go b/lib/service/invoicesubscription.go index e4ae58b..580f900 100644 --- a/lib/service/invoicesubscription.go +++ b/lib/service/invoicesubscription.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "encoding/hex" + "fmt" "strings" "time" @@ -122,32 +123,37 @@ func (svc *LndhubService) InvoiceUpdateSubscription(ctx context.Context) error { return err } for { - // receive the next invoice update - rawInvoice, err := invoiceSubscriptionStream.Recv() - if err != nil { - svc.Logger.Errorf("Error processing invoice update subscription: %v", err) - sentry.CaptureException(err) - // TODO: close the stream somehoe before retrying? - // Wait 30 seconds and try to reconnect - // TODO: implement some backoff - time.Sleep(30 * time.Second) - invoiceSubscriptionStream, _ = svc.ConnectInvoiceSubscription(ctx) - continue - } + select { + case <-ctx.Done(): + return fmt.Errorf("Context was canceled") + default: + // receive the next invoice update + rawInvoice, err := invoiceSubscriptionStream.Recv() + if err != nil { + svc.Logger.Errorf("Error processing invoice update subscription: %v", err) + sentry.CaptureException(err) + // TODO: close the stream somehoe before retrying? + // Wait 30 seconds and try to reconnect + // TODO: implement some backoff + time.Sleep(30 * time.Second) + invoiceSubscriptionStream, _ = svc.ConnectInvoiceSubscription(ctx) + continue + } - // Ignore updates for open invoices - // We store the invoice details in the AddInvoice call - // Processing open invoices here could cause a race condition: - // We could get this notification faster than we finish the AddInvoice call - if rawInvoice.State == lnrpc.Invoice_OPEN { - svc.Logger.Infof("Invoice state is open. Ignoring update. r_hash:%v", hex.EncodeToString(rawInvoice.RHash)) - continue - } + // Ignore updates for open invoices + // We store the invoice details in the AddInvoice call + // Processing open invoices here could cause a race condition: + // We could get this notification faster than we finish the AddInvoice call + if rawInvoice.State == lnrpc.Invoice_OPEN { + svc.Logger.Infof("Invoice state is open. Ignoring update. r_hash:%v", hex.EncodeToString(rawInvoice.RHash)) + continue + } - processingError := svc.ProcessInvoiceUpdate(ctx, rawInvoice) - if processingError != nil { - svc.Logger.Error(processingError) - sentry.CaptureException(processingError) + processingError := svc.ProcessInvoiceUpdate(ctx, rawInvoice) + if processingError != nil { + svc.Logger.Error(processingError) + sentry.CaptureException(processingError) + } } } }