Generalize the request type passed down the framework plugins: rename LLM->Inference#2673
Generalize the request type passed down the framework plugins: rename LLM->Inference#2673RyanRosario wants to merge 1 commit intokubernetes-sigs:mainfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: RyanRosario The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
✅ Deploy Preview for gateway-api-inference-extension ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
|
Hi @RyanRosario. Thanks for your PR. I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with Tip We noticed you've done this a few times! Consider joining the org to skip this step and gain Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
1cce76e to
f6b5ec6
Compare
|
@zetxqx For now, I've had to commit some other files into this PR to get PROW to pass. I am not sure what the issue is here, but I want to keep the ball rolling. |
.github/workflows/kal.yml
Outdated
| uses: actions/setup-go@d35c59abb061a4a6fb18e82ac0862c26744d6ab5 # tag=v5.5.0 | ||
| with: | ||
| go-version-file: 'go.work' | ||
| go-version-file: 'go.mod' |
There was a problem hiding this comment.
Correct. I committed this to start the review process and get PROW to pass. This needs to be removed, but I need to fix my local environment first. Thanks.
There was a problem hiding this comment.
can we also rename the whole package name from handlers to requesthandling? We can do it in a separate PR.
pkg/epp/framework/plugins/scheduling/scorer/predictedlatency/requestcontrol_hooks_test.go
Outdated
Show resolved
Hide resolved
pkg/epp/framework/plugins/scheduling/scorer/predictedlatency/latencypredictor_helper.go
Show resolved
Hide resolved
pkg/epp/framework/plugins/scheduling/scorer/predictedlatency/scorer_test.go
Outdated
Show resolved
Hide resolved
pkg/epp/framework/plugins/scheduling/scorer/runningrequests/running_test.go
Outdated
Show resolved
Hide resolved
|
@zetxqx I still have a few comments to address but wanted to address the rest in this PR. |
|
+1 on @kaushikmitr can we do a rebase? @RyanRosario |
|
/retest |
0846f3e to
863a894
Compare
|
/retest |
|
@zetxqx @kaushikmitr Rebase complete. All tests pass. Ready for review. |
| // buildTrainingEntry constructs a training entry from actual latency measurements. | ||
| // If endpointRoleLabel is configured, it extracts the role from the endpoint's labels and | ||
| // populates the PodType field, enabling role-specific model training. | ||
| func buildTrainingEntry( |
There was a problem hiding this comment.
+1, we should focus on refactoring on using the new type. Since the PR is already large, any other refactoring even it's reasonable can be very confusing for reviewers.
| ) { | ||
| logger := log.FromContext(ctx) | ||
| targetName := predictedLatencyCtx.targetMetadata.NamespacedName.Name | ||
| if m := predictedLatencyCtx.prefillTargetMetadata; m != nil { |
There was a problem hiding this comment.
same this logic is needed to support disagg serving.
| ctx context.Context, | ||
| predictor latencypredictor.PredictorInterface, | ||
| streamingMode bool, | ||
| endpointRoleLabel string, |
| prefixCacheScore float64, | ||
| ) { | ||
| logger := log.FromContext(ctx) | ||
| entry := buildTrainingEntry( |
There was a problem hiding this comment.
yes this method is also needed.
| logger := log.FromContext(ctx) | ||
| targetName := predictedLatencyCtx.targetMetadata.NamespacedName.Name | ||
| if storedPred, ok := predictedLatencyCtx.predictionsForScheduling[targetName]; ok { | ||
| logger.V(logutil.DEBUG).Info("first TPOT from stored prediction", "value_ms", storedPred.TPOT) |
There was a problem hiding this comment.
same i think some of this code was removed inadvertantly?
| ) | ||
| in := latencypredictor.PredictionRequest{ | ||
| KVCachePercentage: m.KVCacheUsagePercent, | ||
| InputTokenLength: len(strings.Fields(predictedLatencyCtx.schedulingRequest.Body.Completions.Prompt.PlainText())), |
There was a problem hiding this comment.
this wont work for other APIs like chat completion. Use the same as before
|
@kaushikmitr Thanks Kaushik. These will all be restored. Some of this was removed inadvertently, automatically when I made other changes. |
|
@RyanRosario I think the rebase resolved conflicts by keeping the PR's old code and discarding main's improvements. Just focussing on scorer/predictedlatency/ I think we should re-do the rebase, accepting main's versions of all functions in scorer/predictedlatency/ and then applying only the rename (LLMRequest → InferenceRequest) on top. |
| // ParsedBody contains the unmarshaled request payload. | ||
| // Note: Because this handles multiple protocols, this field is strictly expected | ||
| // to be either a map[string]any (for HTTP/JSON) or a proto.Message (for gRPC). | ||
| ParsedBody any `json:"-"` |
There was a problem hiding this comment.
the ParsedBody is strong typed and renamed to Payload now , can you move the strong type here as well?
There was a problem hiding this comment.
It seems some unittest is dropped TestPrompt_PlainText, can we make sure all the unittests(https://github.com/kubernetes-sigs/gateway-api-inference-extension/blob/eaaa9469efdf847656b0c0ce7ecb5c5928d84e2f/pkg/epp/framework/interface/scheduling/types_test.go) are moved here?
| // It is populated by external tokenization plugins (e.g., via a PrepareData plugin) | ||
| // and consumed by scheduling plugins that benefit from actual token data | ||
| // (e.g., prefix cache scoring, latency prediction). | ||
| type TokenizedPrompt struct { |
There was a problem hiding this comment.
These field should not be dropped.
| } | ||
| if plugin == nil { | ||
| t.Fatalf("New() returned nil plugin without error") | ||
| return |
There was a problem hiding this comment.
super nit: this is not needed since Fatalf will return
| } | ||
|
|
||
| // ParseResponse extracts usage metadata from the provider's response. | ||
| // ParseResponse extracts usage metada ta from the provider's response. |
pkg/epp/handlers/server.go
Outdated
| body = []byte{} | ||
|
|
||
| reqCtx, err = s.director.HandleRequest(ctx, reqCtx) | ||
| parsedBody, processErr := s.director.ProcessRequestBody(ctx, reqCtx, s.parser) |
There was a problem hiding this comment.
here we can just use s.parser to parser the request instead of using the director's newly added method.
pkg/epp/requestcontrol/director.go
Outdated
| // TODO: to extend fallback functionality, handle cases where target pod is unavailable | ||
| // https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1224 | ||
| d.runResponseHeaderPlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) | ||
| d.runResponseReceivedPlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) |
There was a problem hiding this comment.
we should change it back to runResponseHeaderPlugins
pkg/epp/requestcontrol/director.go
Outdated
| func (d *Director) ProcessRequestBody(ctx context.Context, reqCtx *handlers.RequestContext, parser fwkrh.Parser) (*fwkrh.RequestBody, error) { | ||
| requestBody, err := parser.ParseRequest(ctx, reqCtx.Request.RawBody, reqCtx.Request.Headers) | ||
| if err != nil { | ||
| return nil, errcommon.Error{Code: errcommon.BadRequest, Msg: err.Error()} | ||
| } | ||
|
|
||
| switch v := llmRequestBody.Payload.(type) { | ||
| case fwksched.PayloadProto: | ||
| // Protos are not currently mutated, return as-is. | ||
| reqCtx.RequestSize = len(reqCtx.Request.RawBody) | ||
| case fwksched.PayloadMap: | ||
| switch v := requestBody.ParsedBody.(type) { | ||
| case map[string]any: | ||
| if err := d.mutateAndRepackage(ctx, reqCtx, v); err != nil { | ||
| return nil, err | ||
| } | ||
| case fwksched.RawPayload: | ||
| reqCtx.RequestSize = len(reqCtx.Request.RawBody) | ||
| default: | ||
| return nil, errcommon.Error{Code: errcommon.BadRequest, Msg: "Unsupported llmRequest parsedBody"} | ||
| // For other types (like gRPC, custom structs) or nil, we just set the request size. | ||
| reqCtx.RequestSize = len(reqCtx.Request.RawBody) | ||
| } | ||
| return llmRequestBody, nil | ||
| return requestBody, nil | ||
| } |
There was a problem hiding this comment.
instead of exposing this ProcessRequestBody, we can just move the logic to the handlers/server.go
There was a problem hiding this comment.
the concurrencydetector has been moved out of here, so we can drop this
There was a problem hiding this comment.
Can you rebase again, I don't think this file needs to be modified
Thank you for your patience. That seems to be the issue. |
|
@RyanRosario: The following tests failed, say
Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What type of PR is this?
/kind feature
What this PR does / why we need it:
Enables direct application across various GenAI models, not only OpenAI format, without rewriting the core admission, mutation, or scheduling flows. Pluggable parsers can now intercept raw request bytes and construct a generic InferenceRequest upfront, giving the EPP the flexibility to route, process, and score payloads transparently regardless of the original protocol.
Which issue(s) this PR fixes:
Fixes #2447
Does this PR introduce a user-facing change?: