From 71eabae4432b4669f4b3be103ddc13a71ccd76ba Mon Sep 17 00:00:00 2001 From: Michael Bumann Date: Fri, 21 Jan 2022 21:29:16 +0100 Subject: [PATCH] Split up process invoice updates --- lib/service/invoicesubscription.go | 174 +++++++++++++++-------------- 1 file changed, 89 insertions(+), 85 deletions(-) diff --git a/lib/service/invoicesubscription.go b/lib/service/invoicesubscription.go index 9757d8d..5e65a42 100644 --- a/lib/service/invoicesubscription.go +++ b/lib/service/invoicesubscription.go @@ -12,6 +12,87 @@ import ( "github.com/uptrace/bun" ) +func (svc *LndhubService) ProcessInvoiceUpdate(ctx context.Context, rawInvoice *lnrpc.Invoice) error { + var invoice models.Invoice + rHashStr := hex.EncodeToString(rawInvoice.RHash) + + svc.Logger.Infof("Invoice update: r_hash:%s state:%v", rHashStr, rawInvoice.State.String()) + + // Search for an incoming invoice with the r_hash that is NOT settled in our DB + err := svc.DB.NewSelect().Model(&invoice).Where("type = ? AND r_hash = ? AND state <> ? ", "incoming", rHashStr, "settled").Limit(1).Scan(context.TODO()) + if err != nil { + return err + } + + // Update the DB entry of the invoice + // If the invoice is settled we save the settle date and the status otherwise we just store the lnd status + // Additionally to the invoice update we create a transaction entry from the user's incoming account to the user's current account + // This transaction entry makes the balance available for the user + svc.Logger.Infof("Invoice update: invoice_id:%v settled:%v value:%v state:%v", invoice.ID, rawInvoice.Settled, rawInvoice.AmtPaidSat, rawInvoice.State) + + // Get the user's current account for the transaction entry + creditAccount, err := svc.AccountFor(ctx, "current", invoice.UserID) + if err != nil { + svc.Logger.Errorf("Could not find current account user_id:%v invoice_id:%v", invoice.UserID, invoice.ID) + return err + } + // Get the user's incoming account for the transaction entry + debitAccount, err := svc.AccountFor(ctx, "incoming", invoice.UserID) + if err != nil { + svc.Logger.Errorf("Could not find incoming account user_id:%v invoice_id:%v", invoice.UserID, invoice.ID) + return err + } + + // Process any update in a DB transaction + tx, err := svc.DB.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + svc.Logger.Errorf("Failed to update the invoice invoice_id:%v r_hash:%s %v", invoice.ID, rHashStr, err) + return err + } + + // if the invoice is NOT settled we just update the invoice state + if !rawInvoice.Settled { + svc.Logger.Infof("Invoice not settled invoice_id:%v state:", invoice.ID, rawInvoice.State.String()) + invoice.State = strings.ToLower(rawInvoice.State.String()) + + } else { + // if the invoice is settled we update the state and create an transaction entry to the current account + invoice.SettledAt = bun.NullTime{Time: time.Unix(rawInvoice.SettleDate, 0)} + invoice.State = "settled" + _, err = tx.NewUpdate().Model(&invoice).WherePK().Exec(context.TODO()) + if err != nil { + tx.Rollback() + svc.Logger.Errorf("Could not update invoice invoice_id:%v", invoice.ID) + return err + } + + // Transfer the amount from the user's incoming account to the user's current account + entry := models.TransactionEntry{ + UserID: invoice.UserID, + InvoiceID: invoice.ID, + CreditAccountID: creditAccount.ID, + DebitAccountID: debitAccount.ID, + Amount: invoice.Amount, + } + + // Save the transaction entry + _, err = tx.NewInsert().Model(&entry).Exec(context.TODO()) + if err != nil { + tx.Rollback() + svc.Logger.Errorf("Could not create incoming->current transaction user_id:%v invoice_id:%v %v", invoice.UserID, invoice.ID, err) + return err + } + } + // Commit the DB transaction. Done, everything worked + err = tx.Commit() + if err != nil { + svc.Logger.Errorf("Failed to commit DB transaction user_id:%v invoice_id:%v %v", invoice.UserID, invoice.ID, err) + return err + } + + return nil +} + func (svc *LndhubService) InvoiceUpdateSubscription(ctx context.Context) error { invoiceSubscriptionOptions := lnrpc.InvoiceSubscription{} invoiceSubscriptionStream, err := svc.LndClient.SubscribeInvoices(context.Background(), &invoiceSubscriptionOptions) @@ -19,6 +100,7 @@ func (svc *LndhubService) InvoiceUpdateSubscription(ctx context.Context) error { return err } svc.Logger.Info("Subscribed to invoice updates starting from index: ") + for { // receive the next invoice update rawInvoice, err := invoiceSubscriptionStream.Recv() @@ -27,98 +109,20 @@ func (svc *LndhubService) InvoiceUpdateSubscription(ctx context.Context) error { svc.Logger.Errorf("Error processing invoice update subscription: %v", err) continue } - var invoice models.Invoice - rHashStr := hex.EncodeToString(rawInvoice.RHash) - - svc.Logger.Infof("Invoice update: r_hash:%s state:%v", rHashStr, rawInvoice.State) // Ignore updates for open invoices // We store the invoice details in the AddInvoice call - // This could cause a race condition here where we get this notification faster than we finish the AddInvoice call + // Processing open invoices here could cause a race condition: + // We could get this notification faster than we finish the AddInvoice call if rawInvoice.State == lnrpc.Invoice_OPEN { - svc.Logger.Infof("Invoice state is open. Ignoring update. r_hash:%v", rHashStr) + svc.Logger.Infof("Invoice state is open. Ignoring update. r_hash:%v", hex.EncodeToString(rawInvoice.RHash)) continue } - // Search for the invoice in our DB - err = svc.DB.NewSelect().Model(&invoice).Where("type = ? AND r_hash = ? AND state <> ? ", "incoming", rHashStr, "settled").Limit(1).Scan(context.TODO()) - if err != nil { - // TODO: sentry notification - svc.Logger.Errorf("Could not find invoice: r_hash:%s payment_request:%s", rHashStr, rawInvoice.PaymentRequest) - continue + processingError := svc.ProcessInvoiceUpdate(ctx, rawInvoice) + if processingError != nil { + svc.Logger.Error(err) + // TODO sentry notification } - - // Update the DB entry of the invoice - // If the invoice is settled we save the settle date and the status otherwise we just store the lnd status - // - // Additionally to the invoice update we create a transaction entry from the incoming account to the user's current account - svc.Logger.Infof("Invoice update: invoice_id:%v settled:%v value:%v state:%v", invoice.ID, rawInvoice.Settled, rawInvoice.AmtPaidSat, rawInvoice.State) - - // Get the user's current and incoming account for the transaction entry - creditAccount, err := svc.AccountFor(ctx, "current", invoice.UserID) - if err != nil { - svc.Logger.Errorf("Could not find current account user_id:%v invoice_id:%v", invoice.UserID, invoice.ID) - // TODO: sentry notification - continue - } - debitAccount, err := svc.AccountFor(ctx, "incoming", invoice.UserID) - if err != nil { - svc.Logger.Errorf("Could not find incoming account user_id:%v invoice_id:%v", invoice.UserID, invoice.ID) - // TODO: sentry notification - continue - } - - tx, err := svc.DB.BeginTx(ctx, &sql.TxOptions{}) - if err != nil { - svc.Logger.Errorf("Failed to update the invoice invoice_id:%v r_hash:%s %v", invoice.ID, rHashStr, err) - // TODO: notify sentry - continue - } - - // if the invoice is NOT settled we just update the invoice state - if !rawInvoice.Settled { - svc.Logger.Infof("Invoice not settled invoice_id:%v", invoice.ID) - invoice.State = strings.ToLower(rawInvoice.State.String()) - - // if the invoice is settled we update the state and create an transaction entry to the current account - } else { - invoice.SettledAt = bun.NullTime{Time: time.Unix(rawInvoice.SettleDate, 0)} - invoice.State = "settled" - _, err = tx.NewUpdate().Model(&invoice).WherePK().Exec(context.TODO()) - if err != nil { - tx.Rollback() - svc.Logger.Errorf("Could not update invoice invoice_id:%v", invoice.ID) - // TODO: sentry notification - continue - } - - // Transfer the amount from the incoming account to the current account - entry := models.TransactionEntry{ - UserID: invoice.UserID, - InvoiceID: invoice.ID, - CreditAccountID: creditAccount.ID, - DebitAccountID: debitAccount.ID, - Amount: invoice.Amount, - } - // The DB constraints make sure the user actually has enough balance for the transaction - // If the user does not have enough balance this call fails - _, err = tx.NewInsert().Model(&entry).Exec(context.TODO()) - if err != nil { - tx.Rollback() - svc.Logger.Errorf("Could not create incoming->current transaction user_id:%v invoice_id:%v %v", invoice.UserID, invoice.ID, err) - // TODO: sentry notification - tx.Rollback() - continue - } - } - // Commit the DB transaction. Done, everything worked - err = tx.Commit() - if err != nil { - svc.Logger.Errorf("Failed to commit DB transaction user_id:%v invoice_id:%v %v", invoice.UserID, invoice.ID, err) - // TODO: sentry notification - continue - } - } - return nil }