Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
fccontroller "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller"
fcregistry "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
fwkrh "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/requesthandling"
extractormetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugins/datalayer/extractor/metrics"
Expand Down Expand Up @@ -508,13 +509,13 @@ func (r *Runner) parseConfigurationPhaseOne(ctx context.Context, opts *runserver
}

// Return a function that can be used in the EPP Handle to list pod names.
func makePodListFunc(ds datastore.Datastore) func() []types.NamespacedName {
return func() []types.NamespacedName {
func makePodListFunc(ds datastore.Datastore) func() []plugin.EndPointKey {
return func() []plugin.EndPointKey {
pods := ds.PodList(datastore.AllPodsPredicate)
names := make([]types.NamespacedName, 0, len(pods))
names := make([]plugin.EndPointKey, 0, len(pods))

for _, p := range pods {
names = append(names, p.GetMetadata().NamespacedName)
names = append(names, p.GetMetadata().Key)
}
return names
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/epp/backend/metrics/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/log"

logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/observability/logging"
fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
)

// FakePodMetrics is an implementation of PodMetrics that doesn't run the async refresh loop.
Expand Down Expand Up @@ -66,35 +66,35 @@ func (fpm *FakePodMetrics) UpdateMetrics(updated *MetricsState) {

type FakePodMetricsClient struct {
errMu sync.RWMutex
Err map[types.NamespacedName]error
Err map[plugin.EndPointKey]error
resMu sync.RWMutex
Res map[types.NamespacedName]*MetricsState
Res map[plugin.EndPointKey]*MetricsState
}

func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod *fwkdl.EndpointMetadata, existing *MetricsState) (*MetricsState, error) {
func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, endPoint *fwkdl.EndpointMetadata, existing *MetricsState) (*MetricsState, error) {
f.errMu.RLock()
err, ok := f.Err[pod.NamespacedName]
err, ok := f.Err[endPoint.Key]
f.errMu.RUnlock()
if ok {
return nil, err
}
f.resMu.RLock()
res, ok := f.Res[pod.NamespacedName]
res, ok := f.Res[endPoint.Key]
f.resMu.RUnlock()
if !ok {
return nil, fmt.Errorf("no pod found: %v", pod.NamespacedName)
return nil, fmt.Errorf("no pod found: %v", endPoint.Key)
}
log.FromContext(ctx).V(logutil.VERBOSE).Info("Fetching metrics for pod", "existing", existing, "new", res)
return res.Clone(), nil
}

func (f *FakePodMetricsClient) SetRes(new map[types.NamespacedName]*MetricsState) {
func (f *FakePodMetricsClient) SetRes(new map[plugin.EndPointKey]*MetricsState) {
f.resMu.Lock()
defer f.resMu.Unlock()
f.Res = new
}

func (f *FakePodMetricsClient) SetErr(new map[types.NamespacedName]error) {
func (f *FakePodMetricsClient) SetErr(new map[plugin.EndPointKey]error) {
f.errMu.Lock()
defer f.errMu.Unlock()
f.Err = new
Expand Down
10 changes: 5 additions & 5 deletions pkg/epp/backend/metrics/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/types"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"

v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
eppmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
poolutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool"
)
Expand Down Expand Up @@ -77,12 +77,12 @@ func (f *fakeOddMetricsDataStore) PoolGet() (*datalayer.EndpointPool, error) {

func (f *fakeOddMetricsDataStore) PodList(predicate func(fwkdl.Endpoint) bool) []fwkdl.Endpoint {
pod1 := &fwkdl.EndpointMetadata{
NamespacedName: types.NamespacedName{Name: "pod1", Namespace: "default"},
Address: "1.2.3.4:5678",
Key: plugin.NewEndPointKey("pod1", "default", 8000),
Address: "1.2.3.4:5678",
}
pod2 := &fwkdl.EndpointMetadata{
NamespacedName: types.NamespacedName{Name: "pod2", Namespace: "default"},
Address: "1.2.3.4:5679",
Key: plugin.NewEndPointKey("pod2", "default", 8000),
Address: "1.2.3.4:5679",
}
m1 := &fwkdl.Metrics{
RunningRequestsSize: 0,
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/backend/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,14 @@ func (p *PodMetricsClientImpl) FetchMetrics(ctx context.Context, metadata *fwkdl
}
resp, err := p.Client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", metadata.NamespacedName, err)
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", &metadata.Key, err)
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code from %s: %v", metadata.NamespacedName, resp.StatusCode)
return nil, fmt.Errorf("unexpected status code from %s: %v", &metadata.Key, resp.StatusCode)
}

parser := expfmt.NewTextParser(model.LegacyValidation)
Expand Down
7 changes: 2 additions & 5 deletions pkg/epp/backend/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"github.com/stretchr/testify/assert"
"go.uber.org/multierr"
"google.golang.org/protobuf/proto"
"k8s.io/apimachinery/pkg/types"

logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/observability/logging"
fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
)

// --- Test Helpers ---
Expand Down Expand Up @@ -581,10 +581,7 @@ func TestFetchMetrics(t *testing.T) {
Address: "127.0.0.1",
Port: "9999",
MetricsHost: "127.0.0.1:9999",
NamespacedName: types.NamespacedName{
Namespace: "test",
Name: "pod",
},
Key: plugin.NewEndPointKey("test", "pod", 9999),
}
existing := &MetricsState{}
// No MetricMapping needed for this basic test
Expand Down
9 changes: 3 additions & 6 deletions pkg/epp/backend/metrics/pod_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,15 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
)

var (
pod1Info = &fwkdl.EndpointMetadata{
NamespacedName: types.NamespacedName{
Name: "pod1-rank-0",
Namespace: "default",
},
Key: plugin.NewEndPointKey("default", "pod1", 9999),
PodName: "pod1",
}
initial = &MetricsState{
Expand All @@ -59,7 +56,7 @@ func TestMetricsRefresh(t *testing.T) {

// Use SetRes to simulate an update of metrics from the pod.
// Verify that the metrics are updated.
pmc.SetRes(map[types.NamespacedName]*MetricsState{pod1Info.NamespacedName: initial})
pmc.SetRes(map[plugin.EndPointKey]*MetricsState{pod1Info.Key: initial})
condition := func(collect *assert.CollectT) {
assert.True(collect, cmp.Equal(pm.GetMetrics(), initial, cmpopts.IgnoreFields(MetricsState{}, "UpdateTime")))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/backend/metrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, metadata *fwk
startOnce: sync.Once{},
stopOnce: sync.Once{},
done: make(chan struct{}),
logger: log.FromContext(parentCtx).WithValues("endpoint", metadata.NamespacedName),
logger: log.FromContext(parentCtx).WithValues("endpoint", metadata.Key),
}
pm.metadata.Store(metadata)
pm.metrics.Store(fwkdl.NewMetrics())
Expand Down
16 changes: 8 additions & 8 deletions pkg/epp/controller/inferencepool_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestInferencePoolReconciler(t *testing.T) {
t.Errorf("Unexpected InferencePool reconcile error: %v", err)
}
endpointPool1 := pool.InferencePoolToEndpointPool(pool1)
if diff := diffStore(ds, diffStoreParams{wantPool: endpointPool1, wantEndpoints: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" {
if diff := diffStore(ds, diffStoreParams{wantPool: endpointPool1, wantEndpoints: []string{"pod1", "pod2"}}); diff != "" {
t.Errorf("Unexpected diff (+got/-want): %s", diff)
}

Expand All @@ -147,7 +147,7 @@ func TestInferencePoolReconciler(t *testing.T) {
t.Errorf("Unexpected InferencePool reconcile error: %v", err)
}
newEndpointPool1 := pool.InferencePoolToEndpointPool(newPool1)
if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantEndpoints: []string{"pod5-rank-0"}}); diff != "" {
if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantEndpoints: []string{"pod5"}}); diff != "" {
t.Errorf("Unexpected diff (+got/-want): %s", diff)
}

Expand All @@ -163,7 +163,7 @@ func TestInferencePoolReconciler(t *testing.T) {
t.Errorf("Unexpected InferencePool reconcile error: %v", err)
}
newEndpointPool1 = pool.InferencePoolToEndpointPool(newPool1)
if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantEndpoints: []string{"pod5-rank-0"}}); diff != "" {
if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantEndpoints: []string{"pod5"}}); diff != "" {
t.Errorf("Unexpected diff (+got/-want): %s", diff)
}

Expand Down Expand Up @@ -201,7 +201,7 @@ func diffStore(store datastore.Datastore, params diffStoreParams) string {
}
gotEndpoints := []string{}
for _, em := range store.PodList(datastore.AllPodsPredicate) {
gotEndpoints = append(gotEndpoints, em.GetMetadata().NamespacedName.Name)
gotEndpoints = append(gotEndpoints, em.GetMetadata().GetNamespacedName().Name)
}
if diff := cmp.Diff(params.wantEndpoints, gotEndpoints, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {
return "endpoints:" + diff
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestXInferencePoolReconciler(t *testing.T) {
t.Errorf("Unexpected InferencePool reconcile error: %v", err)
}
endpointPool1 := pool.AlphaInferencePoolToEndpointPool(pool1)
if diff := xDiffStore(ds, xDiffStoreParams{wantPool: endpointPool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" {
if diff := xDiffStore(ds, xDiffStoreParams{wantPool: endpointPool1, wantPods: []string{"pod1", "pod2"}}); diff != "" {
t.Errorf("Unexpected diff (+got/-want): %s", diff)
}

Expand All @@ -300,7 +300,7 @@ func TestXInferencePoolReconciler(t *testing.T) {
t.Errorf("Unexpected InferencePool reconcile error: %v", err)
}
newEndpointPool1 := pool.AlphaInferencePoolToEndpointPool(newPool1)
if diff := xDiffStore(ds, xDiffStoreParams{wantPool: newEndpointPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" {
if diff := xDiffStore(ds, xDiffStoreParams{wantPool: newEndpointPool1, wantPods: []string{"pod5"}}); diff != "" {
t.Errorf("Unexpected diff (+got/-want): %s", diff)
}

Expand All @@ -316,7 +316,7 @@ func TestXInferencePoolReconciler(t *testing.T) {
t.Errorf("Unexpected InferencePool reconcile error: %v", err)
}
newEndpointPool1 = pool.AlphaInferencePoolToEndpointPool(newPool1)
if diff := xDiffStore(ds, xDiffStoreParams{wantPool: newEndpointPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" {
if diff := xDiffStore(ds, xDiffStoreParams{wantPool: newEndpointPool1, wantPods: []string{"pod5"}}); diff != "" {
t.Errorf("Unexpected diff (+got/-want): %s", diff)
}

Expand Down Expand Up @@ -358,7 +358,7 @@ func xDiffStore(store datastore.Datastore, params xDiffStoreParams) string {
}
gotPods := []string{}
for _, em := range store.PodList(datastore.AllPodsPredicate) {
gotPods = append(gotPods, em.GetMetadata().NamespacedName.Name)
gotPods = append(gotPods, em.GetMetadata().GetNamespacedName().Name)
}
if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {
return "pods:" + diff
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/controller/pod_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func TestPodReconciler(t *testing.T) {

var gotPods []*corev1.Pod
for _, pm := range store.PodList(datastore.AllPodsPredicate) {
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetMetadata().PodName, Namespace: pm.GetMetadata().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetMetadata().GetIPAddress()}}
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetMetadata().PodName, Namespace: pm.GetMetadata().GetNamespacedName().Namespace}, Status: corev1.PodStatus{PodIP: pm.GetMetadata().GetIPAddress()}}
gotPods = append(gotPods, pod)
}
if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b *corev1.Pod) bool { return a.Name < b.Name })) {
Expand Down
7 changes: 2 additions & 5 deletions pkg/epp/datalayer/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/mocks"
fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
datasourcemocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugins/datalayer/source/mocks"
)

Expand Down Expand Up @@ -58,10 +58,7 @@ func (e *errSource) Poll(_ context.Context, _ fwkdl.Endpoint) (any, error) {

func defaultEndpoint() fwkdl.Endpoint {
meta := &fwkdl.EndpointMetadata{
NamespacedName: types.NamespacedName{
Name: "pod-name",
Namespace: "default",
},
Key: plugin.NewEndPointKey("pod-name", "default", 5678),
Address: "1.2.3.4:5678",
}
ms := fwkdl.NewEndpoint(meta, nil)
Expand Down
12 changes: 3 additions & 9 deletions pkg/epp/datalayer/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,16 @@ import (
"time"

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/types"

fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
)

func TestFactory(t *testing.T) {
runtime := NewTestRuntime(t, 100*time.Millisecond)

pod1 := &fwkdl.EndpointMetadata{
NamespacedName: types.NamespacedName{
Name: "pod1",
Namespace: "default",
},
Key: plugin.NewEndPointKey("pod1", "default", 5678),
Address: "1.2.3.4:5678",
}
endpoint1 := runtime.NewEndpoint(context.Background(), pod1, nil)
Expand All @@ -44,10 +41,7 @@ func TestFactory(t *testing.T) {
assert.Nil(t, dup, "expected to fail to create a duplicate collector")

pod2 := &fwkdl.EndpointMetadata{
NamespacedName: types.NamespacedName{
Name: "pod2",
Namespace: "default",
},
Key: plugin.NewEndPointKey("pod2", "default", 5679),
Address: "1.2.3.4:5679",
}
endpoint2 := runtime.NewEndpoint(context.Background(), pod2, nil)
Expand Down
17 changes: 6 additions & 11 deletions pkg/epp/datalayer/logger/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import (
"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/zapcore"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"

v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
poolutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool"
)
Expand Down Expand Up @@ -76,10 +76,11 @@ func TestLogger(t *testing.T) {

logOutput := b.read()
assert.Contains(t, logOutput, "Refreshing Prometheus Metrics {\"ReadyPods\": 2}")
assert.Contains(t, logOutput, "Current Pods and metrics gathered {\"Fresh metrics\": \"[Metadata: {NamespacedName:default/pod1 PodName: Address:1.2.3.4:5678")
assert.Contains(t, logOutput, "Current Pods and metrics gathered")
assert.Contains(t, logOutput, "Metadata: {Key:default/pod1:0")
assert.Contains(t, logOutput, "Metrics: {ActiveModels:map[modelA:1] WaitingModels:map[modelB:2] MaxActiveModels:5")
assert.Contains(t, logOutput, "RunningRequestsSize:3 WaitingQueueSize:7 KVCacheUsagePercent:42.5 KvCacheMaxTokenCapacity:2048")
assert.Contains(t, logOutput, "Metadata: {NamespacedName:default/pod2 PodName: Address:1.2.3.4:5679")
assert.Contains(t, logOutput, "Metadata: {Key:default/pod2:0 PodName: Address:1.2.3.4:5679")
assert.Contains(t, logOutput, "\"Stale metrics\": \"[]\"")
}

Expand Down Expand Up @@ -205,17 +206,11 @@ func (f *FakeOddMetricsDataStore) PodList(predicate func(fwkdl.Endpoint) bool) [
}

var pod1 = &fwkdl.EndpointMetadata{
NamespacedName: types.NamespacedName{
Name: "pod1",
Namespace: "default",
},
Key: plugin.NewEndPointKey("pod1", "default", 0),
Address: "1.2.3.4:5678",
}
var pod2 = &fwkdl.EndpointMetadata{
NamespacedName: types.NamespacedName{
Name: "pod2",
Namespace: "default",
},
Key: plugin.NewEndPointKey("pod2", "default", 0),
Address: "1.2.3.4:5679",
}

Expand Down
Loading
Loading