Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/cmd/manager/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func addFlags(cmd *cobra.Command, opts *managerOptions) {
)
cmd.Flags().Float32(
operator.KubeClientQPSMGRFlag,
0,
300,
"Maximum number of queries per second to the Kubernetes API.",
)
zapFlagSet := flag.NewFlagSet("zap", flag.ExitOnError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
reconcilers := []reconciler{
{typ: "finalizer", rec: r.reconcileFinalizer},
{typ: "resources", rec: r.reconcileResources},
{typ: "redis", rec: r.reconcileRedis},
{typ: "status", rec: r.reconcileStatus},
{typ: "redisAndStatus", rec: r.reconcileRedisAndStatus},
}

for _, reconciler := range reconcilers {
Expand Down Expand Up @@ -344,7 +343,10 @@ func (r *Reconciler) sentinelResetIfNeed(ctx context.Context, inst *rrvb2.RedisR
return nil
}

func (r *Reconciler) reconcileRedis(ctx context.Context, instance *rrvb2.RedisReplication) (ctrl.Result, error) {
// reconcileRedisAndStatus combines the redis topology reconciliation and status/label
// update into a single step so that GetRedisNodesByRole is called only once per
// reconcile loop instead of twice (once in reconcileRedis, once in reconcileStatus).
func (r *Reconciler) reconcileRedisAndStatus(ctx context.Context, instance *rrvb2.RedisReplication) (ctrl.Result, error) {
if instance.EnableSentinel() {
if !r.IsStatefulSetReady(ctx, instance.Namespace, instance.SentinelStatefulSet()) {
return intctrlutil.RequeueAfter(ctx, time.Second*30, "waiting for sentinel statefulset to be ready")
Expand All @@ -354,7 +356,7 @@ func (r *Reconciler) reconcileRedis(ctx context.Context, instance *rrvb2.RedisRe
}
}

var realMaster string
// --- Single round-trip to Redis pods ---
masterNodes, err := k8sutils.GetRedisNodesByRole(ctx, r.K8sClient, instance, "master")
if err != nil {
return intctrlutil.RequeueE(ctx, err, "")
Expand All @@ -363,23 +365,20 @@ func (r *Reconciler) reconcileRedis(ctx context.Context, instance *rrvb2.RedisRe
if err != nil {
return intctrlutil.RequeueE(ctx, err, "")
}

// --- reconcileRedis logic ---
var realMaster string
if len(masterNodes) > 1 {
log.FromContext(ctx).Info("Creating redis replication by executing replication creation commands")

realMaster = k8sutils.GetRedisReplicationRealMaster(ctx, r.K8sClient, instance, masterNodes)

// Cascading fallback when no pod has connected_slaves > 0
if realMaster == "" {
// Fallback 1: use last-known master from Status.MasterNode if valid
if instance.Status.MasterNode != "" && k8sutils.IsPodRunning(ctx, r.K8sClient, instance.Namespace, instance.Status.MasterNode) {
log.FromContext(ctx).Info("No master with attached slaves found, falling back to Status.MasterNode",
"statusMasterNode", instance.Status.MasterNode)
realMaster = instance.Status.MasterNode
}
// Last resort: all pods are standalone masters (fresh cluster or full restart).
// Arbitrarily pick masterNodes[0] as the new master to bootstrap replication.
// This choice is stable within a reconcile cycle and will be corrected by
// Status.MasterNode on subsequent cycles once replication is established.
if realMaster == "" && len(masterNodes) > 0 {
log.FromContext(ctx).Info("No real master found via slave count or Status.MasterNode; "+
"electing first master node as bootstrap master", "podName", masterNodes[0])
Expand Down Expand Up @@ -416,7 +415,6 @@ func (r *Reconciler) reconcileRedis(ctx context.Context, instance *rrvb2.RedisRe
if instance.Spec.Size != nil && int(*instance.Spec.Size) != (len(masterNodes)+len(slaveNodes)) {
monitoring.RedisReplicationReplicasSizeMismatch.WithLabelValues(instance.Namespace, instance.Name).Set(1)
}

monitoring.RedisReplicationReplicasSizeCurrent.WithLabelValues(instance.Namespace, instance.Name).Set(float64(len(masterNodes) + len(slaveNodes)))
monitoring.RedisReplicationReplicasSizeDesired.WithLabelValues(instance.Namespace, instance.Name).Set(float64(*instance.Spec.Size))

Expand All @@ -426,31 +424,14 @@ func (r *Reconciler) reconcileRedis(ctx context.Context, instance *rrvb2.RedisRe
}
}

return intctrlutil.Reconciled()
}

// reconcileStatus update status and label.
func (r *Reconciler) reconcileStatus(ctx context.Context, instance *rrvb2.RedisReplication) (ctrl.Result, error) {
var err error
var realMaster string

masterNodes, err := k8sutils.GetRedisNodesByRole(ctx, r.K8sClient, instance, "master")
if err != nil {
return intctrlutil.RequeueE(ctx, err, "")
}
realMaster = k8sutils.GetRedisReplicationRealMaster(ctx, r.K8sClient, instance, masterNodes)
// --- reconcileStatus logic (reuses masterNodes/slaveNodes already fetched above) ---
if err = r.UpdateRedisReplicationMaster(ctx, instance, realMaster); err != nil {
return intctrlutil.RequeueE(ctx, err, "")
}
labels := common.GetRedisLabels(instance.GetName(), common.SetupTypeReplication, "replication", instance.GetLabels())
if err = r.Healer.UpdateRedisRoleLabel(ctx, instance.GetNamespace(), labels, instance.Spec.KubernetesConfig.ExistingPasswordSecret, instance.Spec.TLS); err != nil {
return intctrlutil.RequeueE(ctx, err, "")
}

slaveNodes, err := k8sutils.GetRedisNodesByRole(ctx, r.K8sClient, instance, "slave")
if err != nil {
return intctrlutil.RequeueE(ctx, err, "")
}
if realMaster != "" {
monitoring.RedisReplicationConnectedSlavesTotal.WithLabelValues(instance.Namespace, instance.Name).Set(float64(len(slaveNodes)))
} else {
Expand Down
24 changes: 17 additions & 7 deletions internal/controllerutil/resource_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllerutil

import (
"context"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -15,6 +16,7 @@ import (
// a watched object changes. It's designed to only be used for a single type of object.
// If multiple types should be watched, one ResourceWatcher for each type should be used.
type ResourceWatcher struct {
mu sync.RWMutex
watched map[types.NamespacedName][]types.NamespacedName
}

Expand All @@ -28,7 +30,10 @@ func NewResourceWatcher() *ResourceWatcher {
}

// Watch will add a new object to watch.
func (w ResourceWatcher) Watch(ctx context.Context, watchedName, dependentName types.NamespacedName) {
func (w *ResourceWatcher) Watch(ctx context.Context, watchedName, dependentName types.NamespacedName) {
w.mu.Lock()
defer w.mu.Unlock()

existing, hasExisting := w.watched[watchedName]
if !hasExisting {
existing = []types.NamespacedName{}
Expand All @@ -42,33 +47,38 @@ func (w ResourceWatcher) Watch(ctx context.Context, watchedName, dependentName t
w.watched[watchedName] = append(existing, dependentName)
}

func (w ResourceWatcher) Create(ctx context.Context, event event.CreateEvent, queue workqueue.RateLimitingInterface) {
func (w *ResourceWatcher) Create(ctx context.Context, event event.CreateEvent, queue workqueue.RateLimitingInterface) {
w.handleEvent(event.Object, queue)
}

func (w ResourceWatcher) Update(ctx context.Context, event event.UpdateEvent, queue workqueue.RateLimitingInterface) {
func (w *ResourceWatcher) Update(ctx context.Context, event event.UpdateEvent, queue workqueue.RateLimitingInterface) {
w.handleEvent(event.ObjectOld, queue)
}

func (w ResourceWatcher) Delete(ctx context.Context, event event.DeleteEvent, queue workqueue.RateLimitingInterface) {
func (w *ResourceWatcher) Delete(ctx context.Context, event event.DeleteEvent, queue workqueue.RateLimitingInterface) {
w.handleEvent(event.Object, queue)
}

func (w ResourceWatcher) Generic(ctx context.Context, event event.GenericEvent, queue workqueue.RateLimitingInterface) {
func (w *ResourceWatcher) Generic(ctx context.Context, event event.GenericEvent, queue workqueue.RateLimitingInterface) {
w.handleEvent(event.Object, queue)
}

// handleEvent is called when an event is received for an object.
// It will check if the object is being watched and trigger a reconciliation for
// the dependent object.
func (w ResourceWatcher) handleEvent(meta metav1.Object, queue workqueue.RateLimitingInterface) {
func (w *ResourceWatcher) handleEvent(meta metav1.Object, queue workqueue.RateLimitingInterface) {
changedObjectName := types.NamespacedName{
Name: meta.GetName(),
Namespace: meta.GetNamespace(),
}

w.mu.RLock()
deps := make([]types.NamespacedName, len(w.watched[changedObjectName]))
copy(deps, w.watched[changedObjectName])
w.mu.RUnlock()

// Enqueue reconciliation for each dependent object.
for _, dep := range w.watched[changedObjectName] {
for _, dep := range deps {
queue.Add(reconcile.Request{
NamespacedName: dep,
})
Expand Down
10 changes: 7 additions & 3 deletions internal/k8sutils/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,9 +705,13 @@ func GetRedisNodesByRole(ctx context.Context, cl kubernetes.Interface, cr *rrvb2

for i := 0; i < int(replicas); i++ {
podName := statefulset.Name + "-" + strconv.Itoa(i)
redisClient := configureRedisReplicationClient(ctx, cl, cr, podName)
defer redisClient.Close()
podRole, err := checkRedisServerRole(ctx, redisClient, podName)
// Use anonymous function so redisClient.Close() is called at end of each
// iteration rather than deferred until the outer function returns.
podRole, err := func() (string, error) {
redisClient := configureRedisReplicationClient(ctx, cl, cr, podName)
defer redisClient.Close()
return checkRedisServerRole(ctx, redisClient, podName)
}()
if err != nil {
return nil, err
}
Expand Down