Skip to content

Commit 6f8b94d

Browse files
authored
Merge pull request #8788 from dpol1/fix/7897
Fix Y2038 worker heartbeat overflow by migrating timestamps to i64
2 parents 85a5cbc + b562a74 commit 6f8b94d

20 files changed

Lines changed: 360 additions & 87 deletions

File tree

docs/Cluster-State-Serialization.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,34 @@ non-Thrift Java-serialization variant).
122122
The zstd codec is provided by Apache Commons Compress
123123
(`org.apache.commons:commons-compress`) backed by the `com.github.luben:zstd-jni`
124124
native binding.
125+
126+
## Heartbeat timestamps and the year 2038
127+
128+
Since STORM-7897, the `time_secs` field of `ClusterWorkerHeartbeat`,
129+
`SupervisorWorkerHeartbeat` and `LSWorkerHeartbeat` is a 64-bit integer
130+
(`i64`), and all heartbeat writers and timeout checks use the long-based
131+
clock (`Time.currentTimeSecsLong()` / `Time.deltaSecsLong(...)`). Earlier
132+
releases carried these timestamps as `i32` seconds, which overflows on
133+
2038-01-19T03:14:07Z and would have caused Nimbus to treat live workers
134+
as dead.
135+
136+
`uptime_secs` fields remain `i32`: they are relative durations, not
137+
absolute timestamps. `Time.currentTimeSecs()` and `Time.deltaSecs(int)`
138+
are deprecated but retained for such relative-duration callers.
139+
140+
### Upgrade implications
141+
142+
Thrift tags `i32` and `i64` values differently on the wire, so heartbeat
143+
blobs written by a pre-upgrade daemon do **not** deserialize under the
144+
new schema: the reader skips the mistyped field and the blob then fails
145+
required-field validation with a `TProtocolException`. In practice:
146+
147+
1. **A full-cluster upgrade is required.** Do not run a mixed-version
148+
cluster across Nimbus, Supervisors and workers: heartbeats do not
149+
round-trip between the old and new schema in either direction.
150+
2. **In-flight heartbeats are dropped once, then self-heal.** Nimbus
151+
times workers out by *receipt* time, so a dropped heartbeat is
152+
replaced on the next report interval; supervisors rewrite their
153+
on-disk `LSWorkerHeartbeat` local state the same way. Expect at most
154+
one report cycle of staleness around the restart, which is within the
155+
normal tolerance of a full-cluster bounce.

storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ public static Map<ExecutorInfo, ExecutorBeat> convertExecutorBeats(List<Executor
262262
Map<ExecutorInfo, ExecutorStats> executorStatsMap = workerHeartbeat.get_executor_stats();
263263
for (ExecutorInfo executor : executors) {
264264
if (executorStatsMap.containsKey(executor)) {
265-
int time = workerHeartbeat.get_time_secs();
265+
long time = workerHeartbeat.get_time_secs();
266266
int uptime = workerHeartbeat.get_uptime_secs();
267267
ExecutorStats executorStats = workerHeartbeat.get_executor_stats().get(executor);
268268
ExecutorBeat executorBeat = new ExecutorBeat(time, uptime, executorStats);

storm-client/src/jvm/org/apache/storm/cluster/ExecutorBeat.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,17 @@
1515
import org.apache.storm.generated.ExecutorStats;
1616

1717
public class ExecutorBeat {
18-
private final int timeSecs;
18+
private final long timeSecs;
1919
private final int uptime;
2020
private final ExecutorStats stats;
2121

22-
public ExecutorBeat(int timeSecs, int uptime, ExecutorStats stats) {
22+
public ExecutorBeat(long timeSecs, int uptime, ExecutorStats stats) {
2323
this.timeSecs = timeSecs;
2424
this.uptime = uptime;
2525
this.stats = stats;
2626
}
2727

28-
public int getTimeSecs() {
28+
public long getTimeSecs() {
2929
return timeSecs;
3030
}
3131

storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public byte[] get_worker_hb(String path, boolean watch) {
149149
while (true) {
150150
try {
151151
byte[] ret = null;
152-
int latestTimeSecs = 0;
152+
long latestTimeSecs = 0;
153153
boolean gotResponse = false;
154154

155155
HBMessage message = new HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path));

storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ public void doRefreshLoad() {
387387

388388
public void doHeartBeat() throws IOException {
389389
LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
390-
LSWorkerHeartbeat lsWorkerHeartbeat = new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
390+
LSWorkerHeartbeat lsWorkerHeartbeat = new LSWorkerHeartbeat(Time.currentTimeSecsLong(), workerState.topologyId,
391391
workerState.localExecutors.stream()
392392
.map(executor -> new ExecutorInfo(
393393
executor.get(0).intValue(),

storm-client/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ public class ClusterWorkerHeartbeat implements org.apache.storm.thrift.TBase<Clu
3030

3131
private static final org.apache.storm.thrift.protocol.TField STORM_ID_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("storm_id", org.apache.storm.thrift.protocol.TType.STRING, (short)1);
3232
private static final org.apache.storm.thrift.protocol.TField EXECUTOR_STATS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("executor_stats", org.apache.storm.thrift.protocol.TType.MAP, (short)2);
33-
private static final org.apache.storm.thrift.protocol.TField TIME_SECS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("time_secs", org.apache.storm.thrift.protocol.TType.I32, (short)3);
33+
private static final org.apache.storm.thrift.protocol.TField TIME_SECS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("time_secs", org.apache.storm.thrift.protocol.TType.I64, (short)3);
3434
private static final org.apache.storm.thrift.protocol.TField UPTIME_SECS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("uptime_secs", org.apache.storm.thrift.protocol.TType.I32, (short)4);
3535

3636
private static final org.apache.storm.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new ClusterWorkerHeartbeatStandardSchemeFactory();
3737
private static final org.apache.storm.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new ClusterWorkerHeartbeatTupleSchemeFactory();
3838

3939
private @org.apache.storm.thrift.annotation.Nullable java.lang.String storm_id; // required
4040
private @org.apache.storm.thrift.annotation.Nullable java.util.Map<ExecutorInfo,ExecutorStats> executor_stats; // required
41-
private int time_secs; // required
41+
private long time_secs; // required
4242
private int uptime_secs; // required
4343

4444
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
@@ -126,7 +126,7 @@ public java.lang.String getFieldName() {
126126
new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, ExecutorInfo.class),
127127
new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, ExecutorStats.class))));
128128
tmpMap.put(_Fields.TIME_SECS, new org.apache.storm.thrift.meta_data.FieldMetaData("time_secs", org.apache.storm.thrift.TFieldRequirementType.REQUIRED,
129-
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I32)));
129+
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I64)));
130130
tmpMap.put(_Fields.UPTIME_SECS, new org.apache.storm.thrift.meta_data.FieldMetaData("uptime_secs", org.apache.storm.thrift.TFieldRequirementType.REQUIRED,
131131
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I32)));
132132
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
@@ -139,7 +139,7 @@ public ClusterWorkerHeartbeat() {
139139
public ClusterWorkerHeartbeat(
140140
java.lang.String storm_id,
141141
java.util.Map<ExecutorInfo,ExecutorStats> executor_stats,
142-
int time_secs,
142+
long time_secs,
143143
int uptime_secs)
144144
{
145145
this();
@@ -252,11 +252,11 @@ public void set_executor_stats_isSet(boolean value) {
252252
}
253253
}
254254

255-
public int get_time_secs() {
255+
public long get_time_secs() {
256256
return this.time_secs;
257257
}
258258

259-
public void set_time_secs(int time_secs) {
259+
public void set_time_secs(long time_secs) {
260260
this.time_secs = time_secs;
261261
set_time_secs_isSet(true);
262262
}
@@ -319,7 +319,7 @@ public void setFieldValue(_Fields field, @org.apache.storm.thrift.annotation.Nul
319319
if (value == null) {
320320
unset_time_secs();
321321
} else {
322-
set_time_secs((java.lang.Integer)value);
322+
set_time_secs((java.lang.Long)value);
323323
}
324324
break;
325325

@@ -438,7 +438,7 @@ public int hashCode() {
438438
if (is_set_executor_stats())
439439
hashCode = hashCode * 8191 + executor_stats.hashCode();
440440

441-
hashCode = hashCode * 8191 + time_secs;
441+
hashCode = hashCode * 8191 + org.apache.storm.thrift.TBaseHelper.hashCode(time_secs);
442442

443443
hashCode = hashCode * 8191 + uptime_secs;
444444

@@ -634,8 +634,8 @@ public void read(org.apache.storm.thrift.protocol.TProtocol iprot, ClusterWorker
634634
}
635635
break;
636636
case 3: // TIME_SECS
637-
if (schemeField.type == org.apache.storm.thrift.protocol.TType.I32) {
638-
struct.time_secs = iprot.readI32();
637+
if (schemeField.type == org.apache.storm.thrift.protocol.TType.I64) {
638+
struct.time_secs = iprot.readI64();
639639
struct.set_time_secs_isSet(true);
640640
} else {
641641
org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
@@ -682,7 +682,7 @@ public void write(org.apache.storm.thrift.protocol.TProtocol oprot, ClusterWorke
682682
oprot.writeFieldEnd();
683683
}
684684
oprot.writeFieldBegin(TIME_SECS_FIELD_DESC);
685-
oprot.writeI32(struct.time_secs);
685+
oprot.writeI64(struct.time_secs);
686686
oprot.writeFieldEnd();
687687
oprot.writeFieldBegin(UPTIME_SECS_FIELD_DESC);
688688
oprot.writeI32(struct.uptime_secs);
@@ -714,7 +714,7 @@ public void write(org.apache.storm.thrift.protocol.TProtocol prot, ClusterWorker
714714
_iter827.getValue().write(oprot);
715715
}
716716
}
717-
oprot.writeI32(struct.time_secs);
717+
oprot.writeI64(struct.time_secs);
718718
oprot.writeI32(struct.uptime_secs);
719719
}
720720

@@ -738,7 +738,7 @@ public void read(org.apache.storm.thrift.protocol.TProtocol prot, ClusterWorkerH
738738
}
739739
}
740740
struct.set_executor_stats_isSet(true);
741-
struct.time_secs = iprot.readI32();
741+
struct.time_secs = iprot.readI64();
742742
struct.set_time_secs_isSet(true);
743743
struct.uptime_secs = iprot.readI32();
744744
struct.set_uptime_secs_isSet(true);

storm-client/src/jvm/org/apache/storm/generated/LSWorkerHeartbeat.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@
2828
public class LSWorkerHeartbeat implements org.apache.storm.thrift.TBase<LSWorkerHeartbeat, LSWorkerHeartbeat._Fields>, java.io.Serializable, Cloneable, Comparable<LSWorkerHeartbeat> {
2929
private static final org.apache.storm.thrift.protocol.TStruct STRUCT_DESC = new org.apache.storm.thrift.protocol.TStruct("LSWorkerHeartbeat");
3030

31-
private static final org.apache.storm.thrift.protocol.TField TIME_SECS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("time_secs", org.apache.storm.thrift.protocol.TType.I32, (short)1);
31+
private static final org.apache.storm.thrift.protocol.TField TIME_SECS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("time_secs", org.apache.storm.thrift.protocol.TType.I64, (short)1);
3232
private static final org.apache.storm.thrift.protocol.TField TOPOLOGY_ID_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("topology_id", org.apache.storm.thrift.protocol.TType.STRING, (short)2);
3333
private static final org.apache.storm.thrift.protocol.TField EXECUTORS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("executors", org.apache.storm.thrift.protocol.TType.LIST, (short)3);
3434
private static final org.apache.storm.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("port", org.apache.storm.thrift.protocol.TType.I32, (short)4);
3535

3636
private static final org.apache.storm.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new LSWorkerHeartbeatStandardSchemeFactory();
3737
private static final org.apache.storm.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new LSWorkerHeartbeatTupleSchemeFactory();
3838

39-
private int time_secs; // required
39+
private long time_secs; // required
4040
private @org.apache.storm.thrift.annotation.Nullable java.lang.String topology_id; // required
4141
private @org.apache.storm.thrift.annotation.Nullable java.util.List<ExecutorInfo> executors; // required
4242
private int port; // required
@@ -120,7 +120,7 @@ public java.lang.String getFieldName() {
120120
static {
121121
java.util.Map<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData>(_Fields.class);
122122
tmpMap.put(_Fields.TIME_SECS, new org.apache.storm.thrift.meta_data.FieldMetaData("time_secs", org.apache.storm.thrift.TFieldRequirementType.REQUIRED,
123-
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I32)));
123+
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I64)));
124124
tmpMap.put(_Fields.TOPOLOGY_ID, new org.apache.storm.thrift.meta_data.FieldMetaData("topology_id", org.apache.storm.thrift.TFieldRequirementType.REQUIRED,
125125
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.STRING)));
126126
tmpMap.put(_Fields.EXECUTORS, new org.apache.storm.thrift.meta_data.FieldMetaData("executors", org.apache.storm.thrift.TFieldRequirementType.REQUIRED,
@@ -136,7 +136,7 @@ public LSWorkerHeartbeat() {
136136
}
137137

138138
public LSWorkerHeartbeat(
139-
int time_secs,
139+
long time_secs,
140140
java.lang.String topology_id,
141141
java.util.List<ExecutorInfo> executors,
142142
int port)
@@ -184,11 +184,11 @@ public void clear() {
184184
this.port = 0;
185185
}
186186

187-
public int get_time_secs() {
187+
public long get_time_secs() {
188188
return this.time_secs;
189189
}
190190

191-
public void set_time_secs(int time_secs) {
191+
public void set_time_secs(long time_secs) {
192192
this.time_secs = time_secs;
193193
set_time_secs_isSet(true);
194194
}
@@ -299,7 +299,7 @@ public void setFieldValue(_Fields field, @org.apache.storm.thrift.annotation.Nul
299299
if (value == null) {
300300
unset_time_secs();
301301
} else {
302-
set_time_secs((java.lang.Integer)value);
302+
set_time_secs((java.lang.Long)value);
303303
}
304304
break;
305305

@@ -426,7 +426,7 @@ public boolean equals(LSWorkerHeartbeat that) {
426426
public int hashCode() {
427427
int hashCode = 1;
428428

429-
hashCode = hashCode * 8191 + time_secs;
429+
hashCode = hashCode * 8191 + org.apache.storm.thrift.TBaseHelper.hashCode(time_secs);
430430

431431
hashCode = hashCode * 8191 + ((is_set_topology_id()) ? 131071 : 524287);
432432
if (is_set_topology_id())
@@ -600,8 +600,8 @@ public void read(org.apache.storm.thrift.protocol.TProtocol iprot, LSWorkerHeart
600600
}
601601
switch (schemeField.id) {
602602
case 1: // TIME_SECS
603-
if (schemeField.type == org.apache.storm.thrift.protocol.TType.I32) {
604-
struct.time_secs = iprot.readI32();
603+
if (schemeField.type == org.apache.storm.thrift.protocol.TType.I64) {
604+
struct.time_secs = iprot.readI64();
605605
struct.set_time_secs_isSet(true);
606606
} else {
607607
org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
@@ -657,7 +657,7 @@ public void write(org.apache.storm.thrift.protocol.TProtocol oprot, LSWorkerHear
657657

658658
oprot.writeStructBegin(STRUCT_DESC);
659659
oprot.writeFieldBegin(TIME_SECS_FIELD_DESC);
660-
oprot.writeI32(struct.time_secs);
660+
oprot.writeI64(struct.time_secs);
661661
oprot.writeFieldEnd();
662662
if (struct.topology_id != null) {
663663
oprot.writeFieldBegin(TOPOLOGY_ID_FIELD_DESC);
@@ -697,7 +697,7 @@ private static class LSWorkerHeartbeatTupleScheme extends org.apache.storm.thrif
697697
@Override
698698
public void write(org.apache.storm.thrift.protocol.TProtocol prot, LSWorkerHeartbeat struct) throws org.apache.storm.thrift.TException {
699699
org.apache.storm.thrift.protocol.TTupleProtocol oprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot;
700-
oprot.writeI32(struct.time_secs);
700+
oprot.writeI64(struct.time_secs);
701701
oprot.writeString(struct.topology_id);
702702
{
703703
oprot.writeI32(struct.executors.size());
@@ -712,7 +712,7 @@ public void write(org.apache.storm.thrift.protocol.TProtocol prot, LSWorkerHeart
712712
@Override
713713
public void read(org.apache.storm.thrift.protocol.TProtocol prot, LSWorkerHeartbeat struct) throws org.apache.storm.thrift.TException {
714714
org.apache.storm.thrift.protocol.TTupleProtocol iprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot;
715-
struct.time_secs = iprot.readI32();
715+
struct.time_secs = iprot.readI64();
716716
struct.set_time_secs_isSet(true);
717717
struct.topology_id = iprot.readString();
718718
struct.set_topology_id_isSet(true);

0 commit comments

Comments
 (0)