using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using BTCPayServer.Events; using BTCPayServer.Logging; using Microsoft.Extensions.Logging; namespace BTCPayServer { public interface IEventAggregatorSubscription : IDisposable { void Unsubscribe(); } public class EventAggregator : IDisposable { public EventAggregator(Logs logs) { Logs = logs; } class Subscription : IEventAggregatorSubscription { private readonly EventAggregator aggregator; readonly Type t; public Subscription(EventAggregator aggregator, Type t) { this.aggregator = aggregator; this.t = t; } public Action Act { get; set; } public bool Any { get; set; } bool _Disposed; public void Dispose() { if (_Disposed) return; _Disposed = true; var dict = Any ? aggregator._SubscriptionsAny : aggregator._Subscriptions; lock (dict) { if (dict.TryGetValue(t, out Dictionary> actions)) { if (actions.Remove(this)) { if (actions.Count == 0) dict.Remove(t); } } } } public void Unsubscribe() { Dispose(); } } public Task WaitNext(CancellationToken cancellation = default(CancellationToken)) { return WaitNext(o => true, cancellation); } public async Task WaitNext(Func predicate, CancellationToken cancellation = default(CancellationToken)) { TaskCompletionSource tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); using var subscription = Subscribe((a, b) => { if (predicate(b)) { tcs.TrySetResult(b); a.Unsubscribe(); } }); await using var reg = cancellation.Register(() => tcs.TrySetCanceled(cancellation)); return await tcs.Task.ConfigureAwait(false); } public void Publish(T evt) where T : class { Publish(evt, typeof(T)); } public void Publish(object evt, Type evtType) { ArgumentNullException.ThrowIfNull(evt); List> actionList = new List>(); lock (_Subscriptions) { if (_Subscriptions.TryGetValue(evtType, out Dictionary> actions)) { actionList = actions.Values.ToList(); } } lock (_SubscriptionsAny) { foreach (var kv in _SubscriptionsAny) { if (kv.Key.IsAssignableFrom(evtType)) actionList.AddRange(kv.Value.Values); } } if (Logs.Events.IsEnabled(LogLevel.Information)) Logs.Events.LogInformation("{0}", string.IsNullOrEmpty(evt?.ToString()) ? evtType.Name : evt.ToString()); foreach (var sub in actionList) { try { sub(evt); } catch (Exception ex) { Logs.Events.LogError(ex, $"Error while calling event handler"); } } } /// /// Subscribe to any event of exactly type T /// /// /// /// public IEventAggregatorSubscription Subscribe(Action subscription) { var eventType = typeof(T); var s = new Subscription(this, eventType); s.Act = (o) => subscription(s, (T)o); return Subscribe(eventType, s); } /// /// Subscribe to any event of type T or any of its derived type /// /// /// /// public IEventAggregatorSubscription SubscribeAny(Action subscription) { var eventType = typeof(T); var s = new Subscription(this, eventType) { Any = true }; s.Act = (o) => subscription(s, (T)o); return Subscribe(eventType, s); } public IEventAggregatorSubscription Subscribe(Type eventType, Action subscription) { var s = new Subscription(this, eventType); s.Act = (o) => subscription(s, o); return Subscribe(eventType, s); } private IEventAggregatorSubscription Subscribe(Type eventType, Subscription subscription) { var subscriptions = subscription.Any ? _SubscriptionsAny : _Subscriptions; lock (subscriptions) { if (!subscriptions.TryGetValue(eventType, out Dictionary> actions)) { actions = new Dictionary>(); subscriptions.Add(eventType, actions); } actions.Add(subscription, subscription.Act); } return subscription; } readonly Dictionary>> _Subscriptions = new Dictionary>>(); readonly Dictionary>> _SubscriptionsAny = new Dictionary>>(); public Logs Logs { get; } public IEventAggregatorSubscription Subscribe(Func subscription) { return Subscribe(new Action((t) => subscription(t))); } public IEventAggregatorSubscription Subscribe(Func subscription) { return Subscribe(new Action((sub, t) => subscription(sub, t))); } class ChannelSubscription : IEventAggregatorSubscription { private Channel _evts; private IEventAggregatorSubscription _innerSubscription; private Func _act; private Logs _logs; public ChannelSubscription(Channel evts, IEventAggregatorSubscription innerSubscription, Func act, Logs logs) { _evts = evts; _innerSubscription = innerSubscription; _act = act; _logs = logs; _ = Listen(); } private async Task Listen() { await foreach (var item in _evts.Reader.ReadAllAsync()) { try { await _act(item); } catch (Exception ex) { _logs.Events.LogError(ex, $"Error while calling event async handler"); } } } public void Dispose() { Unsubscribe(); } public void Unsubscribe() { _innerSubscription.Unsubscribe(); _evts.Writer.TryComplete(); } } public IEventAggregatorSubscription SubscribeAsync(Func subscription) { Channel evts = Channel.CreateUnbounded(); var innerSubscription = Subscribe(new Action((sub, t) => evts.Writer.TryWrite(t))); return new ChannelSubscription(evts, innerSubscription, subscription, Logs); } public IEventAggregatorSubscription Subscribe(Action subscription) { return Subscribe(new Action((sub, t) => subscription(t))); } public IEventAggregatorSubscription SubscribeAny(Action subscription) { return SubscribeAny(new Action((sub, t) => subscription(t))); } public void Dispose() { lock (_Subscriptions) { _Subscriptions.Clear(); } lock (_SubscriptionsAny) { _SubscriptionsAny.Clear(); } } } }