Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions cluster-autoscaler/cloudprovider/test/fake_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ func (n *NodeGroup) IncreaseSize(delta int) error {
if n.targetSize+delta > n.maxSize {
return fmt.Errorf("size too large")
}
if n.template == nil || n.template.Node() == nil {
return fmt.Errorf("node group %s has no template to create new nodes", n.id)
}

n.provider.Lock()
defer n.provider.Unlock()
Expand All @@ -354,9 +357,6 @@ func (n *NodeGroup) IncreaseSize(delta int) error {
instanceNum := n.targetSize + i
instanceId := fmt.Sprintf("%s-node-%d", n.id, instanceNum)

if n.template == nil || n.template.Node() == nil {
return fmt.Errorf("node group %s has no template to create new nodes", n.id)
}
newNode := n.template.Node().DeepCopy()
newNode.Name = instanceId

Expand All @@ -370,6 +370,39 @@ func (n *NodeGroup) IncreaseSize(delta int) error {
return nil
}

// IncreaseSizeDirect is a performance-optimized version of IncreaseSize that adds nodes
// directly to the fake client's object tracker to bypass API serialization and watch event overhead.
// Only safe to call before Informer Factory starts.
func (n *NodeGroup) IncreaseSizeDirect(delta int) error {
n.Lock()
defer n.Unlock()
if n.targetSize+delta > n.maxSize {
return fmt.Errorf("size too large")
}
if n.template == nil || n.template.Node() == nil {
return fmt.Errorf("node group %s has no template to create new nodes", n.id)
}

n.provider.Lock()
defer n.provider.Unlock()

for i := 0; i < delta; i++ {
instanceNum := n.targetSize + i
instanceId := fmt.Sprintf("%s-node-%d", n.id, instanceNum)

newNode := n.template.Node().DeepCopy()
newNode.Name = instanceId

n.instances[instanceId] = cloudprovider.InstanceRunning
n.provider.nodeToGroup[instanceId] = n.id
if n.provider.k8s != nil {
n.provider.k8s.AddNodeDirect(newNode)
}
}
n.targetSize += delta
return nil
}

// TemplateNodeInfo returns the template node information for this node group.
func (n *NodeGroup) TemplateNodeInfo() (*framework.NodeInfo, error) {
if n.template == nil {
Expand Down
22 changes: 14 additions & 8 deletions cluster-autoscaler/core/bench/benchmark_runonce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,21 @@
}

for i := 0; i < b.N; i++ {
ctx, cancel := context.WithCancel(context.Background())
clusterFakes := newClusterFakes()
if err := s.setup(clusterFakes); err != nil {
cancel()
b.Fatalf("setup failed: %v", err)
}
autoscaler := newAutoscaler(b, s, clusterFakes)
autoscaler := newAutoscaler(b, s, clusterFakes, ctx)

// Manually trigger GC before the timed section to ensure a clean state
// for each iteration.
runtime.GC()

if f != nil && i == 0 {
if err := pprof.StartCPUProfile(f); err != nil {
cancel()
b.Fatalf("Failed to start cpu profile: %v", err)
}
}
Expand All @@ -158,14 +161,17 @@
}

if err != nil {
cancel()
b.Fatalf("RunOnce failed: %v", err)
}

if s.verify != nil {
if err := s.verify(clusterFakes); err != nil {
cancel()
b.Fatalf("verify failed: %v", err)
}
}
cancel()
}
}

Expand All @@ -178,7 +184,7 @@
}

// newAutoscaler constructs a core.Autoscaler instance configured for the given scenario.
func newAutoscaler(b *testing.B, s scenario, clusterFakes *integration.FakeSet) core.Autoscaler {
func newAutoscaler(b *testing.B, s scenario, clusterFakes *integration.FakeSet, ctx context.Context) core.Autoscaler {

Check failure on line 187 in cluster-autoscaler/core/bench/benchmark_runonce_test.go

View workflow job for this annotation

GitHub Actions / verify

context.Context should be the first parameter of a function
opts := defaultCAOptions()
if s.config != nil {
s.config(&opts)
Expand All @@ -190,7 +196,7 @@
ftkc := &fastTaintingKubeClient{taintedNodes: make(map[string]bool)}
ftkc.registerReactors(clusterFakes.KubeClient)

kubeClients := ca_context.NewAutoscalingKubeClients(context.Background(), opts, clusterFakes.KubeClient, clusterFakes.InformerFactory)
kubeClients := ca_context.NewAutoscalingKubeClients(ctx, opts, clusterFakes.KubeClient, clusterFakes.InformerFactory)
kubeClients.Recorder = &noOpRecorder{}

wrappedCloudProvider := &fastScaleUpCloudProvider{
Expand All @@ -204,7 +210,7 @@
WithAutoscalingKubeClients(kubeClients).
WithInformerFactory(clusterFakes.InformerFactory).
WithCloudProvider(wrappedCloudProvider).
WithPodObserver(clusterFakes.PodObserver).Build(context.Background())
WithPodObserver(clusterFakes.PodObserver).Build(ctx)
if err != nil {
b.Fatalf("Failed to build: %v", err)
}
Expand Down Expand Up @@ -381,7 +387,7 @@
cpu := int64(nodeCPU / podsPerNode)
mem := int64(nodeMem / podsPerNode)
pod := BuildTestPod(podName, cpu, mem, MarkUnschedulable())
clusterFakes.K8s.AddPod(pod)
clusterFakes.K8s.AddPodDirect(pod)
}
return nil
}
Expand All @@ -401,8 +407,8 @@
testprovider.WithNGSize(0, maxNGSize),
)

ng := clusterFakes.CloudProvider.GetNodeGroup(ngName)
if err := ng.IncreaseSize(nodesCount); err != nil {
ng := clusterFakes.CloudProvider.GetNodeGroup(ngName).(*testprovider.NodeGroup)
if err := ng.IncreaseSizeDirect(nodesCount); err != nil {
return err
}

Expand All @@ -420,7 +426,7 @@
pod.Annotations = make(map[string]string)
}
pod.Annotations["cluster-autoscaler.kubernetes.io/safe-to-evict"] = "true"
clusterFakes.K8s.AddPod(pod)
clusterFakes.K8s.AddPodDirect(pod)
}

return nil
Expand Down
26 changes: 26 additions & 0 deletions cluster-autoscaler/utils/fake/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ func (k *Kubernetes) AddNode(node *apiv1.Node) {
_, _ = k.Client.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
}

// AddNodeDirect adds a node directly to the fake client's underlying object tracker
// via Tracker().Add(node) instead of going through CoreV1().Nodes().Create().
// This completely bypasses the fake clientset's serialization, deep-copy, and reactor overhead,
// reducing setup times for large-scale simulations (like benchmarks) by orders of magnitude.
//
// WARNING: Because Tracker().Add() does NOT trigger active watch events in client-go,
// this method MUST ONLY be used for pre-populating the cluster state during the initial
// setup phase BEFORE the Informer Factory is started.
// To dynamically add a node AFTER informers are running, use the standard AddNode method instead.
func (k *Kubernetes) AddNodeDirect(node *apiv1.Node) {
_ = k.Client.Tracker().Add(node)
}

// UpdateNode updates a node.
func (k *Kubernetes) UpdateNode(node *apiv1.Node) {
_, _ = k.Client.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
Expand All @@ -66,6 +79,19 @@ func (k *Kubernetes) AddPod(pod *apiv1.Pod) {
_, _ = k.Client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
}

// AddPodDirect adds a pod directly to the fake client's underlying object tracker
// via Tracker().Add(pod) instead of going through CoreV1().Pods().Create().
// This completely bypasses the fake clientset's serialization, deep-copy, and reactor overhead,
// reducing setup times for large-scale simulations (like benchmarks) by orders of magnitude.
//
// WARNING: Because Tracker().Add() does NOT trigger active watch events in client-go,
// this method MUST ONLY be used for pre-populating the cluster state during the initial
// setup phase BEFORE the Informer Factory is started.
// To dynamically add a pod AFTER informers are running, use the standard AddPod method instead.
func (k *Kubernetes) AddPodDirect(pod *apiv1.Pod) {
_ = k.Client.Tracker().Add(pod)
}

// DeletePod deletes a pod.
func (k *Kubernetes) DeletePod(namespace, name string) {
_ = k.Client.CoreV1().Pods(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
Expand Down
Loading