fix: comment punctuation and shutdown of ticker

This commit is contained in:
djkazic
2025-05-21 11:28:34 -04:00
parent 7233c25bce
commit eab9d755b6
4 changed files with 28 additions and 18 deletions

View File

@@ -151,7 +151,7 @@ func run() error {
errChan := make(chan error) errChan := make(chan error)
a := NewAperture(cfg) a := NewAperture(cfg)
if err := a.Start(errChan); err != nil { if err := a.Start(errChan, interceptor.ShutdownChannel()); err != nil {
return fmt.Errorf("unable to start aperture: %v", err) return fmt.Errorf("unable to start aperture: %v", err)
} }
@@ -192,9 +192,9 @@ func NewAperture(cfg *Config) *Aperture {
} }
// Start sets up the proxy server and starts it. // Start sets up the proxy server and starts it.
func (a *Aperture) Start(errChan chan error) error { func (a *Aperture) Start(errChan chan error, shutdown <-chan struct{}) error {
// Start the prometheus exporter. // Start the prometheus exporter.
err := StartPrometheusExporter(a.cfg.Prometheus) err := StartPrometheusExporter(a.cfg.Prometheus, shutdown)
if err != nil { if err != nil {
return fmt.Errorf("unable to start the prometheus "+ return fmt.Errorf("unable to start the prometheus "+
"exporter: %v", err) "exporter: %v", err)

View File

@@ -809,9 +809,9 @@ func (sa *streamActivity) Record(baseID string) {
// ClassifyAndReset categorizes each tracked stream based on its recent read // ClassifyAndReset categorizes each tracked stream based on its recent read
// rate and returns aggregate counts of active, standby, and in-use sessions. // rate and returns aggregate counts of active, standby, and in-use sessions.
// A stream is classified as: // A stream is classified as:
// - In-use: if read rate ≥ 0.5 reads/sec // - In-use: if read rate ≥ 0.5 reads/sec.
// - Standby: if 0 < read rate < 0.5 reads/sec // - Standby: if 0 < read rate < 0.5 reads/sec.
// - Active: if read rate > 0 (includes standby and in-use) // - Active: if read rate > 0 (includes standby and in-use).
func (sa *streamActivity) ClassifyAndReset() (active, standby, inuse int) { func (sa *streamActivity) ClassifyAndReset() (active, standby, inuse int) {
sa.Lock() sa.Lock()
defer sa.Unlock() defer sa.Unlock()

View File

@@ -186,7 +186,8 @@ func setupAperture(t *testing.T) {
} }
aperture := NewAperture(apertureCfg) aperture := NewAperture(apertureCfg)
errChan := make(chan error) errChan := make(chan error)
require.NoError(t, aperture.Start(errChan)) shutdown := make(chan struct{})
require.NoError(t, aperture.Start(errChan, shutdown))
// Any error while starting? // Any error while starting?
select { select {

View File

@@ -16,21 +16,21 @@ var (
Name: "mailbox_count", Name: "mailbox_count",
}) })
// activeSessions tracks the active session count for mailbox // activeSessions tracks the active session count for mailbox.
activeSessions = prometheus.NewGauge(prometheus.GaugeOpts{ activeSessions = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "hashmail", Namespace: "hashmail",
Name: "mailbox_active_sessions", Name: "mailbox_active_sessions",
Help: "Number of active sessions", Help: "Number of active sessions",
}) })
// standbySessions tracks the standby session count for mailbox // standbySessions tracks the standby session count for mailbox.
standbySessions = prometheus.NewGauge(prometheus.GaugeOpts{ standbySessions = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "hashmail", Namespace: "hashmail",
Name: "mailbox_standby_sessions", Name: "mailbox_standby_sessions",
Help: "Number of standby sessions", Help: "Number of standby sessions",
}) })
// inUseSessions tracks the in-use session count for mailbox // inUseSessions tracks the in-use session count for mailbox.
inUseSessions = prometheus.NewGauge(prometheus.GaugeOpts{ inUseSessions = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "hashmail", Namespace: "hashmail",
Name: "mailbox_inuse_sessions", Name: "mailbox_inuse_sessions",
@@ -38,7 +38,7 @@ var (
}) })
) )
// streamActivityTracker handles the calculation of session statistics // streamActivityTracker handles the calculation of session statistics.
var streamActivityTracker = newStreamActivity() var streamActivityTracker = newStreamActivity()
// PrometheusConfig is the set of configuration data that specifies if // PrometheusConfig is the set of configuration data that specifies if
@@ -56,7 +56,9 @@ type PrometheusConfig struct {
// StartPrometheusExporter registers all relevant metrics with the Prometheus // StartPrometheusExporter registers all relevant metrics with the Prometheus
// library, then launches the HTTP server that Prometheus will hit to scrape // library, then launches the HTTP server that Prometheus will hit to scrape
// our metrics. // our metrics.
func StartPrometheusExporter(cfg *PrometheusConfig) error { func StartPrometheusExporter(cfg *PrometheusConfig,
shutdown <-chan struct{}) error {
// If we're not active, then there's nothing more to do. // If we're not active, then there's nothing more to do.
if !cfg.Enabled { if !cfg.Enabled {
return nil return nil
@@ -68,16 +70,23 @@ func StartPrometheusExporter(cfg *PrometheusConfig) error {
prometheus.MustRegister(standbySessions) prometheus.MustRegister(standbySessions)
prometheus.MustRegister(inUseSessions) prometheus.MustRegister(inUseSessions)
// Periodically update session classification metrics from internal tracker // Periodically update session classification metrics from internal tracker.
go func() { go func() {
ticker := time.NewTicker(10 * time.Second) ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for {
active, standby, inuse := streamActivityTracker.ClassifyAndReset() select {
activeSessions.Set(float64(active)) case <-ticker.C:
standbySessions.Set(float64(standby)) active, standby, inuse :=
inUseSessions.Set(float64(inuse)) streamActivityTracker.ClassifyAndReset()
activeSessions.Set(float64(active))
standbySessions.Set(float64(standby))
inUseSessions.Set(float64(inuse))
case <-shutdown:
log.Infof("Shutting down Prometheus session metrics updater")
return
}
} }
}() }()