diff --git a/db/models/invoice.go b/db/models/invoice.go index 17b705c..0456d5a 100644 --- a/db/models/invoice.go +++ b/db/models/invoice.go @@ -26,6 +26,7 @@ type Invoice struct { State string `json:"state" bun:",default:'initialized'"` AddIndex uint64 `json:"add_index" bun:",nullzero"` CreatedAt time.Time `bun:",nullzero,notnull,default:current_timestamp"` + ExpiresAt bun.NullTime `bun:",nullzero"` UpdatedAt bun.NullTime `json:"updated_at"` SettledAt bun.NullTime `json:"settled_at"` } diff --git a/lib/service/invoices.go b/lib/service/invoices.go index 33ddf23..6a5ab24 100644 --- a/lib/service/invoices.go +++ b/lib/service/invoices.go @@ -37,7 +37,7 @@ func (svc *LndhubService) SendInternalPayment(tx *bun.Tx, invoice *models.Invoic //SendInternalPayment() // find invoice var incomingInvoice models.Invoice - err := svc.DB.NewSelect().Model(&incomingInvoice).Where("type = ? AND payment_request = ? AND state = ? ", "incoming", invoice.PaymentRequest, "created").Limit(1).Scan(context.TODO()) + err := svc.DB.NewSelect().Model(&incomingInvoice).Where("type = ? AND payment_request = ? AND state = ? ", "incoming", invoice.PaymentRequest, "open").Limit(1).Scan(context.TODO()) if err != nil { // invoice not found or already settled // TODO: logging @@ -197,6 +197,7 @@ func (svc *LndhubService) PayInvoice(invoice *models.Invoice) (*models.Transacti func (svc *LndhubService) AddOutgoingInvoice(userID int64, paymentRequest string, decodedInvoice zpay32.Invoice) (*models.Invoice, error) { // Initialize new DB invoice destinationPubkeyHex := hex.EncodeToString(decodedInvoice.Destination.SerializeCompressed()) + expiresAt := decodedInvoice.Timestamp.Add(decodedInvoice.Expiry()) invoice := models.Invoice{ Type: "outgoing", UserID: userID, @@ -204,6 +205,7 @@ func (svc *LndhubService) AddOutgoingInvoice(userID int64, paymentRequest string PaymentRequest: paymentRequest, State: "initialized", DestinationPubkeyHex: destinationPubkeyHex, + ExpiresAt: bun.NullTime{Time: expiresAt}, } if decodedInvoice.DescriptionHash != nil { dh := *decodedInvoice.DescriptionHash @@ -228,6 +230,7 @@ func (svc *LndhubService) AddOutgoingInvoice(userID int64, paymentRequest string func (svc *LndhubService) AddIncomingInvoice(userID int64, amount int64, memo, descriptionHashStr string) (*models.Invoice, error) { preimage := makePreimageHex() + expiry := time.Hour * 24 // invoice expires in 24h // Initialize new DB invoice invoice := models.Invoice{ Type: "incoming", @@ -236,6 +239,7 @@ func (svc *LndhubService) AddIncomingInvoice(userID int64, amount int64, memo, d Memo: memo, DescriptionHash: descriptionHashStr, State: "initialized", + ExpiresAt: bun.NullTime{Time: time.Now().Add(expiry)}, } // Save invoice - we save the invoice early to have a record in case the LN call fails @@ -254,7 +258,7 @@ func (svc *LndhubService) AddIncomingInvoice(userID int64, amount int64, memo, d DescriptionHash: descriptionHash, Value: amount, RPreimage: preimage, - Expiry: 3600 * 24, // 24h // TODO: move to config + Expiry: int64(expiry.Seconds()), } // Call LND lnInvoiceResult, err := svc.LndClient.AddInvoice(context.TODO(), &lnInvoice) @@ -268,7 +272,7 @@ func (svc *LndhubService) AddIncomingInvoice(userID int64, amount int64, memo, d invoice.Preimage = hex.EncodeToString(preimage) invoice.AddIndex = lnInvoiceResult.AddIndex invoice.DestinationPubkeyHex = svc.GetIdentPubKeyHex() // Our node pubkey for incoming invoices - invoice.State = "created" + invoice.State = "open" _, err = svc.DB.NewUpdate().Model(&invoice).WherePK().Exec(context.TODO()) if err != nil { diff --git a/lib/service/invoicesubscription.go b/lib/service/invoicesubscription.go new file mode 100644 index 0000000..20be2cc --- /dev/null +++ b/lib/service/invoicesubscription.go @@ -0,0 +1,143 @@ +package service + +import ( + "context" + "database/sql" + "encoding/hex" + "strings" + "time" + + "github.com/getAlby/lndhub.go/db/models" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/uptrace/bun" +) + +func (svc *LndhubService) ProcessInvoiceUpdate(ctx context.Context, rawInvoice *lnrpc.Invoice) error { + var invoice models.Invoice + rHashStr := hex.EncodeToString(rawInvoice.RHash) + + svc.Logger.Infof("Invoice update: r_hash:%s state:%v", rHashStr, rawInvoice.State.String()) + + // Search for an incoming invoice with the r_hash that is NOT settled in our DB + err := svc.DB.NewSelect().Model(&invoice).Where("type = ? AND r_hash = ? AND state <> ? AND expires_at > NOW()", "incoming", rHashStr, "settled").Limit(1).Scan(ctx) + if err != nil { + return err + } + + // Update the DB entry of the invoice + // If the invoice is settled we save the settle date and the status otherwise we just store the lnd status + // Additionally to the invoice update we create a transaction entry from the user's incoming account to the user's current account + // This transaction entry makes the balance available for the user + svc.Logger.Infof("Invoice update: invoice_id:%v settled:%v value:%v state:%v", invoice.ID, rawInvoice.Settled, rawInvoice.AmtPaidSat, rawInvoice.State) + + // Get the user's current account for the transaction entry + creditAccount, err := svc.AccountFor(ctx, "current", invoice.UserID) + if err != nil { + svc.Logger.Errorf("Could not find current account user_id:%v invoice_id:%v", invoice.UserID, invoice.ID) + return err + } + // Get the user's incoming account for the transaction entry + debitAccount, err := svc.AccountFor(ctx, "incoming", invoice.UserID) + if err != nil { + svc.Logger.Errorf("Could not find incoming account user_id:%v invoice_id:%v", invoice.UserID, invoice.ID) + return err + } + + // Process any update in a DB transaction + tx, err := svc.DB.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + svc.Logger.Errorf("Failed to update the invoice invoice_id:%v r_hash:%s %v", invoice.ID, rHashStr, err) + return err + } + + // if the invoice is NOT settled we just update the invoice state + if !rawInvoice.Settled { + svc.Logger.Infof("Invoice not settled invoice_id:%v state:", invoice.ID, rawInvoice.State.String()) + invoice.State = strings.ToLower(rawInvoice.State.String()) + + } else { + // if the invoice is settled we update the state and create an transaction entry to the current account + invoice.SettledAt = bun.NullTime{Time: time.Unix(rawInvoice.SettleDate, 0)} + invoice.State = "settled" + _, err = tx.NewUpdate().Model(&invoice).WherePK().Exec(ctx) + if err != nil { + tx.Rollback() + svc.Logger.Errorf("Could not update invoice invoice_id:%v", invoice.ID) + return err + } + + // Transfer the amount from the user's incoming account to the user's current account + entry := models.TransactionEntry{ + UserID: invoice.UserID, + InvoiceID: invoice.ID, + CreditAccountID: creditAccount.ID, + DebitAccountID: debitAccount.ID, + Amount: invoice.Amount, + } + + // Save the transaction entry + _, err = tx.NewInsert().Model(&entry).Exec(ctx) + if err != nil { + tx.Rollback() + svc.Logger.Errorf("Could not create incoming->current transaction user_id:%v invoice_id:%v %v", invoice.UserID, invoice.ID, err) + return err + } + } + // Commit the DB transaction. Done, everything worked + err = tx.Commit() + if err != nil { + svc.Logger.Errorf("Failed to commit DB transaction user_id:%v invoice_id:%v %v", invoice.UserID, invoice.ID, err) + return err + } + + return nil +} + +func (svc *LndhubService) ConnectInvoiceSubscription(ctx context.Context) (lnrpc.Lightning_SubscribeInvoicesClient, error) { + var invoice models.Invoice + invoiceSubscriptionOptions := lnrpc.InvoiceSubscription{} + // Find the oldest NOT settled invoice with an add_index + err := svc.DB.NewSelect().Model(&invoice).Where("invoice.settled_at IS NULL AND invoice.add_index IS NOT NULL").OrderExpr("invoice.id ASC").Limit(1).Scan(ctx) + // IF we found an invoice we use that index to start the subscription + if err == nil { + invoiceSubscriptionOptions = lnrpc.InvoiceSubscription{AddIndex: invoice.AddIndex - 1} // -1 because we want updates for that invoice already + } + svc.Logger.Infof("Starting invoice subscription from index: %v", invoiceSubscriptionOptions.AddIndex) + return svc.LndClient.SubscribeInvoices(ctx, &invoiceSubscriptionOptions) +} + +func (svc *LndhubService) InvoiceUpdateSubscription(ctx context.Context) error { + invoiceSubscriptionStream, err := svc.ConnectInvoiceSubscription(ctx) + if err != nil { + return err + } + for { + // receive the next invoice update + rawInvoice, err := invoiceSubscriptionStream.Recv() + if err != nil { + // TODO: sentry notification + svc.Logger.Errorf("Error processing invoice update subscription: %v", err) + // TODO: close the stream somehoe before retrying? + // Wait 30 seconds and try to reconnect + // TODO: implement some backoff + time.Sleep(30 * time.Second) + invoiceSubscriptionStream, _ = svc.ConnectInvoiceSubscription(ctx) + continue + } + + // Ignore updates for open invoices + // We store the invoice details in the AddInvoice call + // Processing open invoices here could cause a race condition: + // We could get this notification faster than we finish the AddInvoice call + if rawInvoice.State == lnrpc.Invoice_OPEN { + svc.Logger.Infof("Invoice state is open. Ignoring update. r_hash:%v", hex.EncodeToString(rawInvoice.RHash)) + continue + } + + processingError := svc.ProcessInvoiceUpdate(ctx, rawInvoice) + if processingError != nil { + svc.Logger.Error(err) + // TODO sentry notification + } + } +} diff --git a/lib/service/service.go b/lib/service/service.go index c68cd1f..4af9a3d 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -11,6 +11,7 @@ import ( "github.com/labstack/gommon/random" "github.com/lightningnetwork/lnd/lnrpc" "github.com/uptrace/bun" + "github.com/ziflex/lecho/v3" "golang.org/x/crypto/bcrypt" ) @@ -20,6 +21,7 @@ type LndhubService struct { Config *Config DB *bun.DB LndClient lnrpc.LightningClient + Logger *lecho.Logger IdentityPubkey *btcec.PublicKey } diff --git a/main.go b/main.go index 962bd1c..f6754c5 100644 --- a/main.go +++ b/main.go @@ -118,6 +118,7 @@ func main() { Config: c, DB: dbConn, LndClient: lndClient, + Logger: logger, IdentityPubkey: identityPubKey, } @@ -141,6 +142,9 @@ func main() { secured.GET("/getpending", blankController.GetPending) e.GET("/", blankController.Home) + // Subscribe to LND invoice updates in the background + go svc.InvoiceUpdateSubscription(context.Background()) + // Start server go func() { if err := e.Start(":3000"); err != nil && err != http.ErrServerClosed {