Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions kwok/charts/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ spec:
- WhenEmpty
- WhenEmptyOrUnderutilized
type: string
spotToSpotConsolidation:
description: |-
SpotToSpotConsolidation enables or disables spot-to-spot consolidation for this NodePool.
When enabled, Karpenter can replace spot nodes with cheaper spot nodes for cost optimization.
When not set (nil), the global SpotToSpotConsolidation feature gate is used.
This only affects "replace" consolidation (launching new spot nodes); "delete" consolidation
(moving pods to existing nodes without launching new ones) is unaffected by this setting.
For multi-node consolidation involving multiple NodePools, ALL source NodePools
must have this enabled for spot-to-spot replacement to occur.
type: boolean
required:
- consolidateAfter
type: object
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ spec:
- WhenEmpty
- WhenEmptyOrUnderutilized
type: string
spotToSpotConsolidation:
description: |-
SpotToSpotConsolidation enables or disables spot-to-spot consolidation for this NodePool.
When enabled, Karpenter can replace spot nodes with cheaper spot nodes for cost optimization.
When not set (nil), the global SpotToSpotConsolidation feature gate is used.
This only affects "replace" consolidation (launching new spot nodes); "delete" consolidation
(moving pods to existing nodes without launching new ones) is unaffected by this setting.
For multi-node consolidation involving multiple NodePools, ALL source NodePools
must have this enabled for spot-to-spot replacement to occur.
type: boolean
required:
- consolidateAfter
type: object
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/v1/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ type Disruption struct {
// +kubebuilder:validation:MaxItems=50
// +optional
Budgets []Budget `json:"budgets,omitempty" hash:"ignore"`
//nolint:kubeapilinter
// SpotToSpotConsolidation enables or disables spot-to-spot consolidation for this NodePool.
// When enabled, Karpenter can replace spot nodes with cheaper spot nodes for cost optimization.
// When not set (nil), the global SpotToSpotConsolidation feature gate is used.
// This only affects "replace" consolidation (launching new spot nodes); "delete" consolidation
// (moving pods to existing nodes without launching new ones) is unaffected by this setting.
// For multi-node consolidation involving multiple NodePools, ALL source NodePools
// must have this enabled for spot-to-spot replacement to occur.
// +optional
SpotToSpotConsolidation *bool `json:"spotToSpotConsolidation,omitempty" hash:"ignore"`
}

// Budget defines when Karpenter will restrict the
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 49 additions & 6 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,61 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
return cmd, nil
}

// spotToSpotConsolidationEnabled checks if spot-to-spot consolidation is enabled
// for the given candidates. For multi-node consolidation, ALL candidate NodePools
// must have it enabled.
// Records an Unconsolidatable event for single-node consolidation when disabled.
func (c *consolidation) spotToSpotConsolidationEnabled(ctx context.Context, candidates []*Candidate) bool {
globalEnabled := options.FromContext(ctx).FeatureGates.SpotToSpotConsolidation

for _, candidate := range candidates {
npEnabled := candidate.NodePool.Spec.Disruption.SpotToSpotConsolidation

// If NodePool has explicit setting, use it
if npEnabled != nil {
if !*npEnabled {
// This NodePool explicitly disables spot-to-spot
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(
candidate.Node,
candidate.NodeClaim,
fmt.Sprintf("SpotToSpotConsolidation is disabled for NodePool %q, can't replace a spot node with a spot node", candidate.NodePool.Name),
)...)
}
return false
}
// This NodePool explicitly enables it, continue checking others
continue
}

// NodePool has no explicit setting (nil), fall back to global
if !globalEnabled {
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(
candidate.Node,
candidate.NodeClaim,
"SpotToSpotConsolidation is disabled, can't replace a spot node with a spot node",
)...)
}
return false
}
}

// If we reach here, all NodePools either:
// 1. Explicitly set spotToSpotConsolidation=true (overrides global), OR
// 2. Had no setting (nil) and global is true (we would have returned false above otherwise)
return true
}

// Compute command to execute spot-to-spot consolidation if:
// 1. The SpotToSpotConsolidation feature flag is set to true.
// 1. The SpotToSpotConsolidation feature flag is set to true, or the NodePool explicitly enables it.
// 2. For single-node consolidation:
// a. There are at least 15 cheapest instance type replacement options to consolidate.
// b. The current candidate is NOT part of the first 15 cheapest instance types inorder to avoid repeated consolidation.
func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, candidates []*Candidate, results pscheduling.Results, candidatePrice float64) (Command, error) {

// Spot consolidation is turned off.
if !options.FromContext(ctx).FeatureGates.SpotToSpotConsolidation {
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "SpotToSpotConsolidation is disabled, can't replace a spot node with a spot node")...)
}
// Check if spot-to-spot consolidation is enabled (per-NodePool or global)
if !c.spotToSpotConsolidationEnabled(ctx, candidates) {
return Command{}, nil
}

Expand Down
163 changes: 163 additions & 0 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,85 @@ var _ = Describe("Consolidation", func() {
})
Expect(ok).To(BeTrue())
})
It("can replace spot with spot when NodePool explicitly enables spotToSpotConsolidation (overrides global=false)", func() {
// Global disabled, but NodePool explicitly enables
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{SpotToSpotConsolidation: lo.ToPtr(false)}}))
nodePool.Spec.Disruption.SpotToSpotConsolidation = lo.ToPtr(true)

rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())

pod := test.Pod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: lo.ToPtr(true),
BlockOwnerDeletion: lo.ToPtr(true),
},
}}})
ExpectApplied(ctx, env.Client, rs, pod, spotNode, spotNodeClaim, nodePool)
ExpectManualBinding(ctx, env.Client, pod, spotNode)
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{spotNode}, []*v1.NodeClaim{spotNodeClaim})

ExpectSingletonReconciled(ctx, disruptionController)

// Should have consolidation command because NodePool override enables it
cmds := queue.GetCommands()
Expect(cmds).To(HaveLen(1))
// Verify the command targets our spot node
Expect(cmds[0].Candidates).To(HaveLen(1))
Expect(cmds[0].Candidates[0].NodeClaim.Name).To(Equal(spotNodeClaim.Name))

// Process the command and verify replacement works
ExpectMakeNewNodeClaimsReady(ctx, env.Client, cluster, cloudProvider, cmds[0])
ExpectObjectReconciled(ctx, env.Client, queue, spotNodeClaim)
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, spotNodeClaim)

// Should have created a replacement and deleted the original
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectNotFound(ctx, env.Client, spotNodeClaim, spotNode)
})
It("cannot replace spot with spot when NodePool explicitly disables spotToSpotConsolidation (overrides global=true)", func() {
// Global enabled, but NodePool explicitly disables
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{SpotToSpotConsolidation: lo.ToPtr(true)}}))
nodePool.Spec.Disruption.SpotToSpotConsolidation = lo.ToPtr(false)

rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())

pod := test.Pod(test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: lo.ToPtr(true),
BlockOwnerDeletion: lo.ToPtr(true),
},
}}})
ExpectApplied(ctx, env.Client, rs, pod, spotNode, spotNodeClaim, nodePool)
ExpectManualBinding(ctx, env.Client, pod, spotNode)
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{spotNode}, []*v1.NodeClaim{spotNodeClaim})

ExpectSingletonReconciled(ctx, disruptionController)
cmds := queue.GetCommands()
Expect(cmds).To(HaveLen(0))

// Expect per-NodePool event message
_, ok := lo.Find(recorder.Events(), func(e events.Event) bool {
return strings.Contains(e.Message, fmt.Sprintf("SpotToSpotConsolidation is disabled for NodePool %q", nodePool.Name))
})
Expect(ok).To(BeTrue())
})
It("cannot replace spot with spot if it is part of the 15 cheapest instance types.", func() {
cloudProvider.InstanceTypes = lo.Slice(fake.InstanceTypesAssorted(), 0, 20)
// Forcefully assign lowest possible instancePrice to make sure we have atleast one instance
Expand Down Expand Up @@ -4072,6 +4151,90 @@ var _ = Describe("Consolidation", func() {
Entry("if the candidate is on-demand node", false),
Entry("if the candidate is spot node", true),
)
It("excludes spot nodes from NodePool with spotToSpotConsolidation disabled in multi-node consolidation", func() {
// Global SpotToSpotConsolidation is enabled (set in BeforeEach)
// Create a second NodePool with SpotToSpotConsolidation explicitly disabled
// Verify that nodes from the disabled pool are excluded from consolidation
disabledNodePool := test.NodePool(v1.NodePool{
Spec: v1.NodePoolSpec{
Template: v1.NodeClaimTemplate{
Spec: v1.NodeClaimTemplateSpec{
Requirements: []v1.NodeSelectorRequirementWithMinValues{
{Key: v1.CapacityTypeLabelKey, Operator: corev1.NodeSelectorOpIn, Values: []string{v1.CapacityTypeSpot}},
},
},
},
Disruption: v1.Disruption{
ConsolidationPolicy: v1.ConsolidationPolicyWhenEmptyOrUnderutilized,
ConsolidateAfter: v1.MustParseNillableDuration("0s"),
Budgets: []v1.Budget{{Nodes: "100%"}},
SpotToSpotConsolidation: lo.ToPtr(false), // Explicitly disabled
},
},
})
// Create a spot node for the disabled pool
disabledNodeClaim, disabledNode := test.NodeClaimAndNode(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: disabledNodePool.Name,
corev1.LabelInstanceTypeStable: mostExpensiveSpotInstance.Name,
v1.CapacityTypeLabelKey: mostExpensiveSpotOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(),
corev1.LabelTopologyZone: mostExpensiveSpotOffering.Requirements.Get(corev1.LabelTopologyZone).Any(),
},
},
Status: v1.NodeClaimStatus{
Allocatable: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("32"),
corev1.ResourcePods: resource.MustParse("100"),
},
},
})
disabledNodeClaim.StatusConditions().SetTrue(v1.ConditionTypeConsolidatable)

// Use existing spotNodeClaims/spotNodes from BeforeEach (global enabled via nodePool)
rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)
pods := test.Pods(4, test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{APIVersion: "apps/v1", Kind: "ReplicaSet", Name: rs.Name, UID: rs.UID, Controller: lo.ToPtr(true), BlockOwnerDeletion: lo.ToPtr(true)},
}},
})

// Apply: 3 nodes from enabled pool (BeforeEach) + 1 node from disabled pool
ExpectApplied(ctx, env.Client, pods[0], pods[1], pods[2], pods[3],
spotNodeClaims[0], spotNodes[0], spotNodeClaims[1], spotNodes[1], spotNodeClaims[2], spotNodes[2],
disabledNodeClaim, disabledNode,
nodePool, disabledNodePool)
ExpectMakeNodesInitialized(ctx, env.Client, spotNodes[0], spotNodes[1], spotNodes[2], disabledNode)
ExpectManualBinding(ctx, env.Client, pods[0], spotNodes[0])
ExpectManualBinding(ctx, env.Client, pods[1], spotNodes[1])
ExpectManualBinding(ctx, env.Client, pods[2], spotNodes[2])
ExpectManualBinding(ctx, env.Client, pods[3], disabledNode)
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController,
[]*corev1.Node{spotNodes[0], spotNodes[1], spotNodes[2], disabledNode},
[]*v1.NodeClaim{spotNodeClaims[0], spotNodeClaims[1], spotNodeClaims[2], disabledNodeClaim})

ExpectSingletonReconciled(ctx, disruptionController)

// REPLACE commands (spot-to-spot) never include nodes from disabled pool
// Note: DELETE commands can include the disabled pool since they don't create new spot nodes
cmds := queue.GetCommands()
Expect(cmds).ToNot(BeEmpty())
for _, cmd := range cmds {
if cmd.Decision() == disruption.ReplaceDecision {
for _, c := range cmd.Candidates {
Expect(c.NodePool.Name).ToNot(Equal(disabledNodePool.Name),
"node from disabled NodePool should not be included in REPLACE consolidation")
}
}
}

// Execute first consolidation command
ExpectMakeNewNodeClaimsReady(ctx, env.Client, cluster, cloudProvider, cmds[0])
ExpectObjectReconciled(ctx, env.Client, queue, cmds[0].Candidates[0].NodeClaim)
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, lo.Map(cmds[0].Candidates, func(c *disruption.Candidate, _ int) *v1.NodeClaim { return c.NodeClaim })...)
})
})
Context("Node Lifetime Consideration", func() {
var nodeClaims []*v1.NodeClaim
Expand Down
Loading