Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions api/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package api
import (
"context"
"fmt"
"math"
"sync"
"time"

"github.com/prometheus/prometheus/model/labels"
Expand All @@ -16,10 +18,46 @@ type RemoteQuery interface {
fmt.Stringer
}

// Deprecated: RemoteEndpoints will be replaced with
// RemoteEndpointsV2 / RemoteEndpointsV3 in a future breaking change.
type RemoteEndpoints interface {
Engines() []RemoteEngine
}

// RemoteEndpointsV2 describes endpoints that accept pruning hints when
// selecting remote engines.
//
// For example implementations may use the hints to prune the TSDBInfos, but
// also may safely ignore them and return all available remote engines.
//
// NOTE(Aleksandr Krivoshchekov):
// We add a new interface as a temporary backward compatibility.
// RemoteEndpoints will be replaced with it in a future breaking change.
type RemoteEndpointsV2 interface {
EnginesV2(mint, maxt int64) []RemoteEngine
}

type RemoteEndpointsQuery struct {
MinT int64
MaxT int64
}

// RemoteEndpointsV3 describes endpoints that accept pruning hints when
// selecting remote engines.
//
// For example implementations may use the hints to prune the TSDBInfos, but
// also may safely ignore them and return all available remote engines.
//
// NOTE(Aleksandr Krivoshchekov):
// We add a new interface as a temporary backward compatibility.
// RemoteEndpoints will be replaced with it in a future breaking change.
//
// Unlike RemoteEndpointsV2, this interface can be extended with more hints
// in the future, without making any breaking changes.
type RemoteEndpointsV3 interface {
EnginesV3(query RemoteEndpointsQuery) []RemoteEngine
Comment thread
SuperPaintman marked this conversation as resolved.
Outdated
}

type RemoteEngine interface {
MaxT() int64
MinT() int64
Expand All @@ -44,6 +82,71 @@ func (m staticEndpoints) Engines() []RemoteEngine {
return m.engines
}

func (m staticEndpoints) EnginesV2(mint, maxt int64) []RemoteEngine {
return m.engines

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we apply filtering here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried, but since pruning is optional, I decided to not change DistributedExecutionOptimizer implementation, because it assumes that some engines might not have the requested range and fall back.

Some some unit tests check for that, and if we apply filter in static endpoints, they fail.

But now when I'm thinking about that again, I'm not sure that's an issue anymore.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yes, the API contract suggest that we only should return intersecting engines.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting something like this?

func (m staticEndpoints) Engines(mint, maxt int64) []RemoteEngine {
	var engines []RemoteEngine
	for _, e := range m.engines {
		if e.MaxT() < mint || e.MinT() > maxt {
			continue
		}
		engines = append(engines, e)
	}
	return engines
}

@SuperPaintman SuperPaintman Jan 21, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, I'm a bit hesitant about filtering engines themselves here (Maybe I don't fully understand the original implementation).

The optimizers seem to expect all engines to be available, especially for handling absent() (in DistributedExecutionOptimizer.distributeAbsent). If the engine is removed from query (e.g. if it has a data gap for the requested range), dist engine will return incorrect results.

Also, filtering engines breaks TestDistributedExecutionWithLongSelectorRanges/skip_distributing_queries_with_timestamps_outside_of_the_range_of_an_engine:

--- Expected
+++ Actual
@@ -1 +1 @@
-sum(sum by (region) (metric @ 18000.000))
+sum(dedup(remote(sum by (region) (metric @ 18000.000))))

Let me know if I'm mistaken, but it seems like an invalid result.

The current "real" implementation also assumes all engines are return.

The intent of the mint/maxt is to prune internal metadata (like TSDBInfos) within each engine, to reduce unnecessary computations later.

Does that make sense, or am I missing something?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, this indeed makes sense, especially since you brought up absent. I think we can leave this as it is and optimize later.

}

func (m staticEndpoints) EnginesV3(query RemoteEndpointsQuery) []RemoteEngine {
return m.engines
}

func NewStaticEndpoints(engines []RemoteEngine) RemoteEndpoints {
return &staticEndpoints{engines: engines}
}

type cachedEndpoints struct {
endpoints RemoteEndpoints

enginesOnce sync.Once
engines []RemoteEngine
}

func (l *cachedEndpoints) Engines() []RemoteEngine {
return l.EnginesV3(RemoteEndpointsQuery{
MaxT: math.MinInt64,
MinT: math.MaxInt64,
})
}

func (l *cachedEndpoints) EnginesV2(mint, maxt int64) []RemoteEngine {
return l.EnginesV3(RemoteEndpointsQuery{
MaxT: maxt,
MinT: mint,
})
}

func (l *cachedEndpoints) EnginesV3(query RemoteEndpointsQuery) []RemoteEngine {
l.enginesOnce.Do(func() {
l.engines = getEngines(l.endpoints, query)
})
return l.engines
}

func getEngines(endpoints RemoteEndpoints, query RemoteEndpointsQuery) []RemoteEngine {
if v3, ok := endpoints.(RemoteEndpointsV3); ok {
return v3.EnginesV3(query)
}

if v2, ok := endpoints.(RemoteEndpointsV2); ok {
return v2.EnginesV2(query.MinT, query.MaxT)
}

return endpoints.Engines()
}

// NewCachedEndpoints returns an endpoints wrapper that
// resolves and caches engines on first access.
//
// All subsequent Engines calls return cached engines, ignoring any query
// parameters.
func NewCachedEndpoints(endpoints RemoteEndpoints) RemoteEndpoints {
if endpoints == nil {
panic("api.NewCachedEndpoints: endpoints is nil")
}

if le, ok := endpoints.(*cachedEndpoints); ok {
return le
}

return &cachedEndpoints{endpoints: endpoints}
}
24 changes: 24 additions & 0 deletions engine/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,17 @@
}
}

func (l DistributedEngine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, e api.RemoteEndpoints, opts promql.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) {

Check failure on line 66 in engine/distributed.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

SA1019: api.RemoteEndpoints is deprecated: RemoteEndpoints will be replaced with RemoteEndpointsV2 / RemoteEndpointsV3 in a future breaking change. (staticcheck)
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
ts = ts.Truncate(time.Second)

// Cache engines to give optimizers a consistent view of Engines().
// Some RemoteEndpoints implementations also compute and cache
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
// recomputing those fields in each optimizer.
e = api.NewCachedEndpoints(e)

qOpts := fromPromQLOpts(opts)
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
logicalplan.PassthroughOptimizer{Endpoints: e},
Expand All @@ -77,13 +83,19 @@
return l.engine.MakeInstantQueryFromPlan(ctx, q, qOpts, plan, ts)
}

func (l DistributedEngine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, e api.RemoteEndpoints, opts promql.QueryOpts, plan logicalplan.Node, start, end time.Time, interval time.Duration) (promql.Query, error) {

Check failure on line 86 in engine/distributed.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

SA1019: api.RemoteEndpoints is deprecated: RemoteEndpoints will be replaced with RemoteEndpointsV2 / RemoteEndpointsV3 in a future breaking change. (staticcheck)
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
start = start.Truncate(time.Second)
end = end.Truncate(time.Second)
interval = interval.Truncate(time.Second)

// Cache engines to give optimizers a consistent view of Engines().
// Some RemoteEndpoints implementations also compute and cache
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
// recomputing those fields in each optimizer.
e = api.NewCachedEndpoints(e)

qOpts := fromPromQLOpts(opts)
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
logicalplan.PassthroughOptimizer{Endpoints: e},
Expand All @@ -93,11 +105,17 @@
return l.engine.MakeRangeQueryFromPlan(ctx, q, qOpts, plan, start, end, interval)
}

func (l DistributedEngine) MakeInstantQuery(ctx context.Context, q storage.Queryable, e api.RemoteEndpoints, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {

Check failure on line 108 in engine/distributed.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

SA1019: api.RemoteEndpoints is deprecated: RemoteEndpoints will be replaced with RemoteEndpointsV2 / RemoteEndpointsV3 in a future breaking change. (staticcheck)
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
ts = ts.Truncate(time.Second)

// Cache engines to give optimizers a consistent view of Engines().
// Some RemoteEndpoints implementations also compute and cache
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
// recomputing those fields in each optimizer.
e = api.NewCachedEndpoints(e)

qOpts := fromPromQLOpts(opts)
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
logicalplan.PassthroughOptimizer{Endpoints: e},
Expand All @@ -114,6 +132,12 @@
end = end.Truncate(time.Second)
interval = interval.Truncate(time.Second)

// Cache engines to give optimizers a consistent view of Engines().
// Some RemoteEndpoints implementations also compute and cache
// MinT() / MaxT() / LabelSets() on the fly, so the cache prevents
// recomputing those fields in each optimizer.
e = api.NewCachedEndpoints(e)

qOpts := fromPromQLOpts(opts)
qOpts.LogicalOptimizers = []logicalplan.Optimizer{
logicalplan.PassthroughOptimizer{Endpoints: e},
Expand Down
19 changes: 18 additions & 1 deletion logicalplan/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ type DistributedExecutionOptimizer struct {
}

func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) {
engines := m.Endpoints.Engines()
engines := getRemoteEngines(m.Endpoints, plan, opts)
sort.Slice(engines, func(i, j int) bool {
return engines[i].MinT() < engines[j].MinT()
})
Expand Down Expand Up @@ -858,3 +858,20 @@ func maxDuration(a, b time.Duration) time.Duration {
}
return b
}

func getRemoteEngines(endpoints api.RemoteEndpoints, plan Node, opts *query.Options) []api.RemoteEngine {
if v3, ok := endpoints.(api.RemoteEndpointsV3); ok {
mint, maxt := MinMaxTime(plan, opts)
return v3.EnginesV3(api.RemoteEndpointsQuery{
MinT: mint,
MaxT: maxt,
})
}

if v2, ok := endpoints.(api.RemoteEndpointsV2); ok {
mint, maxt := MinMaxTime(plan, opts)
return v2.EnginesV2(mint, maxt)
}

return endpoints.Engines()
}
39 changes: 22 additions & 17 deletions logicalplan/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,40 +43,45 @@ func matchingEngineTime(e api.RemoteEngine, opts *query.Options) bool {
}

func (m PassthroughOptimizer) Optimize(plan Node, opts *query.Options) (Node, annotations.Annotations) {
engines := m.Endpoints.Engines()
if len(engines) == 1 {
if !matchingEngineTime(engines[0], opts) {
return plan, nil
}
return RemoteExecution{
Engine: engines[0],
Query: plan.Clone(),
QueryRangeStart: opts.Start,
QueryRangeEnd: opts.End,
}, nil
}

engines := getRemoteEngines(m.Endpoints, plan, opts)
if len(engines) == 0 {
return plan, nil
}

matchingLabelsEngines := make([]api.RemoteEngine, 0, len(engines))
var (
hasSelector bool
matchingEngines int
firstMatchingEngine api.RemoteEngine
)
TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) {
if vs, ok := (*current).(*VectorSelector); ok {
hasSelector = true

for _, e := range engines {
if !labelSetsMatch(vs.LabelMatchers, e.LabelSets()...) {
continue
}

matchingLabelsEngines = append(matchingLabelsEngines, e)
matchingEngines++
if matchingEngines > 1 {
return true
}

firstMatchingEngine = e
}
}
return false
})

if len(matchingLabelsEngines) == 1 && matchingEngineTime(matchingLabelsEngines[0], opts) {
// Fallback to all engines.
if !hasSelector && matchingEngines == 0 {
matchingEngines = len(engines)
firstMatchingEngine = engines[0]
}

if matchingEngines == 1 && matchingEngineTime(firstMatchingEngine, opts) {
return RemoteExecution{
Engine: matchingLabelsEngines[0],
Engine: firstMatchingEngine,
Query: plan.Clone(),
QueryRangeStart: opts.Start,
QueryRangeEnd: opts.End,
Expand Down
20 changes: 8 additions & 12 deletions logicalplan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ var DefaultOptimizers = []Optimizer{
type Plan interface {
Optimize([]Optimizer) (Plan, annotations.Annotations)
Root() Node
MinMaxTime(*query.Options) (int64, int64)
}

type Optimizer interface {
Expand Down Expand Up @@ -152,15 +151,19 @@ func extractFuncFromPath(p []*Node) string {
return extractFuncFromPath(p[:len(p)-1])
}

func (p *plan) MinMaxTime(qOpts *query.Options) (int64, int64) {
func (p *plan) Root() Node {
return p.expr
}

// MinMaxTime returns the min and max timestamp that any selector in the query
// can read.
func MinMaxTime(root Node, qOpts *query.Options) (int64, int64) {
var minTimestamp, maxTimestamp int64 = math.MaxInt64, math.MinInt64
// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
// The evaluation of the VectorSelector inside then evaluates the given range and unsets
// the variable.
var evalRange time.Duration

root := p.Root()

TraverseWithParents(nil, &root, func(parents []*Node, node *Node) {
switch n := (*node).(type) {
case *VectorSelector:
Expand Down Expand Up @@ -205,10 +208,6 @@ func (p *plan) Optimize(optimizers []Optimizer) (Plan, annotations.Annotations)
return &plan{expr: expr, opts: p.opts}, *annos
}

func (p *plan) Root() Node {
return p.expr
}

func Traverse(expr *Node, transform func(*Node)) {
children := (*expr).Children()
transform(expr)
Expand All @@ -230,10 +229,7 @@ func TraverseBottomUp(parent *Node, current *Node, transform func(parent *Node,
for _, c := range (*current).Children() {
stop = TraverseBottomUp(current, c, transform) || stop
}
if stop {
return stop
}
return transform(parent, current)
return stop || transform(parent, current)
}

func replacePrometheusNodes(plan parser.Expr) Node {
Expand Down
2 changes: 1 addition & 1 deletion storage/prometheus/scanners.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *Scanners) Close() error {
func NewPrometheusScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (*Scanners, error) {
var min, max int64
if lplan != nil {
min, max = lplan.MinMaxTime(qOpts)
min, max = logicalplan.MinMaxTime(lplan.Root(), qOpts)
} else {
min, max = qOpts.Start.UnixMilli(), qOpts.End.UnixMilli()
}
Expand Down
2 changes: 1 addition & 1 deletion storage/prometheus/scanners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestScannersMinMaxTime(t *testing.T) {

plan, _ := logicalplan.NewFromAST(p, qOpts, logicalplan.PlanOptions{})

min, max := plan.MinMaxTime(qOpts)
min, max := logicalplan.MinMaxTime(plan.Root(), qOpts)

require.Equal(t, tcase.min, min)
require.Equal(t, tcase.max, max)
Expand Down
Loading