diff --git a/db/db.go b/db/db.go index 7f0a437..cc4d8c3 100644 --- a/db/db.go +++ b/db/db.go @@ -18,7 +18,10 @@ func Open(config *service.Config) (*bun.DB, error) { dsn := config.DatabaseUri switch { case strings.HasPrefix(dsn, "postgres://") || strings.HasPrefix(dsn, "postgresql://") || strings.HasPrefix(dsn, "unix://"): - dbConn := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(dsn))) + dbConn := sql.OpenDB( + pgdriver.NewConnector( + pgdriver.WithDSN(dsn), + pgdriver.WithTimeout(time.Duration(config.DatabaseTimeout)*time.Second))) db = bun.NewDB(dbConn, pgdialect.New()) db.SetMaxOpenConns(config.DatabaseMaxConns) db.SetMaxIdleConns(config.DatabaseMaxIdleConns) diff --git a/integration_tests/subscription_start_test.go b/integration_tests/subscription_start_test.go index 25d3e6e..01683dd 100644 --- a/integration_tests/subscription_start_test.go +++ b/integration_tests/subscription_start_test.go @@ -125,7 +125,7 @@ func (mock *lndSubscriptionStartMockClient) SubscribeInvoices(ctx context.Contex return mock, nil } -//wait forever +// wait forever func (mock *lndSubscriptionStartMockClient) Recv() (*lnrpc.Invoice, error) { select {} } diff --git a/lib/service/config.go b/lib/service/config.go index 082bdaf..7fcb5f1 100644 --- a/lib/service/config.go +++ b/lib/service/config.go @@ -10,6 +10,7 @@ type Config struct { DatabaseMaxConns int `envconfig:"DATABASE_MAX_CONNS" default:"10"` DatabaseMaxIdleConns int `envconfig:"DATABASE_MAX_IDLE_CONNS" default:"5"` DatabaseConnMaxLifetime int `envconfig:"DATABASE_CONN_MAX_LIFETIME" default:"1800"` // 30 minutes + DatabaseTimeout int `envconfig:"DATABASE_TIMEOUT" default:"60"` // 60 seconds SentryDSN string `envconfig:"SENTRY_DSN"` SentryTracesSampleRate float64 `envconfig:"SENTRY_TRACES_SAMPLE_RATE"` LogFilePath string `envconfig:"LOG_FILE_PATH"` diff --git a/lib/service/invoicesubscription.go b/lib/service/invoicesubscription.go index 3d8fd6a..33bff9a 100644 --- a/lib/service/invoicesubscription.go +++ b/lib/service/invoicesubscription.go @@ -220,12 +220,19 @@ func (svc *LndhubService) ConnectInvoiceSubscription(ctx context.Context) (lnd.S var invoice models.Invoice invoiceSubscriptionOptions := lnrpc.InvoiceSubscription{} // Find the oldest NOT settled AND NOT expired invoice with an add_index - // Note: expired invoices will not be settled anymore, so we don't care about those - err := svc.DB.NewSelect().Model(&invoice).Where("invoice.settled_at IS NULL AND invoice.add_index IS NOT NULL AND invoice.expires_at >= now()").OrderExpr("invoice.id ASC").Limit(1).Scan(ctx) + // Build in a safety buffer of 14h to account for lndhub downtime + // Note: expired invoices will not be settled anymore, so we don't care about those + err := svc.DB.NewSelect().Model(&invoice).Where("invoice.settled_at IS NULL AND invoice.add_index IS NOT NULL AND invoice.expires_at >= (now() - interval '14 hours')").OrderExpr("invoice.id ASC").Limit(1).Scan(ctx) // IF we found an invoice we use that index to start the subscription - if err == nil { - invoiceSubscriptionOptions = lnrpc.InvoiceSubscription{AddIndex: invoice.AddIndex - 1} // -1 because we want updates for that invoice already + // if we get an error there might be a serious issue here + // and we are at risk of missing paid invoices, so we should not continue + // if we just didn't find any unsettled invoices that's allright though + if err != nil && err != sql.ErrNoRows { + sentry.CaptureException(err) + return nil, err } + // subtract 1 (read invoiceSubscriptionOptions.Addindex docs) + invoiceSubscriptionOptions.AddIndex = invoice.AddIndex - 1 svc.Logger.Infof("Starting invoice subscription from index: %v", invoiceSubscriptionOptions.AddIndex) return svc.LndClient.SubscribeInvoices(ctx, &invoiceSubscriptionOptions) } @@ -239,14 +246,16 @@ func (svc *LndhubService) InvoiceUpdateSubscription(ctx context.Context) error { for { select { case <-ctx.Done(): - return fmt.Errorf("Context was canceled") + return context.Canceled default: // receive the next invoice update rawInvoice, err := invoiceSubscriptionStream.Recv() + // in case of an error, we want to return and restart LNDhub + // in order to try and reconnect the gRPC subscription if err != nil { svc.Logger.Errorf("Error processing invoice update subscription: %v", err) sentry.CaptureException(err) - continue + return err } // Ignore updates for open invoices diff --git a/main.go b/main.go index 90aebaf..e1b10fd 100644 --- a/main.go +++ b/main.go @@ -179,8 +179,9 @@ func main() { backgroundWg.Add(1) go func() { err = svc.InvoiceUpdateSubscription(backGroundCtx) - if err != nil { - svc.Logger.Error(err) + if err != nil && err != context.Canceled { + // in case of an error in this routine, we want to restart LNDhub + svc.Logger.Fatal(err) } svc.Logger.Info("Invoice routine done") backgroundWg.Done()