Skip to content

Commit 5fc3afb

Browse files
committed
[Perf] Optimize and fix informer leak in core/bench benchmark tests
1. Introduce performance-optimized AddNodeDirect and AddPodDirect methods in fake Kubernetes client helper that bypass standard API-like Create serialization, deep-copy, and reactor overhead by writing directly to the object tracker. 2. Leave existing AddNode and AddPod methods unchanged to ensure absolute zero regression impact on existing integration tests. 3. Pass a localized cancellable context to the Autoscaler builder per benchmark iteration and cancel it at the end of the iteration, resolving informer goroutine and cache leaks.
1 parent 35976b2 commit 5fc3afb

3 files changed

Lines changed: 76 additions & 11 deletions

File tree

cluster-autoscaler/cloudprovider/test/fake_cloud_provider.go

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,9 @@ func (n *NodeGroup) IncreaseSize(delta int) error {
346346
if n.targetSize+delta > n.maxSize {
347347
return fmt.Errorf("size too large")
348348
}
349+
if n.template == nil || n.template.Node() == nil {
350+
return fmt.Errorf("node group %s has no template to create new nodes", n.id)
351+
}
349352

350353
n.provider.Lock()
351354
defer n.provider.Unlock()
@@ -354,9 +357,6 @@ func (n *NodeGroup) IncreaseSize(delta int) error {
354357
instanceNum := n.targetSize + i
355358
instanceId := fmt.Sprintf("%s-node-%d", n.id, instanceNum)
356359

357-
if n.template == nil || n.template.Node() == nil {
358-
return fmt.Errorf("node group %s has no template to create new nodes", n.id)
359-
}
360360
newNode := n.template.Node().DeepCopy()
361361
newNode.Name = instanceId
362362

@@ -370,6 +370,39 @@ func (n *NodeGroup) IncreaseSize(delta int) error {
370370
return nil
371371
}
372372

373+
// IncreaseSizeDirect is a performance-optimized version of IncreaseSize that adds nodes
374+
// directly to the fake client's object tracker to bypass API serialization and watch event overhead.
375+
// Only safe to call before Informer Factory starts.
376+
func (n *NodeGroup) IncreaseSizeDirect(delta int) error {
377+
n.Lock()
378+
defer n.Unlock()
379+
if n.targetSize+delta > n.maxSize {
380+
return fmt.Errorf("size too large")
381+
}
382+
if n.template == nil || n.template.Node() == nil {
383+
return fmt.Errorf("node group %s has no template to create new nodes", n.id)
384+
}
385+
386+
n.provider.Lock()
387+
defer n.provider.Unlock()
388+
389+
for i := 0; i < delta; i++ {
390+
instanceNum := n.targetSize + i
391+
instanceId := fmt.Sprintf("%s-node-%d", n.id, instanceNum)
392+
393+
newNode := n.template.Node().DeepCopy()
394+
newNode.Name = instanceId
395+
396+
n.instances[instanceId] = cloudprovider.InstanceRunning
397+
n.provider.nodeToGroup[instanceId] = n.id
398+
if n.provider.k8s != nil {
399+
n.provider.k8s.AddNodeDirect(newNode)
400+
}
401+
}
402+
n.targetSize += delta
403+
return nil
404+
}
405+
373406
// TemplateNodeInfo returns the template node information for this node group.
374407
func (n *NodeGroup) TemplateNodeInfo() (*framework.NodeInfo, error) {
375408
if n.template == nil {

cluster-autoscaler/core/bench/benchmark_runonce_test.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,18 +133,21 @@ func (s scenario) run(b *testing.B) {
133133
}
134134

135135
for i := 0; i < b.N; i++ {
136+
ctx, cancel := context.WithCancel(context.Background())
136137
clusterFakes := newClusterFakes()
137138
if err := s.setup(clusterFakes); err != nil {
139+
cancel()
138140
b.Fatalf("setup failed: %v", err)
139141
}
140-
autoscaler := newAutoscaler(b, s, clusterFakes)
142+
autoscaler := newAutoscaler(b, s, clusterFakes, ctx)
141143

142144
// Manually trigger GC before the timed section to ensure a clean state
143145
// for each iteration.
144146
runtime.GC()
145147

146148
if f != nil && i == 0 {
147149
if err := pprof.StartCPUProfile(f); err != nil {
150+
cancel()
148151
b.Fatalf("Failed to start cpu profile: %v", err)
149152
}
150153
}
@@ -158,14 +161,17 @@ func (s scenario) run(b *testing.B) {
158161
}
159162

160163
if err != nil {
164+
cancel()
161165
b.Fatalf("RunOnce failed: %v", err)
162166
}
163167

164168
if s.verify != nil {
165169
if err := s.verify(clusterFakes); err != nil {
170+
cancel()
166171
b.Fatalf("verify failed: %v", err)
167172
}
168173
}
174+
cancel()
169175
}
170176
}
171177

@@ -178,7 +184,7 @@ func newClusterFakes() *integration.FakeSet {
178184
}
179185

180186
// newAutoscaler constructs a core.Autoscaler instance configured for the given scenario.
181-
func newAutoscaler(b *testing.B, s scenario, clusterFakes *integration.FakeSet) core.Autoscaler {
187+
func newAutoscaler(b *testing.B, s scenario, clusterFakes *integration.FakeSet, ctx context.Context) core.Autoscaler {
182188
opts := defaultCAOptions()
183189
if s.config != nil {
184190
s.config(&opts)
@@ -190,7 +196,7 @@ func newAutoscaler(b *testing.B, s scenario, clusterFakes *integration.FakeSet)
190196
ftkc := &fastTaintingKubeClient{taintedNodes: make(map[string]bool)}
191197
ftkc.registerReactors(clusterFakes.KubeClient)
192198

193-
kubeClients := ca_context.NewAutoscalingKubeClients(context.Background(), opts, clusterFakes.KubeClient, clusterFakes.InformerFactory)
199+
kubeClients := ca_context.NewAutoscalingKubeClients(ctx, opts, clusterFakes.KubeClient, clusterFakes.InformerFactory)
194200
kubeClients.Recorder = &noOpRecorder{}
195201

196202
wrappedCloudProvider := &fastScaleUpCloudProvider{
@@ -204,7 +210,7 @@ func newAutoscaler(b *testing.B, s scenario, clusterFakes *integration.FakeSet)
204210
WithAutoscalingKubeClients(kubeClients).
205211
WithInformerFactory(clusterFakes.InformerFactory).
206212
WithCloudProvider(wrappedCloudProvider).
207-
WithPodObserver(clusterFakes.PodObserver).Build(context.Background())
213+
WithPodObserver(clusterFakes.PodObserver).Build(ctx)
208214
if err != nil {
209215
b.Fatalf("Failed to build: %v", err)
210216
}
@@ -381,7 +387,7 @@ func setupScaleUp(nodes int) func(*integration.FakeSet) error {
381387
cpu := int64(nodeCPU / podsPerNode)
382388
mem := int64(nodeMem / podsPerNode)
383389
pod := BuildTestPod(podName, cpu, mem, MarkUnschedulable())
384-
clusterFakes.K8s.AddPod(pod)
390+
clusterFakes.K8s.AddPodDirect(pod)
385391
}
386392
return nil
387393
}
@@ -401,8 +407,8 @@ func setupScaleDown60Percent(nodesCount int) func(*integration.FakeSet) error {
401407
testprovider.WithNGSize(0, maxNGSize),
402408
)
403409

404-
ng := clusterFakes.CloudProvider.GetNodeGroup(ngName)
405-
if err := ng.IncreaseSize(nodesCount); err != nil {
410+
ng := clusterFakes.CloudProvider.GetNodeGroup(ngName).(*testprovider.NodeGroup)
411+
if err := ng.IncreaseSizeDirect(nodesCount); err != nil {
406412
return err
407413
}
408414

@@ -420,7 +426,7 @@ func setupScaleDown60Percent(nodesCount int) func(*integration.FakeSet) error {
420426
pod.Annotations = make(map[string]string)
421427
}
422428
pod.Annotations["cluster-autoscaler.kubernetes.io/safe-to-evict"] = "true"
423-
clusterFakes.K8s.AddPod(pod)
429+
clusterFakes.K8s.AddPodDirect(pod)
424430
}
425431

426432
return nil

cluster-autoscaler/utils/fake/kubernetes.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,19 @@ func (k *Kubernetes) AddNode(node *apiv1.Node) {
4545
_, _ = k.Client.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
4646
}
4747

48+
// AddNodeDirect adds a node directly to the fake client's underlying object tracker
49+
// via Tracker().Add(node) instead of going through CoreV1().Nodes().Create().
50+
// This completely bypasses the fake clientset's serialization, deep-copy, and reactor overhead,
51+
// reducing setup times for large-scale simulations (like benchmarks) by orders of magnitude.
52+
//
53+
// WARNING: Because Tracker().Add() does NOT trigger active watch events in client-go,
54+
// this method MUST ONLY be used for pre-populating the cluster state during the initial
55+
// setup phase BEFORE the Informer Factory is started.
56+
// To dynamically add a node AFTER informers are running, use the standard AddNode method instead.
57+
func (k *Kubernetes) AddNodeDirect(node *apiv1.Node) {
58+
_ = k.Client.Tracker().Add(node)
59+
}
60+
4861
// UpdateNode updates a node.
4962
func (k *Kubernetes) UpdateNode(node *apiv1.Node) {
5063
_, _ = k.Client.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
@@ -66,6 +79,19 @@ func (k *Kubernetes) AddPod(pod *apiv1.Pod) {
6679
_, _ = k.Client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
6780
}
6881

82+
// AddPodDirect adds a pod directly to the fake client's underlying object tracker
83+
// via Tracker().Add(pod) instead of going through CoreV1().Pods().Create().
84+
// This completely bypasses the fake clientset's serialization, deep-copy, and reactor overhead,
85+
// reducing setup times for large-scale simulations (like benchmarks) by orders of magnitude.
86+
//
87+
// WARNING: Because Tracker().Add() does NOT trigger active watch events in client-go,
88+
// this method MUST ONLY be used for pre-populating the cluster state during the initial
89+
// setup phase BEFORE the Informer Factory is started.
90+
// To dynamically add a pod AFTER informers are running, use the standard AddPod method instead.
91+
func (k *Kubernetes) AddPodDirect(pod *apiv1.Pod) {
92+
_ = k.Client.Tracker().Add(pod)
93+
}
94+
6995
// DeletePod deletes a pod.
7096
func (k *Kubernetes) DeletePod(namespace, name string) {
7197
_ = k.Client.CoreV1().Pods(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})

0 commit comments

Comments
 (0)