Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions comp/anomalydetection/logssource/impl/ad_source_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (m *adSourceManager) AddSource(src *sources.LogSource) {
if isContainerSource(src) && m.sp.isAgentContainerID(src.Config.Identifier) {
return
}
disableAdaptiveSampling(src.Config)
m.logSources.AddSource(src)
if isContainerSource(src) {
m.sp.suppressIdentifier(src.Config.Identifier)
Expand Down Expand Up @@ -88,3 +89,16 @@ func isContainerSource(src *sources.LogSource) bool {
}
return false
}

// disableAdaptiveSampling stamps an explicit Enabled=false override on cfg so that
// the decoder always picks NoopSampler for logssource sources, regardless of the
// global logs_config.experimental_adaptive_sampling.enabled flag. The observer
// pipeline must receive an unsampled stream; dropping logs here would cause the
// anomaly detection engine to miss anomalies hidden in suppressed patterns.
func disableAdaptiveSampling(cfg *logsconfig.LogsConfig) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Disable sampling for kubelet sources too

In kubelet+systemd builds, registerKubeletJournaldSource creates and adds the kubelet journald source directly (comp/anomalydetection/logssource/impl/kubelet_source.go:20-26) and never calls this helper or sets ExperimentalAdaptiveSampling. When logs_config.experimental_adaptive_sampling.enabled is true, the decoder falls back to that global flag for a nil source option, so the observer pipeline still samples kubelet logs even though this override is intended to make logssource streams unsampled. Please apply the same override when creating the kubelet source as well.

Useful? React with 👍 / 👎.

disabled := false
if cfg.ExperimentalAdaptiveSampling == nil {
cfg.ExperimentalAdaptiveSampling = &logsconfig.SourceAdaptiveSamplingOptions{}
}
cfg.ExperimentalAdaptiveSampling.Enabled = &disabled
Comment on lines +98 to +103

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Apply sampler override to kubelet source

When anomaly_detection.logs.kubelet.enabled is on, registerKubeletJournaldSource adds the kubelet journald LogSource directly to logSources without calling this helper, so with global logs_config.experimental_adaptive_sampling.enabled the kubelet.service logs are still sampled before they reach the observer. This leaves one observer logssource path with the old behavior despite the new unsampled-stream requirement; apply the same override when constructing the kubelet source too.

Useful? React with 👍 / 👎.

}
19 changes: 19 additions & 0 deletions comp/anomalydetection/logssource/impl/ad_source_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,25 @@ func TestADSourceManagerAddSourceKeepsNonAgentContainer(t *testing.T) {
assert.True(t, isSuppressed(sp, "app-container"), "active AD source should still suppress generic fallback collection")
}

func TestADSourceManagerAddSource_DisablesAdaptiveSampling(t *testing.T) {
wmeta := newWMetaMock(t)
wmeta.Set(runningContainer("app-container", "nginx"))

logSources := sources.NewLogSources()
sp := newSourceProvider(wmeta, logSources, nil)
mgr := newADSourceManager(logSources, service.NewServices(), sp)

src := newContainerADSource("app-container")
mgr.AddSource(src)

got := logSources.GetSources()
require.Len(t, got, 1)
as := got[0].Config.ExperimentalAdaptiveSampling
require.NotNil(t, as, "ExperimentalAdaptiveSampling must be set")
require.NotNil(t, as.Enabled, "ExperimentalAdaptiveSampling.Enabled must be set")
assert.False(t, *as.Enabled, "adaptive sampling must be disabled on logssource AD sources")
}

func TestADSourceManagerAddSourceKeepsUnknownContainer(t *testing.T) {
wmeta := newWMetaMock(t)

Expand Down
6 changes: 4 additions & 2 deletions comp/anomalydetection/logssource/impl/source_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,12 @@ func (sp *sourceProvider) handleSet(c *workloadmeta.Container) {
sp.mu.Unlock()
return // an AD source already owns this container; skip generic source
}
src := sources.NewLogSource(c.EntityID.ID, &logsconfig.LogsConfig{
cfg := &logsconfig.LogsConfig{
Type: string(c.Runtime),
Identifier: c.EntityID.ID,
})
}
disableAdaptiveSampling(cfg)
src := sources.NewLogSource(c.EntityID.ID, cfg)
sp.activeSources[c.EntityID.ID] = src
sp.mu.Unlock()

Expand Down
11 changes: 11 additions & 0 deletions comp/anomalydetection/logssource/impl/source_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ func TestHandleSet_AddsRunningContainer(t *testing.T) {
assert.Len(t, ls.GetSources(), 1)
}

func TestHandleSet_DisablesAdaptiveSampling(t *testing.T) {
sp, ls := newTestSourceProvider()
sp.handleSet(runningContainer("abc", "nginx"))
srcs := ls.GetSources()
require.Len(t, srcs, 1)
as := srcs[0].Config.ExperimentalAdaptiveSampling
require.NotNil(t, as, "ExperimentalAdaptiveSampling must be set")
require.NotNil(t, as.Enabled, "ExperimentalAdaptiveSampling.Enabled must be set")
assert.False(t, *as.Enabled, "adaptive sampling must be disabled on logssource sources")
}

func TestHandleSet_SkipsNonRunning(t *testing.T) {
sp, ls := newTestSourceProvider()
c := runningContainer("abc", "nginx")
Expand Down
Loading