Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions internal/agent/bootstrap/redis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ import (
)

// defaultRedisConfig from https://github.com/OT-CONTAINER-KIT/redis/blob/master/redis.conf
// tcp-keepalive is lowered from the Redis default (300s) to 60s so that dead
// peer connections are detected faster, which helps the cluster gossip layer
// converge sooner after pod restarts with new IPs.
const defaultRedisConfig = `
bind 0.0.0.0 ::
tcp-backlog 511
timeout 0
tcp-keepalive 300
tcp-keepalive 60
daemonize no
supervised no
pidfile /var/run/redis.pid
Expand Down Expand Up @@ -57,11 +60,23 @@ func GenerateConfig() error {
if clusterMode == "cluster" {
nodeConfPath := filepath.Join(nodeConfDir, "nodes.conf")

// Cluster-mode tuning (see also tcp-keepalive above):
// - cluster-node-timeout raised from 5000ms to 15000ms (configurable
// via CLUSTER_NODE_TIMEOUT) to give gossip time to converge after
// pod restarts before marking nodes as failed.
// - cluster-allow-reads-when-down (and pubsubshard variant on v7)
// keeps clients unblocked while the operator repairs nodes.
clusterNodeTimeout := util.CoalesceEnv1("CLUSTER_NODE_TIMEOUT", "15000")

cfg.Append("cluster-enabled", "yes")
cfg.Append("cluster-node-timeout", "5000")
cfg.Append("cluster-node-timeout", clusterNodeTimeout)
cfg.Append("cluster-require-full-coverage", "no")
cfg.Append("cluster-migration-barrier", "1")
cfg.Append("cluster-config-file", nodeConfPath)
cfg.Append("cluster-allow-reads-when-down", "yes")
if redisMajorVersion == "v7" {
cfg.Append("cluster-allow-pubsubshard-when-down", "yes")
}

if ip, err := util.GetLocalIP(); err != nil {
log.Printf("Warning: Failed to get local IP: %v", err)
Expand Down
24 changes: 19 additions & 5 deletions internal/controller/rediscluster/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return intctrlutil.Requeue()
}

logger.Info("healthy leader count does not match desired; attempting to repair disconnected masters")
if err = k8sutils.RepairDisconnectedMasters(ctx, r.K8sClient, instance); err != nil {
logger.Error(err, "failed to repair disconnected masters")
logger.Info("Cluster has unhealthy nodes; attempting to repair disconnected nodes")
if err = k8sutils.RepairDisconnectedNodes(ctx, r.K8sClient, instance); err != nil {
logger.Error(err, "failed to repair disconnected nodes")
}

err = retry.Do(func() error {
Expand All @@ -329,8 +329,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}, retry.Attempts(3), retry.Delay(time.Second*5))

if err == nil {
logger.Info("repairing unhealthy masters successful, no unhealthy masters left")
return intctrlutil.RequeueAfter(ctx, time.Second*30, "no unhealthy nodes found after repairing disconnected masters")
logger.Info("Repair successful, no unhealthy nodes left")
return intctrlutil.RequeueAfter(ctx, time.Second*30, "no unhealthy nodes found after repair")
}
// recheck if there's still a lot of unhealthy nodes after attempting to repair the masters
unhealthyNodeCount, err = k8sutils.UnhealthyNodesInCluster(ctx, r.K8sClient, instance)
Expand All @@ -342,6 +342,20 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}
}

// Repair followers that are connected in gossip but have broken replication
// (stale master IP after pod restart). This catches the case that
// RepairDisconnectedNodes misses: the follower isn't "fail"/"disconnected"
// but master_link_status is down.
if followerReplicas > 0 {
repaired, err := k8sutils.RepairStaleReplication(ctx, r.K8sClient, instance)
if err != nil {
logger.Error(err, "failed to repair stale replication links")
}
if repaired > 0 {
return intctrlutil.RequeueAfter(ctx, time.Second*15, "repaired stale replication, rechecking")
}
}

// Check If there is No Empty Master Node
if k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, instance, "") == totalReplicas {
k8sutils.CheckIfEmptyMasters(ctx, r.K8sClient, instance)
Expand Down
127 changes: 109 additions & 18 deletions internal/k8sutils/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,52 +138,140 @@ func CreateSingleLeaderRedisCommand(ctx context.Context, cr *rcvb2.RedisCluster)
return cmd
}

// RepairDisconnectedMasters attempts to repair disconnected/failed masters by issuing
// a CLUSTER MEET with the updated address of the host
func RepairDisconnectedMasters(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) error {
// RepairDisconnectedNodes attempts to repair disconnected/failed nodes (both masters and slaves)
// by issuing CLUSTER MEET with the updated address, and for slaves, re-establishing replication
// via CLUSTER REPLICATE so the follower resolves its master's current IP from gossip.
func RepairDisconnectedNodes(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) error {
redisClient := configureRedisClient(ctx, client, cr, cr.Name+"-leader-0")
defer redisClient.Close()
return repairDisconnectedMasters(ctx, client, cr, redisClient)
return repairDisconnectedNodes(ctx, client, cr, redisClient)
}

func repairDisconnectedMasters(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster, redisClient *redis.Client) error {
func repairDisconnectedNodes(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster, redisClient *redis.Client) error {
nodes, err := clusterNodes(ctx, redisClient)
if err != nil {
return err
}
masterNodeType := "master"
var lastError error
for _, node := range nodes {
if !nodeIsOfType(node, masterNodeType) {
continue
}
if !nodeFailedOrDisconnected(node) {
continue
}
host, err := getMasterHostFromClusterNode(node)
host, err := getHostFromClusterNode(node)
if err != nil {
lastError = err
log.FromContext(ctx).V(1).Error(err, "Failed to get pod name from cluster node. Continuing with other nodes.", "Node", node)
continue
}
podName := strings.Split(host, ".")[0]
ip := getRedisServerIP(ctx, client, RedisDetails{
// host may be FQDN like redis-cluster-leader-0.redis-cluster-leader-headless.default.svc.cluster.local
// or it may be like redis-cluster-leader-0
// we need to adapt
PodName: strings.Split(host, ".")[0],
PodName: podName,
Namespace: cr.Namespace,
})
err = redisClient.ClusterMeet(ctx, ip, strconv.Itoa(*cr.Spec.Port)).Err()
if err != nil {
if ip == "" {
lastError = fmt.Errorf("failed to get IP for pod %s", podName)
log.FromContext(ctx).V(1).Error(lastError, "Empty IP for pod, skipping.", "Pod", podName)
continue
}
if err = redisClient.ClusterMeet(ctx, ip, strconv.Itoa(*cr.Spec.Port)).Err(); err != nil {
lastError = err
log.FromContext(ctx).V(1).Error(err, "Failed to execute CLUSTER MEET on node. Continuing with other nodes.", "Node", node)
continue
}
if nodeIsOfType(node, "slave") {
masterNodeID := node[3]
followerClient := configureRedisClient(ctx, client, cr, podName)
if err = followerClient.ClusterReplicate(ctx, masterNodeID).Err(); err != nil {
lastError = err
log.FromContext(ctx).V(1).Error(err, "Failed to execute CLUSTER REPLICATE on follower.", "Follower", podName, "MasterNodeID", masterNodeID)
}
followerClient.Close()
}
}
return lastError
}

func getMasterHostFromClusterNode(node clusterNodesResponse) (string, error) {
// RepairStaleReplication checks connected followers for broken replication
// (master_link_status != up) and re-issues CLUSTER REPLICATE to force
// the follower to re-resolve its master's current IP from gossip.
// This handles the scenario where a master pod restarts with a new IP:
// gossip propagates the update, but follower replication remains
// pointed at the stale address until explicitly refreshed.
// Returns the number of followers that were repaired and any error.
func RepairStaleReplication(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster) (int, error) {
redisClient := configureRedisClient(ctx, client, cr, cr.Name+"-leader-0")
defer redisClient.Close()
return repairStaleReplication(ctx, redisClient, func(podName string) *redis.Client {
return configureRedisClient(ctx, client, cr, podName)
})
}

func repairStaleReplication(ctx context.Context, redisClient *redis.Client, makeClient func(podName string) *redis.Client) (int, error) {
logger := log.FromContext(ctx)

nodes, err := clusterNodes(ctx, redisClient)
if err != nil {
return 0, err
}

repaired := 0
var lastError error
for _, node := range nodes {
if !nodeIsOfType(node, "slave") {
continue
}
if nodeFailedOrDisconnected(node) {
continue
}
host, err := getHostFromClusterNode(node)
if err != nil {
lastError = err
continue
}
podName := strings.Split(host, ".")[0]
masterNodeID := node[3]

followerClient := makeClient(podName)
info, err := followerClient.Info(ctx, "replication").Result()
if err != nil {
followerClient.Close()
lastError = err
logger.V(1).Error(err, "Failed to get replication info", "Follower", podName)
continue
}

if replicationLinkUp(info) {
followerClient.Close()
continue
}

logger.Info("Follower replication link is down, re-issuing CLUSTER REPLICATE",
"Follower", podName, "MasterNodeID", masterNodeID)
if err = followerClient.ClusterReplicate(ctx, masterNodeID).Err(); err != nil {
lastError = err
logger.Error(err, "Failed to re-establish replication",
"Follower", podName, "MasterNodeID", masterNodeID)
} else {
repaired++
}
followerClient.Close()
}
return repaired, lastError
}

// replicationLinkUp returns true when the INFO Replication output
// contains master_link_status:up, indicating healthy replication.
// Returns true for master nodes (no master_link_status field).
func replicationLinkUp(info string) bool {
for _, line := range strings.Split(info, "\r\n") {
if strings.HasPrefix(line, "master_link_status:") {
return strings.TrimPrefix(line, "master_link_status:") == "up"
}
}
return true
}

func getHostFromClusterNode(node clusterNodesResponse) (string, error) {
addressAndHost := node[1]
s := strings.Split(addressAndHost, ",")
if len(s) != 2 {
Expand Down Expand Up @@ -404,7 +492,10 @@ func executeFailoverCommand(ctx context.Context, client kubernetes.Interface, cr
return nil
}

// CheckRedisNodeCount will check the count of redis nodes
// CheckRedisNodeCount will check the count of redis nodes known to the cluster
// (including failed/disconnected ones). This is used by the controller to
// decide whether the cluster topology exists at all. For detecting unhealthy
// nodes that need repair, use UnhealthyNodesInCluster instead.
func CheckRedisNodeCount(ctx context.Context, client kubernetes.Interface, cr *rcvb2.RedisCluster, nodeType string) int32 {
redisClient := configureRedisClient(ctx, client, cr, cr.Name+"-leader-0")
defer redisClient.Close()
Expand Down
90 changes: 85 additions & 5 deletions internal/k8sutils/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func TestCheckRedisNodePresence(t *testing.T) {
}
}

func TestRepairDisconnectedMasters(t *testing.T) {
func TestRepairDisconnectedNodes(t *testing.T) {
ctx := context.Background()
redisClient, mock := redismock.NewClientMock()
mock.ExpectClusterNodes().SetVal(`
07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1:30004@31004,redis-cluster-follower-0 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238317239 4 connected
67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1:30002@31002,redis-cluster-leader-0 master - 0 1426238316232 2 disconnected 5461-10922
824fe116063bc5fcf9f4ffd895bc17aee7731ac3 127.0.0.1:30006@31006,redis-cluster-follower-1 slave 292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 0 1426238317741 6 disconnected
824fe116063bc5fcf9f4ffd895bc17aee7731ac3 127.0.0.1:30006@31006,redis-cluster-follower-1 slave 292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 0 1426238317741 6 connected
e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,redis-cluster-leader-1 myself,master - 0 0 1 connected 0-5460
`)

Expand All @@ -77,7 +77,7 @@ e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,redis-cluster-lea
})
mock.ExpectClusterMeet(newPodIP, "6379").SetVal("OK")
port := 6379
err := repairDisconnectedMasters(ctx, k8sClient, &rcvb2.RedisCluster{
err := repairDisconnectedNodes(ctx, k8sClient, &rcvb2.RedisCluster{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
},
Expand All @@ -88,7 +88,7 @@ e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,redis-cluster-lea
assert.NoError(t, err)
}

func TestRepairDisconnectedMastersAttemptedOnAllFailedMasters(t *testing.T) {
func TestRepairDisconnectedNodesAttemptedOnAllFailedMasters(t *testing.T) {
ctx := context.Background()
redisClient, mock := redismock.NewClientMock()
mock.ExpectClusterNodes().SetVal(`
Expand Down Expand Up @@ -129,7 +129,7 @@ bffda5dec210cd73576a3993156dc134b5c63a4f :6379@16379,redis-cluster-leader-9 mast
k8sClient := k8sClientFake.NewSimpleClientset(k8sObjects...)

port := 6379
err := repairDisconnectedMasters(ctx, k8sClient, &rcvb2.RedisCluster{
err := repairDisconnectedNodes(ctx, k8sClient, &rcvb2.RedisCluster{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
},
Expand All @@ -141,6 +141,86 @@ bffda5dec210cd73576a3993156dc134b5c63a4f :6379@16379,redis-cluster-leader-9 mast
assert.Equal(t, expectedErr, err, "Expected error to match the one set in the mock")
}

func TestReplicationLinkUp(t *testing.T) {
tests := []struct {
name string
info string
expected bool
}{
{
name: "replication up",
info: "# Replication\r\nrole:slave\r\nmaster_host:10.0.0.1\r\nmaster_port:6379\r\nmaster_link_status:up\r\nmaster_last_io_seconds_ago:1\r\n",
expected: true,
},
{
name: "replication down",
info: "# Replication\r\nrole:slave\r\nmaster_host:10.0.0.1\r\nmaster_port:6379\r\nmaster_link_status:down\r\nmaster_last_io_seconds_ago:-1\r\n",
expected: false,
},
{
name: "master node - no master_link_status field",
info: "# Replication\r\nrole:master\r\nconnected_slaves:2\r\n",
expected: true,
},
{
name: "empty info",
info: "",
expected: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.expected, replicationLinkUp(tt.info))
})
}
}

func TestRepairStaleReplication_replicationDown(t *testing.T) {
ctx := context.Background()
redisClient, mock := redismock.NewClientMock()

mock.ExpectClusterNodes().SetVal(`
e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,redis-cluster-leader-0 myself,master - 0 0 1 connected 0-16383
07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1:30002@31002,redis-cluster-follower-0 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238317239 1 connected
`)

followerClient, followerMock := redismock.NewClientMock()
followerMock.ExpectInfo("replication").SetVal(
"# Replication\r\nrole:slave\r\nmaster_host:10.130.24.167\r\nmaster_port:6379\r\nmaster_link_status:down\r\nmaster_last_io_seconds_ago:-1\r\n",
)
followerMock.ExpectClusterReplicate("e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca").SetVal("OK")

masterNodeID := "e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca"
repaired, lastError := repairStaleReplication(ctx, redisClient, func(_ string) *redis.Client {
return followerClient
})
assert.NoError(t, lastError)
assert.Equal(t, 1, repaired)
_ = masterNodeID
}

func TestRepairStaleReplication_replicationUp(t *testing.T) {
ctx := context.Background()
redisClient, mock := redismock.NewClientMock()

mock.ExpectClusterNodes().SetVal(`
e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,redis-cluster-leader-0 myself,master - 0 0 1 connected 0-16383
07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1:30002@31002,redis-cluster-follower-0 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238317239 1 connected
`)

followerClient, followerMock := redismock.NewClientMock()
followerMock.ExpectInfo("replication").SetVal(
"# Replication\r\nrole:slave\r\nmaster_host:10.0.0.1\r\nmaster_port:6379\r\nmaster_link_status:up\r\nmaster_last_io_seconds_ago:0\r\n",
)

repaired, lastError := repairStaleReplication(ctx, redisClient, func(_ string) *redis.Client {
return followerClient
})
assert.NoError(t, lastError)
assert.Equal(t, 0, repaired)
}

func TestGetRedisServerIP(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading