Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions comp/anomalydetection/logssource/impl/ad_source_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (m *adSourceManager) AddSource(src *sources.LogSource) {
if isContainerSource(src) && m.sp.isAgentContainerID(src.Config.Identifier) {
return
}
disableAdaptiveSampling(src.Config)
m.logSources.AddSource(src)
if isContainerSource(src) {
m.sp.suppressIdentifier(src.Config.Identifier)
Expand Down Expand Up @@ -88,3 +89,16 @@ func isContainerSource(src *sources.LogSource) bool {
}
return false
}

// disableAdaptiveSampling stamps an explicit Enabled=false override on cfg so that
// the decoder always picks NoopSampler for logssource sources, regardless of the
// global logs_config.experimental_adaptive_sampling.enabled flag. The observer
// pipeline must receive an unsampled stream; dropping logs here would cause the
// anomaly detection engine to miss anomalies hidden in suppressed patterns.
func disableAdaptiveSampling(cfg *logsconfig.LogsConfig) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Disable sampling for kubelet sources too

In kubelet+systemd builds, registerKubeletJournaldSource creates and adds the kubelet journald source directly (comp/anomalydetection/logssource/impl/kubelet_source.go:20-26) and never calls this helper or sets ExperimentalAdaptiveSampling. When logs_config.experimental_adaptive_sampling.enabled is true, the decoder falls back to that global flag for a nil source option, so the observer pipeline still samples kubelet logs even though this override is intended to make logssource streams unsampled. Please apply the same override when creating the kubelet source as well.

Useful? React with 👍 / 👎.

disabled := false
if cfg.ExperimentalAdaptiveSampling == nil {
cfg.ExperimentalAdaptiveSampling = &logsconfig.SourceAdaptiveSamplingOptions{}
}
cfg.ExperimentalAdaptiveSampling.Enabled = &disabled
Comment on lines +98 to +103

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Apply sampler override to kubelet source

When anomaly_detection.logs.kubelet.enabled is on, registerKubeletJournaldSource adds the kubelet journald LogSource directly to logSources without calling this helper, so with global logs_config.experimental_adaptive_sampling.enabled the kubelet.service logs are still sampled before they reach the observer. This leaves one observer logssource path with the old behavior despite the new unsampled-stream requirement; apply the same override when constructing the kubelet source too.

Useful? React with 👍 / 👎.

}
19 changes: 19 additions & 0 deletions comp/anomalydetection/logssource/impl/ad_source_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,25 @@ func TestADSourceManagerAddSourceKeepsNonAgentContainer(t *testing.T) {
assert.True(t, isSuppressed(sp, "app-container"), "active AD source should still suppress generic fallback collection")
}

func TestADSourceManagerAddSource_DisablesAdaptiveSampling(t *testing.T) {
wmeta := newWMetaMock(t)
wmeta.Set(runningContainer("app-container", "nginx"))

logSources := sources.NewLogSources()
sp := newSourceProvider(wmeta, logSources, nil)
mgr := newADSourceManager(logSources, service.NewServices(), sp)

src := newContainerADSource("app-container")
mgr.AddSource(src)

got := logSources.GetSources()
require.Len(t, got, 1)
as := got[0].Config.ExperimentalAdaptiveSampling
require.NotNil(t, as, "ExperimentalAdaptiveSampling must be set")
require.NotNil(t, as.Enabled, "ExperimentalAdaptiveSampling.Enabled must be set")
assert.False(t, *as.Enabled, "adaptive sampling must be disabled on logssource AD sources")
}

func TestADSourceManagerAddSourceKeepsUnknownContainer(t *testing.T) {
wmeta := newWMetaMock(t)

Expand Down
6 changes: 4 additions & 2 deletions comp/anomalydetection/logssource/impl/source_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,12 @@ func (sp *sourceProvider) handleSet(c *workloadmeta.Container) {
sp.mu.Unlock()
return // an AD source already owns this container; skip generic source
}
src := sources.NewLogSource(c.EntityID.ID, &logsconfig.LogsConfig{
cfg := &logsconfig.LogsConfig{
Type: string(c.Runtime),
Identifier: c.EntityID.ID,
})
}
disableAdaptiveSampling(cfg)
src := sources.NewLogSource(c.EntityID.ID, cfg)
sp.activeSources[c.EntityID.ID] = src
sp.mu.Unlock()

Expand Down
11 changes: 11 additions & 0 deletions comp/anomalydetection/logssource/impl/source_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ func TestHandleSet_AddsRunningContainer(t *testing.T) {
assert.Len(t, ls.GetSources(), 1)
}

func TestHandleSet_DisablesAdaptiveSampling(t *testing.T) {
sp, ls := newTestSourceProvider()
sp.handleSet(runningContainer("abc", "nginx"))
srcs := ls.GetSources()
require.Len(t, srcs, 1)
as := srcs[0].Config.ExperimentalAdaptiveSampling
require.NotNil(t, as, "ExperimentalAdaptiveSampling must be set")
require.NotNil(t, as.Enabled, "ExperimentalAdaptiveSampling.Enabled must be set")
assert.False(t, *as.Enabled, "adaptive sampling must be disabled on logssource sources")
}

func TestHandleSet_SkipsNonRunning(t *testing.T) {
sp, ls := newTestSourceProvider()
c := runningContainer("abc", "nginx")
Expand Down
132 changes: 132 additions & 0 deletions pkg/logs/internal/decoder/logssource_sampling_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package decoder

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/comp/logs/agent/config"
configmock "github.com/DataDog/datadog-agent/pkg/config/mock"
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
"github.com/DataDog/datadog-agent/pkg/logs/internal/framer"
"github.com/DataDog/datadog-agent/pkg/logs/internal/parsers/noop"
"github.com/DataDog/datadog-agent/pkg/logs/sources"
status "github.com/DataDog/datadog-agent/pkg/logs/status/utils"
)

// newDecoderForSource creates a started decoder for the given source, using
// UTF8 newline framing and a noop parser. The caller is responsible for
// stopping the decoder after the test.
func newDecoderForSource(t *testing.T, src *sources.LogSource) Decoder {
t.Helper()
d := NewDecoderWithFraming(
sources.NewReplaceableSource(src),
noop.New(),
framer.UTF8Newline,
nil,
status.NewInfoRegistry(),
)
d.Start()
return d
}

// sendAndCount sends n identical newline-terminated messages to d and returns
// how many messages are received from the output channel.
// inputChan and outputChan are unbuffered, so sending and receiving must be
// concurrent to avoid deadlock.
func sendAndCount(t *testing.T, d Decoder, n int) int {
t.Helper()
line := []byte("INFO something repetitive happened\n")

// Send in a separate goroutine so the main goroutine can drain outputChan.
go func() {
for range n {
d.InputChan() <- NewInput(line)
}
d.Stop() // closes inputChan → flushes pipeline → closes outputChan
}()

count := 0
timeout := time.After(5 * time.Second)
for {
select {
case _, ok := <-d.OutputChan():
if !ok {
return count
}
count++
case <-timeout:
t.Error("timed out draining output channel")
return count
}
}
}

// TestLogssourceDecoderSeesMoreLogsThanMainAgentWhenSamplingEnabled verifies
// the end-to-end effect of disableAdaptiveSampling (called by logssource on
// every source it registers).
//
// With global adaptive sampling enabled:
// - A source with ExperimentalAdaptiveSampling=nil (main agent) gets an
// AdaptiveSampler; repeated identical messages are rate-limited.
// - A source with ExperimentalAdaptiveSampling.Enabled=false (logssource
// after the fix) gets a NoopSampler; every message passes through.
//
// With global adaptive sampling disabled, both sources use NoopSampler and
// both pipelines receive the same count.
func TestLogssourceDecoderSeesMoreLogsThanMainAgentWhenSamplingEnabled(t *testing.T) {
const N = 20

// Use burst=1, rate=0: exactly one log gets through per pattern, then all
// subsequent identical messages are dropped regardless of elapsed time.
cfg := configmock.New(t)
cfg.Set("logs_config.experimental_adaptive_sampling.burst_size", 1.0, pkgconfigmodel.SourceAgentRuntime)
cfg.Set("logs_config.experimental_adaptive_sampling.rate_limit", 0.0, pkgconfigmodel.SourceAgentRuntime)
cfg.Set("logs_config.experimental_adaptive_sampling.max_patterns", 10, pkgconfigmodel.SourceAgentRuntime)
cfg.Set("logs_config.experimental_adaptive_sampling.match_threshold", 0.8, pkgconfigmodel.SourceAgentRuntime)

disabled := false

mainAgentSource := sources.NewLogSource("main-agent", &config.LogsConfig{
// No ExperimentalAdaptiveSampling override: inherits global flag.
})
logssourceSource := sources.NewLogSource("logssource", &config.LogsConfig{
// Mirrors what disableAdaptiveSampling stamps on every logssource source.
ExperimentalAdaptiveSampling: &config.SourceAdaptiveSamplingOptions{
Enabled: &disabled,
},
})

t.Run("sampling enabled: logssource sees all logs, main agent drops most", func(t *testing.T) {
cfg.Set("logs_config.experimental_adaptive_sampling.enabled", true, pkgconfigmodel.SourceAgentRuntime)

mainCount := sendAndCount(t, newDecoderForSource(t, mainAgentSource), N)
logssourceCount := sendAndCount(t, newDecoderForSource(t, logssourceSource), N)

assert.Equal(t, N, logssourceCount,
"logssource decoder must pass all %d logs through (NoopSampler)", N)
require.Less(t, mainCount, N,
"main agent decoder must drop some logs (AdaptiveSampler with burst=1)")
assert.Greater(t, logssourceCount, mainCount,
"logssource must receive more logs than the main agent pipeline")
})

t.Run("sampling disabled: both pipelines receive all logs", func(t *testing.T) {
cfg.Set("logs_config.experimental_adaptive_sampling.enabled", false, pkgconfigmodel.SourceAgentRuntime)

mainCount := sendAndCount(t, newDecoderForSource(t, mainAgentSource), N)
logssourceCount := sendAndCount(t, newDecoderForSource(t, logssourceSource), N)

assert.Equal(t, N, mainCount,
"main agent decoder must pass all logs when sampling is disabled")
assert.Equal(t, N, logssourceCount,
"logssource decoder must pass all logs when sampling is disabled")
})
}
Loading