Increase notitifcation timeout to 1 minute, make sure that BackgroundJobScheduler is correctly cancelling tasks

This commit is contained in:
nicolas.dorier
2019-04-05 14:58:25 +09:00
parent ea8196b532
commit c767a49f2d
4 changed files with 45 additions and 25 deletions

View File

@@ -61,10 +61,10 @@ namespace BTCPayServer.HostedServices
{ {
class BackgroundJob class BackgroundJob
{ {
public Func<Task> Action; public Func<CancellationToken, Task> Action;
public TimeSpan Delay; public TimeSpan Delay;
public IDelay DelayImplementation; public IDelay DelayImplementation;
public BackgroundJob(Func<Task> action, TimeSpan delay, IDelay delayImplementation) public BackgroundJob(Func<CancellationToken, Task> action, TimeSpan delay, IDelay delayImplementation)
{ {
this.Action = action; this.Action = action;
this.Delay = delay; this.Delay = delay;
@@ -74,7 +74,7 @@ namespace BTCPayServer.HostedServices
public async Task Run(CancellationToken cancellationToken) public async Task Run(CancellationToken cancellationToken)
{ {
await DelayImplementation.Wait(Delay, cancellationToken); await DelayImplementation.Wait(Delay, cancellationToken);
await Action(); await Action(cancellationToken);
} }
} }
@@ -89,9 +89,9 @@ namespace BTCPayServer.HostedServices
private Channel<BackgroundJob> _Jobs = Channel.CreateUnbounded<BackgroundJob>(); private Channel<BackgroundJob> _Jobs = Channel.CreateUnbounded<BackgroundJob>();
HashSet<Task> _Processing = new HashSet<Task>(); HashSet<Task> _Processing = new HashSet<Task>();
public void Schedule(Func<Task> action, TimeSpan delay) public void Schedule(Func<CancellationToken, Task> act, TimeSpan scheduledIn)
{ {
_Jobs.Writer.TryWrite(new BackgroundJob(action, delay, Delay)); _Jobs.Writer.TryWrite(new BackgroundJob(act, scheduledIn, Delay));
} }
public async Task WaitAllRunning(CancellationToken cancellationToken) public async Task WaitAllRunning(CancellationToken cancellationToken)
@@ -124,8 +124,7 @@ namespace BTCPayServer.HostedServices
{ {
_Processing.Add(processing); _Processing.Add(processing);
} }
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed _ = processing.ContinueWith(t =>
processing.ContinueWith(t =>
{ {
if (t.IsFaulted) if (t.IsFaulted)
{ {
@@ -136,7 +135,6 @@ namespace BTCPayServer.HostedServices
_Processing.Remove(processing); _Processing.Remove(processing);
} }
}, default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); }, default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
} }
} }
} }

View File

@@ -137,23 +137,29 @@ namespace BTCPayServer.HostedServices
return; return;
var invoiceStr = NBitcoin.JsonConverters.Serializer.ToString(new ScheduledJob() { TryCount = 0, Notification = notification }); var invoiceStr = NBitcoin.JsonConverters.Serializer.ToString(new ScheduledJob() { TryCount = 0, Notification = notification });
if (!string.IsNullOrEmpty(invoice.NotificationURL)) if (!string.IsNullOrEmpty(invoice.NotificationURL))
_JobClient.Schedule(() => NotifyHttp(invoiceStr), TimeSpan.Zero); _JobClient.Schedule((cancellation) => NotifyHttp(invoiceStr, cancellation), TimeSpan.Zero);
} }
public async Task NotifyHttp(string invoiceData) public async Task NotifyHttp(string invoiceData, CancellationToken cancellationToken)
{ {
var job = NBitcoin.JsonConverters.Serializer.ToObject<ScheduledJob>(invoiceData); var job = NBitcoin.JsonConverters.Serializer.ToObject<ScheduledJob>(invoiceData);
bool reschedule = false; bool reschedule = false;
var aggregatorEvent = new InvoiceIPNEvent(job.Notification.Data.Id, job.Notification.Event.Code, job.Notification.Event.Name); var aggregatorEvent = new InvoiceIPNEvent(job.Notification.Data.Id, job.Notification.Event.Code, job.Notification.Event.Name);
CancellationTokenSource cts = new CancellationTokenSource(10000);
try try
{ {
HttpResponseMessage response = await SendNotification(job.Notification, cts.Token); HttpResponseMessage response = await SendNotification(job.Notification, cancellationToken);
reschedule = !response.IsSuccessStatusCode; reschedule = !response.IsSuccessStatusCode;
aggregatorEvent.Error = reschedule ? $"Unexpected return code: {(int)response.StatusCode}" : null; aggregatorEvent.Error = reschedule ? $"Unexpected return code: {(int)response.StatusCode}" : null;
_EventAggregator.Publish<InvoiceIPNEvent>(aggregatorEvent); _EventAggregator.Publish<InvoiceIPNEvent>(aggregatorEvent);
} }
catch (OperationCanceledException) when (cts.IsCancellationRequested) catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// When the JobClient will be persistent, this will reschedule the job for after reboot
invoiceData = NBitcoin.JsonConverters.Serializer.ToString(job);
_JobClient.Schedule((cancellation) => NotifyHttp(invoiceData, cancellation), TimeSpan.FromMinutes(10.0));
return;
}
catch (OperationCanceledException)
{ {
aggregatorEvent.Error = "Timeout"; aggregatorEvent.Error = "Timeout";
_EventAggregator.Publish<InvoiceIPNEvent>(aggregatorEvent); _EventAggregator.Publish<InvoiceIPNEvent>(aggregatorEvent);
@@ -174,14 +180,13 @@ namespace BTCPayServer.HostedServices
aggregatorEvent.Error = $"Unexpected error: {message}"; aggregatorEvent.Error = $"Unexpected error: {message}";
_EventAggregator.Publish<InvoiceIPNEvent>(aggregatorEvent); _EventAggregator.Publish<InvoiceIPNEvent>(aggregatorEvent);
} }
finally { cts?.Dispose(); }
job.TryCount++; job.TryCount++;
if (job.TryCount < MaxTry && reschedule) if (job.TryCount < MaxTry && reschedule)
{ {
invoiceData = NBitcoin.JsonConverters.Serializer.ToString(job); invoiceData = NBitcoin.JsonConverters.Serializer.ToString(job);
_JobClient.Schedule(() => NotifyHttp(invoiceData), TimeSpan.FromMinutes(10.0)); _JobClient.Schedule((cancellation) => NotifyHttp(invoiceData, cancellation), TimeSpan.FromMinutes(10.0));
} }
} }
@@ -205,7 +210,7 @@ namespace BTCPayServer.HostedServices
} }
Encoding UTF8 = new UTF8Encoding(false); Encoding UTF8 = new UTF8Encoding(false);
private async Task<HttpResponseMessage> SendNotification(InvoicePaymentNotificationEventWrapper notification, CancellationToken cancellation) private async Task<HttpResponseMessage> SendNotification(InvoicePaymentNotificationEventWrapper notification, CancellationToken cancellationToken)
{ {
var request = new HttpRequestMessage(); var request = new HttpRequestMessage();
request.Method = HttpMethod.Post; request.Method = HttpMethod.Post;
@@ -226,7 +231,14 @@ namespace BTCPayServer.HostedServices
request.RequestUri = new Uri(notification.NotificationURL, UriKind.Absolute); request.RequestUri = new Uri(notification.NotificationURL, UriKind.Absolute);
request.Content = new StringContent(notificationString, UTF8, "application/json"); request.Content = new StringContent(notificationString, UTF8, "application/json");
var response = await Enqueue(notification.Data.Id, async () => await _Client.SendAsync(request, cancellation)); var response = await Enqueue(notification.Data.Id, async () =>
{
using (CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{
cts.CancelAfter(TimeSpan.FromMinutes(1.0));
return await _Client.SendAsync(request, cts.Token);
}
});
return response; return response;
} }

View File

@@ -1,12 +1,13 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace BTCPayServer.Services namespace BTCPayServer.Services
{ {
public interface IBackgroundJobClient public interface IBackgroundJobClient
{ {
void Schedule(Func<Task> act, TimeSpan zero); void Schedule(Func<CancellationToken, Task> act, TimeSpan scheduledIn);
} }
} }

View File

@@ -1,4 +1,5 @@
using BTCPayServer.Logging; using BTCPayServer.Logging;
using NBitcoin;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
using System.Net.Mail; using System.Net.Mail;
@@ -17,7 +18,7 @@ namespace BTCPayServer.Services.Mails
public void SendEmail(string email, string subject, string message) public void SendEmail(string email, string subject, string message)
{ {
_JobClient.Schedule(async () => _JobClient.Schedule(async (cancellationToken) =>
{ {
var emailSettings = await GetEmailSettings(); var emailSettings = await GetEmailSettings();
if (emailSettings?.IsComplete() != true) if (emailSettings?.IsComplete() != true)
@@ -25,13 +26,21 @@ namespace BTCPayServer.Services.Mails
Logs.Configuration.LogWarning("Should have sent email, but email settings are not configured"); Logs.Configuration.LogWarning("Should have sent email, but email settings are not configured");
return; return;
} }
var smtp = emailSettings.CreateSmtpClient(); using (var smtp = emailSettings.CreateSmtpClient())
var mail = new MailMessage(emailSettings.From, email, subject, message)
{ {
IsBodyHtml = true var mail = new MailMessage(emailSettings.From, email, subject, message)
}; {
await smtp.SendMailAsync(mail); IsBodyHtml = true
};
try
{
await smtp.SendMailAsync(mail).WithCancellation(cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
smtp.SendAsyncCancel();
}
}
}, TimeSpan.Zero); }, TimeSpan.Zero);
} }