Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions pkg/epp/framework/plugins/datalayer/source/http/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,31 @@ import (
"crypto/tls"
"fmt"
"io"
"net"
"net/url"
"reflect"
"strconv"

fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
)

// HTTPDataSource is a data source that receives its data using HTTP client.
type HTTPDataSource struct {
typedName fwkplugin.TypedName
scheme string // scheme to use
path string // path to use
typedName fwkplugin.TypedName
scheme string // scheme to use
path string // path to use
metricsPort int // when non-zero, overrides the port in MetricsHost for scraping

client Client // client (e.g. a wrapped http.Client) used to get data
parser func(io.Reader) (any, error)
outputType reflect.Type
}

// NewHTTPDataSource returns a new data source, configured with
// the provided scheme, path and certificate verification parameters.
func NewHTTPDataSource(scheme string, path string, skipCertVerification bool, pluginType string,
// NewHTTPDataSource returns a new data source configured with the given scheme, path,
// and certificate verification. metricsPort overrides the port in MetricsHost when
// non-zero; pass 0 to use MetricsHost as-is.
func NewHTTPDataSource(scheme string, path string, skipCertVerification bool, metricsPort int, pluginType string,
pluginName string, parser func(io.Reader) (any, error), outputType reflect.Type) (*HTTPDataSource, error) {
if scheme != "http" && scheme != "https" {
return nil, fmt.Errorf("unsupported scheme: %s", scheme)
Expand All @@ -59,11 +63,12 @@ func NewHTTPDataSource(scheme string, path string, skipCertVerification bool, pl
Type: pluginType,
Name: pluginName,
},
scheme: scheme,
path: path,
client: defaultClient,
parser: parser,
outputType: outputType,
scheme: scheme,
path: path,
metricsPort: metricsPort,
client: defaultClient,
parser: parser,
outputType: outputType,
}
return dataSrc, nil
}
Expand All @@ -90,9 +95,18 @@ func (dataSrc *HTTPDataSource) Poll(ctx context.Context, ep fwkdl.Endpoint) (any
}

func (dataSrc *HTTPDataSource) getEndpoint(ep Addressable) *url.URL {
host := ep.GetMetricsHost()
if dataSrc.metricsPort != 0 {
ip, _, err := net.SplitHostPort(host)
if err == nil {
host = net.JoinHostPort(ip, strconv.Itoa(dataSrc.metricsPort))
}
// If SplitHostPort fails (e.g. host has no port), use MetricsHost unchanged
// so we still attempt a scrape rather than silently dropping the endpoint.
}
return &url.URL{
Scheme: dataSrc.scheme,
Host: ep.GetMetricsHost(),
Host: host,
Path: dataSrc.path,
}
}
Expand Down
94 changes: 94 additions & 0 deletions pkg/epp/framework/plugins/datalayer/source/http/datasource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package http

import (
"io"
"reflect"
"testing"

"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/types"
)

// fakeAddressable is a test double for the Addressable interface.
type fakeAddressable struct {
metricsHost string
}

func (f *fakeAddressable) GetIPAddress() string { return "" }
func (f *fakeAddressable) GetPort() string { return "" }
func (f *fakeAddressable) GetMetricsHost() string { return f.metricsHost }
func (f *fakeAddressable) GetNamespacedName() types.NamespacedName { return types.NamespacedName{Name: "pod", Namespace: "test"} }

func noopParser(r io.Reader) (any, error) { return nil, nil }

func TestGetEndpoint(t *testing.T) {
tests := []struct {
name string
metricsHost string
metricsPort int
wantHost string
}{
{
name: "metricsPort=0 preserves MetricsHost unchanged",
metricsHost: "1.2.3.4:8000",
metricsPort: 0,
wantHost: "1.2.3.4:8000",
},
{
name: "metricsPort overrides port in MetricsHost",
metricsHost: "1.2.3.4:8000",
metricsPort: 9090,
wantHost: "1.2.3.4:9090",
},
{
name: "metricsPort with IPv6 address",
metricsHost: "[::1]:8000",
metricsPort: 9090,
wantHost: "[::1]:9090",
},
{
name: "metricsPort with IPv6 address, no override when port=0",
metricsHost: "[::1]:8000",
metricsPort: 0,
wantHost: "[::1]:8000",
},
{
name: "malformed host falls back to original MetricsHost",
metricsHost: "not-a-host-with-port",
metricsPort: 9090,
wantHost: "not-a-host-with-port",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ds, err := NewHTTPDataSource("http", "/metrics", false, tc.metricsPort,
"test-type", "test-name", noopParser, reflect.TypeOf(""))
if err != nil {
t.Fatalf("NewHTTPDataSource() error = %v", err)
}

got := ds.getEndpoint(&fakeAddressable{metricsHost: tc.metricsHost})

if diff := cmp.Diff(tc.wantHost, got.Host); diff != "" {
t.Errorf("getEndpoint() host mismatch (-want +got):\n%s", diff)
}
})
}
}
14 changes: 12 additions & 2 deletions pkg/epp/framework/plugins/datalayer/source/metrics/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,19 @@ type metricsDatasourceParams struct {
Path string `json:"path"`
// InsecureSkipVerify defines whether model server certificate should be verified or not.
InsecureSkipVerify bool `json:"insecureSkipVerify"`
// MetricsPort defines the port to use for scraping metrics from model server pods.
// When set, this overrides the inference port encoded in the endpoint's MetricsHost.
// Useful when the model server exposes metrics on a separate port from inference
// (e.g., vLLM with --metrics-port 9090).
// Defaults to 0, which means the inference port is used.
MetricsPort int `json:"metricsPort"`
}

// NewHTTPMetricsDataSource constructs a MetricsDataSource with the given scheme and path.
// InsecureSkipVerify defaults to true (matching the factory default).
// Use this function directly in tests to bypass JSON parameter marshaling.
func NewHTTPMetricsDataSource(scheme, path, name string) (*http.HTTPDataSource, error) {
return http.NewHTTPDataSource(scheme, path, defaultMetricsInsecureSkipVerify,
return http.NewHTTPDataSource(scheme, path, defaultMetricsInsecureSkipVerify, 0,
MetricsDataSourceType, name, parseMetrics, PrometheusMetricType)
}

Expand All @@ -72,7 +78,11 @@ func MetricsDataSourceFactory(name string, parameters json.RawMessage, handle fw
}
}

return http.NewHTTPDataSource(cfg.Scheme, cfg.Path, cfg.InsecureSkipVerify, MetricsDataSourceType,
if cfg.MetricsPort != 0 && (cfg.MetricsPort < 1 || cfg.MetricsPort > 65535) {
return nil, fmt.Errorf("metricsPort must be between 1 and 65535, got %d", cfg.MetricsPort)
}

return http.NewHTTPDataSource(cfg.Scheme, cfg.Path, cfg.InsecureSkipVerify, cfg.MetricsPort, MetricsDataSourceType,
name, parseMetrics, PrometheusMetricType)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@ package metrics

import (
"context"
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/types"

fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugins/datalayer/source/http"
)

func TestDatasource(t *testing.T) {
_, err := http.NewHTTPDataSource("invalid", "/metrics", true, MetricsDataSourceType,
_, err := http.NewHTTPDataSource("invalid", "/metrics", true, 0, MetricsDataSourceType,
"metrics-data-source", parseMetrics, PrometheusMetricType)
assert.NotNil(t, err, "expected to fail with invalid scheme")

source, err := http.NewHTTPDataSource("https", "/metrics", true, MetricsDataSourceType,
source, err := http.NewHTTPDataSource("https", "/metrics", true, 0, MetricsDataSourceType,
"metrics-data-source", parseMetrics, PrometheusMetricType)
assert.Nil(t, err, "failed to create HTTP datasource")

Expand All @@ -50,3 +52,27 @@ func TestDatasource(t *testing.T) {
_, err = source.Poll(ctx, endpoint)
assert.NotNil(t, err, "expected to fail polling for metrics")
}

func TestMetricsDataSourceFactory_MetricsPortOverride(t *testing.T) {
params, err := json.Marshal(map[string]any{
"scheme": "http",
"metricsPort": 9090,
})
require.NoError(t, err)

plugin, err := MetricsDataSourceFactory("test-ds", params, nil)
require.NoError(t, err)

ds, ok := plugin.(fwkdl.PollingDataSource)
require.True(t, ok, "expected MetricsDataSourceFactory to return a PollingDataSource")

// Poll will fail (no real server), but the error must reference port 9090.
// If metricsPort were ignored, EPP would dial :8000 instead.
endpoint := fwkdl.NewEndpoint(&fwkdl.EndpointMetadata{
NamespacedName: types.NamespacedName{Name: "pod1", Namespace: "default"},
MetricsHost: "1.2.3.4:8000",
}, nil)
_, err = ds.Poll(context.Background(), endpoint)
assert.Error(t, err)
assert.Contains(t, err.Error(), "9090", "expected scrape target to use metricsPort 9090, not inference port 8000")
}
10 changes: 5 additions & 5 deletions test/integration/epp/runtime_polling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestRuntimePollingDispatch(t *testing.T) {
r := datalayer.NewRuntime(pollingInterval)
ext := mocks.NewPollingExtractor("test-extractor")

httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, "test-http", "test-source",
httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, 0, "test-http", "test-source",
parsePrometheusMetrics, reflect.TypeOf(fwkdl.Metrics{}))
require.NoError(t, err)

Expand Down Expand Up @@ -138,7 +138,7 @@ func TestRuntimePollingMultipleExtractors(t *testing.T) {
ext1 := mocks.NewPollingExtractor("extractor-1")
ext2 := mocks.NewPollingExtractor("extractor-2")

httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, "test-http", "test-source",
httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, 0, "test-http", "test-source",
parsePrometheusMetrics, reflect.TypeOf(fwkdl.Metrics{}))
require.NoError(t, err)

Expand Down Expand Up @@ -187,7 +187,7 @@ func TestRuntimePollingEndpointLifecycle(t *testing.T) {
r := datalayer.NewRuntime(pollingInterval)
ext := mocks.NewPollingExtractor("lifecycle-extractor")

httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, "test-http", "test-source",
httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, 0, "test-http", "test-source",
parsePrometheusMetrics, reflect.TypeOf(fwkdl.Metrics{}))
require.NoError(t, err)

Expand Down Expand Up @@ -241,7 +241,7 @@ func TestRuntimePollingWithoutExtractors(t *testing.T) {

r := datalayer.NewRuntime(50 * time.Millisecond)

httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, "test-http", "test-source",
httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, 0, "test-http", "test-source",
parsePrometheusMetrics, reflect.TypeOf(fwkdl.Metrics{}))
require.NoError(t, err)

Expand Down Expand Up @@ -280,7 +280,7 @@ func TestRuntimePollingHTTPError(t *testing.T) {
r := datalayer.NewRuntime(pollingInterval)
ext := mocks.NewPollingExtractor("error-extractor")

httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, "test-http", "test-source",
httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, 0, "test-http", "test-source",
parsePrometheusMetrics, reflect.TypeOf(fwkdl.Metrics{}))
require.NoError(t, err)

Expand Down