From f5e5174045f06c81cfef7871404a5ea4896e7cda Mon Sep 17 00:00:00 2001 From: Nicolas Dorier Date: Fri, 20 Sep 2024 18:54:36 +0900 Subject: [PATCH] Refactor: Add GetMonitoredInvoices to fetch pending invoices or those with pending payments (#6235) --- BTCPayServer.Data/BTCPayServer.Data.csproj | 1 + .../003.RefactorPendingInvoicesPayments.sql | 2 +- .../DBScripts/004.MonitoredInvoices.sql | 22 ++++ .../20240919034505_monitoredinvoices.cs | 15 +++ BTCPayServer.Tests/UnitTest1.cs | 6 ++ BTCPayServer/HostedServices/InvoiceWatcher.cs | 12 ++- .../Payments/Bitcoin/NBXplorerListener.cs | 4 +- .../Payments/Lightning/LightningListener.cs | 102 +++++++++--------- .../Monero/Services/MoneroListener.cs | 2 +- .../Altcoins/Zcash/Services/ZcashListener.cs | 2 +- .../Services/Invoices/InvoiceRepository.cs | 71 ++++++++---- 11 files changed, 162 insertions(+), 77 deletions(-) create mode 100644 BTCPayServer.Data/DBScripts/004.MonitoredInvoices.sql create mode 100644 BTCPayServer.Data/Migrations/20240919034505_monitoredinvoices.cs diff --git a/BTCPayServer.Data/BTCPayServer.Data.csproj b/BTCPayServer.Data/BTCPayServer.Data.csproj index cda071757..0a3c73e52 100644 --- a/BTCPayServer.Data/BTCPayServer.Data.csproj +++ b/BTCPayServer.Data/BTCPayServer.Data.csproj @@ -21,5 +21,6 @@ + diff --git a/BTCPayServer.Data/DBScripts/003.RefactorPendingInvoicesPayments.sql b/BTCPayServer.Data/DBScripts/003.RefactorPendingInvoicesPayments.sql index 68ac2d52b..59d19c384 100644 --- a/BTCPayServer.Data/DBScripts/003.RefactorPendingInvoicesPayments.sql +++ b/BTCPayServer.Data/DBScripts/003.RefactorPendingInvoicesPayments.sql @@ -5,5 +5,5 @@ $$ LANGUAGE sql IMMUTABLE; CREATE INDEX "IX_Invoices_Pending" ON "Invoices"((1)) WHERE is_pending("Status"); CREATE INDEX "IX_Payments_Pending" ON "Payments"((1)) WHERE is_pending("Status"); - DROP TABLE "PendingInvoices"; +ANALYZE "Invoices"; diff --git a/BTCPayServer.Data/DBScripts/004.MonitoredInvoices.sql b/BTCPayServer.Data/DBScripts/004.MonitoredInvoices.sql new file mode 100644 index 000000000..b89f396ff --- /dev/null +++ b/BTCPayServer.Data/DBScripts/004.MonitoredInvoices.sql @@ -0,0 +1,22 @@ +CREATE OR REPLACE FUNCTION get_prompt(invoice_blob JSONB, payment_method_id TEXT) +RETURNS JSONB AS $$ + SELECT invoice_blob->'prompts'->payment_method_id +$$ LANGUAGE sql IMMUTABLE; + + +CREATE OR REPLACE FUNCTION get_monitored_invoices(payment_method_id TEXT) +RETURNS TABLE (invoice_id TEXT, payment_id TEXT) AS $$ +WITH cte AS ( +-- Get all the invoices which are pending. Even if no payments. +SELECT i."Id" invoice_id, p."Id" payment_id FROM "Invoices" i LEFT JOIN "Payments" p ON i."Id" = p."InvoiceDataId" + WHERE is_pending(i."Status") +UNION ALL +-- For invoices not pending, take all of those which have pending payments +SELECT i."Id", p."Id" FROM "Invoices" i INNER JOIN "Payments" p ON i."Id" = p."InvoiceDataId" + WHERE is_pending(p."Status") AND NOT is_pending(i."Status")) +SELECT cte.* FROM cte +LEFT JOIN "Payments" p ON cte.payment_id=p."Id" +LEFT JOIN "Invoices" i ON cte.invoice_id=i."Id" +WHERE (p."Type" IS NOT NULL AND p."Type" = payment_method_id) OR + (p."Type" IS NULL AND get_prompt(i."Blob2", payment_method_id) IS NOT NULL AND (get_prompt(i."Blob2", payment_method_id)->'activated')::BOOLEAN IS NOT FALSE); +$$ LANGUAGE SQL STABLE; diff --git a/BTCPayServer.Data/Migrations/20240919034505_monitoredinvoices.cs b/BTCPayServer.Data/Migrations/20240919034505_monitoredinvoices.cs new file mode 100644 index 000000000..199b9070c --- /dev/null +++ b/BTCPayServer.Data/Migrations/20240919034505_monitoredinvoices.cs @@ -0,0 +1,15 @@ +using BTCPayServer.Data; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace BTCPayServer.Migrations +{ + [DbContext(typeof(ApplicationDbContext))] + [Migration("20240919034505_monitoredinvoices")] + [DBScript("004.MonitoredInvoices.sql")] + public partial class monitoredinvoices : DBScriptsMigration + { + } +} diff --git a/BTCPayServer.Tests/UnitTest1.cs b/BTCPayServer.Tests/UnitTest1.cs index 947010f56..88271c53d 100644 --- a/BTCPayServer.Tests/UnitTest1.cs +++ b/BTCPayServer.Tests/UnitTest1.cs @@ -3157,6 +3157,12 @@ namespace BTCPayServer.Tests var invoiceId = GetInvoiceId(resp); await acc.PayOnChain(invoiceId); + // Quick unrelated test on GetMonitoredInvoices + var invoiceRepo = tester.PayTester.GetService(); + var monitored = Assert.Single(await invoiceRepo.GetMonitoredInvoices(PaymentMethodId.Parse("BTC-CHAIN")), i => i.Id == invoiceId); + Assert.Single(monitored.Payments); + // + app = await client.CreatePointOfSaleApp(acc.StoreId, new PointOfSaleAppRequest { AppName = "Cart", diff --git a/BTCPayServer/HostedServices/InvoiceWatcher.cs b/BTCPayServer/HostedServices/InvoiceWatcher.cs index d4f07aa9c..cc094a534 100644 --- a/BTCPayServer/HostedServices/InvoiceWatcher.cs +++ b/BTCPayServer/HostedServices/InvoiceWatcher.cs @@ -14,6 +14,7 @@ using BTCPayServer.Payments.Bitcoin; using BTCPayServer.Services.Invoices; using BTCPayServer.Services.Notifications; using BTCPayServer.Services.Notifications.Blobs; +using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using NBitcoin; @@ -255,10 +256,19 @@ namespace BTCPayServer.HostedServices private async Task WaitPendingInvoices() { - await Task.WhenAll((await _invoiceRepository.GetPendingInvoices()) + await Task.WhenAll((await GetPendingInvoices(_Cts.Token)) .Select(i => Wait(i)).ToArray()); } + private async Task GetPendingInvoices(CancellationToken cancellationToken) + { + using var ctx = _invoiceRepository.DbContextFactory.CreateContext(); + var rows = await ctx.Invoices.Where(i => Data.InvoiceData.IsPending(i.Status)) + .Select(o => o).ToArrayAsync(cancellationToken); + var invoices = rows.Select(_invoiceRepository.ToEntity).ToArray(); + return invoices; + } + async Task StartLoop(CancellationToken cancellation) { Logs.PayServer.LogInformation("Start watching invoices"); diff --git a/BTCPayServer/Payments/Bitcoin/NBXplorerListener.cs b/BTCPayServer/Payments/Bitcoin/NBXplorerListener.cs index 2c87cf48d..9a39b8518 100644 --- a/BTCPayServer/Payments/Bitcoin/NBXplorerListener.cs +++ b/BTCPayServer/Payments/Bitcoin/NBXplorerListener.cs @@ -235,7 +235,7 @@ namespace BTCPayServer.Payments.Bitcoin async Task UpdatePaymentStates(BTCPayWallet wallet) { - var invoices = await _InvoiceRepository.GetInvoicesWithPendingPayments(PaymentTypes.CHAIN.GetPaymentMethodId(wallet.Network.CryptoCode)); + var invoices = await _InvoiceRepository.GetMonitoredInvoices(PaymentTypes.CHAIN.GetPaymentMethodId(wallet.Network.CryptoCode)); await Task.WhenAll(invoices.Select(i => UpdatePaymentStates(wallet, i)).ToArray()); } async Task UpdatePaymentStates(BTCPayWallet wallet, string invoiceId, bool fireEvents = true) @@ -384,7 +384,7 @@ namespace BTCPayServer.Payments.Bitcoin { var handler = _handlers.GetBitcoinHandler(wallet.Network); int totalPayment = 0; - var invoices = await _InvoiceRepository.GetInvoicesWithPendingPayments(PaymentTypes.CHAIN.GetPaymentMethodId(network.CryptoCode), true); + var invoices = await _InvoiceRepository.GetMonitoredInvoices(PaymentTypes.CHAIN.GetPaymentMethodId(network.CryptoCode)); var coinsPerDerivationStrategy = new Dictionary(); foreach (var i in invoices) diff --git a/BTCPayServer/Payments/Lightning/LightningListener.cs b/BTCPayServer/Payments/Lightning/LightningListener.cs index 9cdeed766..15efa0fc7 100644 --- a/BTCPayServer/Payments/Lightning/LightningListener.cs +++ b/BTCPayServer/Payments/Lightning/LightningListener.cs @@ -18,6 +18,7 @@ using BTCPayServer.Payments.Bitcoin; using BTCPayServer.Services; using BTCPayServer.Services.Invoices; using BTCPayServer.Services.Stores; +using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -71,66 +72,69 @@ namespace BTCPayServer.Payments.Lightning bool needCheckOfflinePayments = true; async Task CheckingInvoice(CancellationToken cancellation) { -retry: - try + var pmis = _handlers.Where(h => h is LightningLikePaymentHandler).Select(handler => handler.PaymentMethodId).ToArray(); + foreach (var pmi in pmis) { - Logs.PayServer.LogInformation("Checking if any payment arrived on lightning while the server was offline..."); - foreach (var invoice in await _InvoiceRepository.GetPendingInvoices(cancellationToken: cancellation)) +retry: + try { - if (GetListenedInvoices(invoice).Count > 0) + Logs.PayServer.LogInformation("Checking if any payment arrived on lightning while the server was offline..."); + foreach (var invoice in await _InvoiceRepository.GetMonitoredInvoices(pmi, cancellation)) { - _CheckInvoices.Writer.TryWrite(invoice.Id); - _memoryCache.Set(GetCacheKey(invoice.Id), invoice, GetExpiration(invoice)); - } - } - needCheckOfflinePayments = false; - Logs.PayServer.LogInformation("Processing lightning payments..."); - - - - while (await _CheckInvoices.Reader.WaitToReadAsync(cancellation) && - _CheckInvoices.Reader.TryRead(out var invoiceId)) - { - var invoice = await GetInvoice(invoiceId); - - foreach (var listenedInvoice in GetListenedInvoices(invoice)) - { - var store = await GetStore(invoice.StoreId); - var lnConfig = _handlers.GetLightningConfig(store, listenedInvoice.Network); - if (lnConfig is null) - continue; - var connStr = GetLightningUrl(listenedInvoice.Network.CryptoCode, lnConfig); - if (connStr is null) - continue; - var instanceListenerKey = (listenedInvoice.Network.CryptoCode, connStr.ToString()); - lock (_InstanceListeners) + if (GetListenedInvoices(invoice).Count > 0) { - if (!_InstanceListeners.TryGetValue(instanceListenerKey, out var instanceListener)) - { - instanceListener ??= new LightningInstanceListener(_InvoiceRepository, _Aggregator, lightningClientFactory, listenedInvoice.Network, _handlers, connStr, _paymentService, Logs); - _InstanceListeners.TryAdd(instanceListenerKey, instanceListener); - } - instanceListener.AddListenedInvoice(listenedInvoice); - _ = instanceListener.PollPayment(listenedInvoice, cancellation); + _CheckInvoices.Writer.TryWrite(invoice.Id); + _memoryCache.Set(GetCacheKey(invoice.Id), invoice, GetExpiration(invoice)); } } + needCheckOfflinePayments = false; + Logs.PayServer.LogInformation("Processing lightning payments..."); - if (_CheckInvoices.Reader.Count is 0) - this.CheckConnections(); + + + while (await _CheckInvoices.Reader.WaitToReadAsync(cancellation) && + _CheckInvoices.Reader.TryRead(out var invoiceId)) + { + var invoice = await GetInvoice(invoiceId); + + foreach (var listenedInvoice in GetListenedInvoices(invoice)) + { + var store = await GetStore(invoice.StoreId); + var lnConfig = _handlers.GetLightningConfig(store, listenedInvoice.Network); + if (lnConfig is null) + continue; + var connStr = GetLightningUrl(listenedInvoice.Network.CryptoCode, lnConfig); + if (connStr is null) + continue; + var instanceListenerKey = (listenedInvoice.Network.CryptoCode, connStr.ToString()); + lock (_InstanceListeners) + { + if (!_InstanceListeners.TryGetValue(instanceListenerKey, out var instanceListener)) + { + instanceListener ??= new LightningInstanceListener(_InvoiceRepository, _Aggregator, lightningClientFactory, listenedInvoice.Network, _handlers, connStr, _paymentService, Logs); + _InstanceListeners.TryAdd(instanceListenerKey, instanceListener); + } + instanceListener.AddListenedInvoice(listenedInvoice); + _ = instanceListener.PollPayment(listenedInvoice, cancellation); + } + } + + if (_CheckInvoices.Reader.Count is 0) + this.CheckConnections(); + } + } + catch when (cancellation.IsCancellationRequested) + { + } + catch (Exception ex) + { + await Task.Delay(1000, cancellation); + Logs.PayServer.LogWarning(ex, "Unhandled error in the LightningListener"); + goto retry; } } - catch when (cancellation.IsCancellationRequested) - { - } - catch (Exception ex) - { - await Task.Delay(1000, cancellation); - Logs.PayServer.LogWarning(ex, "Unhandled error in the LightningListener"); - goto retry; - } } - private string GetCacheKey(string invoiceId) { return $"{nameof(GetListenedInvoices)}-{invoiceId}"; diff --git a/BTCPayServer/Services/Altcoins/Monero/Services/MoneroListener.cs b/BTCPayServer/Services/Altcoins/Monero/Services/MoneroListener.cs index 9c1e14a25..bc4342d05 100644 --- a/BTCPayServer/Services/Altcoins/Monero/Services/MoneroListener.cs +++ b/BTCPayServer/Services/Altcoins/Monero/Services/MoneroListener.cs @@ -384,7 +384,7 @@ namespace BTCPayServer.Services.Altcoins.Monero.Services private async Task UpdateAnyPendingMoneroLikePayment(string cryptoCode) { var paymentMethodId = PaymentTypes.CHAIN.GetPaymentMethodId(cryptoCode); - var invoices = await _invoiceRepository.GetInvoicesWithPendingPayments(paymentMethodId); + var invoices = await _invoiceRepository.GetMonitoredInvoices(paymentMethodId); if (!invoices.Any()) return; invoices = invoices.Where(entity => entity.GetPaymentPrompt(paymentMethodId)?.Activated is true).ToArray(); diff --git a/BTCPayServer/Services/Altcoins/Zcash/Services/ZcashListener.cs b/BTCPayServer/Services/Altcoins/Zcash/Services/ZcashListener.cs index a9a02a9e4..7ade159c4 100644 --- a/BTCPayServer/Services/Altcoins/Zcash/Services/ZcashListener.cs +++ b/BTCPayServer/Services/Altcoins/Zcash/Services/ZcashListener.cs @@ -374,7 +374,7 @@ namespace BTCPayServer.Services.Altcoins.Zcash.Services private async Task UpdateAnyPendingZcashLikePayment(string cryptoCode) { var paymentMethodId = PaymentTypes.CHAIN.GetPaymentMethodId(cryptoCode); - var invoices = await _invoiceRepository.GetInvoicesWithPendingPayments(paymentMethodId); + var invoices = await _invoiceRepository.GetMonitoredInvoices(paymentMethodId); if (!invoices.Any()) return; invoices = invoices.Where(entity => entity.GetPaymentPrompt(paymentMethodId).Activated).ToArray(); diff --git a/BTCPayServer/Services/Invoices/InvoiceRepository.cs b/BTCPayServer/Services/Invoices/InvoiceRepository.cs index ce3c17c31..b74a8ffcd 100644 --- a/BTCPayServer/Services/Invoices/InvoiceRepository.cs +++ b/BTCPayServer/Services/Invoices/InvoiceRepository.cs @@ -34,7 +34,7 @@ namespace BTCPayServer.Services.Invoices private readonly ApplicationDbContextFactory _applicationDbContextFactory; private readonly EventAggregator _eventAggregator; - + public ApplicationDbContextFactory DbContextFactory => _applicationDbContextFactory; public InvoiceRepository(ApplicationDbContextFactory contextFactory, EventAggregator eventAggregator) { @@ -78,32 +78,59 @@ namespace BTCPayServer.Services.Invoices return row is null ? null : ToEntity(row); } - public async Task GetInvoicesWithPendingPayments(PaymentMethodId paymentMethodId, bool includeAddresses = false) + /// + /// Returns all invoices which either: + /// * Have the activated and are pending + /// * Aren't pending but have a payment from the that is pending + /// is filled with the monitored addresses of the for this invoice. + /// include the payments for this invoice. + /// + /// The payment method id + /// Cancellation token + /// + public async Task GetMonitoredInvoices(PaymentMethodId paymentMethodId, CancellationToken cancellationToken = default) { var pmi = paymentMethodId.ToString(); using var ctx = _applicationDbContextFactory.CreateContext(); - var invoiceIds = (await ctx.Payments.Where(p => PaymentData.IsPending(p.Status) && p.Type == pmi).Select(p => p.InvoiceDataId).ToArrayAsync()).Distinct().ToArray(); - if (invoiceIds.Length is 0) + var conn = ctx.Database.GetDbConnection(); + var rows = await conn.QueryAsync<(string Id, uint xmin, string[] addresses, string[] payments, string invoice)>(new(""" + SELECT + i."Id", + i.xmin, + array_agg(ai."Address") addresses, + COALESCE(array_agg(to_jsonb(p)) FILTER (WHERE p."Id" IS NOT NULL), '{}') as payments, + (array_agg(to_jsonb(i)))[1] as invoice + FROM get_monitored_invoices(@pmi) m + LEFT JOIN "Payments" p ON p."Id" = m.payment_id + LEFT JOIN "Invoices" i ON i."Id" = m.invoice_id + LEFT JOIN "AddressInvoices" ai ON i."Id" = ai."InvoiceDataId" + WHERE ai."PaymentMethodId" = @pmi + GROUP BY i."Id"; + """ + , new { pmi = paymentMethodId.ToString() })); + if (Enumerable.TryGetNonEnumeratedCount(rows, out var c) && c == 0) return Array.Empty(); - return await GetInvoices(new InvoiceQuery() + List invoices = new List(); + foreach (var row in rows) { - InvoiceId = invoiceIds, - IncludeAddresses = true - }); - } - public async Task GetPendingInvoices(CancellationToken cancellationToken = default) - { - using var ctx = _applicationDbContextFactory.CreateContext(); - var rows = await ctx.Invoices.Where(i => InvoiceData.IsPending(i.Status)) - .Include(i => i.Payments) - .Select(o => o).ToArrayAsync(); - return rows.Select(ToEntity).ToArray(); - } - public async Task GetPendingInvoices() - { - using var ctx = _applicationDbContextFactory.CreateContext(); - return (await ctx.Invoices.Where(i => InvoiceData.IsPending(i.Status)).Select(i => i).ToArrayAsync()) - .Select(i => ToEntity(i)).ToArray(); + var jobj = JObject.Parse(row.invoice); + jobj.Remove("Blob"); + jobj["Blob2"] = jobj["Blob2"].ToString(Formatting.None); + var invoiceData = jobj.ToObject(); + invoiceData.XMin = row.xmin; + invoiceData.AddressInvoices = row.addresses.Select((a) => new AddressInvoiceData() { InvoiceDataId = invoiceData.Id, Address = a, PaymentMethodId = paymentMethodId.ToString() }).ToList(); + invoiceData.Payments = new List(); + foreach (var payment in row.payments) + { + jobj = JObject.Parse(payment); + jobj.Remove("Blob"); + jobj["Blob2"] = jobj["Blob2"].ToString(Formatting.None); + var paymentData = jobj.ToObject(); + invoiceData.Payments.Add(paymentData); + } + invoices.Add(ToEntity(invoiceData)); + } + return invoices.ToArray(); } public async Task> GetWebhookDeliveries(string invoiceId)