Make ChargeListener use only one websocket connection per url

This commit is contained in:
nicolas.dorier
2018-02-26 13:29:23 +09:00
parent c8923af573
commit ef431f688f
4 changed files with 255 additions and 40 deletions

View File

@@ -71,8 +71,12 @@ namespace BTCPayServer.Tests.Logging
public void LogInformation(string msg)
{
if (msg != null)
try
{
_Helper.WriteLine(DateTimeOffset.UtcNow + " :" + Name + ": " + msg);
}
catch { }
}
}
public class Logs
{

View File

@@ -98,12 +98,34 @@ namespace BTCPayServer.Tests
ExplorerNode.Generate(433 - blockCount.Result);
}
// If the channel is not created, let's do it
if (channels.Result.Length == 0)
{
await CustomerEclair.RPC.OpenAsync(clightning, Money.Satoshis(16777215));
while ((await CustomerEclair.RPC.ChannelsAsync())[0].State != "NORMAL")
var c = (await CustomerEclair.RPC.ChannelsAsync());
bool generated = false;
bool createdChannel = false;
CancellationTokenSource timeout = new CancellationTokenSource();
timeout.CancelAfter(10000);
while (c.Length == 0 || c[0].State != "NORMAL")
{
ExplorerNode.Generate(1);
if(timeout.IsCancellationRequested)
{
timeout = new CancellationTokenSource();
timeout.CancelAfter(10000);
createdChannel = false;
generated = false;
}
if(!createdChannel)
{
await CustomerEclair.RPC.OpenAsync(clightning, Money.Satoshis(16777215));
createdChannel = true;
}
if (!generated && c.Length != 0 && c[0].State == "WAIT_FOR_FUNDING_CONFIRMED")
{
ExplorerNode.Generate(6);
generated = true;
}
c = (await CustomerEclair.RPC.ChannelsAsync());
}
}
}

View File

@@ -356,9 +356,34 @@ namespace BTCPayServer.Tests
Assert.Equal("complete", localInvoice.Status);
Assert.Equal("False", localInvoice.ExceptionStatus.ToString());
});
Task.WaitAll(Enumerable.Range(0, 5)
.Select(_ => CanSendLightningPaymentCore(tester, user))
.ToArray());
}
}
async Task CanSendLightningPaymentCore(ServerTester tester, TestAccount user)
{
await Task.Delay(TimeSpan.FromSeconds(RandomUtils.GetUInt32() % 5));
var invoice = await user.BitPay.CreateInvoiceAsync(new Invoice()
{
Price = 0.01,
Currency = "USD",
PosData = "posData",
OrderId = "orderId",
ItemDesc = "Some description"
});
await tester.SendLightningPaymentAsync(invoice);
await EventuallyAsync(async () =>
{
var localInvoice = await user.BitPay.GetInvoiceAsync(invoice.Id);
Assert.Equal("complete", localInvoice.Status);
Assert.Equal("False", localInvoice.ExceptionStatus.ToString());
});
}
[Fact]
public void CanUseServerInitiatedPairingCode()
{
@@ -895,7 +920,7 @@ namespace BTCPayServer.Tests
private void Eventually(Action act)
{
CancellationTokenSource cts = new CancellationTokenSource(200000);
CancellationTokenSource cts = new CancellationTokenSource(20000);
while (true)
{
try
@@ -909,5 +934,22 @@ namespace BTCPayServer.Tests
}
}
}
private async Task EventuallyAsync(Func<Task> act)
{
CancellationTokenSource cts = new CancellationTokenSource(20000);
while (true)
{
try
{
await act();
break;
}
catch (XunitException) when (!cts.Token.IsCancellationRequested)
{
await Task.Delay(500);
}
}
}
}
}

View File

@@ -1,9 +1,11 @@
using System;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BTCPayServer.Events;
using BTCPayServer.Logging;
using BTCPayServer.Payments.Lightning.CLightning;
using BTCPayServer.Services.Invoices;
using Microsoft.Extensions.Hosting;
@@ -13,6 +15,16 @@ namespace BTCPayServer.Payments.Lightning
{
public class ChargeListener : IHostedService
{
class ListenedInvoice
{
public LightningLikePaymentMethodDetails PaymentMethodDetails { get; set; }
public LightningSupportedPaymentMethod SupportedPaymentMethod { get; set; }
public PaymentMethod PaymentMethod { get; set; }
public string Uri { get; internal set; }
public BTCPayNetwork Network { get; internal set; }
public string InvoiceId { get; internal set; }
}
EventAggregator _Aggregator;
InvoiceRepository _InvoiceRepository;
BTCPayNetworkProvider _NetworkProvider;
@@ -32,57 +44,192 @@ namespace BTCPayServer.Payments.Lightning
{
if (inv.Name == "invoice_created")
{
var invoice = await _InvoiceRepository.GetInvoice(null, inv.InvoiceId);
await Task.WhenAll(invoice.GetPaymentMethods(_NetworkProvider)
.Where(c => c.GetId().PaymentType == PaymentTypes.LightningLike)
.Select(s => Listen(invoice, s, _NetworkProvider.GetNetwork(s.GetId().CryptoCode)))).ConfigureAwait(false);
await EnsureListening(inv.InvoiceId);
}
}));
_ListenPoller = new Timer(async s =>
{
await Task.WhenAll((await _InvoiceRepository.GetPendingInvoices())
.Select(async invoiceId => await EnsureListening(invoiceId))
.ToArray());
}, null, 0, (int)PollInterval.TotalMilliseconds);
leases.Add(_ListenPoller);
return Task.CompletedTask;
}
//MultiValueDictionary<string,string>
private async Task Listen(InvoiceEntity invoice, PaymentMethod paymentMethod, BTCPayNetwork network)
private async Task EnsureListening(string invoiceId)
{
if (Listening(invoiceId))
return;
var invoice = await _InvoiceRepository.GetInvoice(null, invoiceId);
foreach (var paymentMethod in invoice.GetPaymentMethods(_NetworkProvider)
.Where(c => c.GetId().PaymentType == PaymentTypes.LightningLike))
{
var lightningMethod = paymentMethod.GetPaymentMethodDetails() as LightningLikePaymentMethodDetails;
if (lightningMethod == null)
return;
continue;
var lightningSupportedMethod = invoice.GetSupportedPaymentMethod<LightningSupportedPaymentMethod>(_NetworkProvider)
.FirstOrDefault(c => c.CryptoCode == network.CryptoCode);
.FirstOrDefault(c => c.CryptoCode == paymentMethod.GetId().CryptoCode);
if (lightningSupportedMethod == null)
return;
continue;
var network = _NetworkProvider.GetNetwork(paymentMethod.GetId().CryptoCode);
var listenedInvoice = new ListenedInvoice()
{
Uri = lightningSupportedMethod.GetLightningChargeUrl(false).AbsoluteUri,
PaymentMethodDetails = lightningMethod,
SupportedPaymentMethod = lightningSupportedMethod,
PaymentMethod = paymentMethod,
Network = network,
InvoiceId = invoice.Id
};
StartListening(listenedInvoice);
}
}
var charge = new ChargeClient(lightningSupportedMethod.GetLightningChargeUrl(true), network.NBitcoinNetwork);
var session = await charge.Listen();
TimeSpan _PollInterval = TimeSpan.FromMinutes(1.0);
public TimeSpan PollInterval
{
get
{
return _PollInterval;
}
set
{
_PollInterval = value;
if (_ListenPoller != null)
{
_ListenPoller.Change(0, (int)value.TotalMilliseconds);
}
}
}
CancellationTokenSource _Cts = new CancellationTokenSource();
private async Task Listen(LightningSupportedPaymentMethod supportedPaymentMethod, BTCPayNetwork network)
{
try
{
Logs.PayServer.LogInformation($"{supportedPaymentMethod.CryptoCode} (Lightning): Start listening {supportedPaymentMethod.GetLightningChargeUrl(false)}");
var charge = new ChargeClient(supportedPaymentMethod.GetLightningChargeUrl(true), network.NBitcoinNetwork);
var session = await charge.Listen(_Cts.Token);
while (true)
{
var notification = await session.NextEvent();
if (notification.Id == lightningMethod.InvoiceId &&
notification.PaymentRequest == lightningMethod.BOLT11)
var notification = await session.NextEvent(_Cts.Token);
ListenedInvoice listenedInvoice = GetListenedInvoice(notification.Id);
if (listenedInvoice == null)
continue;
if (notification.Id == listenedInvoice.PaymentMethodDetails.InvoiceId &&
notification.PaymentRequest == listenedInvoice.PaymentMethodDetails.BOLT11)
{
if (notification.Status == "paid" && notification.PaidAt.HasValue)
{
await _InvoiceRepository.AddPayment(invoice.Id, notification.PaidAt.Value, new LightningLikePaymentData()
await _InvoiceRepository.AddPayment(listenedInvoice.InvoiceId, notification.PaidAt.Value, new LightningLikePaymentData()
{
BOLT11 = notification.PaymentRequest,
Amount = notification.MilliSatoshi
}, network.CryptoCode, accounted: true);
_Aggregator.Publish(new InvoiceEvent(invoice.Id, 1002, "invoice_receivedPayment"));
_Aggregator.Publish(new InvoiceEvent(listenedInvoice.InvoiceId, 1002, "invoice_receivedPayment"));
if (DoneListening(listenedInvoice))
break;
}
if (notification.Status == "expired")
{
if (DoneListening(listenedInvoice))
break;
}
}
}
}
catch when (_Cts.IsCancellationRequested)
{
}
catch (Exception ex)
{
Logs.PayServer.LogError(ex, $"{supportedPaymentMethod.CryptoCode} (Lightning): Error while contacting {supportedPaymentMethod.GetLightningChargeUrl(false)}");
}
Logs.PayServer.LogInformation($"{supportedPaymentMethod.CryptoCode} (Lightning): Stop listening {supportedPaymentMethod.GetLightningChargeUrl(false)}");
}
List<Task> _ListeningLightning = new List<Task>();
MultiValueDictionary<string, ListenedInvoice> _ListenedInvoiceByLightningUrl = new MultiValueDictionary<string, ListenedInvoice>();
Dictionary<string, ListenedInvoice> _ListenedInvoiceByChargeInvoiceId = new Dictionary<string, ListenedInvoice>();
HashSet<string> _InvoiceIds = new HashSet<string>();
private Timer _ListenPoller;
public Task StopAsync(CancellationToken cancellationToken)
/// <summary>
/// Stop listening an invoice
/// </summary>
/// <param name="listenedInvoice">The invoice to stop listening</param>
/// <returns>true if still need to listen the lightning instance</returns>
bool DoneListening(ListenedInvoice listenedInvoice)
{
lock (_ListenedInvoiceByLightningUrl)
{
_ListenedInvoiceByChargeInvoiceId.Remove(listenedInvoice.PaymentMethodDetails.InvoiceId);
_ListenedInvoiceByLightningUrl.Remove(listenedInvoice.Uri, listenedInvoice);
_InvoiceIds.Remove(listenedInvoice.InvoiceId);
if (!_ListenedInvoiceByLightningUrl.ContainsKey(listenedInvoice.Uri))
{
return true;
}
}
return false;
}
bool Listening(string invoiceId)
{
lock(_ListenedInvoiceByLightningUrl)
{
return _InvoiceIds.Contains(invoiceId);
}
}
private ListenedInvoice GetListenedInvoice(string chargeInvoiceId)
{
ListenedInvoice listenedInvoice = null;
lock (_ListenedInvoiceByLightningUrl)
{
_ListenedInvoiceByChargeInvoiceId.TryGetValue(chargeInvoiceId, out listenedInvoice);
}
return listenedInvoice;
}
bool StartListening(ListenedInvoice listenedInvoice)
{
lock (_ListenedInvoiceByLightningUrl)
{
if (_InvoiceIds.Contains(listenedInvoice.InvoiceId))
return false;
if (!_ListenedInvoiceByLightningUrl.ContainsKey(listenedInvoice.Uri))
{
var listen = Listen(listenedInvoice.SupportedPaymentMethod, listenedInvoice.Network);
_ListeningLightning.Add(listen);
listen.ContinueWith(_ =>
{
lock (_ListenedInvoiceByLightningUrl)
{
_ListeningLightning.Remove(listen);
}
}, TaskScheduler.Default);
}
_ListenedInvoiceByLightningUrl.Add(listenedInvoice.Uri, listenedInvoice);
_ListenedInvoiceByChargeInvoiceId.Add(listenedInvoice.PaymentMethodDetails.InvoiceId, listenedInvoice);
_InvoiceIds.Add(listenedInvoice.InvoiceId);
}
return true;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
leases.Dispose();
return Task.CompletedTask;
_Cts.Cancel();
Task[] listening = null;
lock (_ListenedInvoiceByLightningUrl)
{
listening = _ListeningLightning.ToArray();
}
await Task.WhenAll(listening);
}
}
}