Skip to content

Commit 01eff19

Browse files
committed
Skip stale worker heartbeats from orphaned worker directories
The supervisor builds the heartbeat batch it sends to Nimbus from every worker directory present on disk, with no check on whether the worker is still alive. If a worker directory is ever left behind (a worker that died before the supervisor finished cleanup), its frozen heartbeat keeps being reported until the next supervisor restart -- the only time orphaned directories are reclaimed. Nimbus then repeatedly reads the (often deleted) topology conf, flooding its log with "Exception when getting heartbeat timeout" / NotAliveException. Filter out heartbeats older than supervisor.worker.timeout.secs in ReportWorkerHeartbeats: such a worker is already considered dead by the same timeout the slot uses, so it should not be reported. A live worker always refreshes its heartbeat well within the timeout, so this never drops a valid heartbeat.
1 parent 85a5cbc commit 01eff19

2 files changed

Lines changed: 114 additions & 1 deletion

File tree

storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.ArrayList;
1616
import java.util.List;
1717
import java.util.Map;
18+
import org.apache.storm.Config;
1819
import org.apache.storm.daemon.supervisor.Supervisor;
1920
import org.apache.storm.daemon.supervisor.SupervisorUtils;
2021
import org.apache.storm.generated.LSWorkerHeartbeat;
@@ -23,6 +24,8 @@
2324
import org.apache.storm.thrift.TException;
2425
import org.apache.storm.utils.ConfigUtils;
2526
import org.apache.storm.utils.NimbusClient;
27+
import org.apache.storm.utils.ObjectReader;
28+
import org.apache.storm.utils.Time;
2629
import org.slf4j.Logger;
2730
import org.slf4j.LoggerFactory;
2831

@@ -36,10 +39,12 @@ public class ReportWorkerHeartbeats implements Runnable {
3639

3740
private Supervisor supervisor;
3841
private Map<String, Object> conf;
42+
private final int workerTimeoutSecs;
3943

4044
public ReportWorkerHeartbeats(Map<String, Object> conf, Supervisor supervisor) {
4145
this.conf = conf;
4246
this.supervisor = supervisor;
47+
this.workerTimeoutSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
4348
}
4449

4550
@Override
@@ -59,7 +64,7 @@ private SupervisorWorkerHeartbeats getAndResetWorkerHeartbeats() {
5964
}
6065
}
6166

62-
private SupervisorWorkerHeartbeats getSupervisorWorkerHeartbeatsFromLocal(Map<String, LSWorkerHeartbeat> localHeartbeats) {
67+
SupervisorWorkerHeartbeats getSupervisorWorkerHeartbeatsFromLocal(Map<String, LSWorkerHeartbeat> localHeartbeats) {
6368
SupervisorWorkerHeartbeats supervisorWorkerHeartbeats = new SupervisorWorkerHeartbeats();
6469

6570
List<SupervisorWorkerHeartbeat> heartbeatList = new ArrayList<>();
@@ -70,6 +75,18 @@ private SupervisorWorkerHeartbeats getSupervisorWorkerHeartbeatsFromLocal(Map<St
7075
continue;
7176
}
7277

78+
// Skip stale heartbeats left by worker directories that were never cleaned up
79+
// (e.g. a worker that died before the supervisor finished cleanup). Such a worker
80+
// has not heartbeat within the timeout, so it is already considered dead; forwarding
81+
// it would make Nimbus repeatedly read the (often deleted) topology conf and log noise.
82+
// A live worker always refreshes its heartbeat well within the timeout.
83+
int hbAgeSecs = Time.currentTimeSecs() - lsWorkerHeartbeat.get_time_secs();
84+
if (hbAgeSecs > workerTimeoutSecs) {
85+
LOG.debug("Skipping stale heartbeat for topology {}: age {}s > worker timeout {}s",
86+
lsWorkerHeartbeat.get_topology_id(), hbAgeSecs, workerTimeoutSecs);
87+
continue;
88+
}
89+
7390
SupervisorWorkerHeartbeat supervisorWorkerHeartbeat = new SupervisorWorkerHeartbeat();
7491
supervisorWorkerHeartbeat.set_storm_id(lsWorkerHeartbeat.get_topology_id());
7592
supervisorWorkerHeartbeat.set_executors(lsWorkerHeartbeat.get_executors());
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
3+
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
4+
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
10+
* and limitations under the License.
11+
*/
12+
13+
package org.apache.storm.daemon.supervisor.timer;
14+
15+
import static org.junit.jupiter.api.Assertions.assertEquals;
16+
import static org.mockito.Mockito.mock;
17+
import static org.mockito.Mockito.when;
18+
19+
import java.util.HashMap;
20+
import java.util.LinkedHashMap;
21+
import java.util.Map;
22+
import java.util.Set;
23+
import java.util.stream.Collectors;
24+
import org.apache.storm.Config;
25+
import org.apache.storm.daemon.supervisor.Supervisor;
26+
import org.apache.storm.generated.LSWorkerHeartbeat;
27+
import org.apache.storm.generated.SupervisorWorkerHeartbeat;
28+
import org.apache.storm.generated.SupervisorWorkerHeartbeats;
29+
import org.apache.storm.utils.Time;
30+
import org.junit.jupiter.api.Test;
31+
32+
public class ReportWorkerHeartbeatsTest {
33+
34+
private static final int WORKER_TIMEOUT_SECS = 30;
35+
private static final String SUPERVISOR_ID = "test-supervisor";
36+
37+
private static LSWorkerHeartbeat mkHeartbeat(String topologyId, int timeSecs) {
38+
LSWorkerHeartbeat hb = new LSWorkerHeartbeat();
39+
hb.set_topology_id(topologyId);
40+
hb.set_time_secs(timeSecs);
41+
return hb;
42+
}
43+
44+
private ReportWorkerHeartbeats mkReporter() {
45+
Map<String, Object> conf = new HashMap<>();
46+
conf.put(Config.SUPERVISOR_WORKER_TIMEOUT_SECS, WORKER_TIMEOUT_SECS);
47+
Supervisor supervisor = mock(Supervisor.class);
48+
when(supervisor.getId()).thenReturn(SUPERVISOR_ID);
49+
return new ReportWorkerHeartbeats(conf, supervisor);
50+
}
51+
52+
private static Set<String> reportedTopologies(SupervisorWorkerHeartbeats heartbeats) {
53+
return heartbeats.get_worker_heartbeats().stream()
54+
.map(SupervisorWorkerHeartbeat::get_storm_id)
55+
.collect(Collectors.toSet());
56+
}
57+
58+
@Test
59+
public void freshHeartbeatsAreReportedAndStaleOnesAreFilteredOut() {
60+
try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
61+
Time.advanceTimeSecs(100_000);
62+
int now = Time.currentTimeSecs();
63+
64+
Map<String, LSWorkerHeartbeat> local = new LinkedHashMap<>();
65+
// live worker: heartbeat just refreshed
66+
local.put("w-fresh", mkHeartbeat("topo-fresh", now));
67+
// exactly at the timeout boundary is still considered alive (age == timeout, not > timeout)
68+
local.put("w-boundary", mkHeartbeat("topo-boundary", now - WORKER_TIMEOUT_SECS));
69+
// just past the timeout: the worker is already considered dead
70+
local.put("w-stale", mkHeartbeat("topo-stale", now - WORKER_TIMEOUT_SECS - 1));
71+
// orphaned worker directory left behind long ago for a topology that no longer exists
72+
local.put("w-orphan", mkHeartbeat("topo-orphan", now - 86_400));
73+
74+
SupervisorWorkerHeartbeats result = mkReporter().getSupervisorWorkerHeartbeatsFromLocal(local);
75+
76+
assertEquals(SUPERVISOR_ID, result.get_supervisor_id());
77+
assertEquals(Set.of("topo-fresh", "topo-boundary"), reportedTopologies(result));
78+
}
79+
}
80+
81+
@Test
82+
public void nullLocalHeartbeatsAreSkipped() {
83+
try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
84+
Time.advanceTimeSecs(100_000);
85+
int now = Time.currentTimeSecs();
86+
87+
Map<String, LSWorkerHeartbeat> local = new LinkedHashMap<>();
88+
local.put("w-null", null);
89+
local.put("w-fresh", mkHeartbeat("topo-fresh", now));
90+
91+
SupervisorWorkerHeartbeats result = mkReporter().getSupervisorWorkerHeartbeatsFromLocal(local);
92+
93+
assertEquals(Set.of("topo-fresh"), reportedTopologies(result));
94+
}
95+
}
96+
}

0 commit comments

Comments
 (0)