Skip to content
7 changes: 7 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine {
},
decodingConcurrency: decodingConcurrency,
selectorBatchSize: selectorBatchSize,
maxSamplesPerQuery: opts.MaxSamples,
}
}

Expand Down Expand Up @@ -227,6 +228,7 @@ type Engine struct {
selectorBatchSize int64
enableAnalysis bool
noStepSubqueryIntervalFn func(time.Duration) time.Duration
maxSamplesPerQuery int
}

func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
Expand Down Expand Up @@ -445,6 +447,11 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
DecodingConcurrency: e.decodingConcurrency,
}

if e.maxSamplesPerQuery > 0 {
res.SampleTracker = query.NewSampleTracker(e.maxSamplesPerQuery)
}

if opts == nil {
return res
}
Expand Down
150 changes: 150 additions & 0 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4794,6 +4794,156 @@ func TestQueryTimeout(t *testing.T) {
testutil.Equals(t, context.DeadlineExceeded, res.Err)
}

func TestMaxSamples(t *testing.T) {
t.Parallel()

t.Run("max_samples with rate function", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
defer storage.Close()

app := storage.Appender(context.Background())
// Create 1000 series with samples every 15s for 5 minutes
for i := range 1000 {
for ts := int64(0); ts <= 300; ts += 15 {
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())

// With 1000 series and a 2m window, rate() will keep ~8 samples per series in memory
// = ~8000 samples total
query := `rate(test_metric[2m])`
start := time.Unix(120, 0)
end := time.Unix(300, 0)
step := 30 * time.Second

t.Run("exceeds limit", func(t *testing.T) {
ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 5000, // Lower than ~8000 expected
},
})
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
require.NoError(t, err)
res := q.Exec(context.Background())
require.Error(t, res.Err, "expected max_samples error")
require.Contains(t, res.Err.Error(), "query processing would load too many samples into memory")
})

t.Run("within limit", func(t *testing.T) {
ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 50000, // Higher than ~8000 expected
},
})
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
require.NoError(t, err)
res := q.Exec(context.Background())
require.NoError(t, res.Err)
})
})

t.Run("max_samples with vector selector", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
defer storage.Close()

app := storage.Appender(context.Background())
// 10000 series, each step will have 10000 samples in memory
for i := range 10000 {
for ts := int64(0); ts <= 300; ts += 30 {
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())

query := `test_metric`
start := time.Unix(0, 0)
end := time.Unix(60, 0)
step := 30 * time.Second

ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 5000, // Lower than 10000 series per step
},
})
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
require.NoError(t, err)
res := q.Exec(context.Background())
require.Error(t, res.Err)
require.Contains(t, res.Err.Error(), "query processing would load too many samples into memory")
})

t.Run("max_samples with subquery", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
defer storage.Close()

app := storage.Appender(context.Background())
// 1000 series with subquery that accumulates samples
for i := range 1000 {
for ts := int64(0); ts <= 600; ts += 15 {
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())

// Subquery with 2m range and 30s step = 5 steps per evaluation
// With 1000 series, that's ~5000 samples in ring buffer
query := `sum_over_time(test_metric[2m:30s])`
start := time.Unix(120, 0)
end := time.Unix(300, 0)
step := 60 * time.Second

ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 1000, // Lower than expected
},
})
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
require.NoError(t, err)
res := q.Exec(context.Background())
require.Error(t, res.Err)
require.Contains(t, res.Err.Error(), "query processing would load too many samples into memory")
})

t.Run("max_samples disabled by default", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
defer storage.Close()

app := storage.Appender(context.Background())
for i := range 100 {
for ts := int64(0); ts < 300; ts += 30 {
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())

query := `rate(test_metric[1m])`
start := time.Unix(0, 0)
end := time.Unix(300, 0)
step := 30 * time.Second

ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{Timeout: 1 * time.Hour},
})
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
require.NoError(t, err)
res := q.Exec(context.Background())
require.NoError(t, res.Err)
})
}

type hintRecordingQuerier struct {
storage.Querier
mux sync.Mutex
Expand Down
51 changes: 51 additions & 0 deletions execution/scan/subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/prometheus/prometheus/model/labels"
)

const sampleLimitCheckPercentage = 0.05

type subqueryOperator struct {
next model.VectorOperator
paramOp model.VectorOperator
Expand Down Expand Up @@ -52,6 +54,9 @@ type subqueryOperator struct {
paramBuf []model.StepVector
param2Buf []model.StepVector
tempBuf []model.StepVector

currentTrackedSamples int
lastTrackedSamples int
}

func NewSubqueryOperator(next, paramOp, paramOp2 model.VectorOperator, opts *query.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) {
Expand Down Expand Up @@ -150,6 +155,9 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
for _, b := range o.buffers {
b.Reset(mint, maxt+o.subQuery.Offset.Milliseconds())
}
o.currentTrackedSamples = 0
o.lastTrackedSamples = 0
checkSampleLimitCounter := 0
if len(o.lastVectors) > 0 {
for _, v := range o.lastVectors[o.lastCollected+1:] {
if v.T > maxt {
Expand Down Expand Up @@ -184,6 +192,20 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
o.collect(vector, mint)
}

if o.opts.SampleTracker != nil {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it strange that we apply this limit in subqueries. They operate on existing samples and do not produce new ones.

@PaurushGarg PaurushGarg Feb 18, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Selectors reuse the same buffer each next, while subqueries accumulate samples in ring buffers that persist across the evaluation window.
For rate(http_requests[10m])[2d:1m], the inner selector loads samples and releases them, but the subquery keeps accumulating those in buffers for the full 2d window.

checkSampleLimitCounter++
if o.shouldCheckSampleLimit(checkSampleLimitCounter) {
if err := o.checkSampleLimit(); err != nil {
return 0, err
}
checkSampleLimitCounter = 0
}
}
}
if o.opts.SampleTracker != nil && checkSampleLimitCounter > 0 {
if err := o.checkSampleLimit(); err != nil {
return 0, err
}
}

buf[n].Reset(o.currentStep)
Expand All @@ -210,6 +232,15 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
return n, nil
}

func (o *subqueryOperator) checkSampleLimit() error {
delta := o.currentTrackedSamples - o.lastTrackedSamples
if delta > 0 {
o.opts.SampleTracker.Add(delta)
}
o.lastTrackedSamples = o.currentTrackedSamples
return o.opts.SampleTracker.CheckLimit()
}

func (o *subqueryOperator) collect(v model.StepVector, mint int64) {
if v.T < mint {
return
Expand All @@ -220,6 +251,7 @@ func (o *subqueryOperator) collect(v model.StepVector, mint int64) {
continue
}
buffer.Push(v.T, ringbuffer.Value{F: s})
o.currentTrackedSamples++
}
for i, s := range v.Histograms {
buffer := o.buffers[v.HistogramIDs[i]]
Expand All @@ -245,6 +277,7 @@ func (o *subqueryOperator) collect(v model.StepVector, mint int64) {
s.CounterResetHint = histogram.UnknownCounterReset
}
buffer.Push(v.T, ringbuffer.Value{H: s})
o.currentTrackedSamples += telemetry.CalculateHistogramSampleCount(s)
}

}
Expand Down Expand Up @@ -291,3 +324,21 @@ func (o *subqueryOperator) initSeries(ctx context.Context) error {
})
return err
}

func (o *subqueryOperator) shouldCheckSampleLimit(checkSampleLimitCounter int) bool {
if len(o.series) == 0 {
return checkSampleLimitCounter >= 1
}

limit := o.opts.SampleTracker.Limit()
targetSamplesPerCheck := int(float64(limit) * sampleLimitCheckPercentage)

maxSamplesPerCall := len(o.series) * o.stepsBatch
if maxSamplesPerCall == 0 {
return checkSampleLimitCounter >= 1
}

interval := max(targetSamplesPerCheck/maxSamplesPerCall, 1)

return checkSampleLimitCounter >= interval
}
1 change: 1 addition & 0 deletions query/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Options struct {
NoStepSubqueryIntervalFn func(time.Duration) time.Duration
EnableAnalysis bool
DecodingConcurrency int
SampleTracker *SampleTracker // Tracks current samples in memory
}

// TotalSteps returns the total number of steps in the query, regardless of batching.
Expand Down
52 changes: 52 additions & 0 deletions query/sample_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.

package query

import (
"fmt"
"sync/atomic"
)

type SampleTracker struct {
current atomic.Int64
limit int64
}

func NewSampleTracker(maxSamples int) *SampleTracker {
return &SampleTracker{
limit: int64(maxSamples),
}
}

func (st *SampleTracker) Add(count int) {
st.current.Add(int64(count))
}

func (st *SampleTracker) Remove(count int) {
st.current.Add(-int64(count))
}

func (st *SampleTracker) CheckLimit() error {
if st.limit <= 0 {
return nil
}
current := st.current.Load()
if current > st.limit {
return ErrMaxSamplesExceeded{Current: current, Limit: st.limit}
}
return nil
}

func (st *SampleTracker) Limit() int64 {
return st.limit
}

type ErrMaxSamplesExceeded struct {
Current int64
Limit int64
}

func (e ErrMaxSamplesExceeded) Error() string {
return fmt.Sprintf("query processing would load too many samples into memory: current=%d, limit=%d", e.Current, e.Limit)
}
Loading