diff --git a/BTCPayServer.Tests/UnitTest1.cs b/BTCPayServer.Tests/UnitTest1.cs index adc490c22..06cc3c311 100644 --- a/BTCPayServer.Tests/UnitTest1.cs +++ b/BTCPayServer.Tests/UnitTest1.cs @@ -569,7 +569,7 @@ namespace BTCPayServer.Tests bool paid = false; bool confirmed = false; bool completed = false; - while (!completed || !confirmed) + while (!completed || !confirmed || !receivedPayment) { var request = await callbackServer.GetNextRequest(); if (request.ContainsKey("event")) @@ -584,7 +584,9 @@ namespace BTCPayServer.Tests receivedPayment = true; break; case InvoiceEvent.PaidInFull: - Assert.True(receivedPayment); + // TODO, we should check that ReceivedPayment is sent after PaidInFull + // for now, we can't ensure this because the ReceivedPayment events isn't sent by the + // InvoiceWatcher, contrary to all other events tester.ExplorerNode.Generate(6); paid = true; break; diff --git a/BTCPayServer/EventAggregator.cs b/BTCPayServer/EventAggregator.cs index ad7641346..6816f42d1 100644 --- a/BTCPayServer/EventAggregator.cs +++ b/BTCPayServer/EventAggregator.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using BTCPayServer.Logging; using Microsoft.Extensions.Logging; @@ -11,7 +12,6 @@ namespace BTCPayServer public interface IEventAggregatorSubscription : IDisposable { void Unsubscribe(); - void Resubscribe(); } public class EventAggregator : IDisposable { @@ -50,11 +50,6 @@ namespace BTCPayServer } } - public void Resubscribe() - { - aggregator.Subscribe(t, this); - } - public void Unsubscribe() { Dispose(); @@ -149,10 +144,53 @@ namespace BTCPayServer { return Subscribe(new Action((sub, t) => subscription(sub, t))); } + class ChannelSubscription : IEventAggregatorSubscription + { + private Channel _evts; + private IEventAggregatorSubscription _innerSubscription; + private Func _act; + private Logs _logs; + public ChannelSubscription(Channel evts, IEventAggregatorSubscription innerSubscription, Func act, Logs logs) + { + _evts = evts; + _innerSubscription = innerSubscription; + _act = act; + _logs = logs; + _ = Listen(); + } + + private async Task Listen() + { + await foreach (var item in _evts.Reader.ReadAllAsync()) + { + try + { + await _act(item); + } + catch (Exception ex) + { + _logs.Events.LogError(ex, $"Error while calling event async handler"); + } + } + } + + public void Dispose() + { + Unsubscribe(); + } + + public void Unsubscribe() + { + _innerSubscription.Unsubscribe(); + _evts.Writer.TryComplete(); + } + } public IEventAggregatorSubscription SubscribeAsync(Func subscription) { - return Subscribe(new Action((sub, t) => _ = subscription(t))); + Channel evts = Channel.CreateUnbounded(); + var innerSubscription = Subscribe(new Action((sub, t) => evts.Writer.TryWrite(t))); + return new ChannelSubscription(evts, innerSubscription, subscription, Logs); } public IEventAggregatorSubscription Subscribe(Action subscription) { diff --git a/BTCPayServer/HostedServices/BitpayIPNSender.cs b/BTCPayServer/HostedServices/BitpayIPNSender.cs index 0850b860c..d1edf63db 100644 --- a/BTCPayServer/HostedServices/BitpayIPNSender.cs +++ b/BTCPayServer/HostedServices/BitpayIPNSender.cs @@ -270,20 +270,20 @@ namespace BTCPayServer.HostedServices e.Name == InvoiceEvent.ExpiredPaidPartial ) { - _ = Notify(invoice, e, false, sendMail); + await Notify(invoice, e, false, sendMail); sendMail = false; } } if (e.Name == InvoiceEvent.Confirmed) { - _ = Notify(invoice, e, false, sendMail); + await Notify(invoice, e, false, sendMail); sendMail = false; } if (invoice.ExtendedNotifications) { - _ = Notify(invoice, e, true, sendMail); + await Notify(invoice, e, true, sendMail); sendMail = false; } })); diff --git a/BTCPayServer/Payments/Bitcoin/NBXplorerListener.cs b/BTCPayServer/Payments/Bitcoin/NBXplorerListener.cs index 82bedd269..9e341d324 100644 --- a/BTCPayServer/Payments/Bitcoin/NBXplorerListener.cs +++ b/BTCPayServer/Payments/Bitcoin/NBXplorerListener.cs @@ -80,25 +80,25 @@ namespace BTCPayServer.Payments.Bitcoin { _RunningTask = new TaskCompletionSource(); _Cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - leases.Add(_Aggregator.SubscribeAsync(async nbxplorerEvent => + leases.Add(_Aggregator.Subscribe(nbxplorerEvent => { if (nbxplorerEvent.NewState == NBXplorerState.Ready) { var wallet = _Wallets.GetWallet(nbxplorerEvent.Network); if (_Wallets.IsAvailable(wallet.Network)) { - await Listen(wallet); + _ = Listen(wallet); } } })); - _ListenPoller = new Timer(async s => + _ListenPoller = new Timer(s => { foreach (var wallet in _Wallets.GetWallets()) { if (_Wallets.IsAvailable(wallet.Network)) { - await Listen(wallet); + _ = Listen(wallet); } } }, null, 0, (int)PollInterval.TotalMilliseconds);