Use the MultiProcessingQueue in IPNSender

This commit is contained in:
nicolas.dorier
2022-01-11 14:48:45 +09:00
parent 6999abe1ca
commit ba101015f6

View File

@@ -39,6 +39,7 @@ namespace BTCPayServer.HostedServices
}
}
MultiProcessingQueue _Queue = new MultiProcessingQueue();
readonly IBackgroundJobClient _JobClient;
readonly EventAggregator _EventAggregator;
readonly InvoiceRepository _InvoiceRepository;
@@ -140,14 +141,12 @@ namespace BTCPayServer.HostedServices
if (invoice.NotificationURL != null)
{
var invoiceStr = NBitcoin.JsonConverters.Serializer.ToString(new ScheduledJob() { TryCount = 0, Notification = notification });
_JobClient.Schedule((cancellation) => NotifyHttp(invoiceStr, cancellation), TimeSpan.Zero);
_Queue.Enqueue(invoice.Id, (cancellationToken) => NotifyHttp(new ScheduledJob() { TryCount = 0, Notification = notification }, cancellationToken));
}
}
public async Task NotifyHttp(string invoiceData, CancellationToken cancellationToken)
public async Task NotifyHttp(ScheduledJob job, CancellationToken cancellationToken)
{
var job = NBitcoin.JsonConverters.Serializer.ToObject<ScheduledJob>(invoiceData);
bool reschedule = false;
var aggregatorEvent = new InvoiceIPNEvent(job.Notification.Data.Id, job.Notification.Event.Code, job.Notification.Event.Name);
try
@@ -159,9 +158,7 @@ namespace BTCPayServer.HostedServices
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// When the JobClient will be persistent, this will reschedule the job for after reboot
invoiceData = NBitcoin.JsonConverters.Serializer.ToString(job);
_JobClient.Schedule((cancellation) => NotifyHttp(invoiceData, cancellation), TimeSpan.FromMinutes(10.0));
_JobClient.Schedule((cancellation) => NotifyHttp(job, cancellation), TimeSpan.FromMinutes(10.0));
return;
}
catch (OperationCanceledException)
@@ -190,8 +187,7 @@ namespace BTCPayServer.HostedServices
if (job.TryCount < MaxTry && reschedule)
{
invoiceData = NBitcoin.JsonConverters.Serializer.ToString(job);
_JobClient.Schedule((cancellation) => NotifyHttp(invoiceData, cancellation), TimeSpan.FromMinutes(10.0));
_JobClient.Schedule((cancellation) => NotifyHttp(job, cancellation), TimeSpan.FromMinutes(10.0));
}
}
@@ -307,6 +303,7 @@ namespace BTCPayServer.HostedServices
readonly int MaxTry = 6;
readonly CompositeDisposable leases = new CompositeDisposable();
public Task StartAsync(CancellationToken cancellationToken)
{
leases.Add(_EventAggregator.SubscribeAsync<InvoiceEvent>(async e =>
@@ -347,10 +344,10 @@ namespace BTCPayServer.HostedServices
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
public async Task StopAsync(CancellationToken cancellationToken)
{
leases.Dispose();
return Task.CompletedTask;
await _Queue.Abort(cancellationToken);
}
}
}