Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions comp/logs-library/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//comp/core/telemetry/impl",
"//pkg/util/log",
"@com_github_benbjohnson_clock//:clock",
"@org_uber_go_atomic//:atomic",
],
)

Expand Down
5 changes: 2 additions & 3 deletions comp/logs-library/metrics/pipeline_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ func TestTelemetryPipelineMonitor_TickerSamplesRegisteredMonitor(t *testing.T) {

require.Eventually(t, func() bool {
clk.Add(time.Second)
um.mu.Lock()
defer um.mu.Unlock()
return um.avg > 0
// avg is published atomically, so a subscriber can poll it without locking the hot path.
return um.avg.Load() > 0
}, 2*time.Second, 5*time.Millisecond,
"the pipeline monitor's ticker must sample its registered utilization monitor")
}
Expand Down
170 changes: 81 additions & 89 deletions comp/logs-library/metrics/utilization_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
package metrics

import (
"sync"
"time"

"github.com/benbjohnson/clock"
"go.uber.org/atomic"

log "github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand All @@ -35,132 +35,114 @@ func (n *NoopUtilizationMonitor) Start() {}
// Stop does nothing.
func (n *NoopUtilizationMonitor) Stop() {}

// TelemetryUtilizationMonitor is a UtilizationMonitor that reports utilization metrics as telemetry.
// TelemetryUtilizationMonitor reports component utilization as telemetry. Start/Stop run lock-free
// on the hot path; the sampler goroutine owns all derived state.
type TelemetryUtilizationMonitor struct {
// mu guards all mutable state: Start/Stop run on the component goroutine, sample on the ticker.
mu sync.Mutex

inUse time.Duration
idle time.Duration
startIdle time.Time
startInUse time.Time
lastSample time.Time
sampleRate time.Duration
avg float64 // EWMA utilization (N=15, α≈0.125)
history *rollingHistory
name string
instance string
started bool
clock clock.Clock
// registry, when non-nil, receives utilization snapshots and supplies the capacity figures
// used in saturation logs. It is owned by the pipeline monitor; standalone monitors leave it nil.
registry *snapshotRegistry

// Saturation episode tracking for log emission.
// Hot-path accumulators in ns; effective busy = cumulativeBusyNanos + open interval while started.
cumulativeBusyNanos atomic.Int64
startInUseNanos atomic.Int64
started atomic.Bool

// EWMA utilization (N=15, α≈0.125); sampler writes, subscribers read.
avg atomic.Float64

name string
instance string
sampleRate time.Duration
clock clock.Clock
history *rollingHistory
lastSample time.Time
lastEffectiveBusy int64
registry *snapshotRegistry // nil for standalone monitors

isSaturated bool
saturatedSince time.Time
lastThrottleLog time.Time
episodeMaxUtil float64
episodeMaxItems int64
episodeMaxBytes int64
pendingRecoverySince time.Time // non-zero while EWMA is below threshold but debounce not yet met
}

// NewTelemetryUtilizationMonitor creates a new TelemetryUtilizationMonitor that reports to telemetry only.
func NewTelemetryUtilizationMonitor(name, instance string) *TelemetryUtilizationMonitor {
return newTelemetryUtilizationMonitorWithSampleRateAndClock(name, instance, 1*time.Second, clock.New(), nil)
pendingRecoverySince time.Time
}

func newTelemetryUtilizationMonitorWithSampleRateAndClock(name, instance string, sampleRate time.Duration, clock clock.Clock, registry *snapshotRegistry) *TelemetryUtilizationMonitor {
return &TelemetryUtilizationMonitor{
name: name,
instance: instance,
startIdle: clock.Now(),
startInUse: clock.Now(),
lastSample: clock.Now(),
sampleRate: sampleRate,
avg: 0,
history: newRollingHistory(),
started: false,
clock: clock,
history: newRollingHistory(),
lastSample: clock.Now(),
registry: registry,
}
}

// Start tracks a start event in the utilization tracker.
// Start marks the component in-use.
func (u *TelemetryUtilizationMonitor) Start() {
u.mu.Lock()
defer u.mu.Unlock()
if u.started {
if u.started.Load() {
return
}
u.started = true
now := u.clock.Now()
u.idle += now.Sub(u.startIdle)
u.startInUse = now
u.reportIfNeededLocked(now)
// Store the start before flipping started, so the sampler never sees started with a stale start.
u.startInUseNanos.Store(u.clock.Now().UnixNano())
u.started.Store(true)
}

// Stop tracks a finish event in the utilization tracker.
// Stop marks the component idle and credits the elapsed in-use interval.
func (u *TelemetryUtilizationMonitor) Stop() {
u.mu.Lock()
defer u.mu.Unlock()
if !u.started {
if !u.started.Load() {
return
}
u.started = false
now := u.clock.Now()
u.inUse += now.Sub(u.startInUse)
u.startIdle = now
u.reportIfNeededLocked(now)
}

// sample is driven by the ticker so a component blocked mid-operation is observed, not frozen at its last EWMA.
func (u *TelemetryUtilizationMonitor) sample(now time.Time) {
u.mu.Lock()
defer u.mu.Unlock()
u.settleLocked(now)
u.reportIfNeededLocked(now)
}

// settleLocked credits the open interval up to now and advances its start so Start/Stop won't double-count it. Caller holds u.mu.
func (u *TelemetryUtilizationMonitor) settleLocked(now time.Time) {
if u.started {
u.inUse += now.Sub(u.startInUse)
u.startInUse = now
} else {
u.idle += now.Sub(u.startIdle)
u.startIdle = now
// Credit the interval before clearing started, so a sampler seeing started=false also sees it.
if busy := u.clock.Now().UnixNano() - u.startInUseNanos.Load(); busy > 0 {
u.cumulativeBusyNanos.Add(busy)
}
u.started.Store(false)
}

// reportIfNeededLocked publishes a sample if sampleRate has elapsed; now is passed for a consistent instant. Caller holds u.mu.
func (u *TelemetryUtilizationMonitor) reportIfNeededLocked(now time.Time) {
// sample is driven by the ticker so a component blocked mid-operation is still observed.
func (u *TelemetryUtilizationMonitor) sample(now time.Time) {
if now.Sub(u.lastSample) < u.sampleRate {
return
}

// A torn read against the hot path can over- or under-count one interval; clamp01 bounds it and
// the next tick self-corrects.
effBusy := u.effectiveBusyNanos(now)
windowBusy := effBusy - u.lastEffectiveBusy
windowElapsed := now.UnixNano() - u.lastSample.UnixNano()

rawRatio := 0.0
if total := u.idle + u.inUse; total > 0 {
rawRatio = float64(u.inUse) / float64(total)
if windowElapsed > 0 {
rawRatio = clamp01(float64(windowBusy) / float64(windowElapsed))
}
u.avg = ewma(rawRatio, u.avg)

u.history.add(now, u.avg)
avg := ewma(rawRatio, u.avg.Load())
u.avg.Store(avg)
u.history.add(now, avg)

TlmUtilizationRatio.Set(u.avg, u.name, u.instance)
TlmUtilizationRatio.Set(avg, u.name, u.instance)
if u.registry != nil {
u.registry.setUtilization(u.name, u.instance, u.avg, rawRatio, u.history)
u.registry.setUtilization(u.name, u.instance, avg, rawRatio, u.history)
}
u.idle = 0
u.inUse = 0

u.lastEffectiveBusy = effBusy
u.lastSample = now

u.updateSaturationState(now)
u.updateSaturationState(now, avg)
}

func (u *TelemetryUtilizationMonitor) effectiveBusyNanos(now time.Time) int64 {
busy := u.cumulativeBusyNanos.Load()
if u.started.Load() {
if open := now.UnixNano() - u.startInUseNanos.Load(); open > 0 {
busy += open
}
}
return busy
}

// updateSaturationState drives the saturation state machine, emitting transition and throttled logs. Caller holds u.mu.
func (u *TelemetryUtilizationMonitor) updateSaturationState(now time.Time) {
currentlySaturated := u.avg >= SaturationThreshold
// updateSaturationState drives the saturation state machine, emitting transition and throttled logs.
func (u *TelemetryUtilizationMonitor) updateSaturationState(now time.Time, avg float64) {
currentlySaturated := avg >= SaturationThreshold

if currentlySaturated {
u.pendingRecoverySince = time.Time{}
Expand All @@ -174,15 +156,15 @@ func (u *TelemetryUtilizationMonitor) updateSaturationState(now time.Time) {
u.isSaturated = true
u.saturatedSince = now
u.lastThrottleLog = now
u.episodeMaxUtil = u.avg
u.episodeMaxUtil = avg
u.episodeMaxItems = snap.RawItems
u.episodeMaxBytes = snap.RawBytes
// max_items/max_bytes are omitted at onset (capacity snapshot may not have ticked yet).
log.Warnf("Logs Agent pipeline component saturated component=%s instance=%s utilization=%.0f%%",
u.name, u.instance, u.avg*100)
u.name, u.instance, avg*100)
} else {
if u.avg > u.episodeMaxUtil {
u.episodeMaxUtil = u.avg
if avg > u.episodeMaxUtil {
u.episodeMaxUtil = avg
}
if snap.RawItems > u.episodeMaxItems {
u.episodeMaxItems = snap.RawItems
Expand All @@ -193,7 +175,7 @@ func (u *TelemetryUtilizationMonitor) updateSaturationState(now time.Time) {

if now.Sub(u.lastThrottleLog) >= saturationThrottleDuration {
log.Warnf("Logs Agent pipeline component saturated component=%s instance=%s utilization=%.0f%% duration=%s max_utilization=%.0f%% max_items=%d max_bytes=%d",
u.name, u.instance, u.avg*100, now.Sub(u.saturatedSince), u.episodeMaxUtil*100, u.episodeMaxItems, u.episodeMaxBytes)
u.name, u.instance, avg*100, now.Sub(u.saturatedSince), u.episodeMaxUtil*100, u.episodeMaxItems, u.episodeMaxBytes)
u.lastThrottleLog = now
}
}
Expand All @@ -212,3 +194,13 @@ func (u *TelemetryUtilizationMonitor) updateSaturationState(now time.Time) {
}
}
}

func clamp01(v float64) float64 {
if v < 0 {
return 0
}
if v > 1 {
return 1
}
return v
}
Loading
Loading