Fix: Potential server issue with Blink listener

This commit is contained in:
nicolas.dorier
2025-06-27 10:23:39 +09:00
parent 32bffc4bf6
commit e7c29d01fd
2 changed files with 57 additions and 88 deletions

View File

@@ -3,6 +3,7 @@ using System;
using System.ComponentModel.DataAnnotations;
using System.Linq;
using System.Net.Http;
using System.Reactive.Disposables;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
@@ -481,117 +482,85 @@ expiresIn = (int)createInvoiceRequest.Expiry.TotalMinutes
public class BlinkListener : ILightningInvoiceListener
{
private volatile int _WebSocketErrorCount = 0;
private readonly BlinkLightningClient _lightningClient;
private readonly Channel<LightningInvoice> _invoices = Channel.CreateUnbounded<LightningInvoice>();
private readonly Channel<string> _invoices = Channel.CreateUnbounded<string>();
private readonly IDisposable _subscription;
private readonly ILogger _logger;
public BlinkListener(GraphQLHttpClient httpClient, BlinkLightningClient lightningClient, ILogger logger)
{
try
_logger = logger;
_lightningClient = lightningClient;
var stream = httpClient.CreateSubscriptionStream<JObject>(new GraphQLRequest()
{
_logger = logger;
_lightningClient = lightningClient;
var stream = httpClient.CreateSubscriptionStream<JObject>(new GraphQLRequest()
{
Query = @"subscription myUpdates {
myUpdates {
update {
... on LnUpdate {
transaction {
initiationVia {
... on InitiationViaLn {
paymentHash
}
}
direction
Query = @"subscription myUpdates {
myUpdates {
update {
... on LnUpdate {
transaction {
initiationVia {
... on InitiationViaLn {
paymentHash
}
}
direction
}
}
}
}
}
", OperationName = "myUpdates"
});
_subscription = stream.Subscribe(async response =>
{
try
{
if(response.Data is null)
return;
if (response.Data.SelectToken("myUpdates.update.transaction.direction")?.Value<string>() != "RECEIVE")
return;
var invoiceId = response.Data
.SelectToken("myUpdates.update.transaction.initiationVia.paymentHash")?.Value<string>();
if (invoiceId is null)
return;
if (await _lightningClient.GetInvoice(invoiceId) is LightningInvoice inv)
{
_invoices.Writer.TryWrite(inv);
}
}
catch (Exception e)
{
_logger.LogError(e, "Error while processing detecting lightning invoice payment");
}
});
_wsSubscriptionDisposable = httpClient.WebsocketConnectionState.Subscribe(state =>
{
if (state == GraphQLWebsocketConnectionState.Disconnected)
{
streamEnded.TrySetResult();
}
});
}
catch (Exception e)
}, webSocketExceptionHandler: (wse) =>
{
logger.LogError(e, "Error while creating lightning invoice listener");
}
_logger.LogWarning(wse, $"Websocket error to Blink... ({_WebSocketErrorCount})");
if (Interlocked.Increment(ref _WebSocketErrorCount) == 10)
{
_invoices.Writer.TryComplete(wse);
_logger.LogError(wse, "Connection to Blink WebSocket closed.");
}
});
_subscription = stream.Subscribe(response =>
{
_WebSocketErrorCount = 0;
try
{
if(response?.Data is null)
return;
if (response.Data.SelectToken("myUpdates.update.transaction.direction")?.Value<string>() != "RECEIVE")
return;
var invoiceId = response.Data
.SelectToken("myUpdates.update.transaction.initiationVia.paymentHash")?.Value<string>();
if (invoiceId is null)
return;
_invoices.Writer.TryWrite(invoiceId);
}
catch (Exception e)
{
_logger.LogError(e, "Error while processing detecting lightning invoice payment");
}
},
onError: (e) =>
{
_invoices.Writer.TryComplete(e);
});
}
public void Dispose()
{
_subscription.Dispose();
_invoices.Writer.TryComplete();
_wsSubscriptionDisposable.Dispose();
streamEnded.TrySetResult();
_invoices.Writer.TryComplete(new ObjectDisposedException(nameof(BlinkListener)));
}
private TaskCompletionSource streamEnded = new();
private readonly IDisposable _wsSubscriptionDisposable;
public async Task<LightningInvoice> WaitInvoice(CancellationToken cancellation)
{
var resultz = await Task.WhenAny(streamEnded.Task, _invoices.Reader.ReadAsync(cancellation).AsTask());
if (resultz is Task<LightningInvoice> res)
await foreach (var id in _invoices.Reader.ReadAllAsync(cancellation))
{
return await res;
var invoice = await _lightningClient.GetInvoice(id, cancellation);
if (invoice is not null)
return invoice;
}
// Enhanced logging to identify which task completed and its details
var taskType = resultz.GetType();
var typeName = taskType.Name.Contains('`') ? taskType.Name.Split('`')[0] : taskType.Name;
var genericArgs = taskType.GenericTypeArguments;
var genericTypeInfo = genericArgs.Length > 0
? $"<{string.Join(", ", genericArgs.Select(t => t.Name))}>"
: "";
_logger.LogInformation("WaitInvoice completed - Task Type: {TaskType}{GenericArgs}, " +
"Status: {Status}, IsCompletedSuccessfully: {IsCompletedSuccessfully}, " +
"IsFaulted: {IsFaulted}",
typeName,
genericTypeInfo,
resultz.Status,
resultz.IsCompletedSuccessfully,
resultz.IsFaulted);
if (resultz.IsFaulted && resultz.Exception != null)
{
_logger.LogError("Task completed with fault: {Exception}", resultz.Exception);
}
return new LightningInvoice { Id = Guid.NewGuid().ToString() }; // Return a dummy invoice so calling listening logic exits
throw new ChannelClosedException();
}
}
public async Task<(Network Network, string DefaultWalletId, BlinkCurrency DefaultWalletCurrency)> FetchNetworkAndDefaultWallet(CancellationToken cancellation =default)

View File

@@ -98,6 +98,6 @@ public class BlinkLightningConnectionStringHandler : ILightningConnectionStringH
}
}
return new BlinkLightningClient(apiKey, uri, walletId, currency, network, client, _loggerFactory.CreateLogger($"{nameof(BlinkLightningClient)}:{walletId}"));
return new BlinkLightningClient(apiKey, uri, walletId, currency, network, client, _loggerFactory.CreateLogger(nameof(BlinkLightningClient)));
}
}