From 5d9827fb6032a2716eb87ecdc3ce62d53c74e055 Mon Sep 17 00:00:00 2001 From: Igor Rylko Date: Tue, 27 Oct 2020 21:09:53 +0200 Subject: [PATCH] Add thread limit for updating payment states for payment invoices in NBXplorerListener --- .../Payments/Bitcoin/NBXplorerListener.cs | 30 ++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/BTCPayServer/Payments/Bitcoin/NBXplorerListener.cs b/BTCPayServer/Payments/Bitcoin/NBXplorerListener.cs index c192683e0..90754858d 100644 --- a/BTCPayServer/Payments/Bitcoin/NBXplorerListener.cs +++ b/BTCPayServer/Payments/Bitcoin/NBXplorerListener.cs @@ -4,6 +4,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using BTCPayServer; using BTCPayServer.Events; using BTCPayServer.HostedServices; @@ -26,6 +27,8 @@ namespace BTCPayServer.Payments.Bitcoin /// public class NBXplorerListener : IHostedService { + private const int UpdatePaymentStatesMaxDegreeOfParallelism = 32; + readonly EventAggregator _Aggregator; private readonly PayJoinRepository _payJoinRepository; readonly ExplorerClientProvider _ExplorerClients; @@ -138,9 +141,7 @@ namespace BTCPayServer.Payments.Bitcoin switch (newEvent) { case NBXplorer.Models.NewBlockEvent evt: - await Task.WhenAll((await _InvoiceRepository.GetPendingInvoices()) - .Select(invoiceId => UpdatePaymentStates(wallet, invoiceId)) - .ToArray()); + await UpdatePaymentStatesForPendingInvoices(wallet); _Aggregator.Publish(new Events.NewBlockEvent() { CryptoCode = evt.CryptoCode }); break; case NBXplorer.Models.NewTransactionEvent evt: @@ -206,6 +207,27 @@ namespace BTCPayServer.Payments.Bitcoin } } + private async Task UpdatePaymentStatesForPendingInvoices(BTCPayWallet wallet) + { + var pendingInvoices = await _InvoiceRepository.GetPendingInvoices(); + var actionBlock = new ActionBlock(invoiceId => UpdatePaymentStates(wallet, invoiceId), + new ExecutionDataflowBlockOptions + { + CancellationToken = _Cts.Token, + EnsureOrdered = false, + SingleProducerConstrained = true, + MaxDegreeOfParallelism = UpdatePaymentStatesMaxDegreeOfParallelism + }); + + foreach (var invoiceId in pendingInvoices) + { + await actionBlock.SendAsync(invoiceId); + } + + actionBlock.Complete(); + await actionBlock.Completion; + } + async Task UpdatePaymentStates(BTCPayWallet wallet, string invoiceId) { var invoice = await _InvoiceRepository.GetInvoice(invoiceId, false); @@ -278,7 +300,7 @@ namespace BTCPayServer.Payments.Bitcoin pj.CoinjoinTransactionHash == tx.TransactionHash) { // This payment is a coinjoin, so the value of - // the payment output is different from the real value of the payment + // the payment output is different from the real value of the payment paymentData.Value = pj.CoinjoinValue; payment.SetCryptoPaymentData(paymentData); }