mirror of
https://github.com/getAlby/lndhub.go.git
synced 2026-02-23 13:54:25 +01:00
Merge pull request #316 from getAlby/catch-add-index-error
Invoice subscription: safety improvements
This commit is contained in:
5
db/db.go
5
db/db.go
@@ -18,7 +18,10 @@ func Open(config *service.Config) (*bun.DB, error) {
|
||||
dsn := config.DatabaseUri
|
||||
switch {
|
||||
case strings.HasPrefix(dsn, "postgres://") || strings.HasPrefix(dsn, "postgresql://") || strings.HasPrefix(dsn, "unix://"):
|
||||
dbConn := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(dsn)))
|
||||
dbConn := sql.OpenDB(
|
||||
pgdriver.NewConnector(
|
||||
pgdriver.WithDSN(dsn),
|
||||
pgdriver.WithTimeout(time.Duration(config.DatabaseTimeout)*time.Second)))
|
||||
db = bun.NewDB(dbConn, pgdialect.New())
|
||||
db.SetMaxOpenConns(config.DatabaseMaxConns)
|
||||
db.SetMaxIdleConns(config.DatabaseMaxIdleConns)
|
||||
|
||||
@@ -125,7 +125,7 @@ func (mock *lndSubscriptionStartMockClient) SubscribeInvoices(ctx context.Contex
|
||||
return mock, nil
|
||||
}
|
||||
|
||||
//wait forever
|
||||
// wait forever
|
||||
func (mock *lndSubscriptionStartMockClient) Recv() (*lnrpc.Invoice, error) {
|
||||
select {}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ type Config struct {
|
||||
DatabaseMaxConns int `envconfig:"DATABASE_MAX_CONNS" default:"10"`
|
||||
DatabaseMaxIdleConns int `envconfig:"DATABASE_MAX_IDLE_CONNS" default:"5"`
|
||||
DatabaseConnMaxLifetime int `envconfig:"DATABASE_CONN_MAX_LIFETIME" default:"1800"` // 30 minutes
|
||||
DatabaseTimeout int `envconfig:"DATABASE_TIMEOUT" default:"60"` // 60 seconds
|
||||
SentryDSN string `envconfig:"SENTRY_DSN"`
|
||||
SentryTracesSampleRate float64 `envconfig:"SENTRY_TRACES_SAMPLE_RATE"`
|
||||
LogFilePath string `envconfig:"LOG_FILE_PATH"`
|
||||
|
||||
@@ -220,12 +220,19 @@ func (svc *LndhubService) ConnectInvoiceSubscription(ctx context.Context) (lnd.S
|
||||
var invoice models.Invoice
|
||||
invoiceSubscriptionOptions := lnrpc.InvoiceSubscription{}
|
||||
// Find the oldest NOT settled AND NOT expired invoice with an add_index
|
||||
// Note: expired invoices will not be settled anymore, so we don't care about those
|
||||
err := svc.DB.NewSelect().Model(&invoice).Where("invoice.settled_at IS NULL AND invoice.add_index IS NOT NULL AND invoice.expires_at >= now()").OrderExpr("invoice.id ASC").Limit(1).Scan(ctx)
|
||||
// Build in a safety buffer of 14h to account for lndhub downtime
|
||||
// Note: expired invoices will not be settled anymore, so we don't care about those
|
||||
err := svc.DB.NewSelect().Model(&invoice).Where("invoice.settled_at IS NULL AND invoice.add_index IS NOT NULL AND invoice.expires_at >= (now() - interval '14 hours')").OrderExpr("invoice.id ASC").Limit(1).Scan(ctx)
|
||||
// IF we found an invoice we use that index to start the subscription
|
||||
if err == nil {
|
||||
invoiceSubscriptionOptions = lnrpc.InvoiceSubscription{AddIndex: invoice.AddIndex - 1} // -1 because we want updates for that invoice already
|
||||
// if we get an error there might be a serious issue here
|
||||
// and we are at risk of missing paid invoices, so we should not continue
|
||||
// if we just didn't find any unsettled invoices that's allright though
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
sentry.CaptureException(err)
|
||||
return nil, err
|
||||
}
|
||||
// subtract 1 (read invoiceSubscriptionOptions.Addindex docs)
|
||||
invoiceSubscriptionOptions.AddIndex = invoice.AddIndex - 1
|
||||
svc.Logger.Infof("Starting invoice subscription from index: %v", invoiceSubscriptionOptions.AddIndex)
|
||||
return svc.LndClient.SubscribeInvoices(ctx, &invoiceSubscriptionOptions)
|
||||
}
|
||||
@@ -239,14 +246,16 @@ func (svc *LndhubService) InvoiceUpdateSubscription(ctx context.Context) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("Context was canceled")
|
||||
return context.Canceled
|
||||
default:
|
||||
// receive the next invoice update
|
||||
rawInvoice, err := invoiceSubscriptionStream.Recv()
|
||||
// in case of an error, we want to return and restart LNDhub
|
||||
// in order to try and reconnect the gRPC subscription
|
||||
if err != nil {
|
||||
svc.Logger.Errorf("Error processing invoice update subscription: %v", err)
|
||||
sentry.CaptureException(err)
|
||||
continue
|
||||
return err
|
||||
}
|
||||
|
||||
// Ignore updates for open invoices
|
||||
|
||||
5
main.go
5
main.go
@@ -179,8 +179,9 @@ func main() {
|
||||
backgroundWg.Add(1)
|
||||
go func() {
|
||||
err = svc.InvoiceUpdateSubscription(backGroundCtx)
|
||||
if err != nil {
|
||||
svc.Logger.Error(err)
|
||||
if err != nil && err != context.Canceled {
|
||||
// in case of an error in this routine, we want to restart LNDhub
|
||||
svc.Logger.Fatal(err)
|
||||
}
|
||||
svc.Logger.Info("Invoice routine done")
|
||||
backgroundWg.Done()
|
||||
|
||||
Reference in New Issue
Block a user