Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/controller/common/redis/heal.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
rsvb2 "github.com/OT-CONTAINER-KIT/redis-operator/api/redissentinel/v1beta2"
"github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/common"
"github.com/OT-CONTAINER-KIT/redis-operator/internal/envs"
"github.com/OT-CONTAINER-KIT/redis-operator/internal/k8sutils"
"github.com/OT-CONTAINER-KIT/redis-operator/internal/service/redis"
"github.com/OT-CONTAINER-KIT/redis-operator/internal/util/cryptutil"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -69,6 +70,10 @@ func (h *healer) UpdateRedisRoleLabel(ctx context.Context, ns string, labels map
}
}
for _, pod := range pods.Items {
if !k8sutils.IsRedisPodProbeable(&pod) {
continue
}

connInfo := createConnectionInfo(ctx, pod, password, tlsConfig, h.k8s, ns, "6379")
isMaster, err := h.redis.Connect(connInfo).IsMaster(ctx)
if err != nil {
Expand Down
124 changes: 124 additions & 0 deletions internal/controller/common/redis/heal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package redis

import (
"context"
"testing"

common "github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/common"
redisservice "github.com/OT-CONTAINER-KIT/redis-operator/internal/service/redis"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sfake "k8s.io/client-go/kubernetes/fake"
)

func TestUpdateRedisRoleLabelSkipsUnprobeablePods(t *testing.T) {
labels := map[string]string{"app": "redis"}
clientset := k8sfake.NewSimpleClientset(
newLabeledRedisPod("redis-0", labels, "10.0.0.10", corev1.PodRunning, true),
newLabeledRedisPod("redis-1", labels, "", corev1.PodRunning, true),
newLabeledRedisPod("redis-2", labels, "10.0.0.12", corev1.PodRunning, false),
newLabeledRedisPod("redis-3", labels, "", corev1.PodPending, false),
)
redisClient := &fakeRedisClient{
isMasterByHost: map[string]bool{
"10.0.0.10": true,
},
}
h := &healer{
k8s: clientset,
redis: redisClient,
}

err := h.UpdateRedisRoleLabel(context.Background(), "default", labels, nil, nil)

require.NoError(t, err)
assert.Equal(t, []string{"10.0.0.10"}, redisClient.connectHosts)

readyPod, err := clientset.CoreV1().Pods("default").Get(context.Background(), "redis-0", metav1.GetOptions{})
require.NoError(t, err)
assert.Equal(t, common.RedisRoleLabelMaster, readyPod.Labels[common.RedisRoleLabelKey])

for _, podName := range []string{"redis-1", "redis-2", "redis-3"} {
pod, getErr := clientset.CoreV1().Pods("default").Get(context.Background(), podName, metav1.GetOptions{})
require.NoError(t, getErr)
assert.Empty(t, pod.Labels[common.RedisRoleLabelKey])
}
}

type fakeRedisClient struct {
connectHosts []string
isMasterByHost map[string]bool
}

func (f *fakeRedisClient) Connect(info *redisservice.ConnectionInfo) redisservice.Service {
f.connectHosts = append(f.connectHosts, info.Host)
return &fakeRedisService{
host: info.Host,
isMasterByHost: f.isMasterByHost,
}
}

type fakeRedisService struct {
host string
isMasterByHost map[string]bool
}

func (f *fakeRedisService) IsMaster(context.Context) (bool, error) {
return f.isMasterByHost[f.host], nil
}

func (f *fakeRedisService) GetAttachedReplicaCount(context.Context) (int, error) {
return 0, nil
}

func (f *fakeRedisService) SentinelMonitor(context.Context, *redisservice.ConnectionInfo, string, string) error {
return nil
}

func (f *fakeRedisService) SentinelSet(context.Context, string, string, string) error {
return nil
}

func (f *fakeRedisService) SentinelReset(context.Context, string) error {
return nil
}

func (f *fakeRedisService) GetInfoSentinel(context.Context) (*redisservice.InfoSentinelResult, error) {
return &redisservice.InfoSentinelResult{}, nil
}

func (f *fakeRedisService) GetClusterInfo(context.Context) (*redisservice.ClusterStatus, error) {
return &redisservice.ClusterStatus{}, nil
}

func newLabeledRedisPod(name string, labels map[string]string, podIP string, phase corev1.PodPhase, ready bool) *corev1.Pod {
podLabels := map[string]string{}
for key, value := range labels {
podLabels[key] = value
}

readyStatus := corev1.ConditionFalse
if ready {
readyStatus = corev1.ConditionTrue
}

return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
Labels: podLabels,
},
Status: corev1.PodStatus{
Phase: phase,
PodIP: podIP,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: readyStatus,
},
},
},
}
}
121 changes: 80 additions & 41 deletions internal/controller/redisreplication/redisreplication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ const (
type Reconciler struct {
client.Client
k8sutils.StatefulSet
Healer redishealer.Healer
K8sClient kubernetes.Interface
Healer redishealer.Healer
K8sClient kubernetes.Interface
RedisNodesByRole func(context.Context, kubernetes.Interface, *rrvb2.RedisReplication, string) ([]string, error)
RedisReplicationRealMaster func(context.Context, kubernetes.Interface, *rrvb2.RedisReplication, []string) string
CreateRedisReplicationLink func(context.Context, kubernetes.Interface, *rrvb2.RedisReplication, []string, string) error
ConfigureSentinel func(context.Context, *rrvb2.RedisReplication, string) error
}

func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -113,6 +117,46 @@ func connectionInfoEqual(a, b *rrvb2.ConnectionInfo) bool {
return a.Host == b.Host && a.Port == b.Port && a.MasterName == b.MasterName
}

func (r *Reconciler) redisNodesByRole(ctx context.Context, instance *rrvb2.RedisReplication, role string) ([]string, error) {
if r.RedisNodesByRole != nil {
return r.RedisNodesByRole(ctx, r.K8sClient, instance, role)
}
return k8sutils.GetRedisNodesByRole(ctx, r.K8sClient, instance, role)
}

func (r *Reconciler) redisReplicationRealMaster(ctx context.Context, instance *rrvb2.RedisReplication, masterPods []string) string {
if r.RedisReplicationRealMaster != nil {
return r.RedisReplicationRealMaster(ctx, r.K8sClient, instance, masterPods)
}
return k8sutils.GetRedisReplicationRealMaster(ctx, r.K8sClient, instance, masterPods)
}

func (r *Reconciler) createRedisReplicationLink(ctx context.Context, instance *rrvb2.RedisReplication, pods []string, realMaster string) error {
if r.CreateRedisReplicationLink != nil {
return r.CreateRedisReplicationLink(ctx, r.K8sClient, instance, pods, realMaster)
}
return k8sutils.CreateMasterSlaveReplication(ctx, r.K8sClient, instance, pods, realMaster)
}

func (r *Reconciler) configureReplicationSentinel(ctx context.Context, instance *rrvb2.RedisReplication, masterPodName string) error {
if r.ConfigureSentinel != nil {
return r.ConfigureSentinel(ctx, instance, masterPodName)
}
return r.configureSentinel(ctx, instance, masterPodName)
}

func (r *Reconciler) observedRedisReplicationMaster(ctx context.Context, instance *rrvb2.RedisReplication, masterPods []string) (string, bool) {
switch len(masterPods) {
case 0:
return "", false
case 1:
return masterPods[0], true
default:
realMaster := r.redisReplicationRealMaster(ctx, instance, masterPods)
return realMaster, realMaster != ""
}
}

type reconciler struct {
typ string
rec func(ctx context.Context, instance *rrvb2.RedisReplication) (ctrl.Result, error)
Expand Down Expand Up @@ -355,60 +399,51 @@ func (r *Reconciler) reconcileRedis(ctx context.Context, instance *rrvb2.RedisRe
}

var realMaster string
masterNodes, err := k8sutils.GetRedisNodesByRole(ctx, r.K8sClient, instance, "master")
masterNodes, err := r.redisNodesByRole(ctx, instance, "master")
if err != nil {
return intctrlutil.RequeueE(ctx, err, "")
}
slaveNodes, err := k8sutils.GetRedisNodesByRole(ctx, r.K8sClient, instance, "slave")
slaveNodes, err := r.redisNodesByRole(ctx, instance, "slave")
if err != nil {
return intctrlutil.RequeueE(ctx, err, "")
}
incompleteTopology := instance.Spec.Size != nil && (len(masterNodes)+len(slaveNodes)) < int(*instance.Spec.Size)
realMaster, masterPositivelyIdentified := r.observedRedisReplicationMaster(ctx, instance, masterNodes)
if len(masterNodes) > 1 {
log.FromContext(ctx).Info("Creating redis replication by executing replication creation commands")

realMaster = k8sutils.GetRedisReplicationRealMaster(ctx, r.K8sClient, instance, masterNodes)

// Cascading fallback when no pod has connected_slaves > 0
if realMaster == "" {
// Fallback 1: use last-known master from Status.MasterNode if valid
if instance.Status.MasterNode != "" && k8sutils.IsPodRunning(ctx, r.K8sClient, instance.Namespace, instance.Status.MasterNode) {
log.FromContext(ctx).Info("No master with attached slaves found, falling back to Status.MasterNode",
"statusMasterNode", instance.Status.MasterNode)
realMaster = instance.Status.MasterNode
}
// Last resort: all pods are standalone masters (fresh cluster or full restart).
// Arbitrarily pick masterNodes[0] as the new master to bootstrap replication.
// This choice is stable within a reconcile cycle and will be corrected by
// Status.MasterNode on subsequent cycles once replication is established.
if realMaster == "" && len(masterNodes) > 0 {
log.FromContext(ctx).Info("No real master found via slave count or Status.MasterNode; "+
"electing first master node as bootstrap master", "podName", masterNodes[0])
realMaster = masterNodes[0]
}
}

if realMaster == "" {
log.FromContext(ctx).Error(nil, "No valid master found after all fallbacks, requeueing")
return intctrlutil.RequeueAfter(ctx, time.Second*60, "no valid master found")
if realMaster == "" && len(slaveNodes) == 0 && !incompleteTopology {
realMaster = masterNodes[0]
}
if err := k8sutils.CreateMasterSlaveReplication(ctx, r.K8sClient, instance, masterNodes, realMaster); err != nil {
if incompleteTopology {
log.FromContext(ctx).Info("Skipping replication reconfiguration because the observed topology is incomplete",
"observedPods", len(masterNodes)+len(slaveNodes),
"expectedPods", *instance.Spec.Size)
} else if realMaster == "" {
log.FromContext(ctx).Info("Skipping replication reconfiguration because the current master could not be identified")
} else if err := r.createRedisReplicationLink(ctx, instance, masterNodes, realMaster); err != nil {
return intctrlutil.RequeueAfter(ctx, time.Second*60, "")
}
} else if len(masterNodes) == 1 && len(slaveNodes) > 0 {
realMaster = masterNodes[0]
currentRealMaster := k8sutils.GetRedisReplicationRealMaster(ctx, r.K8sClient, instance, masterNodes)
currentRealMaster := r.redisReplicationRealMaster(ctx, instance, masterNodes)

if currentRealMaster == "" && !instance.EnableSentinel() {
log.FromContext(ctx).Info("Detected disconnected slaves, reconfiguring replication",
"master", realMaster, "slaves", slaveNodes)

allPods := append(masterNodes, slaveNodes...)
if err := k8sutils.CreateMasterSlaveReplication(ctx, r.K8sClient, instance, allPods, realMaster); err != nil {
log.FromContext(ctx).Error(err, "Failed to reconfigure master-slave replication",
"master", realMaster, "slaves", slaveNodes)
return intctrlutil.RequeueAfter(ctx, time.Second*60, "")
if incompleteTopology {
log.FromContext(ctx).Info("Skipping master-slave reconfiguration because the observed topology is incomplete",
"observedPods", len(masterNodes)+len(slaveNodes),
"expectedPods", *instance.Spec.Size)
} else {
allPods := append(masterNodes, slaveNodes...)
if err := r.createRedisReplicationLink(ctx, instance, allPods, realMaster); err != nil {
log.FromContext(ctx).Error(err, "Failed to reconfigure master-slave replication",
"master", realMaster, "slaves", slaveNodes)
return intctrlutil.RequeueAfter(ctx, time.Second*60, "")
}
log.FromContext(ctx).Info("Successfully reconfigured slave replication")
}
log.FromContext(ctx).Info("Successfully reconfigured slave replication")
}
}

Expand All @@ -421,7 +456,11 @@ func (r *Reconciler) reconcileRedis(ctx context.Context, instance *rrvb2.RedisRe
monitoring.RedisReplicationReplicasSizeDesired.WithLabelValues(instance.Namespace, instance.Name).Set(float64(*instance.Spec.Size))

if instance.EnableSentinel() {
if err := r.configureSentinel(ctx, instance, realMaster); err != nil {
if incompleteTopology && !masterPositivelyIdentified {
log.FromContext(ctx).Info("Skipping sentinel reconfiguration because topology is incomplete and the master is ambiguous",
"observedPods", len(masterNodes)+len(slaveNodes),
"expectedPods", *instance.Spec.Size)
} else if err := r.configureReplicationSentinel(ctx, instance, realMaster); err != nil {
log.FromContext(ctx).Error(err, "failed to configure sentinel")
}
}
Expand All @@ -434,11 +473,11 @@ func (r *Reconciler) reconcileStatus(ctx context.Context, instance *rrvb2.RedisR
var err error
var realMaster string

masterNodes, err := k8sutils.GetRedisNodesByRole(ctx, r.K8sClient, instance, "master")
masterNodes, err := r.redisNodesByRole(ctx, instance, "master")
if err != nil {
return intctrlutil.RequeueE(ctx, err, "")
}
realMaster = k8sutils.GetRedisReplicationRealMaster(ctx, r.K8sClient, instance, masterNodes)
realMaster, _ = r.observedRedisReplicationMaster(ctx, instance, masterNodes)
if err = r.UpdateRedisReplicationMaster(ctx, instance, realMaster); err != nil {
return intctrlutil.RequeueE(ctx, err, "")
}
Expand All @@ -447,7 +486,7 @@ func (r *Reconciler) reconcileStatus(ctx context.Context, instance *rrvb2.RedisR
return intctrlutil.RequeueE(ctx, err, "")
}

slaveNodes, err := k8sutils.GetRedisNodesByRole(ctx, r.K8sClient, instance, "slave")
slaveNodes, err := r.redisNodesByRole(ctx, instance, "slave")
if err != nil {
return intctrlutil.RequeueE(ctx, err, "")
}
Expand Down
Loading