Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions pkg/clusteragent/autoscaling/workload/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func NewController(
c.limitHeap = limitHeap
c.store = store
c.podWatcher = podWatcher
c.scaler = newScaler(restMapper, scaleClient)
c.scaler = newScaler(restMapper, scaleClient, dynamicClient)

// Initialize metrics store
c.metricsStore = metricsstore.NewMetricsStore(metrics.GeneratePodAutoscalerMetrics, localSender, c.IsLeader, globalTagsFunc)
Expand Down Expand Up @@ -272,6 +272,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
// Deletion can only happen if the object is owned by remote config.
if podAutoscalerInternal.Deleted() {
log.Infof("Remote owned PodAutoscaler with Deleted flag, deleting object: %s", key)
c.releaseTargetReplicasOwnership(ctx, podAutoscalerInternal)
Comment thread
celenechang marked this conversation as resolved.
err := c.deletePodAutoscaler(ns, name)
// In case of not found, it means the object is gone but informer cache is not updated yet, we can safely delete it from our store
if err != nil && k8serrors.IsNotFound(err) {
Expand Down Expand Up @@ -359,6 +360,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
if podAutoscalerInternal.IsProfileManaged() {
if podAutoscalerInternal.Deleted() {
log.Infof("Profile-managed PodAutoscaler with Deleted flag, deleting object: %s", key)
c.releaseTargetReplicasOwnership(ctx, podAutoscalerInternal)
err := c.deletePodAutoscaler(ns, name)
if err != nil && k8serrors.IsNotFound(err) {
c.store.UnlockDelete(key, c.ID)
Expand Down Expand Up @@ -466,7 +468,7 @@ func (c *Controller) handleScaling(ctx context.Context, podAutoscaler *datadoghq

// TODO: While horizontal scaling is in progress we should not start vertical scaling
// While vertical scaling is in progress we should only allow horizontal scale up
horizontalRes, err := c.horizontalController.sync(ctx, podAutoscaler, podAutoscalerInternal, scale, gr, scaleErr)
horizontalRes, err := c.horizontalController.sync(ctx, podAutoscaler, podAutoscalerInternal, targetGVK, scale, gr, scaleErr)
if err != nil {
return horizontalRes, err
}
Expand Down Expand Up @@ -606,6 +608,26 @@ func (c *Controller) deletePodAutoscaler(ns, name string) error {
return nil
}

// releaseTargetReplicasOwnership best-effort removes the cluster agent's
// managed-fields entry for `.spec.replicas` (scale subresource) from the
// DPA's target workload. Called before deleting a DPA so that subsequent
// SSA writers (e.g. Helm) do not conflict with a stale field manager.
// Errors are logged but not returned — cleanup must not block deletion.
func (c *Controller) releaseTargetReplicasOwnership(ctx context.Context, podAutoscalerInternal model.PodAutoscalerInternal) {
spec := podAutoscalerInternal.Spec()
if spec == nil || spec.TargetRef.Name == "" {
return
}
targetGVK, err := podAutoscalerInternal.TargetGVK()
if err != nil {
log.Debugf("Skipping replicas-ownership release for %s/%s: unable to resolve target GVK: %v", podAutoscalerInternal.Namespace(), podAutoscalerInternal.Name(), err)
return
}
if err := c.scaler.releaseReplicasOwnership(ctx, podAutoscalerInternal.Namespace(), spec.TargetRef.Name, targetGVK); err != nil {
log.Warnf("Failed to release replicas ownership for %s %s/%s: %v", targetGVK.Kind, podAutoscalerInternal.Namespace(), spec.TargetRef.Name, err)
}
}

func (c *Controller) validateAutoscaler(podAutoscalerInternal model.PodAutoscalerInternal) error {
// Check that we are within the limit of 100 DatadogPodAutoscalers
key := podAutoscalerInternal.ID()
Expand Down
13 changes: 12 additions & 1 deletion pkg/clusteragent/autoscaling/workload/controller_horizontal.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,25 @@ func newHorizontalReconciler(clock clock.Clock, eventRecorder record.EventRecord
}
}

func (hr *horizontalController) sync(ctx context.Context, podAutoscaler *datadoghq.DatadogPodAutoscaler, autoscalerInternal *model.PodAutoscalerInternal, scale *autoscalingv1.Scale, gr schema.GroupResource, scaleErr error) (autoscaling.ProcessResult, error) {
func (hr *horizontalController) sync(ctx context.Context, podAutoscaler *datadoghq.DatadogPodAutoscaler, autoscalerInternal *model.PodAutoscalerInternal, targetGVK schema.GroupVersionKind, scale *autoscalingv1.Scale, gr schema.GroupResource, scaleErr error) (autoscaling.ProcessResult, error) {
// If we have no Spec, nothing to do
if autoscalerInternal.Spec() == nil {
return autoscaling.NoRequeue, nil
}

// If horizontal scaling is disabled, clear horizontal state and exit.
if !autoscalerInternal.IsHorizontalScalingEnabled() {
// If we previously scaled this target, release ownership of
// `.spec.replicas` so SSA writers (e.g. Helm) do not conflict with
// a stale field manager once horizontal scaling is off. Gated on
// HorizontalLastActions to avoid issuing a GET on every reconcile
// while horizontal stays disabled — ClearHorizontalState below
// nils out actions so subsequent ticks skip this branch.
if len(autoscalerInternal.HorizontalLastActions()) > 0 && autoscalerInternal.Spec().TargetRef.Name != "" {
if err := hr.scaler.releaseReplicasOwnership(ctx, autoscalerInternal.Namespace(), autoscalerInternal.Spec().TargetRef.Name, targetGVK); err != nil {
log.Warnf("Failed to release replicas ownership for %s %s/%s after disabling horizontal scaling: %v", targetGVK.Kind, autoscalerInternal.Namespace(), autoscalerInternal.Spec().TargetRef.Name, err)
}
}
autoscalerInternal.ClearHorizontalState()
Comment thread
celenechang marked this conversation as resolved.
return autoscaling.NoRequeue, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,16 @@ func (f *horizontalControllerFixture) runSync(fakePai *model.FakePodAutoscalerIn
// Pre-fetch scale subresource, mirroring what handleScaling does in the parent controller.
var scale *autoscalingv1.Scale
var gr schema.GroupResource
var targetGVK schema.GroupVersionKind
var scaleErr error
if autoscalerInternal.Spec() != nil {
if gvk, err := autoscalerInternal.TargetGVK(); err == nil {
targetGVK = gvk
scale, gr, scaleErr = f.scaler.get(context.Background(), fakePai.Namespace, autoscalerInternal.Spec().TargetRef.Name, gvk)
}
}

res, err := f.controller.sync(context.Background(), fakeAutoscaler, &autoscalerInternal, scale, gr, scaleErr)
res, err := f.controller.sync(context.Background(), fakeAutoscaler, &autoscalerInternal, targetGVK, scale, gr, scaleErr)
return autoscalerInternal, res, err
}

Expand Down Expand Up @@ -352,6 +354,58 @@ func TestHorizontalControllerSyncPrerequisites(t *testing.T) {
// assert.NoError(t, err)
}

func TestHorizontalControllerReleaseOwnershipOnDisable(t *testing.T) {
f := newHorizontalControllerFixture(t, time.Now())
autoscalerNamespace := "default"
autoscalerName := "test"

targetGVK := schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"}
disabledStrategy := datadoghqcommon.DatadogPodAutoscalerDisabledStrategySelect
disabledPolicy := &datadoghq.DatadogPodAutoscalerApplyPolicy{
ScaleUp: &datadoghqcommon.DatadogPodAutoscalerScalingPolicy{Strategy: &disabledStrategy},
ScaleDown: &datadoghqcommon.DatadogPodAutoscalerScalingPolicy{Strategy: &disabledStrategy},
}
spec := &datadoghq.DatadogPodAutoscalerSpec{
TargetRef: v2.CrossVersionObjectReference{
Name: autoscalerName,
Kind: targetGVK.Kind,
APIVersion: targetGVK.Group + "/" + targetGVK.Version,
},
ApplyPolicy: disabledPolicy,
}

// Case 1: horizontal disabled with prior actions → release exactly once.
fakePai := &model.FakePodAutoscalerInternal{
Namespace: autoscalerNamespace,
Name: autoscalerName,
Spec: spec,
TargetGVK: targetGVK,
HorizontalLastActions: []datadoghqcommon.DatadogPodAutoscalerHorizontalAction{
{Time: metav1.NewTime(f.clock.Now()), FromReplicas: 3, ToReplicas: 5},
},
}
f.scaler.mockGet(*fakePai, 5, 5, nil)
_, result, err := f.runSync(fakePai)
assert.Equal(t, autoscaling.NoRequeue, result)
assert.NoError(t, err)
f.scaler.AssertNumberOfCalls(t, "releaseReplicasOwnership", 1)
f.scaler.AssertCalled(t, "releaseReplicasOwnership", mock.Anything, autoscalerNamespace, autoscalerName, targetGVK)

// Case 2: horizontal disabled with no prior actions → no release.
f.resetFakeScaler()
fakePaiNoActions := &model.FakePodAutoscalerInternal{
Namespace: autoscalerNamespace,
Name: autoscalerName,
Spec: spec,
TargetGVK: targetGVK,
}
f.scaler.mockGet(*fakePaiNoActions, 5, 5, nil)
_, result, err = f.runSync(fakePaiNoActions)
assert.Equal(t, autoscaling.NoRequeue, result)
assert.NoError(t, err)
f.scaler.AssertNumberOfCalls(t, "releaseReplicasOwnership", 0)
}

func TestHorizontalControllerSyncScaleDecisions(t *testing.T) {
testTime := time.Now()
startTime := testTime.Add(-time.Hour)
Expand Down
6 changes: 6 additions & 0 deletions pkg/clusteragent/autoscaling/workload/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ func TestLeaderCreateDeleteRemote(t *testing.T) {
f.ExpectDeleteAction("default", "dpa-0")
f.RunControllerSync(true, "default/dpa-0")
assert.Len(t, f.store.GetAll(), 1) // Still in store
// The controller must release `.spec.replicas` ownership on the target
// workload before deleting the DPA, so SSA writers (e.g. Helm) do not
// conflict with a stale `datadog-cluster-agent` field manager.
f.scaler.AssertNumberOfCalls(t, "releaseReplicasOwnership", 1)
f.scaler.AssertCalled(t, "releaseReplicasOwnership", mock.Anything, "default", "app-0",
schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"})

// Next reconcile the controller is going to remove the object from the store
f.InformerObjects = nil
Expand Down
95 changes: 90 additions & 5 deletions pkg/clusteragent/autoscaling/workload/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,50 @@ package workload

import (
"context"
"encoding/json"
"fmt"

autoscalingv1 "k8s.io/api/autoscaling/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
scaleclient "k8s.io/client-go/scale"
)

// datadogClusterAgentFieldManager is the field-manager name Kubernetes
// records for writes the cluster agent issues against scaleable workloads.
// It is derived from the binary's user-agent. See the support case context
// in the PR description for how stale entries with this manager surface as
// SSA conflicts for users.
const datadogClusterAgentFieldManager = "datadog-cluster-agent"

type scaler interface {
get(ctx context.Context, namespace, name string, gvk schema.GroupVersionKind) (*autoscalingv1.Scale, schema.GroupResource, error)
update(ctx context.Context, gr schema.GroupResource, scale *autoscalingv1.Scale) (*autoscalingv1.Scale, error)
// releaseReplicasOwnership removes the cluster agent's managed-fields
// entry for the scale subresource on the target workload, so that
// server-side appliers (e.g. Helm SSA) can write `.spec.replicas`
// without conflicting with a stale entry left behind once the
// DatadogPodAutoscaler stops scaling the workload.
//
// Safe to call when no such entry exists (returns nil).
releaseReplicasOwnership(ctx context.Context, namespace, name string, gvk schema.GroupVersionKind) error
}

type scalerImpl struct {
restMapper apimeta.RESTMapper
scaleGetter scaleclient.ScalesGetter
restMapper apimeta.RESTMapper
scaleGetter scaleclient.ScalesGetter
dynamicClient dynamic.Interface
}

func newScaler(restMapper apimeta.RESTMapper, scaleGetter scaleclient.ScalesGetter) scaler {
func newScaler(restMapper apimeta.RESTMapper, scaleGetter scaleclient.ScalesGetter, dynamicClient dynamic.Interface) scaler {
return &scalerImpl{
restMapper: restMapper,
scaleGetter: scaleGetter,
restMapper: restMapper,
scaleGetter: scaleGetter,
dynamicClient: dynamicClient,
}
}

Expand Down Expand Up @@ -67,3 +88,67 @@ func (sg *scalerImpl) get(ctx context.Context, namespace, name string, gvk schem
func (sg *scalerImpl) update(ctx context.Context, gr schema.GroupResource, scale *autoscalingv1.Scale) (*autoscalingv1.Scale, error) {
return sg.scaleGetter.Scales(scale.Namespace).Update(ctx, gr, scale, metav1.UpdateOptions{})
}

func (sg *scalerImpl) releaseReplicasOwnership(ctx context.Context, namespace, name string, gvk schema.GroupVersionKind) error {
mappings, err := sg.restMapper.RESTMappings(gvk.GroupKind())
if err != nil {
return fmt.Errorf("failed to get REST mappings for GVK: %s", gvk)
}

var firstErr error
for i, mapping := range mappings {
gvr := mapping.Resource
err := sg.releaseReplicasOwnershipForGVR(ctx, namespace, name, gvr)
if err == nil {
return nil
}
if k8serrors.IsNotFound(err) {
// Target workload no longer exists — nothing left to release.
return nil
}
if i == 0 {
firstErr = err
}
}

if firstErr == nil {
return fmt.Errorf("unrecognized resource: %s", gvk)
}
return firstErr
}

func (sg *scalerImpl) releaseReplicasOwnershipForGVR(ctx context.Context, namespace, name string, gvr schema.GroupVersionResource) error {
obj, err := sg.dynamicClient.Resource(gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return err
}

// Collect indices of managedFields entries owned by the cluster agent
// on the scale subresource. Iterate in descending order so each remove
// op leaves earlier indices stable.
managedFields := obj.GetManagedFields()
var indices []int
for i, mf := range managedFields {
if mf.Manager == datadogClusterAgentFieldManager && mf.Subresource == "scale" {
indices = append(indices, i)
}
}
if len(indices) == 0 {
return nil
}

patch := make([]map[string]string, 0, len(indices))
for j := len(indices) - 1; j >= 0; j-- {
patch = append(patch, map[string]string{
"op": "remove",
"path": fmt.Sprintf("/metadata/managedFields/%d", indices[j]),
})
Comment thread
celenechang marked this conversation as resolved.
Outdated
}
body, err := json.Marshal(patch)
if err != nil {
return fmt.Errorf("failed to marshal managedFields patch: %w", err)
}

_, err = sg.dynamicClient.Resource(gvr).Namespace(namespace).Patch(ctx, name, types.JSONPatchType, body, metav1.PatchOptions{})
Comment thread
celenechang marked this conversation as resolved.
return err
}
19 changes: 18 additions & 1 deletion pkg/clusteragent/autoscaling/workload/scaler_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ type fakeScaler struct {
}

func newFakeScaler() *fakeScaler {
return &fakeScaler{}
fs := &fakeScaler{}
// Default Maybe-expectation: most existing tests don't care about the
// best-effort release-ownership cleanup path. Tests that need to assert
// on releaseReplicasOwnership can register a stricter expectation via
// mockReleaseReplicasOwnership and it will take precedence.
fs.On("releaseReplicasOwnership", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil).Maybe()
return fs
}

func (fs *fakeScaler) get(ctx context.Context, namespace, name string, gvk schema.GroupVersionKind) (*autoscalingv1.Scale, schema.GroupResource, error) {
Expand All @@ -35,6 +42,16 @@ func (fs *fakeScaler) update(ctx context.Context, gr schema.GroupResource, scale
return args.Get(0).(*autoscalingv1.Scale), args.Error(1)
}

func (fs *fakeScaler) releaseReplicasOwnership(ctx context.Context, namespace, name string, gvk schema.GroupVersionKind) error {
args := fs.Called(ctx, namespace, name, gvk)
return args.Error(0)
}

func (fs *fakeScaler) mockReleaseReplicasOwnership(pai model.FakePodAutoscalerInternal, err error) {
fs.On("releaseReplicasOwnership", mock.Anything, pai.Namespace, pai.Spec.TargetRef.Name, pai.TargetGVK).
Return(err)
}

func (fs *fakeScaler) mockGet(pai model.FakePodAutoscalerInternal, specReplicas, statusReplicas int32, err error) {
mockCall := fs.On("get", mock.Anything, pai.Namespace, pai.Spec.TargetRef.Name, pai.TargetGVK)
if err != nil {
Expand Down
Loading
Loading