From 7233c25bce5eabd15a033ca9a34a9b7a434a86ed Mon Sep 17 00:00:00 2001 From: djkazic Date: Tue, 6 May 2025 11:11:49 -0400 Subject: [PATCH] aperture: internally aggregate session statistics to reduce cardinality --- hashmail_server.go | 95 ++++++++++++++++++++++++++++++++++++++++++++-- prometheus.go | 56 +++++++++++++++++++-------- 2 files changed, 131 insertions(+), 20 deletions(-) diff --git a/hashmail_server.go b/hashmail_server.go index f6b3275..69822d3 100644 --- a/hashmail_server.go +++ b/hashmail_server.go @@ -11,7 +11,6 @@ import ( "github.com/btcsuite/btclog/v2" "github.com/lightninglabs/lightning-node-connect/hashmailrpc" "github.com/lightningnetwork/lnd/tlv" - "github.com/prometheus/client_golang/prometheus" "golang.org/x/time/rate" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -35,6 +34,11 @@ const ( // DefaultBufSize is the default number of bytes that are read in a // single operation. DefaultBufSize = 4096 + + // streamTTL is the amount of time that a stream needs to be exist without + // reads for it to be considered for pruning. Otherwise, memory will grow + // unbounded. + streamTTL = 24 * time.Hour ) // streamID is the identifier of a stream. @@ -747,9 +751,7 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc, streamID := newStreamID(desc.StreamId) if streamID.isOdd() { baseID := streamID.baseID() - mailboxReadCount.With(prometheus.Labels{ - streamIDLabel: fmt.Sprintf("%x", baseID), - }).Inc() + streamActivityTracker.Record(fmt.Sprintf("%x", baseID)) } err = reader.Send(&hashmailrpc.CipherBox{ @@ -766,6 +768,91 @@ func (h *hashMailServer) RecvStream(desc *hashmailrpc.CipherBoxDesc, var _ hashmailrpc.HashMailServer = (*hashMailServer)(nil) +// streamActivity tracks per-session read activity for classifying mailbox +// sessions as active, standby, or in-use. It maintains an in-memory map +// of stream IDs to counters and timestamps. +type streamActivity struct { + sync.Mutex + streams map[string]*activityEntry +} + +// activityEntry holds the read count and last update time for a single mailbox +// session. +type activityEntry struct { + count uint64 + lastUpdate time.Time +} + +// newStreamActivity creates a new streamActivity tracker used to monitor +// mailbox read activity per stream ID. +func newStreamActivity() *streamActivity { + return &streamActivity{ + streams: make(map[string]*activityEntry), + } +} + +// Record logs a read event for the given base stream ID. +// It increments the read count and updates the last activity timestamp. +func (sa *streamActivity) Record(baseID string) { + sa.Lock() + defer sa.Unlock() + + entry, ok := sa.streams[baseID] + if !ok { + entry = &activityEntry{} + sa.streams[baseID] = entry + } + entry.count++ + entry.lastUpdate = time.Now() +} + +// ClassifyAndReset categorizes each tracked stream based on its recent read +// rate and returns aggregate counts of active, standby, and in-use sessions. +// A stream is classified as: +// - In-use: if read rate ≥ 0.5 reads/sec +// - Standby: if 0 < read rate < 0.5 reads/sec +// - Active: if read rate > 0 (includes standby and in-use) +func (sa *streamActivity) ClassifyAndReset() (active, standby, inuse int) { + sa.Lock() + defer sa.Unlock() + + now := time.Now() + + for baseID, e := range sa.streams { + inactiveDuration := now.Sub(e.lastUpdate) + + // Prune if idle for >24h and no new reads. + if e.count == 0 && inactiveDuration > streamTTL { + delete(sa.streams, baseID) + continue + } + + elapsed := inactiveDuration.Seconds() + if elapsed <= 0 { + // Prevent divide-by-zero, treat as 1s interval. + elapsed = 1 + } + + rate := float64(e.count) / elapsed + + switch { + case rate >= 0.5: + inuse++ + case rate > 0: + standby++ + } + if rate > 0 { + active++ + } + + // Reset for next window. + e.count = 0 + e.lastUpdate = now + } + + return active, standby, inuse +} + // streamStatus keeps track of the occupancy status of a stream's read and // write sub-streams. It is initialised with callback functions to call on the // event of the streams being occupied (either or both of the streams are diff --git a/prometheus.go b/prometheus.go index 1585b3f..353d63e 100644 --- a/prometheus.go +++ b/prometheus.go @@ -3,13 +3,12 @@ package aperture import ( "fmt" "net/http" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" ) -const streamIDLabel = "streamID" - var ( // mailboxCount tracks the current number of active mailboxes. mailboxCount = prometheus.NewGauge(prometheus.GaugeOpts{ @@ -17,21 +16,31 @@ var ( Name: "mailbox_count", }) - // mailboxReadCount counts each time a mailbox pair is being used. - // A session consists of a bidirectional stream each using a mailbox - // with an ID that overlaps for the first 63 bytes and differ for the - // last bit. So in order to obtain accurate data about a specific - // mailbox session, the stream ID that will be recorded is the first - // 16 bytes of the session ID and we will only record the odd stream's - // reads so that we don't duplicate the data. - mailboxReadCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "hashmail", - Name: "mailbox_read_count", - }, []string{streamIDLabel}, - ) + // activeSessions tracks the active session count for mailbox + activeSessions = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "hashmail", + Name: "mailbox_active_sessions", + Help: "Number of active sessions", + }) + + // standbySessions tracks the standby session count for mailbox + standbySessions = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "hashmail", + Name: "mailbox_standby_sessions", + Help: "Number of standby sessions", + }) + + // inUseSessions tracks the in-use session count for mailbox + inUseSessions = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "hashmail", + Name: "mailbox_inuse_sessions", + Help: "Number of in-use sessions", + }) ) +// streamActivityTracker handles the calculation of session statistics +var streamActivityTracker = newStreamActivity() + // PrometheusConfig is the set of configuration data that specifies if // Prometheus metric exporting is activated, and if so the listening address of // the Prometheus server. @@ -55,7 +64,22 @@ func StartPrometheusExporter(cfg *PrometheusConfig) error { // Next, we'll register all our metrics. prometheus.MustRegister(mailboxCount) - prometheus.MustRegister(mailboxReadCount) + prometheus.MustRegister(activeSessions) + prometheus.MustRegister(standbySessions) + prometheus.MustRegister(inUseSessions) + + // Periodically update session classification metrics from internal tracker + go func() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for range ticker.C { + active, standby, inuse := streamActivityTracker.ClassifyAndReset() + activeSessions.Set(float64(active)) + standbySessions.Set(float64(standby)) + inUseSessions.Set(float64(inuse)) + } + }() // Finally, we'll launch the HTTP server that Prometheus will use to // scape our metrics.