Refactor: Add GetMonitoredInvoices to fetch pending invoices or those with pending payments (#6235)

This commit is contained in:
Nicolas Dorier
2024-09-20 18:54:36 +09:00
committed by GitHub
parent ba2301ebfe
commit f5e5174045
11 changed files with 162 additions and 77 deletions

View File

@@ -21,5 +21,6 @@
<None Remove="DBScripts\001.InvoiceFunctions.sql" />
<None Remove="DBScripts\002.RefactorPayouts.sql" />
<None Remove="DBScripts\003.RefactorPendingInvoicesPayments.sql" />
<None Remove="DBScripts\004.MonitoredInvoices.sql" />
</ItemGroup>
</Project>

View File

@@ -5,5 +5,5 @@ $$ LANGUAGE sql IMMUTABLE;
CREATE INDEX "IX_Invoices_Pending" ON "Invoices"((1)) WHERE is_pending("Status");
CREATE INDEX "IX_Payments_Pending" ON "Payments"((1)) WHERE is_pending("Status");
DROP TABLE "PendingInvoices";
ANALYZE "Invoices";

View File

@@ -0,0 +1,22 @@
CREATE OR REPLACE FUNCTION get_prompt(invoice_blob JSONB, payment_method_id TEXT)
RETURNS JSONB AS $$
SELECT invoice_blob->'prompts'->payment_method_id
$$ LANGUAGE sql IMMUTABLE;
CREATE OR REPLACE FUNCTION get_monitored_invoices(payment_method_id TEXT)
RETURNS TABLE (invoice_id TEXT, payment_id TEXT) AS $$
WITH cte AS (
-- Get all the invoices which are pending. Even if no payments.
SELECT i."Id" invoice_id, p."Id" payment_id FROM "Invoices" i LEFT JOIN "Payments" p ON i."Id" = p."InvoiceDataId"
WHERE is_pending(i."Status")
UNION ALL
-- For invoices not pending, take all of those which have pending payments
SELECT i."Id", p."Id" FROM "Invoices" i INNER JOIN "Payments" p ON i."Id" = p."InvoiceDataId"
WHERE is_pending(p."Status") AND NOT is_pending(i."Status"))
SELECT cte.* FROM cte
LEFT JOIN "Payments" p ON cte.payment_id=p."Id"
LEFT JOIN "Invoices" i ON cte.invoice_id=i."Id"
WHERE (p."Type" IS NOT NULL AND p."Type" = payment_method_id) OR
(p."Type" IS NULL AND get_prompt(i."Blob2", payment_method_id) IS NOT NULL AND (get_prompt(i."Blob2", payment_method_id)->'activated')::BOOLEAN IS NOT FALSE);
$$ LANGUAGE SQL STABLE;

View File

@@ -0,0 +1,15 @@
using BTCPayServer.Data;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace BTCPayServer.Migrations
{
[DbContext(typeof(ApplicationDbContext))]
[Migration("20240919034505_monitoredinvoices")]
[DBScript("004.MonitoredInvoices.sql")]
public partial class monitoredinvoices : DBScriptsMigration
{
}
}

View File

@@ -3157,6 +3157,12 @@ namespace BTCPayServer.Tests
var invoiceId = GetInvoiceId(resp);
await acc.PayOnChain(invoiceId);
// Quick unrelated test on GetMonitoredInvoices
var invoiceRepo = tester.PayTester.GetService<InvoiceRepository>();
var monitored = Assert.Single(await invoiceRepo.GetMonitoredInvoices(PaymentMethodId.Parse("BTC-CHAIN")), i => i.Id == invoiceId);
Assert.Single(monitored.Payments);
//
app = await client.CreatePointOfSaleApp(acc.StoreId, new PointOfSaleAppRequest
{
AppName = "Cart",

View File

@@ -14,6 +14,7 @@ using BTCPayServer.Payments.Bitcoin;
using BTCPayServer.Services.Invoices;
using BTCPayServer.Services.Notifications;
using BTCPayServer.Services.Notifications.Blobs;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using NBitcoin;
@@ -255,10 +256,19 @@ namespace BTCPayServer.HostedServices
private async Task WaitPendingInvoices()
{
await Task.WhenAll((await _invoiceRepository.GetPendingInvoices())
await Task.WhenAll((await GetPendingInvoices(_Cts.Token))
.Select(i => Wait(i)).ToArray());
}
private async Task<InvoiceEntity[]> GetPendingInvoices(CancellationToken cancellationToken)
{
using var ctx = _invoiceRepository.DbContextFactory.CreateContext();
var rows = await ctx.Invoices.Where(i => Data.InvoiceData.IsPending(i.Status))
.Select(o => o).ToArrayAsync(cancellationToken);
var invoices = rows.Select(_invoiceRepository.ToEntity).ToArray();
return invoices;
}
async Task StartLoop(CancellationToken cancellation)
{
Logs.PayServer.LogInformation("Start watching invoices");

View File

@@ -235,7 +235,7 @@ namespace BTCPayServer.Payments.Bitcoin
async Task UpdatePaymentStates(BTCPayWallet wallet)
{
var invoices = await _InvoiceRepository.GetInvoicesWithPendingPayments(PaymentTypes.CHAIN.GetPaymentMethodId(wallet.Network.CryptoCode));
var invoices = await _InvoiceRepository.GetMonitoredInvoices(PaymentTypes.CHAIN.GetPaymentMethodId(wallet.Network.CryptoCode));
await Task.WhenAll(invoices.Select(i => UpdatePaymentStates(wallet, i)).ToArray());
}
async Task<InvoiceEntity> UpdatePaymentStates(BTCPayWallet wallet, string invoiceId, bool fireEvents = true)
@@ -384,7 +384,7 @@ namespace BTCPayServer.Payments.Bitcoin
{
var handler = _handlers.GetBitcoinHandler(wallet.Network);
int totalPayment = 0;
var invoices = await _InvoiceRepository.GetInvoicesWithPendingPayments(PaymentTypes.CHAIN.GetPaymentMethodId(network.CryptoCode), true);
var invoices = await _InvoiceRepository.GetMonitoredInvoices(PaymentTypes.CHAIN.GetPaymentMethodId(network.CryptoCode));
var coinsPerDerivationStrategy =
new Dictionary<DerivationStrategyBase, ReceivedCoin[]>();
foreach (var i in invoices)

View File

@@ -18,6 +18,7 @@ using BTCPayServer.Payments.Bitcoin;
using BTCPayServer.Services;
using BTCPayServer.Services.Invoices;
using BTCPayServer.Services.Stores;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
@@ -71,66 +72,69 @@ namespace BTCPayServer.Payments.Lightning
bool needCheckOfflinePayments = true;
async Task CheckingInvoice(CancellationToken cancellation)
{
retry:
try
var pmis = _handlers.Where(h => h is LightningLikePaymentHandler).Select(handler => handler.PaymentMethodId).ToArray();
foreach (var pmi in pmis)
{
Logs.PayServer.LogInformation("Checking if any payment arrived on lightning while the server was offline...");
foreach (var invoice in await _InvoiceRepository.GetPendingInvoices(cancellationToken: cancellation))
retry:
try
{
if (GetListenedInvoices(invoice).Count > 0)
Logs.PayServer.LogInformation("Checking if any payment arrived on lightning while the server was offline...");
foreach (var invoice in await _InvoiceRepository.GetMonitoredInvoices(pmi, cancellation))
{
_CheckInvoices.Writer.TryWrite(invoice.Id);
_memoryCache.Set(GetCacheKey(invoice.Id), invoice, GetExpiration(invoice));
}
}
needCheckOfflinePayments = false;
Logs.PayServer.LogInformation("Processing lightning payments...");
while (await _CheckInvoices.Reader.WaitToReadAsync(cancellation) &&
_CheckInvoices.Reader.TryRead(out var invoiceId))
{
var invoice = await GetInvoice(invoiceId);
foreach (var listenedInvoice in GetListenedInvoices(invoice))
{
var store = await GetStore(invoice.StoreId);
var lnConfig = _handlers.GetLightningConfig(store, listenedInvoice.Network);
if (lnConfig is null)
continue;
var connStr = GetLightningUrl(listenedInvoice.Network.CryptoCode, lnConfig);
if (connStr is null)
continue;
var instanceListenerKey = (listenedInvoice.Network.CryptoCode, connStr.ToString());
lock (_InstanceListeners)
if (GetListenedInvoices(invoice).Count > 0)
{
if (!_InstanceListeners.TryGetValue(instanceListenerKey, out var instanceListener))
{
instanceListener ??= new LightningInstanceListener(_InvoiceRepository, _Aggregator, lightningClientFactory, listenedInvoice.Network, _handlers, connStr, _paymentService, Logs);
_InstanceListeners.TryAdd(instanceListenerKey, instanceListener);
}
instanceListener.AddListenedInvoice(listenedInvoice);
_ = instanceListener.PollPayment(listenedInvoice, cancellation);
_CheckInvoices.Writer.TryWrite(invoice.Id);
_memoryCache.Set(GetCacheKey(invoice.Id), invoice, GetExpiration(invoice));
}
}
needCheckOfflinePayments = false;
Logs.PayServer.LogInformation("Processing lightning payments...");
if (_CheckInvoices.Reader.Count is 0)
this.CheckConnections();
while (await _CheckInvoices.Reader.WaitToReadAsync(cancellation) &&
_CheckInvoices.Reader.TryRead(out var invoiceId))
{
var invoice = await GetInvoice(invoiceId);
foreach (var listenedInvoice in GetListenedInvoices(invoice))
{
var store = await GetStore(invoice.StoreId);
var lnConfig = _handlers.GetLightningConfig(store, listenedInvoice.Network);
if (lnConfig is null)
continue;
var connStr = GetLightningUrl(listenedInvoice.Network.CryptoCode, lnConfig);
if (connStr is null)
continue;
var instanceListenerKey = (listenedInvoice.Network.CryptoCode, connStr.ToString());
lock (_InstanceListeners)
{
if (!_InstanceListeners.TryGetValue(instanceListenerKey, out var instanceListener))
{
instanceListener ??= new LightningInstanceListener(_InvoiceRepository, _Aggregator, lightningClientFactory, listenedInvoice.Network, _handlers, connStr, _paymentService, Logs);
_InstanceListeners.TryAdd(instanceListenerKey, instanceListener);
}
instanceListener.AddListenedInvoice(listenedInvoice);
_ = instanceListener.PollPayment(listenedInvoice, cancellation);
}
}
if (_CheckInvoices.Reader.Count is 0)
this.CheckConnections();
}
}
catch when (cancellation.IsCancellationRequested)
{
}
catch (Exception ex)
{
await Task.Delay(1000, cancellation);
Logs.PayServer.LogWarning(ex, "Unhandled error in the LightningListener");
goto retry;
}
}
catch when (cancellation.IsCancellationRequested)
{
}
catch (Exception ex)
{
await Task.Delay(1000, cancellation);
Logs.PayServer.LogWarning(ex, "Unhandled error in the LightningListener");
goto retry;
}
}
private string GetCacheKey(string invoiceId)
{
return $"{nameof(GetListenedInvoices)}-{invoiceId}";

View File

@@ -384,7 +384,7 @@ namespace BTCPayServer.Services.Altcoins.Monero.Services
private async Task UpdateAnyPendingMoneroLikePayment(string cryptoCode)
{
var paymentMethodId = PaymentTypes.CHAIN.GetPaymentMethodId(cryptoCode);
var invoices = await _invoiceRepository.GetInvoicesWithPendingPayments(paymentMethodId);
var invoices = await _invoiceRepository.GetMonitoredInvoices(paymentMethodId);
if (!invoices.Any())
return;
invoices = invoices.Where(entity => entity.GetPaymentPrompt(paymentMethodId)?.Activated is true).ToArray();

View File

@@ -374,7 +374,7 @@ namespace BTCPayServer.Services.Altcoins.Zcash.Services
private async Task UpdateAnyPendingZcashLikePayment(string cryptoCode)
{
var paymentMethodId = PaymentTypes.CHAIN.GetPaymentMethodId(cryptoCode);
var invoices = await _invoiceRepository.GetInvoicesWithPendingPayments(paymentMethodId);
var invoices = await _invoiceRepository.GetMonitoredInvoices(paymentMethodId);
if (!invoices.Any())
return;
invoices = invoices.Where(entity => entity.GetPaymentPrompt(paymentMethodId).Activated).ToArray();

View File

@@ -34,7 +34,7 @@ namespace BTCPayServer.Services.Invoices
private readonly ApplicationDbContextFactory _applicationDbContextFactory;
private readonly EventAggregator _eventAggregator;
public ApplicationDbContextFactory DbContextFactory => _applicationDbContextFactory;
public InvoiceRepository(ApplicationDbContextFactory contextFactory,
EventAggregator eventAggregator)
{
@@ -78,32 +78,59 @@ namespace BTCPayServer.Services.Invoices
return row is null ? null : ToEntity(row);
}
public async Task<InvoiceEntity[]> GetInvoicesWithPendingPayments(PaymentMethodId paymentMethodId, bool includeAddresses = false)
/// <summary>
/// Returns all invoices which either:
/// * Have the <paramref name="paymentMethodId"/> activated and are pending
/// * Aren't pending but have a payment from the <paramref name="paymentMethodId"/> that is pending
/// <see cref="InvoiceData.AddressInvoices"/> is filled with the monitored addresses of the <paramref name="paymentMethodId"/> for this invoice.
/// <see cref="InvoiceData.Payments"/> include the <paramref name="paymentMethodId"/> payments for this invoice.
/// </summary>
/// <param name="paymentMethodId">The payment method id</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns></returns>
public async Task<InvoiceEntity[]> GetMonitoredInvoices(PaymentMethodId paymentMethodId, CancellationToken cancellationToken = default)
{
var pmi = paymentMethodId.ToString();
using var ctx = _applicationDbContextFactory.CreateContext();
var invoiceIds = (await ctx.Payments.Where(p => PaymentData.IsPending(p.Status) && p.Type == pmi).Select(p => p.InvoiceDataId).ToArrayAsync()).Distinct().ToArray();
if (invoiceIds.Length is 0)
var conn = ctx.Database.GetDbConnection();
var rows = await conn.QueryAsync<(string Id, uint xmin, string[] addresses, string[] payments, string invoice)>(new("""
SELECT
i."Id",
i.xmin,
array_agg(ai."Address") addresses,
COALESCE(array_agg(to_jsonb(p)) FILTER (WHERE p."Id" IS NOT NULL), '{}') as payments,
(array_agg(to_jsonb(i)))[1] as invoice
FROM get_monitored_invoices(@pmi) m
LEFT JOIN "Payments" p ON p."Id" = m.payment_id
LEFT JOIN "Invoices" i ON i."Id" = m.invoice_id
LEFT JOIN "AddressInvoices" ai ON i."Id" = ai."InvoiceDataId"
WHERE ai."PaymentMethodId" = @pmi
GROUP BY i."Id";
"""
, new { pmi = paymentMethodId.ToString() }));
if (Enumerable.TryGetNonEnumeratedCount(rows, out var c) && c == 0)
return Array.Empty<InvoiceEntity>();
return await GetInvoices(new InvoiceQuery()
List<InvoiceEntity> invoices = new List<InvoiceEntity>();
foreach (var row in rows)
{
InvoiceId = invoiceIds,
IncludeAddresses = true
});
}
public async Task<InvoiceEntity[]> GetPendingInvoices(CancellationToken cancellationToken = default)
{
using var ctx = _applicationDbContextFactory.CreateContext();
var rows = await ctx.Invoices.Where(i => InvoiceData.IsPending(i.Status))
.Include(i => i.Payments)
.Select(o => o).ToArrayAsync();
return rows.Select(ToEntity).ToArray();
}
public async Task<InvoiceEntity[]> GetPendingInvoices()
{
using var ctx = _applicationDbContextFactory.CreateContext();
return (await ctx.Invoices.Where(i => InvoiceData.IsPending(i.Status)).Select(i => i).ToArrayAsync())
.Select(i => ToEntity(i)).ToArray();
var jobj = JObject.Parse(row.invoice);
jobj.Remove("Blob");
jobj["Blob2"] = jobj["Blob2"].ToString(Formatting.None);
var invoiceData = jobj.ToObject<InvoiceData>();
invoiceData.XMin = row.xmin;
invoiceData.AddressInvoices = row.addresses.Select((a) => new AddressInvoiceData() { InvoiceDataId = invoiceData.Id, Address = a, PaymentMethodId = paymentMethodId.ToString() }).ToList();
invoiceData.Payments = new List<PaymentData>();
foreach (var payment in row.payments)
{
jobj = JObject.Parse(payment);
jobj.Remove("Blob");
jobj["Blob2"] = jobj["Blob2"].ToString(Formatting.None);
var paymentData = jobj.ToObject<PaymentData>();
invoiceData.Payments.Add(paymentData);
}
invoices.Add(ToEntity(invoiceData));
}
return invoices.ToArray();
}
public async Task<List<Data.WebhookDeliveryData>> GetWebhookDeliveries(string invoiceId)