Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions pkg/bbr/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (

"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/metrics"
envoy "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy"
envoyhandlers "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy/handlers"
reqenvoy "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy/request"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/observability/logging"
)

Expand Down Expand Up @@ -86,7 +87,7 @@ func (s *Server) HandleRequestBody(ctx context.Context, reqCtx *RequestContext,
Response: &eppb.CommonResponse{
ClearRouteCache: true,
HeaderMutation: &eppb.HeaderMutation{
SetHeaders: envoy.GenerateHeadersMutation(reqCtx.Request.MutatedHeaders()),
SetHeaders: reqenvoy.GenerateHeadersMutation(reqCtx.Request.MutatedHeaders()),
RemoveHeaders: reqCtx.Request.RemovedHeaders(),
},
},
Expand All @@ -105,7 +106,7 @@ func (s *Server) HandleRequestBody(ctx context.Context, reqCtx *RequestContext,
response := &eppb.CommonResponse{
ClearRouteCache: true,
HeaderMutation: &eppb.HeaderMutation{
SetHeaders: envoy.GenerateHeadersMutation(reqCtx.Request.MutatedHeaders()),
SetHeaders: reqenvoy.GenerateHeadersMutation(reqCtx.Request.MutatedHeaders()),
RemoveHeaders: reqCtx.Request.RemovedHeaders(),
},
}
Expand Down Expand Up @@ -145,7 +146,7 @@ func (s *Server) runRequestPlugins(ctx context.Context, request *framework.Infer
}

func addStreamedBodyResponse(responses []*eppb.ProcessingResponse, requestBodyBytes []byte) []*eppb.ProcessingResponse {
commonResponses := envoy.BuildChunkedBodyResponses(requestBodyBytes, true)
commonResponses := envoyhandlers.BuildChunkedBodyResponses(requestBodyBytes, true)
for _, commonResp := range commonResponses {
responses = append(responses, &eppb.ProcessingResponse{
Response: &eppb.ProcessingResponse_RequestBody{
Expand All @@ -165,7 +166,7 @@ func (s *Server) HandleRequestHeaders(reqCtx *RequestContext, headers *eppb.Http

if headers != nil && headers.Headers != nil {
for _, header := range headers.Headers.Headers {
reqCtx.Request.Headers[header.Key] = envoy.GetHeaderValue(header)
reqCtx.Request.Headers[header.Key] = reqenvoy.GetHeaderValue(header)
}
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/bbr/handlers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (

"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/metrics"
envoy "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy"
envoyhandlers "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy/handlers"
reqenvoy "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy/request"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/observability/logging"
)

Expand All @@ -37,7 +38,7 @@ import (
func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, headers *eppb.HttpHeaders) ([]*eppb.ProcessingResponse, error) {
if headers != nil && headers.Headers != nil {
for _, header := range headers.Headers.Headers {
reqCtx.Response.Headers[header.Key] = envoy.GetHeaderValue(header)
reqCtx.Response.Headers[header.Key] = reqenvoy.GetHeaderValue(header)
}
}

Expand Down Expand Up @@ -97,25 +98,25 @@ func (s *Server) HandleResponseBody(ctx context.Context, reqCtx *RequestContext,
Response: &eppb.CommonResponse{
ClearRouteCache: true,
HeaderMutation: &eppb.HeaderMutation{
SetHeaders: envoy.GenerateHeadersMutation(reqCtx.Response.MutatedHeaders()),
SetHeaders: reqenvoy.GenerateHeadersMutation(reqCtx.Response.MutatedHeaders()),
RemoveHeaders: reqCtx.Response.RemovedHeaders(),
},
},
},
},
})
if bodyMutated {
ret = envoy.AddStreamedResponseBody(ret, mutatedBytes)
ret = envoyhandlers.AddStreamedResponseBody(ret, mutatedBytes)
} else {
ret = envoy.AddStreamedResponseBody(ret, responseBodyBytes)
ret = envoyhandlers.AddStreamedResponseBody(ret, responseBodyBytes)
}
return ret, nil
}

response := &eppb.CommonResponse{
ClearRouteCache: true,
HeaderMutation: &eppb.HeaderMutation{
SetHeaders: envoy.GenerateHeadersMutation(reqCtx.Response.MutatedHeaders()),
SetHeaders: reqenvoy.GenerateHeadersMutation(reqCtx.Response.MutatedHeaders()),
RemoveHeaders: reqCtx.Response.RemovedHeaders(),
},
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/bbr/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/framework"
envoy "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy"
reqenvoy "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy/request"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/observability/logging"
reqcommon "sigs.k8s.io/gateway-api-inference-extension/pkg/common/request"
"sigs.k8s.io/gateway-api-inference-extension/version"
Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
// If streaming and the body is not empty, then headers are handled when processing request body.
loggerVerbose.Info("Received headers, passing off header processing until body arrives...")
} else {
if requestId := envoy.ExtractHeaderValue(v, reqcommon.RequestIdHeaderKey); len(requestId) > 0 {
if requestId := reqenvoy.ExtractHeaderValue(v, reqcommon.RequestIdHeaderKey); len(requestId) > 0 {
logger = logger.WithValues(reqcommon.RequestIdHeaderKey, requestId)
loggerVerbose = logger.V(logutil.VERBOSE)
ctx = log.IntoContext(ctx, logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package envoy
package handlers

import (
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package envoy
package handlers

import (
"crypto/rand"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package envoy
package handlers

import (
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
Expand Down
55 changes: 55 additions & 0 deletions pkg/common/envoy/handlers/response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package handlers

import (
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
)

// AddStreamedResponseBody splits responseBodyBytes into chunked body responses
// and appends them as ResponseBody ProcessingResponses, mirroring
// GenerateRequestBodyResponses for the request path.
func AddStreamedResponseBody(responses []*extProcPb.ProcessingResponse, responseBodyBytes []byte) []*extProcPb.ProcessingResponse {
commonResponses := BuildChunkedBodyResponses(responseBodyBytes, true)
for _, commonResp := range commonResponses {
responses = append(responses, &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseBody{
ResponseBody: &extProcPb.BodyResponse{
Response: commonResp,
},
},
})
}
return responses
}

// GenerateResponseBodyResponses generate the ResponseBody messages for a response
func GenerateResponseBodyResponses(responseBodyBytes []byte, setEoS bool) []*extProcPb.ProcessingResponse {
commonResponses := BuildChunkedBodyResponses(responseBodyBytes, setEoS)
responses := make([]*extProcPb.ProcessingResponse, 0, len(commonResponses))
for _, commonResp := range commonResponses {
resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseBody{
ResponseBody: &extProcPb.BodyResponse{
Response: commonResp,
},
},
}
responses = append(responses, resp)
}
return responses
}
File renamed without changes.
File renamed without changes.
38 changes: 0 additions & 38 deletions pkg/common/envoy/response.go

This file was deleted.

12 changes: 6 additions & 6 deletions pkg/epp/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ import (
"go.opentelemetry.io/otel/propagation"
"google.golang.org/protobuf/types/known/structpb"

envoy "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy"
reqenvoy "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy/request"
errcommon "sigs.k8s.io/gateway-api-inference-extension/pkg/common/error"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
)

func (s *StreamingServer) HandleRequestHeaders(ctx context.Context, reqCtx *RequestContext, req *extProcPb.ProcessingRequest_RequestHeaders) error {
func (s *Server) HandleRequestHeaders(ctx context.Context, reqCtx *RequestContext, req *extProcPb.ProcessingRequest_RequestHeaders) error {
reqCtx.RequestReceivedTimestamp = time.Now()

// an EoS in the request headers means this request has no body or trailers.
Expand All @@ -54,7 +54,7 @@ func (s *StreamingServer) HandleRequestHeaders(ctx context.Context, reqCtx *Requ
}

for _, header := range req.RequestHeaders.Headers.Headers {
reqCtx.Request.Headers[header.Key] = envoy.GetHeaderValue(header)
reqCtx.Request.Headers[header.Key] = reqenvoy.GetHeaderValue(header)
switch header.Key {
case metadata.FlowFairnessIDKey:
reqCtx.FairnessID = reqCtx.Request.Headers[header.Key]
Expand All @@ -72,7 +72,7 @@ func (s *StreamingServer) HandleRequestHeaders(ctx context.Context, reqCtx *Requ
return nil
}

func (s *StreamingServer) generateRequestHeaderResponse(ctx context.Context, reqCtx *RequestContext) *extProcPb.ProcessingResponse {
func (s *Server) generateRequestHeaderResponse(ctx context.Context, reqCtx *RequestContext) *extProcPb.ProcessingResponse {
// The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header
// and as an unstructure ext-proc response metadata key/value pair. This enables different integration
// options for gateway providers.
Expand All @@ -99,7 +99,7 @@ func (s *StreamingServer) generateRequestHeaderResponse(ctx context.Context, req
}
}

func (s *StreamingServer) generateHeaders(ctx context.Context, reqCtx *RequestContext) []*configPb.HeaderValueOption {
func (s *Server) generateHeaders(ctx context.Context, reqCtx *RequestContext) []*configPb.HeaderValueOption {
// can likely refactor these two bespoke headers to be updated in PostDispatch, to centralize logic.
headers := []*configPb.HeaderValueOption{
{
Expand Down Expand Up @@ -148,7 +148,7 @@ func (s *StreamingServer) generateHeaders(ctx context.Context, reqCtx *RequestCo
return headers
}

func (s *StreamingServer) generateMetadata(endpoint string) *structpb.Struct {
func (s *Server) generateMetadata(endpoint string) *structpb.Struct {
return &structpb.Struct{
Fields: map[string]*structpb.Value{
metadata.DestinationEndpointNamespace: {
Expand Down
6 changes: 3 additions & 3 deletions pkg/epp/handlers/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestHandleRequestHeaders(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
server := &StreamingServer{}
server := &Server{}
reqCtx := &RequestContext{
Request: &Request{Headers: make(map[string]string)},
}
Expand All @@ -81,7 +81,7 @@ func TestHandleRequestHeaders(t *testing.T) {
}

func TestGenerateHeaders_Sanitization(t *testing.T) {
server := &StreamingServer{}
server := &Server{}
reqCtx := &RequestContext{
TargetEndpoint: "1.2.3.4:8080",
RequestSize: 123,
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestGenerateHeaders_Sanitization(t *testing.T) {
func TestGenerateRequestHeaderResponse_MergeMetadata(t *testing.T) {
t.Parallel()

server := &StreamingServer{}
server := &Server{}
reqCtx := &RequestContext{
TargetEndpoint: "1.2.3.4:8080",
Request: &Request{
Expand Down
30 changes: 7 additions & 23 deletions pkg/epp/handlers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import (
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"sigs.k8s.io/controller-runtime/pkg/log"

envoy "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy"
reqenvoy "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy/request"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/observability/logging"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
)

// HandleResponseBody always returns the requestContext even in the error case, as the request context is used in error handling.
func (s *StreamingServer) HandleResponseBody(ctx context.Context, reqCtx *RequestContext, responseBytes []byte) (*RequestContext, error) {
func (s *Server) HandleResponseBody(ctx context.Context, reqCtx *RequestContext, responseBytes []byte) (*RequestContext, error) {
logger := log.FromContext(ctx)

parsedResponse, parseErr := s.parser.ParseResponse(ctx, responseBytes, reqCtx.Response.Headers, true)
Expand All @@ -45,7 +45,7 @@ func (s *StreamingServer) HandleResponseBody(ctx context.Context, reqCtx *Reques
}

// The function is to handle streaming response if the modelServer is streaming.
func (s *StreamingServer) HandleResponseBodyModelStreaming(ctx context.Context, reqCtx *RequestContext, responseBytes []byte, endOfStream bool) {
func (s *Server) HandleResponseBodyModelStreaming(ctx context.Context, reqCtx *RequestContext, responseBytes []byte, endOfStream bool) {
logger := log.FromContext(ctx)
_, err := s.director.HandleResponseBodyStreaming(ctx, reqCtx)
if err != nil {
Expand All @@ -64,17 +64,17 @@ func (s *StreamingServer) HandleResponseBodyModelStreaming(ctx context.Context,
}
}

func (s *StreamingServer) HandleResponseHeaders(ctx context.Context, reqCtx *RequestContext, resp *extProcPb.ProcessingRequest_ResponseHeaders) (*RequestContext, error) {
func (s *Server) HandleResponseHeaders(ctx context.Context, reqCtx *RequestContext, resp *extProcPb.ProcessingRequest_ResponseHeaders) (*RequestContext, error) {
for _, header := range resp.ResponseHeaders.Headers.Headers {
reqCtx.Response.Headers[header.Key] = envoy.GetHeaderValue(header)
reqCtx.Response.Headers[header.Key] = reqenvoy.GetHeaderValue(header)
}

reqCtx, err := s.director.HandleResponseReceived(ctx, reqCtx)

return reqCtx, err
}

func (s *StreamingServer) generateResponseHeaderResponse(reqCtx *RequestContext) *extProcPb.ProcessingResponse {
func (s *Server) generateResponseHeaderResponse(reqCtx *RequestContext) *extProcPb.ProcessingResponse {
return &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
ResponseHeaders: &extProcPb.HeadersResponse{
Expand All @@ -88,23 +88,7 @@ func (s *StreamingServer) generateResponseHeaderResponse(reqCtx *RequestContext)
}
}

func generateResponseBodyResponses(responseBodyBytes []byte, setEoS bool) []*extProcPb.ProcessingResponse {
commonResponses := envoy.BuildChunkedBodyResponses(responseBodyBytes, setEoS)
responses := make([]*extProcPb.ProcessingResponse, 0, len(commonResponses))
for _, commonResp := range commonResponses {
resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseBody{
ResponseBody: &extProcPb.BodyResponse{
Response: commonResp,
},
},
}
responses = append(responses, resp)
}
return responses
}

func (s *StreamingServer) generateResponseHeaders(reqCtx *RequestContext) []*configPb.HeaderValueOption {
func (s *Server) generateResponseHeaders(reqCtx *RequestContext) []*configPb.HeaderValueOption {
// can likely refactor these two bespoke headers to be updated in PostDispatch, to centralize logic.
headers := []*configPb.HeaderValueOption{
{
Expand Down
Loading
Loading