Skip to content
7 changes: 7 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine {
},
decodingConcurrency: decodingConcurrency,
selectorBatchSize: selectorBatchSize,
maxSamplesPerQuery: opts.MaxSamples,
}
}

Expand Down Expand Up @@ -227,6 +228,7 @@ type Engine struct {
selectorBatchSize int64
enableAnalysis bool
noStepSubqueryIntervalFn func(time.Duration) time.Duration
maxSamplesPerQuery int
}

func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
Expand Down Expand Up @@ -445,6 +447,11 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
DecodingConcurrency: e.decodingConcurrency,
}

if e.maxSamplesPerQuery > 0 {
res.SampleTracker = query.NewSampleTracker(e.maxSamplesPerQuery)
}

if opts == nil {
return res
}
Expand Down
150 changes: 150 additions & 0 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4794,6 +4794,156 @@ func TestQueryTimeout(t *testing.T) {
testutil.Equals(t, context.DeadlineExceeded, res.Err)
}

func TestMaxSamples(t *testing.T) {
t.Parallel()

t.Run("max_samples with rate function", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
defer storage.Close()

app := storage.Appender(context.Background())
// Create 1000 series with samples every 15s for 5 minutes
for i := range 1000 {
for ts := int64(0); ts <= 300; ts += 15 {
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())

// With 1000 series and a 2m window, rate() will keep ~8 samples per series in memory
// = ~8000 samples total
query := `rate(test_metric[2m])`
start := time.Unix(120, 0)
end := time.Unix(300, 0)
step := 30 * time.Second

t.Run("exceeds limit", func(t *testing.T) {
ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 5000, // Lower than ~8000 expected
},
})
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
require.NoError(t, err)
res := q.Exec(context.Background())
require.Error(t, res.Err, "expected max_samples error")
require.Contains(t, res.Err.Error(), "query processing would load too many samples into memory")
})

t.Run("within limit", func(t *testing.T) {
ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 50000, // Higher than ~8000 expected
},
})
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
require.NoError(t, err)
res := q.Exec(context.Background())
require.NoError(t, res.Err)
})
})

t.Run("max_samples with vector selector", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
defer storage.Close()

app := storage.Appender(context.Background())
// 10000 series, each step will have 10000 samples in memory
for i := range 10000 {
for ts := int64(0); ts <= 300; ts += 30 {
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())

query := `test_metric`
start := time.Unix(0, 0)
end := time.Unix(60, 0)
step := 30 * time.Second

ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 5000, // Lower than 10000 series per step
},
})
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
require.NoError(t, err)
res := q.Exec(context.Background())
require.Error(t, res.Err)
require.Contains(t, res.Err.Error(), "query processing would load too many samples into memory")
})

t.Run("max_samples with subquery", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
defer storage.Close()

app := storage.Appender(context.Background())
// 1000 series with subquery that accumulates samples
for i := range 1000 {
for ts := int64(0); ts <= 600; ts += 15 {
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())

// Subquery with 2m range and 30s step = 5 steps per evaluation
// With 1000 series, that's ~5000 samples in ring buffer
query := `sum_over_time(test_metric[2m:30s])`
start := time.Unix(120, 0)
end := time.Unix(300, 0)
step := 60 * time.Second

ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 3000, // Lower than expected
},
})
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
require.NoError(t, err)
res := q.Exec(context.Background())
require.Error(t, res.Err)
require.Contains(t, res.Err.Error(), "query processing would load too many samples into memory")
})

t.Run("max_samples disabled by default", func(t *testing.T) {
t.Parallel()
storage := teststorage.New(t)
defer storage.Close()

app := storage.Appender(context.Background())
for i := range 100 {
for ts := int64(0); ts < 300; ts += 30 {
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "test_metric", "series", strconv.Itoa(i)), ts*1000, float64(ts))
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())

query := `rate(test_metric[1m])`
start := time.Unix(0, 0)
end := time.Unix(300, 0)
step := 30 * time.Second

ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{Timeout: 1 * time.Hour},
})
q, err := ng.NewRangeQuery(context.Background(), storage, nil, query, start, end, step)
require.NoError(t, err)
res := q.Exec(context.Background())
require.NoError(t, res.Err)
})
}

type hintRecordingQuerier struct {
storage.Querier
mux sync.Mutex
Expand Down
36 changes: 36 additions & 0 deletions execution/scan/subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ type subqueryOperator struct {
paramBuf []model.StepVector
param2Buf []model.StepVector
tempBuf []model.StepVector

lastTrackedSamples int
}

const maxSamplesCheckIntervalSteps = 100

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we have a way to infer a good sample check interval dynamically. Cardinality and number of total steps are known when we call Next(). So ideally we can check more frequently if the query has high cardinality?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved the checkSampleLimit to inner loop as noted from the run on beta cell, the instant subQueries were getting checked too late. Also added dynamic interval check.
Thanks.


func NewSubqueryOperator(next, paramOp, paramOp2 model.VectorOperator, opts *query.Options, funcExpr *logicalplan.FunctionCall, subQuery *logicalplan.Subquery) (model.VectorOperator, error) {
call, err := ringbuffer.NewRangeVectorFunc(funcExpr.Func.Name)
if err != nil {
Expand Down Expand Up @@ -205,6 +209,38 @@ func (o *subqueryOperator) Next(ctx context.Context, buf []model.StepVector) (in
}
n++
o.currentStep += o.step

if o.opts.SampleTracker != nil && (i+1)%maxSamplesCheckIntervalSteps == 0 {
totalSamplesInBatch := 0
for _, b := range o.buffers {
totalSamplesInBatch += b.SampleCount()
}
if totalSamplesInBatch > o.lastTrackedSamples {
o.opts.SampleTracker.Add(totalSamplesInBatch - o.lastTrackedSamples)
} else if totalSamplesInBatch < o.lastTrackedSamples {
o.opts.SampleTracker.Remove(o.lastTrackedSamples - totalSamplesInBatch)
}
o.lastTrackedSamples = totalSamplesInBatch
if err := o.opts.SampleTracker.CheckLimit(); err != nil {
return 0, err
}
}
}

if o.opts.SampleTracker != nil {
totalSamplesInBatch := 0
for _, b := range o.buffers {
totalSamplesInBatch += b.SampleCount()
}
if totalSamplesInBatch > o.lastTrackedSamples {
o.opts.SampleTracker.Add(totalSamplesInBatch - o.lastTrackedSamples)
} else if totalSamplesInBatch < o.lastTrackedSamples {
o.opts.SampleTracker.Remove(o.lastTrackedSamples - totalSamplesInBatch)
}
o.lastTrackedSamples = totalSamplesInBatch
if err := o.opts.SampleTracker.CheckLimit(); err != nil {
return 0, err
}
}

return n, nil
Expand Down
1 change: 1 addition & 0 deletions query/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Options struct {
NoStepSubqueryIntervalFn func(time.Duration) time.Duration
EnableAnalysis bool
DecodingConcurrency int
SampleTracker *SampleTracker // Tracks current samples in memory
}

// TotalSteps returns the total number of steps in the query, regardless of batching.
Expand Down
48 changes: 48 additions & 0 deletions query/sample_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.

package query

import (
"fmt"
"sync/atomic"
)

type SampleTracker struct {
current atomic.Int64
limit int64
}

func NewSampleTracker(maxSamples int) *SampleTracker {
return &SampleTracker{
limit: int64(maxSamples),
}
}

func (st *SampleTracker) Add(count int) {
st.current.Add(int64(count))
}

func (st *SampleTracker) Remove(count int) {
st.current.Add(-int64(count))
}

func (st *SampleTracker) CheckLimit() error {
if st.limit <= 0 {
return nil
}
current := st.current.Load()
if current > st.limit {
return ErrMaxSamplesExceeded{Current: current, Limit: st.limit}
}
return nil
}

type ErrMaxSamplesExceeded struct {
Current int64
Limit int64
}

func (e ErrMaxSamplesExceeded) Error() string {
return fmt.Sprintf("query processing would load too many samples into memory: current=%d, limit=%d", e.Current, e.Limit)
}
38 changes: 38 additions & 0 deletions storage/prometheus/matrix_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,14 @@ type matrixSelector struct {

nonCounterMetric string
hasFloats bool

lastTrackedSamples int
}

var ErrNativeHistogramsNotSupported = errors.New("native histograms are not supported in extended range functions")

const maxSamplesCheckIntervalSeries = 1000

// NewMatrixSelector creates operator which selects vector of series over time.
func NewMatrixSelector(
selector SeriesSelector,
Expand Down Expand Up @@ -217,7 +221,40 @@ func (o *matrixSelector) Next(ctx context.Context, buf []model.StepVector) (int,
o.telemetry.IncrementSamplesAtTimestamp(scanner.buffer.SampleCount(), seriesTs)
seriesTs += o.step
}

if o.opts.SampleTracker != nil && (o.currentSeries+1-firstSeries)%maxSamplesCheckIntervalSeries == 0 {
totalSamplesInBatch := 0
for i := range o.scanners {
totalSamplesInBatch += o.scanners[i].buffer.SampleCount()
}

@yeya24 yeya24 Feb 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we are counting sample count from all series not just the current series? It seems that we can just maintain a running sum for processed series instead of counting all processed series again

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that would make more sense.
Updated now to count sample for only current series.
Thanks.

if totalSamplesInBatch > o.lastTrackedSamples {
o.opts.SampleTracker.Add(totalSamplesInBatch - o.lastTrackedSamples)
} else if totalSamplesInBatch < o.lastTrackedSamples {
o.opts.SampleTracker.Remove(o.lastTrackedSamples - totalSamplesInBatch)
}
o.lastTrackedSamples = totalSamplesInBatch
if err := o.opts.SampleTracker.CheckLimit(); err != nil {
return 0, err
}
}
}

if o.opts.SampleTracker != nil {
totalSamplesInBatch := 0
for i := range o.scanners {
totalSamplesInBatch += o.scanners[i].buffer.SampleCount()
}
if totalSamplesInBatch > o.lastTrackedSamples {
o.opts.SampleTracker.Add(totalSamplesInBatch - o.lastTrackedSamples)
} else if totalSamplesInBatch < o.lastTrackedSamples {
o.opts.SampleTracker.Remove(o.lastTrackedSamples - totalSamplesInBatch)
}
o.lastTrackedSamples = totalSamplesInBatch
if err := o.opts.SampleTracker.CheckLimit(); err != nil {
return 0, err
}
}

if o.currentSeries == int64(len(o.scanners)) {
o.currentStep += o.step * int64(n)
o.currentSeries = 0
Expand Down Expand Up @@ -311,6 +348,7 @@ func (o *matrixSelector) newBuffer(ctx context.Context) ringbuffer.Buffer {
return ringbuffer.NewWithExtLookback(ctx, 8, o.selectRange, o.offset, o.opts.ExtLookbackDelta.Milliseconds()-1, o.call)
}
return ringbuffer.New(ctx, 8, o.selectRange, o.offset, o.call)

}

func (o *matrixSelector) String() string {
Expand Down
Loading
Loading