mirror of
https://github.com/getAlby/lndhub.go.git
synced 2026-02-23 05:44:23 +01:00
Split up process invoice updates
This commit is contained in:
@@ -12,6 +12,87 @@ import (
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
func (svc *LndhubService) ProcessInvoiceUpdate(ctx context.Context, rawInvoice *lnrpc.Invoice) error {
|
||||
var invoice models.Invoice
|
||||
rHashStr := hex.EncodeToString(rawInvoice.RHash)
|
||||
|
||||
svc.Logger.Infof("Invoice update: r_hash:%s state:%v", rHashStr, rawInvoice.State.String())
|
||||
|
||||
// Search for an incoming invoice with the r_hash that is NOT settled in our DB
|
||||
err := svc.DB.NewSelect().Model(&invoice).Where("type = ? AND r_hash = ? AND state <> ? ", "incoming", rHashStr, "settled").Limit(1).Scan(context.TODO())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update the DB entry of the invoice
|
||||
// If the invoice is settled we save the settle date and the status otherwise we just store the lnd status
|
||||
// Additionally to the invoice update we create a transaction entry from the user's incoming account to the user's current account
|
||||
// This transaction entry makes the balance available for the user
|
||||
svc.Logger.Infof("Invoice update: invoice_id:%v settled:%v value:%v state:%v", invoice.ID, rawInvoice.Settled, rawInvoice.AmtPaidSat, rawInvoice.State)
|
||||
|
||||
// Get the user's current account for the transaction entry
|
||||
creditAccount, err := svc.AccountFor(ctx, "current", invoice.UserID)
|
||||
if err != nil {
|
||||
svc.Logger.Errorf("Could not find current account user_id:%v invoice_id:%v", invoice.UserID, invoice.ID)
|
||||
return err
|
||||
}
|
||||
// Get the user's incoming account for the transaction entry
|
||||
debitAccount, err := svc.AccountFor(ctx, "incoming", invoice.UserID)
|
||||
if err != nil {
|
||||
svc.Logger.Errorf("Could not find incoming account user_id:%v invoice_id:%v", invoice.UserID, invoice.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
// Process any update in a DB transaction
|
||||
tx, err := svc.DB.BeginTx(ctx, &sql.TxOptions{})
|
||||
if err != nil {
|
||||
svc.Logger.Errorf("Failed to update the invoice invoice_id:%v r_hash:%s %v", invoice.ID, rHashStr, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// if the invoice is NOT settled we just update the invoice state
|
||||
if !rawInvoice.Settled {
|
||||
svc.Logger.Infof("Invoice not settled invoice_id:%v state:", invoice.ID, rawInvoice.State.String())
|
||||
invoice.State = strings.ToLower(rawInvoice.State.String())
|
||||
|
||||
} else {
|
||||
// if the invoice is settled we update the state and create an transaction entry to the current account
|
||||
invoice.SettledAt = bun.NullTime{Time: time.Unix(rawInvoice.SettleDate, 0)}
|
||||
invoice.State = "settled"
|
||||
_, err = tx.NewUpdate().Model(&invoice).WherePK().Exec(context.TODO())
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
svc.Logger.Errorf("Could not update invoice invoice_id:%v", invoice.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
// Transfer the amount from the user's incoming account to the user's current account
|
||||
entry := models.TransactionEntry{
|
||||
UserID: invoice.UserID,
|
||||
InvoiceID: invoice.ID,
|
||||
CreditAccountID: creditAccount.ID,
|
||||
DebitAccountID: debitAccount.ID,
|
||||
Amount: invoice.Amount,
|
||||
}
|
||||
|
||||
// Save the transaction entry
|
||||
_, err = tx.NewInsert().Model(&entry).Exec(context.TODO())
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
svc.Logger.Errorf("Could not create incoming->current transaction user_id:%v invoice_id:%v %v", invoice.UserID, invoice.ID, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Commit the DB transaction. Done, everything worked
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
svc.Logger.Errorf("Failed to commit DB transaction user_id:%v invoice_id:%v %v", invoice.UserID, invoice.ID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (svc *LndhubService) InvoiceUpdateSubscription(ctx context.Context) error {
|
||||
invoiceSubscriptionOptions := lnrpc.InvoiceSubscription{}
|
||||
invoiceSubscriptionStream, err := svc.LndClient.SubscribeInvoices(context.Background(), &invoiceSubscriptionOptions)
|
||||
@@ -19,6 +100,7 @@ func (svc *LndhubService) InvoiceUpdateSubscription(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
svc.Logger.Info("Subscribed to invoice updates starting from index: ")
|
||||
|
||||
for {
|
||||
// receive the next invoice update
|
||||
rawInvoice, err := invoiceSubscriptionStream.Recv()
|
||||
@@ -27,98 +109,20 @@ func (svc *LndhubService) InvoiceUpdateSubscription(ctx context.Context) error {
|
||||
svc.Logger.Errorf("Error processing invoice update subscription: %v", err)
|
||||
continue
|
||||
}
|
||||
var invoice models.Invoice
|
||||
rHashStr := hex.EncodeToString(rawInvoice.RHash)
|
||||
|
||||
svc.Logger.Infof("Invoice update: r_hash:%s state:%v", rHashStr, rawInvoice.State)
|
||||
|
||||
// Ignore updates for open invoices
|
||||
// We store the invoice details in the AddInvoice call
|
||||
// This could cause a race condition here where we get this notification faster than we finish the AddInvoice call
|
||||
// Processing open invoices here could cause a race condition:
|
||||
// We could get this notification faster than we finish the AddInvoice call
|
||||
if rawInvoice.State == lnrpc.Invoice_OPEN {
|
||||
svc.Logger.Infof("Invoice state is open. Ignoring update. r_hash:%v", rHashStr)
|
||||
svc.Logger.Infof("Invoice state is open. Ignoring update. r_hash:%v", hex.EncodeToString(rawInvoice.RHash))
|
||||
continue
|
||||
}
|
||||
|
||||
// Search for the invoice in our DB
|
||||
err = svc.DB.NewSelect().Model(&invoice).Where("type = ? AND r_hash = ? AND state <> ? ", "incoming", rHashStr, "settled").Limit(1).Scan(context.TODO())
|
||||
if err != nil {
|
||||
// TODO: sentry notification
|
||||
svc.Logger.Errorf("Could not find invoice: r_hash:%s payment_request:%s", rHashStr, rawInvoice.PaymentRequest)
|
||||
continue
|
||||
processingError := svc.ProcessInvoiceUpdate(ctx, rawInvoice)
|
||||
if processingError != nil {
|
||||
svc.Logger.Error(err)
|
||||
// TODO sentry notification
|
||||
}
|
||||
|
||||
// Update the DB entry of the invoice
|
||||
// If the invoice is settled we save the settle date and the status otherwise we just store the lnd status
|
||||
//
|
||||
// Additionally to the invoice update we create a transaction entry from the incoming account to the user's current account
|
||||
svc.Logger.Infof("Invoice update: invoice_id:%v settled:%v value:%v state:%v", invoice.ID, rawInvoice.Settled, rawInvoice.AmtPaidSat, rawInvoice.State)
|
||||
|
||||
// Get the user's current and incoming account for the transaction entry
|
||||
creditAccount, err := svc.AccountFor(ctx, "current", invoice.UserID)
|
||||
if err != nil {
|
||||
svc.Logger.Errorf("Could not find current account user_id:%v invoice_id:%v", invoice.UserID, invoice.ID)
|
||||
// TODO: sentry notification
|
||||
continue
|
||||
}
|
||||
debitAccount, err := svc.AccountFor(ctx, "incoming", invoice.UserID)
|
||||
if err != nil {
|
||||
svc.Logger.Errorf("Could not find incoming account user_id:%v invoice_id:%v", invoice.UserID, invoice.ID)
|
||||
// TODO: sentry notification
|
||||
continue
|
||||
}
|
||||
|
||||
tx, err := svc.DB.BeginTx(ctx, &sql.TxOptions{})
|
||||
if err != nil {
|
||||
svc.Logger.Errorf("Failed to update the invoice invoice_id:%v r_hash:%s %v", invoice.ID, rHashStr, err)
|
||||
// TODO: notify sentry
|
||||
continue
|
||||
}
|
||||
|
||||
// if the invoice is NOT settled we just update the invoice state
|
||||
if !rawInvoice.Settled {
|
||||
svc.Logger.Infof("Invoice not settled invoice_id:%v", invoice.ID)
|
||||
invoice.State = strings.ToLower(rawInvoice.State.String())
|
||||
|
||||
// if the invoice is settled we update the state and create an transaction entry to the current account
|
||||
} else {
|
||||
invoice.SettledAt = bun.NullTime{Time: time.Unix(rawInvoice.SettleDate, 0)}
|
||||
invoice.State = "settled"
|
||||
_, err = tx.NewUpdate().Model(&invoice).WherePK().Exec(context.TODO())
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
svc.Logger.Errorf("Could not update invoice invoice_id:%v", invoice.ID)
|
||||
// TODO: sentry notification
|
||||
continue
|
||||
}
|
||||
|
||||
// Transfer the amount from the incoming account to the current account
|
||||
entry := models.TransactionEntry{
|
||||
UserID: invoice.UserID,
|
||||
InvoiceID: invoice.ID,
|
||||
CreditAccountID: creditAccount.ID,
|
||||
DebitAccountID: debitAccount.ID,
|
||||
Amount: invoice.Amount,
|
||||
}
|
||||
// The DB constraints make sure the user actually has enough balance for the transaction
|
||||
// If the user does not have enough balance this call fails
|
||||
_, err = tx.NewInsert().Model(&entry).Exec(context.TODO())
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
svc.Logger.Errorf("Could not create incoming->current transaction user_id:%v invoice_id:%v %v", invoice.UserID, invoice.ID, err)
|
||||
// TODO: sentry notification
|
||||
tx.Rollback()
|
||||
continue
|
||||
}
|
||||
}
|
||||
// Commit the DB transaction. Done, everything worked
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
svc.Logger.Errorf("Failed to commit DB transaction user_id:%v invoice_id:%v %v", invoice.UserID, invoice.ID, err)
|
||||
// TODO: sentry notification
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user