Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions comp/logs/agent/impl/status_templates/logsagentHTML.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
{{- if .Backpressure.Reason }}
<strong>Reason:</strong> {{ .Backpressure.Reason }}<br>
{{- end }}
{{- if .Backpressure.Suggestion }}
<strong>Suggestion:</strong> {{ .Backpressure.Suggestion }}<br>
{{- end }}
<table style="border-collapse:collapse;margin-top:6px;font-family:monospace;font-size:12px">
<tr style="text-align:left;border-bottom:1px solid #ccc">
<th style="padding:2px 8px">Component</th>
Expand Down
33 changes: 27 additions & 6 deletions pkg/logs/status/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,21 @@ func (b *Builder) getComponentUtilization() []ComponentUtilization {
return result
}

// backpressureSuggestion returns a brief corrective hint naming the logs_config settings
// most relevant to the saturated stage.
func backpressureSuggestion(component string) string {
switch {
case component == logsMetrics.ProcessorTlmName:
return "Processing is the bottleneck: drop noisy logs with logs_config.processing_rules (exclude_at_match), or raise logs_config.pipelines to parallelize processing."
case component == logsMetrics.StrategyTlmName:
return "Batching/compression is the bottleneck: lower logs_config.compression_level, or raise logs_config.pipelines."
case component == logsMetrics.WorkerTlmName || strings.HasPrefix(component, "destination"):
return "Sending is the bottleneck: raise logs_config.batch_max_concurrent_send, and verify endpoint reachability (logs_config.logs_dd_url)."
default:
return "Pipeline is saturated: raise logs_config.pipelines to add throughput."
}
}

// getBackpressureStatus returns SATURATED (saturated in last 1m), WARNING (last 30m only), or HEALTHY.
func (b *Builder) getBackpressureStatus(utils []ComponentUtilization) BackpressureStatus {
// SATURATED signal: among currently-saturated components, surface the one with the highest EWMA.
Expand Down Expand Up @@ -183,23 +198,26 @@ func (b *Builder) getBackpressureStatus(utils []ComponentUtilization) Backpressu
if hasCurrSat {
dur30m := time.Duration(currSat30m) * time.Second
return BackpressureStatus{
State: "SATURATED",
Reason: fmt.Sprintf("%s pipeline %s is currently saturated (saturated for %s in the last 30m)", currSatName, currSatInst, fmtDuration(dur30m)),
State: "SATURATED",
Reason: fmt.Sprintf("%s pipeline %s is currently saturated (saturated for %s in the last 30m)", currSatName, currSatInst, fmtDuration(dur30m)),
Suggestion: backpressureSuggestion(currSatName),
}
}
// WARNING: saturation occurred in the last 1m or 30m but no component is currently at threshold.
if maxSat1m > 0 {
dur30m := time.Duration(sat30mForMaxSat1m) * time.Second
return BackpressureStatus{
State: "WARNING",
Reason: fmt.Sprintf("%s pipeline %s is not currently saturated but was saturated for %s in the last 30m", satName1m, satInst1m, fmtDuration(dur30m)),
State: "WARNING",
Reason: fmt.Sprintf("%s pipeline %s is not currently saturated but was saturated for %s in the last 30m", satName1m, satInst1m, fmtDuration(dur30m)),
Suggestion: backpressureSuggestion(satName1m),
}
}
if maxSat30m > 0 {
dur30m := time.Duration(maxSat30m) * time.Second
return BackpressureStatus{
State: "WARNING",
Reason: fmt.Sprintf("%s pipeline %s is not currently saturated but was saturated for %s in the last 30m", satName30m, satInst30m, fmtDuration(dur30m)),
State: "WARNING",
Reason: fmt.Sprintf("%s pipeline %s is not currently saturated but was saturated for %s in the last 30m", satName30m, satInst30m, fmtDuration(dur30m)),
Suggestion: backpressureSuggestion(satName30m),
}
}
return BackpressureStatus{State: "HEALTHY"}
Expand All @@ -218,6 +236,9 @@ func (b *Builder) formatBackpressureSection(utils []ComponentUtilization, bp Bac
if bp.Reason != "" {
sb.WriteString(fmt.Sprintf(" Reason: %s\n", bp.Reason))
}
if bp.Suggestion != "" {
sb.WriteString(fmt.Sprintf(" Suggestion: %s\n", bp.Suggestion))
}
sb.WriteString("\n")

// Size columns to the widest name/instance so the table stays aligned.
Expand Down
2 changes: 2 additions & 0 deletions pkg/logs/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ type ComponentUtilization struct {
type BackpressureStatus struct {
State string `json:"state"`
Reason string `json:"reason"`
// Suggestion is a brief, component-specific corrective hint with a doc link. Empty when HEALTHY.
Suggestion string `json:"suggestion,omitempty"`
}

// Status provides some information about logs-agent.
Expand Down
28 changes: 28 additions & 0 deletions pkg/logs/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,31 @@ func TestGetBackpressureStatus_WarningPicksHighestSat1m(t *testing.T) {
assert.Equal(t, "WARNING", bp.State)
assert.Contains(t, bp.Reason, "sender", "component with highest Saturated1mSeconds must appear in reason")
}

// TestGetBackpressureStatus_SuggestionByComponent checks the corrective suggestion names the
// logs_config setting relevant to the saturated stage.
func TestGetBackpressureStatus_SuggestionByComponent(t *testing.T) {
b := &Builder{}
wantSetting := map[string]string{
"processor": "logs_config.processing_rules",
"strategy": "logs_config.compression_level",
"worker": "logs_config.batch_max_concurrent_send",
"destination_http": "logs_config.batch_max_concurrent_send",
}
for name, want := range wantSetting {
utils := []ComponentUtilization{
{Name: name, Instance: "0", AvgRatio: 0.95, CurrentlySaturated: true, Saturated30mSeconds: 30},
}
bp := b.getBackpressureStatus(utils)
assert.Equal(t, "SATURATED", bp.State)
assert.Contains(t, bp.Suggestion, want, "suggestion must name the setting relevant to the %s stage", name)
}
}

// TestGetBackpressureStatus_HealthyNoSuggestion checks HEALTHY carries no suggestion.
func TestGetBackpressureStatus_HealthyNoSuggestion(t *testing.T) {
b := &Builder{}
bp := b.getBackpressureStatus([]ComponentUtilization{{Name: "processor", Instance: "0", AvgRatio: 0.2}})
assert.Equal(t, "HEALTHY", bp.State)
assert.Empty(t, bp.Suggestion)
}
Loading