Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/epp/flowcontrol/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller/internal"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
fwkrequest "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/common/request"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
)
Expand Down Expand Up @@ -218,21 +219,27 @@ func (fc *FlowController) EnqueueAndWait(
flowKey := req.FlowKey()
priority := strconv.Itoa(flowKey.Priority)
reqBytes := req.ByteSize()
sloClass := metrics.ClassifySLO(extractHeader(req, fwkrequest.TTFTSLOMsHeaderKey))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm feeling hesitant about adding this use-case specific bucketing/labeling (see other comment).

We want to let the user define the buckets and associate it that way. A more robust solution would be adding to the InferenceObjective object, and making the inf objective string the label.

Which would probably be worth a longer discussion.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @kfswain! I agree.

I was feeling uncomfortable with the choices of interval (as mentioned in the PR description) because I felt it restricts the possibilities. It's a good indication it should be defined by the user.

It's definitely better to add it to the InferenceObjective (as SLO attainment is already a planned expansion). How do you think we shoud add it? One label per InferenceObjective? And as many InferenceObjectives as we need, to represent all the buckets? Would it be correlated to the priority? Thank you!

metrics.RecordFlowControlSLOIncomingRequest(sloClass, req.InferencePoolName())
metrics.IncFlowControlQueueSize(
flowKey.ID, priority,
req.InferencePoolName(),
sloClass,
req.ModelName(), req.TargetModelName())
defer metrics.DecFlowControlQueueSize(
flowKey.ID, priority,
req.InferencePoolName(),
sloClass,
req.ModelName(), req.TargetModelName())
metrics.AddFlowControlQueueBytes(
flowKey.ID, priority,
req.InferencePoolName(),
sloClass,
req.ModelName(), req.TargetModelName(), reqBytes)
defer metrics.SubFlowControlQueueBytes(
flowKey.ID, priority,
req.InferencePoolName(),
sloClass,
req.ModelName(), req.TargetModelName(), reqBytes)

// 1. Create the derived context that governs this request's lifecycle (Parent Cancellation + TTL).
Expand Down Expand Up @@ -297,6 +304,14 @@ func (fc *FlowController) EnqueueAndWait(
return finalOutcome, err
}

func extractHeader(req flowcontrol.FlowControlRequest, name string) string {
infReq := req.InferenceRequest()
if infReq == nil || infReq.Headers == nil {
return ""
}
return fwkrequest.GetHeader(infReq.Headers, name)
}

var errNoShards = errors.New("no viable active shards available")

// tryDistribution handles a single attempt to select a shard and submit a request.
Expand Down
17 changes: 16 additions & 1 deletion pkg/epp/flowcontrol/controller/internal/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/common/request"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
)
Expand Down Expand Up @@ -159,16 +160,30 @@ func (fi *FlowItem) finalizeInternal(outcome types.QueueOutcome, err error) {

duration := time.Since(fi.enqueueTime)
flowKey := fi.originalRequest.FlowKey()
outcomeStr := outcome.String()
metrics.RecordFlowControlRequestQueueDuration(
flowKey.ID, strconv.Itoa(flowKey.Priority), outcome.String(),
flowKey.ID, strconv.Itoa(flowKey.Priority), outcomeStr,
fi.originalRequest.InferencePoolName(),
fi.OriginalRequest().ModelName(), fi.OriginalRequest().TargetModelName(),
duration)

sloClass := metrics.ClassifySLO(extractHeader(fi.originalRequest, request.TTFTSLOMsHeaderKey))
metrics.RecordFlowControlSLORequestQueueDuration(
sloClass, outcomeStr, fi.originalRequest.InferencePoolName(),
duration)

fi.done <- finalState
close(fi.done)
}

func extractHeader(req flowcontrol.FlowControlRequest, name string) string {
infReq := req.InferenceRequest()
if infReq == nil || infReq.Headers == nil {
return ""
}
return request.GetHeader(infReq.Headers, name)
}

// inferOutcome determines the correct QueueOutcome and Error based on the cause of finalization and whether the item
// was already admitted to a queue.
func inferOutcome(cause error, isQueued bool) (types.QueueOutcome, error) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/epp/framework/common/request/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ package request

import "strings"

const (
// TTFTSLOMsHeaderKey is the request header name for SLO time-to-first-token in milliseconds.
TTFTSLOMsHeaderKey = "x-slo-ttft-ms"
// TPOTSLOMsHeaderKey is the request header name for SLO time-per-output-token in milliseconds.
TPOTSLOMsHeaderKey = "x-slo-tpot-ms"
)

// GetHeader returns the value for key from headers, with case-insensitive lookup.
func GetHeader(headers map[string]string, key string) string {
if v, ok := headers[key]; ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/framework/common/request/headers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
func TestGetHeader(t *testing.T) {
headers := map[string]string{"X-SLO-TTFT-MS": "42", "Other": "x"}
assert.Equal(t, "42", GetHeader(headers, "X-SLO-TTFT-MS"))
assert.Equal(t, "42", GetHeader(headers, "x-slo-ttft-ms"))
assert.Equal(t, "42", GetHeader(headers, TTFTSLOMsHeaderKey))
assert.Equal(t, "", GetHeader(headers, "missing"))
assert.Equal(t, "", GetHeader(nil, "k"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ const (
// Requests without a valid x-slo-ttft-ms header are treated as having no deadline and are scheduled after SLO-bound requests,
// with FCFS as a tie-breaker.
SLODeadlineOrderingPolicyType = "slo-deadline-ordering-policy"

// sloTtftHeader is the request header name for SLO time-to-first-token in milliseconds.
sloTtftHeader = "x-slo-ttft-ms"
)

func SLODeadlineOrderingPolicyFactory(name string, _ json.RawMessage, _ plugin.Handle) (plugin.Plugin, error) {
Expand Down Expand Up @@ -92,11 +89,11 @@ func calculateSLODeadline(item flowcontrol.QueueItemAccessor) time.Time {
if infReq == nil || infReq.Headers == nil {
return sloMaxDeadlineTime
}
sloTtft := request.GetHeader(infReq.Headers, sloTtftHeader)
if sloTtft == "" {
sloTTFT := request.GetHeader(infReq.Headers, request.TTFTSLOMsHeaderKey)
if sloTTFT == "" {
return sloMaxDeadlineTime
}
ms, err := strconv.ParseInt(strings.TrimSpace(sloTtft), 10, 64)
ms, err := strconv.ParseInt(strings.TrimSpace(sloTTFT), 10, 64)
if err != nil || ms < 0 {
return sloMaxDeadlineTime
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

fwkrequest "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/common/request"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol/mocks"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling"
Expand Down Expand Up @@ -52,7 +53,7 @@ func TestSLODeadlinePolicy_RequiredQueueCapabilities(t *testing.T) {
func makeSLOItem(id string, received time.Time, sloTTFTMs string) flowcontrol.QueueItemAccessor {
req := mocks.NewMockFlowControlRequest(10, id, testFlowKey)
req.ReceivedTimestampV = received
req.InferenceRequestV = &scheduling.LLMRequest{Headers: map[string]string{sloTtftHeader: sloTTFTMs}}
req.InferenceRequestV = &scheduling.LLMRequest{Headers: map[string]string{fwkrequest.TTFTSLOMsHeaderKey: sloTTFTMs}}
return &mocks.MockQueueItemAccessor{
EffectiveTTLV: 0,
OriginalRequestV: req,
Expand Down Expand Up @@ -113,7 +114,7 @@ func TestCalculateSLODeadline(t *testing.T) {
// Valid header
reqValid := mocks.NewMockFlowControlRequest(1, "valid", testFlowKey)
reqValid.ReceivedTimestampV = now
reqValid.InferenceRequestV = &scheduling.LLMRequest{Headers: map[string]string{sloTtftHeader: "200"}}
reqValid.InferenceRequestV = &scheduling.LLMRequest{Headers: map[string]string{fwkrequest.TTFTSLOMsHeaderKey: "200"}}
accValid := &mocks.MockQueueItemAccessor{OriginalRequestV: reqValid}
deadline := calculateSLODeadline(accValid)
assert.Equal(t, now.Add(200*time.Millisecond), deadline)
Expand All @@ -130,7 +131,7 @@ func TestCalculateSLODeadline(t *testing.T) {

// Invalid value
reqInvalid := mocks.NewMockFlowControlRequest(3, "inv", testFlowKey)
reqInvalid.InferenceRequestV = &scheduling.LLMRequest{Headers: map[string]string{sloTtftHeader: "x"}}
reqInvalid.InferenceRequestV = &scheduling.LLMRequest{Headers: map[string]string{fwkrequest.TTFTSLOMsHeaderKey: "x"}}
accInvalid := &mocks.MockQueueItemAccessor{OriginalRequestV: reqInvalid}
assert.Equal(t, sloMaxDeadlineTime, calculateSLODeadline(accInvalid))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

errcommon "sigs.k8s.io/gateway-api-inference-extension/pkg/common/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/observability/logging"
fwkrequest "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/common/request"
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling"
)

Expand All @@ -34,14 +35,14 @@ func (s *PredictedLatency) parseSLOHeaders(ctx context.Context, request *schedul
var err error

// Get Request SLOs from request header
predictedLatencyCtx.ttftSLO, err = parseFloatHeader(*request, ttftSLOHeaderKey)
predictedLatencyCtx.ttftSLO, err = parseFloatHeader(*request, fwkrequest.TTFTSLOMsHeaderKey)
if err != nil {
logger.V(logutil.DEBUG).Error(errcommon.Error{Code: errcommon.BadRequest, Msg: fmt.Sprintf("%v must be a float: %v", ttftSLOHeaderKey, err)}, "PredictedLatency: Error parsing TTFT SLO from header")
logger.V(logutil.DEBUG).Error(errcommon.Error{Code: errcommon.BadRequest, Msg: fmt.Sprintf("%v must be a float: %v", fwkrequest.TTFTSLOMsHeaderKey, err)}, "PredictedLatency: Error parsing TTFT SLO from header")
}

predictedLatencyCtx.avgTPOTSLO, err = parseFloatHeader(*request, tpotSLOHeaderKey)
predictedLatencyCtx.avgTPOTSLO, err = parseFloatHeader(*request, fwkrequest.TPOTSLOMsHeaderKey)
if err != nil {
logger.V(logutil.DEBUG).Error(errcommon.Error{Code: errcommon.BadRequest, Msg: fmt.Sprintf("%v must be a float: %v", tpotSLOHeaderKey, err)}, "PredictedLatency: Error parsing TPOT SLO from header")
logger.V(logutil.DEBUG).Error(errcommon.Error{Code: errcommon.BadRequest, Msg: fmt.Sprintf("%v must be a float: %v", fwkrequest.TPOTSLOMsHeaderKey, err)}, "PredictedLatency: Error parsing TPOT SLO from header")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ const (
headroomStrategyCompositeLeast headroomStrategy = "composite-least"
headroomStrategyCompositeMost headroomStrategy = "composite-most"
headroomStrategyCompositeOnly headroomStrategy = "composite-only"

// TTFT header string
ttftSLOHeaderKey = "x-slo-ttft-ms"
// TPOT header string
tpotSLOHeaderKey = "x-slo-tpot-ms"
)

const (
Expand Down
98 changes: 88 additions & 10 deletions pkg/epp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,27 @@ var (
append([]string{"fairness_id", "priority", "outcome", "inference_pool"}, modelLabels...),
)

flowControlSLORequestQueueDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: inferenceExtension,
Name: "flow_control_slo_request_queue_duration_seconds",
Help: metricsutil.HelpMsgWithStability("Distribution of the total time requests spend in the EPP flow control layer, partitioned by SLO class.", compbasemetrics.ALPHA),
Buckets: []float64{
0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0,
},
},
[]string{"slo_class", "outcome", "inference_pool"},
)

flowControlSLOIncomingRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: inferenceExtension,
Name: "flow_control_slo_incoming_requests_total",
Help: metricsutil.HelpMsgWithStability("Total number of requests that entered the EPP flow control layer via EnqueueAndWait.", compbasemetrics.ALPHA),
},
[]string{"slo_class", "inference_pool"},
)

flowControlDispatchCycleDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: inferenceExtension,
Expand Down Expand Up @@ -431,7 +452,7 @@ var (
Name: "flow_control_queue_size",
Help: metricsutil.HelpMsgWithStability("Current number of requests actively held in the Flow Control queue.", compbasemetrics.ALPHA),
},
append([]string{"fairness_id", "priority", "inference_pool"}, modelLabels...),
append([]string{"fairness_id", "priority", "inference_pool", "slo_class"}, modelLabels...),
)

flowControlQueueBytes = prometheus.NewGaugeVec(
Expand All @@ -440,7 +461,7 @@ var (
Name: "flow_control_queue_bytes",
Help: metricsutil.HelpMsgWithStability("Current total size in bytes of requests actively held in the Flow Control queue.", compbasemetrics.ALPHA),
},
append([]string{"fairness_id", "priority", "inference_pool"}, modelLabels...),
append([]string{"fairness_id", "priority", "inference_pool", "slo_class"}, modelLabels...),
)

flowControlPoolSaturation = prometheus.NewGaugeVec(
Expand Down Expand Up @@ -505,6 +526,8 @@ func Register(customCollectors ...prometheus.Collector) {
metrics.Registry.MustRegister(prefixCacheHitRatio)
metrics.Registry.MustRegister(prefixCacheHitLength)
metrics.Registry.MustRegister(flowControlRequestQueueDuration)
metrics.Registry.MustRegister(flowControlSLORequestQueueDuration)
metrics.Registry.MustRegister(flowControlSLOIncomingRequestsTotal)
metrics.Registry.MustRegister(flowControlDispatchCycleDuration)
metrics.Registry.MustRegister(flowControlQueueSize)
metrics.Registry.MustRegister(flowControlQueueBytes)
Expand Down Expand Up @@ -556,6 +579,8 @@ func Reset() {
prefixCacheHitRatio.Reset()
prefixCacheHitLength.Reset()
flowControlRequestQueueDuration.Reset()
flowControlSLORequestQueueDuration.Reset()
flowControlSLOIncomingRequestsTotal.Reset()
flowControlQueueSize.Reset()
flowControlQueueBytes.Reset()
flowControlPoolSaturation.Reset()
Expand Down Expand Up @@ -858,6 +883,59 @@ func RecordFlowControlRequestQueueDuration(
).Observe(duration.Seconds())
}

// SLO class constants label for flow control SLO metrics (bounded buckets for the TTFT SLO header in ms).
const (
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we can/should hardcode these buckets. TTFT is pretty heavily reliant on what the prompt looks like, and a customer could want higher range buckets.

SLOClassNone = "none"
SLOClassBelowMS200 = "below_ms_200"
SLOClassMS200to399 = "ms_200_399"
SLOClassMS400to599 = "ms_400_599"
SLOClassMS600to799 = "ms_600_799"
SLOClassMS800to1000 = "ms_800_1000"
SLOClassAboveMS1000 = "above_ms_1000"
)

// ClassifySLO maps a raw SLO header value (in milliseconds) to a bounded SLO class label.
// Returns SLOClassNone when the header is absent or unparseable.
func ClassifySLO(rawHeaderValue string) string {
if rawHeaderValue == "" {
return SLOClassNone
}
ms, err := strconv.ParseInt(rawHeaderValue, 10, 64)
if err != nil || ms < 0 {
return SLOClassNone
}
switch {
case ms < 200:
return SLOClassBelowMS200
case ms < 400:
return SLOClassMS200to399
case ms < 600:
return SLOClassMS400to599
case ms < 800:
return SLOClassMS600to799
case ms <= 1000:
return SLOClassMS800to1000
default:
return SLOClassAboveMS1000
}
}

// RecordFlowControlSLORequestQueueDuration records the queue duration for a request partitioned by its
// SLO class (derived from the TTFT SLO header "x-slo-ttft-ms").
func RecordFlowControlSLORequestQueueDuration(
sloClass, outcome, inferencePool string,
duration time.Duration,
) {
flowControlSLORequestQueueDuration.WithLabelValues(
sloClass, outcome, inferencePool,
).Observe(duration.Seconds())
}

// RecordFlowControlSLOIncomingRequest increments the count of requests that entered flow control at EnqueueAndWait.
func RecordFlowControlSLOIncomingRequest(sloClass, inferencePool string) {
flowControlSLOIncomingRequestsTotal.WithLabelValues(sloClass, inferencePool).Inc()
}

// RecordFlowControlDispatchCycleDuration records the duration of a dispatch cycle in the Flow Control layer.
func RecordFlowControlDispatchCycleDuration(duration time.Duration) {
flowControlDispatchCycleDuration.WithLabelValues().Observe(duration.Seconds())
Expand All @@ -874,23 +952,23 @@ func RecordFlowControlRequestEnqueueDuration(
}

// IncFlowControlQueueSize increments the Flow Control queue size gauge.
func IncFlowControlQueueSize(fairnessID, priority, inferencePool, modelName, targetModelName string) {
flowControlQueueSize.WithLabelValues(fairnessID, priority, inferencePool, modelName, targetModelName).Inc()
func IncFlowControlQueueSize(fairnessID, priority, inferencePool, sloClass, modelName, targetModelName string) {
flowControlQueueSize.WithLabelValues(fairnessID, priority, inferencePool, sloClass, modelName, targetModelName).Inc()
}

// DecFlowControlQueueSize decrements the Flow Control queue size gauge.
func DecFlowControlQueueSize(fairnessID, priority, inferencePool, modelName, targetModelName string) {
flowControlQueueSize.WithLabelValues(fairnessID, priority, inferencePool, modelName, targetModelName).Dec()
func DecFlowControlQueueSize(fairnessID, priority, inferencePool, sloClass, modelName, targetModelName string) {
flowControlQueueSize.WithLabelValues(fairnessID, priority, inferencePool, sloClass, modelName, targetModelName).Dec()
}

// AddFlowControlQueueBytes increments the Flow Control queue bytes gauge.
func AddFlowControlQueueBytes(fairnessID, priority, inferencePool, modelName, targetModelName string, bytes uint64) {
flowControlQueueBytes.WithLabelValues(fairnessID, priority, inferencePool, modelName, targetModelName).Add(float64(bytes))
func AddFlowControlQueueBytes(fairnessID, priority, inferencePool, sloClass, modelName, targetModelName string, bytes uint64) {
flowControlQueueBytes.WithLabelValues(fairnessID, priority, inferencePool, sloClass, modelName, targetModelName).Add(float64(bytes))
}

// SubFlowControlQueueBytes decrements the Flow Control queue bytes gauge.
func SubFlowControlQueueBytes(fairnessID, priority, inferencePool, modelName, targetModelName string, bytes uint64) {
flowControlQueueBytes.WithLabelValues(fairnessID, priority, inferencePool, modelName, targetModelName).Sub(float64(bytes))
func SubFlowControlQueueBytes(fairnessID, priority, inferencePool, sloClass, modelName, targetModelName string, bytes uint64) {
flowControlQueueBytes.WithLabelValues(fairnessID, priority, inferencePool, sloClass, modelName, targetModelName).Sub(float64(bytes))
}

// RecordFlowControlPoolSaturation records the current saturation level for an inference pool.
Expand Down
Loading
Loading