From e7c29d01fdfc804d2d0d8a9ac34076137974bc90 Mon Sep 17 00:00:00 2001 From: "nicolas.dorier" Date: Fri, 27 Jun 2025 10:23:39 +0900 Subject: [PATCH] Fix: Potential server issue with Blink listener --- .../BlinkLightningClient.cs | 143 +++++++----------- .../BlinkLightningConnectionStringHandler.cs | 2 +- 2 files changed, 57 insertions(+), 88 deletions(-) diff --git a/Plugins/BTCPayServer.Plugins.Blink/BlinkLightningClient.cs b/Plugins/BTCPayServer.Plugins.Blink/BlinkLightningClient.cs index 2d7f594..aea7422 100644 --- a/Plugins/BTCPayServer.Plugins.Blink/BlinkLightningClient.cs +++ b/Plugins/BTCPayServer.Plugins.Blink/BlinkLightningClient.cs @@ -3,6 +3,7 @@ using System; using System.ComponentModel.DataAnnotations; using System.Linq; using System.Net.Http; +using System.Reactive.Disposables; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -481,117 +482,85 @@ expiresIn = (int)createInvoiceRequest.Expiry.TotalMinutes public class BlinkListener : ILightningInvoiceListener { + private volatile int _WebSocketErrorCount = 0; private readonly BlinkLightningClient _lightningClient; - private readonly Channel _invoices = Channel.CreateUnbounded(); + private readonly Channel _invoices = Channel.CreateUnbounded(); private readonly IDisposable _subscription; private readonly ILogger _logger; public BlinkListener(GraphQLHttpClient httpClient, BlinkLightningClient lightningClient, ILogger logger) { - try + _logger = logger; + _lightningClient = lightningClient; + var stream = httpClient.CreateSubscriptionStream(new GraphQLRequest() { - _logger = logger; - _lightningClient = lightningClient; - var stream = httpClient.CreateSubscriptionStream(new GraphQLRequest() - { - Query = @"subscription myUpdates { - myUpdates { - update { - ... on LnUpdate { - transaction { - initiationVia { - ... on InitiationViaLn { - paymentHash - } - } - direction + Query = @"subscription myUpdates { +myUpdates { +update { + ... on LnUpdate { + transaction { + initiationVia { + ... on InitiationViaLn { + paymentHash } } + direction } } } +} +} ", OperationName = "myUpdates" - }); - - _subscription = stream.Subscribe(async response => - { - try - { - if(response.Data is null) - return; - if (response.Data.SelectToken("myUpdates.update.transaction.direction")?.Value() != "RECEIVE") - return; - var invoiceId = response.Data - .SelectToken("myUpdates.update.transaction.initiationVia.paymentHash")?.Value(); - if (invoiceId is null) - return; - if (await _lightningClient.GetInvoice(invoiceId) is LightningInvoice inv) - { - _invoices.Writer.TryWrite(inv); - } - } - catch (Exception e) - { - _logger.LogError(e, "Error while processing detecting lightning invoice payment"); - } - - }); - _wsSubscriptionDisposable = httpClient.WebsocketConnectionState.Subscribe(state => - { - if (state == GraphQLWebsocketConnectionState.Disconnected) - { - streamEnded.TrySetResult(); - } - }); - - } - catch (Exception e) + }, webSocketExceptionHandler: (wse) => { - logger.LogError(e, "Error while creating lightning invoice listener"); - } + _logger.LogWarning(wse, $"Websocket error to Blink... ({_WebSocketErrorCount})"); + if (Interlocked.Increment(ref _WebSocketErrorCount) == 10) + { + _invoices.Writer.TryComplete(wse); + _logger.LogError(wse, "Connection to Blink WebSocket closed."); + } + }); + + _subscription = stream.Subscribe(response => + { + _WebSocketErrorCount = 0; + try + { + if(response?.Data is null) + return; + if (response.Data.SelectToken("myUpdates.update.transaction.direction")?.Value() != "RECEIVE") + return; + var invoiceId = response.Data + .SelectToken("myUpdates.update.transaction.initiationVia.paymentHash")?.Value(); + if (invoiceId is null) + return; + _invoices.Writer.TryWrite(invoiceId); + } + catch (Exception e) + { + _logger.LogError(e, "Error while processing detecting lightning invoice payment"); + } + }, + onError: (e) => + { + _invoices.Writer.TryComplete(e); + }); } public void Dispose() { _subscription.Dispose(); - _invoices.Writer.TryComplete(); - _wsSubscriptionDisposable.Dispose(); - streamEnded.TrySetResult(); + _invoices.Writer.TryComplete(new ObjectDisposedException(nameof(BlinkListener))); } - private TaskCompletionSource streamEnded = new(); - private readonly IDisposable _wsSubscriptionDisposable; - public async Task WaitInvoice(CancellationToken cancellation) { - var resultz = await Task.WhenAny(streamEnded.Task, _invoices.Reader.ReadAsync(cancellation).AsTask()); - if (resultz is Task res) + await foreach (var id in _invoices.Reader.ReadAllAsync(cancellation)) { - return await res; + var invoice = await _lightningClient.GetInvoice(id, cancellation); + if (invoice is not null) + return invoice; } - - // Enhanced logging to identify which task completed and its details - var taskType = resultz.GetType(); - var typeName = taskType.Name.Contains('`') ? taskType.Name.Split('`')[0] : taskType.Name; - var genericArgs = taskType.GenericTypeArguments; - var genericTypeInfo = genericArgs.Length > 0 - ? $"<{string.Join(", ", genericArgs.Select(t => t.Name))}>" - : ""; - - _logger.LogInformation("WaitInvoice completed - Task Type: {TaskType}{GenericArgs}, " + - "Status: {Status}, IsCompletedSuccessfully: {IsCompletedSuccessfully}, " + - "IsFaulted: {IsFaulted}", - typeName, - genericTypeInfo, - resultz.Status, - resultz.IsCompletedSuccessfully, - resultz.IsFaulted); - - if (resultz.IsFaulted && resultz.Exception != null) - { - _logger.LogError("Task completed with fault: {Exception}", resultz.Exception); - } - - return new LightningInvoice { Id = Guid.NewGuid().ToString() }; // Return a dummy invoice so calling listening logic exits + throw new ChannelClosedException(); } } public async Task<(Network Network, string DefaultWalletId, BlinkCurrency DefaultWalletCurrency)> FetchNetworkAndDefaultWallet(CancellationToken cancellation =default) diff --git a/Plugins/BTCPayServer.Plugins.Blink/BlinkLightningConnectionStringHandler.cs b/Plugins/BTCPayServer.Plugins.Blink/BlinkLightningConnectionStringHandler.cs index c64962e..481db1d 100644 --- a/Plugins/BTCPayServer.Plugins.Blink/BlinkLightningConnectionStringHandler.cs +++ b/Plugins/BTCPayServer.Plugins.Blink/BlinkLightningConnectionStringHandler.cs @@ -98,6 +98,6 @@ public class BlinkLightningConnectionStringHandler : ILightningConnectionStringH } } - return new BlinkLightningClient(apiKey, uri, walletId, currency, network, client, _loggerFactory.CreateLogger($"{nameof(BlinkLightningClient)}:{walletId}")); + return new BlinkLightningClient(apiKey, uri, walletId, currency, network, client, _loggerFactory.CreateLogger(nameof(BlinkLightningClient))); } } \ No newline at end of file