Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (r *Runner) setup(ctx context.Context, cfg *rest.Config, opts *runserver.Op
admissionController = requestcontrol.NewLegacyAdmissionController(eppConfig.SaturationDetector, endpointCandidates)
}

director := requestcontrol.NewDirectorWithConfig(ds, scheduler, admissionController, r.parser, endpointCandidates, r.requestControlConfig)
director := requestcontrol.NewDirectorWithConfig(ds, scheduler, admissionController, endpointCandidates, r.requestControlConfig)

serverRunner := &runserver.ExtProcServerRunner{
GrpcPort: opts.GRPCPort,
Expand Down
6 changes: 3 additions & 3 deletions pkg/epp/config/loader/configloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func (m *mockScorer) Category() framework.ScorerCategory {
return framework.Distribution
}

func (m *mockScorer) Score(context.Context, *framework.CycleState, *framework.LLMRequest, []framework.Endpoint) map[framework.Endpoint]float64 {
func (m *mockScorer) Score(context.Context, *framework.CycleState, *framework.InferenceRequest, []framework.Endpoint) map[framework.Endpoint]float64 {
return nil
}

Expand All @@ -660,11 +660,11 @@ type mockHandler struct{ mockPlugin }
// compile-time type assertion
var _ framework.ProfileHandler = &mockHandler{}

func (m *mockHandler) Pick(context.Context, *framework.CycleState, *framework.LLMRequest, map[string]framework.SchedulerProfile,
func (m *mockHandler) Pick(context.Context, *framework.CycleState, *framework.InferenceRequest, map[string]framework.SchedulerProfile,
map[string]*framework.ProfileRunResult) map[string]framework.SchedulerProfile {
return nil
}
func (m *mockHandler) ProcessResults(context.Context, *framework.CycleState, *framework.LLMRequest,
func (m *mockHandler) ProcessResults(context.Context, *framework.CycleState, *framework.InferenceRequest,
map[string]*framework.ProfileRunResult) (*framework.SchedulingResult, error) {
return nil, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/datalayer/data_graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (m *mockPrepareRequestDataP) Consumes() map[string]any {
return m.consumes
}

func (m *mockPrepareRequestDataP) PrepareRequestData(ctx context.Context, request *fwksch.LLMRequest, endpoints []fwksch.Endpoint) error {
func (m *mockPrepareRequestDataP) PrepareRequestData(ctx context.Context, request *fwksch.InferenceRequest, endpoints []fwksch.Endpoint) error {
endpoints[0].Put(mockProducedDataKey, &mockProducedDataType{value: 42})
return nil
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/epp/flowcontrol/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,16 @@ type benchRequest struct {
}

// --- stubs required by FlowControlRequest interface ---
func (r *benchRequest) FlowKey() flowcontrol.FlowKey { return r.key }
func (r *benchRequest) ByteSize() uint64 { return r.byteSize }
func (r *benchRequest) InitialEffectiveTTL() time.Duration { return 5 * time.Minute }
func (r *benchRequest) ID() string { return "bench-req" }
func (r *benchRequest) GetMetadata() map[string]any { return nil }
func (r *benchRequest) InferencePoolName() string { return "bench-pool" }
func (r *benchRequest) ModelName() string { return "bench-model" }
func (r *benchRequest) TargetModelName() string { return "bench-target" }
func (r *benchRequest) InferenceRequest() *scheduling.LLMRequest { return nil }
func (r *benchRequest) ReceivedTimestamp() time.Time { return time.Now() }
func (r *benchRequest) FlowKey() flowcontrol.FlowKey { return r.key }
func (r *benchRequest) ByteSize() uint64 { return r.byteSize }
func (r *benchRequest) InitialEffectiveTTL() time.Duration { return 5 * time.Minute }
func (r *benchRequest) ID() string { return "bench-req" }
func (r *benchRequest) GetMetadata() map[string]any { return nil }
func (r *benchRequest) InferencePoolName() string { return "bench-pool" }
func (r *benchRequest) ModelName() string { return "bench-model" }
func (r *benchRequest) TargetModelName() string { return "bench-target" }
func (r *benchRequest) InferenceRequest() *scheduling.InferenceRequest { return nil }
func (r *benchRequest) ReceivedTimestamp() time.Time { return time.Now() }

// setupRegistry provisions the concrete FlowRegistry.
func setupRegistry(
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/flowcontrol/eviction/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (p *Plugin) TypedName() plugin.TypedName {
// It tracks the request and, if the filter policy accepts it, adds it to the eviction queue.
func (p *Plugin) PreRequest(
ctx context.Context,
request *scheduling.LLMRequest,
request *scheduling.InferenceRequest,
result *scheduling.SchedulingResult,
) {
if request == nil || result == nil || len(result.ProfileResults) == 0 {
Expand Down Expand Up @@ -110,7 +110,7 @@ func (p *Plugin) PreRequest(
// On the final call (EndOfStream == true), it removes the request from tracking and the eviction queue.
func (p *Plugin) ResponseBody(
ctx context.Context,
request *scheduling.LLMRequest,
request *scheduling.InferenceRequest,
response *requestcontrol.Response,
targetEndpoint *datalayer.EndpointMetadata,
) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/framework/interface/flowcontrol/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type EvictionItem struct {
// TargetURL is the base URL of the model server serving this request.
TargetURL string
// Request is the original scheduling request.
Request *scheduling.LLMRequest
Request *scheduling.InferenceRequest
// TargetEndpoint is the metadata of the endpoint serving this request.
TargetEndpoint *datalayer.EndpointMetadata
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/framework/interface/flowcontrol/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
type MockFlowControlRequest struct {
FlowKeyV flowcontrol.FlowKey
ByteSizeV uint64
InferenceRequestV *scheduling.LLMRequest
InferenceRequestV *scheduling.InferenceRequest
ReceivedTimestampV time.Time
InitialEffectiveTTLV time.Duration
IDV string
Expand Down Expand Up @@ -65,7 +65,7 @@ func NewMockFlowControlRequest(

func (m *MockFlowControlRequest) FlowKey() flowcontrol.FlowKey { return m.FlowKeyV }
func (m *MockFlowControlRequest) ByteSize() uint64 { return m.ByteSizeV }
func (m *MockFlowControlRequest) InferenceRequest() *scheduling.LLMRequest {
func (m *MockFlowControlRequest) InferenceRequest() *scheduling.InferenceRequest {
return m.InferenceRequestV
}
func (m *MockFlowControlRequest) ReceivedTimestamp() time.Time { return m.ReceivedTimestampV }
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/framework/interface/flowcontrol/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type FlowControlRequest interface {
ByteSize() uint64

// InferenceRequest returns the inference request passed to the scheduling layer.
InferenceRequest() *scheduling.LLMRequest
InferenceRequest() *scheduling.InferenceRequest

// ReceivedTimestamp returns the timestamp when the request was received by the server.
ReceivedTimestamp() time.Time
Expand Down
10 changes: 5 additions & 5 deletions pkg/epp/framework/interface/requestcontrol/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ const (
// before a request is sent to the selected model server.
type PreRequest interface {
plugin.Plugin
PreRequest(ctx context.Context, request *types.LLMRequest, schedulingResult *types.SchedulingResult)
PreRequest(ctx context.Context, request *types.InferenceRequest, schedulingResult *types.SchedulingResult)
}

// ResponseHeader is called by the director after the response headers are successfully received
// which indicates the beginning of the response handling by the model server.
// The given pod argument is the pod that served the request.
type ResponseHeader interface {
plugin.Plugin
ResponseHeader(ctx context.Context, request *types.LLMRequest, response *Response, targetEndpoint *datalayer.EndpointMetadata)
ResponseHeader(ctx context.Context, request *types.InferenceRequest, response *Response, targetEndpoint *datalayer.EndpointMetadata)
}

// ResponseBody is the primary hook for processing response data.
Expand All @@ -61,15 +61,15 @@ type ResponseHeader interface {
// between success, errors, and disconnects.
type ResponseBody interface {
plugin.Plugin
ResponseBody(ctx context.Context, request *types.LLMRequest, response *Response, targetEndpoint *datalayer.EndpointMetadata)
ResponseBody(ctx context.Context, request *types.InferenceRequest, response *Response, targetEndpoint *datalayer.EndpointMetadata)
}

// PrepareRequestData is called by the director before scheduling requests.
// PrepareDataPlugin plugin is implemented by data producers which produce data from different sources.
type PrepareDataPlugin interface {
plugin.ProducerPlugin
plugin.ConsumerPlugin
PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Endpoint) error
PrepareRequestData(ctx context.Context, request *types.InferenceRequest, pods []types.Endpoint) error
}

// AdmissionPlugin is called by the director after the prepare data phase and before scheduling.
Expand All @@ -79,5 +79,5 @@ type AdmissionPlugin interface {
plugin.Plugin
// AdmitRequest returns the denial reason, wrapped as error if the request is denied.
// If the request is allowed, it returns nil.
AdmitRequest(ctx context.Context, request *types.LLMRequest, pods []types.Endpoint) error
AdmitRequest(ctx context.Context, request *types.InferenceRequest, pods []types.Endpoint) error
}
15 changes: 3 additions & 12 deletions pkg/epp/framework/interface/requestcontrol/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package requestcontrol

import (
"google.golang.org/protobuf/types/known/structpb"

requesthandle "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/requesthandling"
)

// Response contains information from the response received to be passed to the Response requestcontrol plugins
Expand All @@ -33,19 +35,8 @@ type Response struct {
// Currently, this is only used by conformance test.
ReqMetadata map[string]any
// Token usage counts parsed from the response body.
Usage Usage
Usage requesthandle.Usage
// DynamicMetadata is a map of metadata that can be passed to the Envoy. It is populated into the dynamic
// metadata when processing ProcessingResponse_RequestHeaders.
DynamicMetadata *structpb.Struct
}

type Usage struct {
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
TotalTokens int `json:"total_tokens"`
PromptTokenDetails *PromptTokenDetails `json:"prompt_token_details,omitempty"`
}

type PromptTokenDetails struct {
CachedTokens int `json:"cached_tokens"`
}
7 changes: 2 additions & 5 deletions pkg/epp/framework/interface/requesthandling/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@ import (
"context"

fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/requestcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling"
)

// Parser defines the interface for parsing payload(requests and responses).
type Parser interface {
fwkplugin.Plugin
// ParseRequest parses the request body and headers and returns a map representation.
ParseRequest(ctx context.Context, body []byte, headers map[string]string) (*scheduling.LLMRequestBody, error)
ParseRequest(ctx context.Context, body []byte, headers map[string]string) (*InferenceRequestBody, error)

// ParseResponse parses the response payload.
// For streaming responses , this method is invoked multiple times (once per chunk),
Expand All @@ -41,5 +38,5 @@ type Parser interface {

type ParsedResponse struct {
// Usage is only populate when the raw response has usage.
Usage *requestcontrol.Usage
Usage *Usage
}
Loading
Loading