diff --git a/main.go b/main.go index 3bded76..7b83427 100644 --- a/main.go +++ b/main.go @@ -98,9 +98,10 @@ func main() { forwardingStore := postgresql.NewForwardingEventStore(pool) notificationsStore := postgresql.NewNotificationsStore(pool) lsps2Store := postgresql.NewLsps2Store(pool) - notificationService := notifications.NewNotificationService(notificationsStore) ctx, cancel := context.WithCancel(context.Background()) + notificationService := notifications.NewNotificationService(notificationsStore) + go notificationService.Start(ctx) openingService := common.NewOpeningService(openingStore, nodesService) cleanupService := lsps2.NewCleanupService(lsps2Store) go cleanupService.Start(ctx) diff --git a/notifications/notification_service.go b/notifications/notification_service.go index 22d37b3..19cdc41 100644 --- a/notifications/notification_service.go +++ b/notifications/notification_service.go @@ -6,15 +6,41 @@ import ( "encoding/json" "log" "net/http" + "sync" + "time" ) +var minNotificationInterval time.Duration = time.Second * 30 +var cleanInterval time.Duration = time.Minute * 2 + type NotificationService struct { - store Store + mtx sync.Mutex + recentNotifications map[string]time.Time + store Store } func NewNotificationService(store Store) *NotificationService { return &NotificationService{ - store: store, + store: store, + recentNotifications: make(map[string]time.Time), + } +} + +func (s *NotificationService) Start(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(cleanInterval): + } + + s.mtx.Lock() + for id, lastTime := range s.recentNotifications { + if lastTime.Add(minNotificationInterval).Before(time.Now()) { + delete(s.recentNotifications, id) + } + } + s.mtx.Unlock() } } @@ -29,6 +55,14 @@ func (s *NotificationService) Notify( pubkey string, paymenthash string, ) (bool, error) { + s.mtx.Lock() + lastTime, ok := s.recentNotifications[paymentIdentifier(pubkey, paymenthash)] + s.mtx.Unlock() + if ok && lastTime.Add(minNotificationInterval).After(time.Now()) { + // Treat as if we notified if the notification was recent. + return true, nil + } + registrations, err := s.store.GetRegistrations(context.Background(), pubkey) if err != nil { log.Printf("Failed to get notification registrations for %s: %v", pubkey, err) @@ -69,5 +103,15 @@ func (s *NotificationService) Notify( notified = true } + if notified { + s.mtx.Lock() + s.recentNotifications[paymentIdentifier(pubkey, paymenthash)] = time.Now() + s.mtx.Unlock() + } + return notified, nil } + +func paymentIdentifier(pubkey string, paymentHash string) string { + return pubkey + "|" + paymentHash +}