Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/api/query/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,16 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer
query.NoopSeriesStatsReporter,
)

var ts time.Time
if request.TimeSeconds == 0 {
ts = g.now()
} else {
ts = time.Unix(request.TimeSeconds, 0)
}
remoteEndpoints := g.remoteEndpointsCreate(
replicaLabels,
request.EnablePartialResponse,
ts, ts,
)

var qry promql.Query
Expand Down Expand Up @@ -197,9 +204,13 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que
query.NoopSeriesStatsReporter,
)

start := time.Unix(request.StartTimeSeconds, 0)
end := time.Unix(request.EndTimeSeconds, 0)
remoteEndpoints := g.remoteEndpointsCreate(
replicaLabels,
request.EnablePartialResponse,
start,
end,
)

var qry promql.Query
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ func (qapi *QueryAPI) queryExplain(r *http.Request) (any, []error, *api.ApiError
remoteEndpoints := qapi.remoteEndpointsCreate(
replicaLabels,
enablePartialResponse,
ts,
ts,
)
queryOpts := &engine.QueryOpts{
LookbackDeltaParam: lookbackDelta,
Expand Down Expand Up @@ -656,6 +658,8 @@ func (qapi *QueryAPI) query(r *http.Request) (any, []error, *api.ApiError, func(
remoteEndpoints := qapi.remoteEndpointsCreate(
replicaLabels,
enablePartialResponse,
ts,
ts,
)
queryOpts := &engine.QueryOpts{
LookbackDeltaParam: lookbackDelta,
Expand Down Expand Up @@ -836,6 +840,8 @@ func (qapi *QueryAPI) queryRangeExplain(r *http.Request) (any, []error, *api.Api
remoteEndpoints := qapi.remoteEndpointsCreate(
replicaLabels,
enablePartialResponse,
start,
end,
)
queryOpts := &engine.QueryOpts{
LookbackDeltaParam: lookbackDelta,
Expand Down Expand Up @@ -969,6 +975,8 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (any, []error, *api.ApiError,
remoteEndpoints := qapi.remoteEndpointsCreate(
replicaLabels,
enablePartialResponse,
start,
end,
)
queryOpts := &engine.QueryOpts{
LookbackDeltaParam: lookbackDelta,
Expand Down
36 changes: 34 additions & 2 deletions pkg/query/remote_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
type RemoteEndpointsCreator func(
replicaLabels []string,
partialResponse bool,
start, end time.Time,
) api.RemoteEndpoints

func NewRemoteEndpointsCreator(
Expand All @@ -47,8 +48,9 @@ func NewRemoteEndpointsCreator(
return func(
replicaLabels []string,
partialResponse bool,
start, end time.Time,
) api.RemoteEndpoints {
return NewRemoteEndpoints(logger, getEndpoints, Opts{
return NewRemoteEndpoints(logger, getEndpoints, start, end, Opts{
AutoDownsample: autoDownsample,
PartialResponse: partialResponse,
ReplicaLabels: replicaLabels,
Expand Down Expand Up @@ -94,13 +96,16 @@ func (c Client) LabelSets() []labels.Labels {
type remoteEndpoints struct {
logger log.Logger
getClients func() []Client
start, end time.Time
opts Opts
}

func NewRemoteEndpoints(logger log.Logger, getClients func() []Client, opts Opts) api.RemoteEndpoints {
func NewRemoteEndpoints(logger log.Logger, getClients func() []Client, start, end time.Time, opts Opts) api.RemoteEndpoints {
return remoteEndpoints{
logger: logger,
getClients: getClients,
start: start,
end: end,
opts: opts,
}
}
Expand All @@ -111,6 +116,33 @@ func (r remoteEndpoints) Engines() []api.RemoteEngine {
for i := range clients {
engines[i] = NewRemoteEngine(r.logger, clients[i], r.opts)
}

// NOTE(Aleksandr Krivoshchekov):
// This is a hack for now. I'll find a more elegant approach later.

// Prune TSDBInfos.
start := r.start.UnixMilli()
end := r.end.UnixMilli()
for _, engine := range engines {
e := engine.(*remoteEngine)

tsdbInfos := e.client.tsdbInfos

newClient := e.client
newClient.tsdbInfos = make(infopb.TSDBInfos, 0)

for _, tsdbInfo := range tsdbInfos {
includesStart := tsdbInfo.MinTime <= start && start <= tsdbInfo.MaxTime
includesEnd := tsdbInfo.MinTime <= end && end <= tsdbInfo.MaxTime

if includesStart || includesEnd {
newClient.tsdbInfos = append(newClient.tsdbInfos, tsdbInfo)
}
}

e.client = newClient
}

return engines
}

Expand Down
Loading