using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using BTCPayServer.Logging; using Microsoft.Extensions.Logging; namespace BTCPayServer { public interface IEventAggregatorSubscription : IDisposable { void Unsubscribe(); void Resubscribe(); } public class EventAggregator : IDisposable { class Subscription : IEventAggregatorSubscription { private readonly EventAggregator aggregator; readonly Type t; public Subscription(EventAggregator aggregator, Type t) { this.aggregator = aggregator; this.t = t; } public Action Act { get; set; } bool _Disposed; public void Dispose() { if (_Disposed) return; _Disposed = true; lock (this.aggregator._Subscriptions) { if (this.aggregator._Subscriptions.TryGetValue(t, out Dictionary> actions)) { if (actions.Remove(this)) { if (actions.Count == 0) this.aggregator._Subscriptions.Remove(t); } } } } public void Resubscribe() { aggregator.Subscribe(t, this); } public void Unsubscribe() { Dispose(); } } public Task WaitNext(CancellationToken cancellation = default(CancellationToken)) { return WaitNext(o => true, cancellation); } public async Task WaitNext(Func predicate, CancellationToken cancellation = default(CancellationToken)) { TaskCompletionSource tcs = new TaskCompletionSource(); using var subscription = Subscribe((a, b) => { if (predicate(b)) { tcs.TrySetResult(b); a.Unsubscribe(); } }); using (cancellation.Register(() => { tcs.TrySetCanceled(); })) { return await tcs.Task.ConfigureAwait(false); } } public void Publish(T evt) where T : class { Publish(evt, typeof(T)); } public void Publish(object evt, Type evtType) { if (evt == null) throw new ArgumentNullException(nameof(evt)); List> actionList = new List>(); lock (_Subscriptions) { if (_Subscriptions.TryGetValue(evtType, out Dictionary> actions)) { actionList = actions.Values.ToList(); } } var log = evt.ToString(); if (!String.IsNullOrEmpty(log)) Logs.Events.LogInformation(log); foreach (var sub in actionList) { try { sub(evt); } catch (Exception ex) { Logs.Events.LogError(ex, $"Error while calling event handler"); } } } public IEventAggregatorSubscription Subscribe(Action subscription) { var eventType = typeof(T); var s = new Subscription(this, eventType); s.Act = (o) => subscription(s, (T)o); return Subscribe(eventType, s); } public IEventAggregatorSubscription Subscribe(Type eventType, Action subscription) { var s = new Subscription(this, eventType); s.Act = (o) => subscription(s, o); return Subscribe(eventType, s); } private IEventAggregatorSubscription Subscribe(Type eventType, Subscription subscription) { lock (_Subscriptions) { if (!_Subscriptions.TryGetValue(eventType, out Dictionary> actions)) { actions = new Dictionary>(); _Subscriptions.Add(eventType, actions); } actions.Add(subscription, subscription.Act); } return subscription; } readonly Dictionary>> _Subscriptions = new Dictionary>>(); public IEventAggregatorSubscription Subscribe(Func subscription) { return Subscribe(new Action((t) => subscription(t))); } public IEventAggregatorSubscription Subscribe(Func subscription) { return Subscribe(new Action((sub, t) => subscription(sub, t))); } public IEventAggregatorSubscription SubscribeAsync(Func subscription) { return Subscribe(new Action((sub, t) => _ = subscription(t))); } public IEventAggregatorSubscription Subscribe(Action subscription) { return Subscribe(new Action((sub, t) => subscription(t))); } public void Dispose() { lock (_Subscriptions) { _Subscriptions.Clear(); } } } }