Skip to content

Commit 3ebc264

Browse files
authored
Fix MIGRATE REGION falsely reported complete when ConfigNode leader switches during AddRegionPeer (#17908)
1 parent 08c55b9 commit 3ebc264

11 files changed

Lines changed: 241 additions & 7 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,27 @@ public class IoTDBRegionOperationReliabilityITFramework {
119119
LOGGER.info("Cluster has been restarted");
120120
};
121121

122+
/**
123+
* Gracefully stop (SIGTERM, not a forcible kill) the ConfigNode that hit the kill point, then
124+
* restart it. A graceful stop lets the ConfigNode run its shutdown hooks, which interrupts the
125+
* in-flight region-operation procedure worker. This reproduces a leader switch / graceful
126+
* shutdown during AddRegionPeer: the interrupted {@code waitTaskFinish()} returns PROCESSING
127+
* while the AddRegionPeerTask is still running on the coordinator DataNode. The procedure must
128+
* NOT silently end here, otherwise the parent RegionMigrateProcedure would falsely treat AddPeer
129+
* as complete and remove the source replica before the destination replica is actually Running.
130+
* See AddRegionPeerProcedure#executeFromState DO_ADD_REGION_PEER PROCESSING branch.
131+
*/
132+
public static Consumer<KillPointContext> actionOfGracefullyRestartConfigNode =
133+
context -> {
134+
Assert.assertTrue(context.getNodeWrapper() instanceof ConfigNodeWrapper);
135+
context.getNodeWrapper().stop();
136+
LOGGER.info("ConfigNode {} gracefully stopped.", context.getNodeWrapper().getId());
137+
Assert.assertFalse(context.getNodeWrapper().isAlive());
138+
context.getNodeWrapper().start();
139+
LOGGER.info("ConfigNode {} restarted.", context.getNodeWrapper().getId());
140+
Assert.assertTrue(context.getNodeWrapper().isAlive());
141+
};
142+
122143
@Before
123144
public void setUp() throws Exception {
124145
EnvFactory.getEnv()
@@ -155,6 +176,28 @@ public void successTest(
155176
killNode);
156177
}
157178

179+
public void successTestWithAction(
180+
final int dataReplicateFactor,
181+
final int schemaReplicationFactor,
182+
final int configNodeNum,
183+
final int dataNodeNum,
184+
KeySetView<String, Boolean> killConfigNodeKeywords,
185+
KeySetView<String, Boolean> killDataNodeKeywords,
186+
Consumer<KillPointContext> actionWhenDetectKeyWords,
187+
KillNode killNode)
188+
throws Exception {
189+
generalTestWithAllOptions(
190+
dataReplicateFactor,
191+
schemaReplicationFactor,
192+
configNodeNum,
193+
dataNodeNum,
194+
killConfigNodeKeywords,
195+
killDataNodeKeywords,
196+
actionWhenDetectKeyWords,
197+
true,
198+
killNode);
199+
}
200+
158201
public void failTest(
159202
final int dataReplicateFactor,
160203
final int schemaReplicationFactor,

integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateConfigNodeCrashIoTV1IT.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.utils.KillPoint.KillNode;
2323
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
24+
import org.apache.iotdb.commons.utils.KillPoint.RegionMaintainKillPoints;
2425
import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework;
2526
import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
2627
import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
@@ -92,6 +93,31 @@ public void testCnCrashDuringDoAddPeer() throws Exception {
9293
KillNode.CONFIG_NODE);
9394
}
9495

96+
/**
97+
* Gracefully restart (not forcibly kill) the ConfigNode leader while AddRegionPeer is blocked in
98+
* waitTaskFinish() polling the coordinator. The kill point fires from inside waitTaskFinish()
99+
* (after the first poll confirms the task is still running), so the graceful shutdown
100+
* deterministically interrupts that wait and waitTaskFinish() returns PROCESSING. The migration
101+
* must still finish correctly after a leader switch: previously the AddRegionPeerProcedure
102+
* silently ended on PROCESSING, letting the parent procedure remove the source replica before the
103+
* destination replica was actually Running. Uses 3 ConfigNodes so a real leader switch happens.
104+
* The framework requires the WAIT_TASK_FINISH_POLLING kill point to fire, which can only happen
105+
* once the worker is blocked inside waitTaskFinish(), so the PROCESSING branch is guaranteed to
106+
* be exercised.
107+
*/
108+
@Test
109+
public void cnLeaderSwitchDuringDoAddPeerTest() throws Exception {
110+
successTestWithAction(
111+
1,
112+
1,
113+
3,
114+
2,
115+
buildSet(RegionMaintainKillPoints.WAIT_TASK_FINISH_POLLING),
116+
noKillPoints(),
117+
actionOfGracefullyRestartConfigNode,
118+
KillNode.CONFIG_NODE);
119+
}
120+
95121
@Test
96122
public void cnCrashDuringUpdateCacheTest() throws Exception {
97123
successTest(

integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.utils.KillPoint.KillNode;
2323
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
24+
import org.apache.iotdb.commons.utils.KillPoint.RegionMaintainKillPoints;
2425
import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework;
2526
import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
2627
import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
@@ -78,6 +79,31 @@ public void testCnCrashDuringDoAddPeer() throws Exception {
7879
KillNode.CONFIG_NODE);
7980
}
8081

82+
/**
83+
* Gracefully restart (not forcibly kill) the ConfigNode leader while AddRegionPeer is blocked in
84+
* waitTaskFinish() polling the coordinator. The kill point fires from inside waitTaskFinish()
85+
* (after the first poll confirms the task is still running), so the graceful shutdown
86+
* deterministically interrupts that wait and waitTaskFinish() returns PROCESSING. The migration
87+
* must still finish correctly after a leader switch: previously the AddRegionPeerProcedure
88+
* silently ended on PROCESSING, letting the parent procedure remove the source replica before the
89+
* destination replica was actually Running. Uses 3 ConfigNodes so a real leader switch happens.
90+
* The framework requires the WAIT_TASK_FINISH_POLLING kill point to fire, which can only happen
91+
* once the worker is blocked inside waitTaskFinish(), so the PROCESSING branch is guaranteed to
92+
* be exercised.
93+
*/
94+
@Test
95+
public void cnLeaderSwitchDuringDoAddPeerTest() throws Exception {
96+
successTestWithAction(
97+
1,
98+
1,
99+
3,
100+
2,
101+
buildSet(RegionMaintainKillPoints.WAIT_TASK_FINISH_POLLING),
102+
noKillPoints(),
103+
actionOfGracefullyRestartConfigNode,
104+
KillNode.CONFIG_NODE);
105+
}
106+
81107
@Test
82108
public void cnCrashDuringUpdateCacheTest() throws Exception {
83109
successTest(

integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.utils.KillPoint.KillNode;
2323
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
24+
import org.apache.iotdb.commons.utils.KillPoint.RegionMaintainKillPoints;
2425
import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework;
2526
import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
2627
import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
@@ -92,6 +93,31 @@ public void testCnCrashDuringDoAddPeer() throws Exception {
9293
KillNode.CONFIG_NODE);
9394
}
9495

96+
/**
97+
* Gracefully restart (not forcibly kill) the ConfigNode leader while AddRegionPeer is blocked in
98+
* waitTaskFinish() polling the coordinator. The kill point fires from inside waitTaskFinish()
99+
* (after the first poll confirms the task is still running), so the graceful shutdown
100+
* deterministically interrupts that wait and waitTaskFinish() returns PROCESSING. The migration
101+
* must still finish correctly after a leader switch: previously the AddRegionPeerProcedure
102+
* silently ended on PROCESSING, letting the parent procedure remove the source replica before the
103+
* destination replica was actually Running. Uses 3 ConfigNodes so a real leader switch happens.
104+
* The framework requires the WAIT_TASK_FINISH_POLLING kill point to fire, which can only happen
105+
* once the worker is blocked inside waitTaskFinish(), so the PROCESSING branch is guaranteed to
106+
* be exercised.
107+
*/
108+
@Test
109+
public void cnLeaderSwitchDuringDoAddPeerTest() throws Exception {
110+
successTestWithAction(
111+
1,
112+
1,
113+
3,
114+
2,
115+
buildSet(RegionMaintainKillPoints.WAIT_TASK_FINISH_POLLING),
116+
noKillPoints(),
117+
actionOfGracefullyRestartConfigNode,
118+
KillNode.CONFIG_NODE);
119+
}
120+
95121
@Test
96122
public void cnCrashDuringUpdateCacheTest() throws Exception {
97123
successTest(

integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/ratis/IoTDBRegionMigrateConfigNodeCrashForRatisIT.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.utils.KillPoint.KillNode;
2323
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
24+
import org.apache.iotdb.commons.utils.KillPoint.RegionMaintainKillPoints;
2425
import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateITFrameworkForRatis;
2526
import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
2627
import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState;
@@ -76,6 +77,31 @@ public void cnCrashDuringDoAddPeerTest() throws Exception {
7677
KillNode.CONFIG_NODE);
7778
}
7879

80+
/**
81+
* Gracefully restart (not forcibly kill) the ConfigNode leader while AddRegionPeer is blocked in
82+
* waitTaskFinish() polling the coordinator. The kill point fires from inside waitTaskFinish()
83+
* (after the first poll confirms the task is still running), so the graceful shutdown
84+
* deterministically interrupts that wait and waitTaskFinish() returns PROCESSING. The migration
85+
* must still finish correctly after a leader switch: previously the AddRegionPeerProcedure
86+
* silently ended on PROCESSING, letting the parent procedure remove the source replica before the
87+
* destination replica was actually Running. Uses 3 ConfigNodes so a real leader switch happens.
88+
* The framework requires the WAIT_TASK_FINISH_POLLING kill point to fire, which can only happen
89+
* once the worker is blocked inside waitTaskFinish(), so the PROCESSING branch is guaranteed to
90+
* be exercised.
91+
*/
92+
@Test
93+
public void cnLeaderSwitchDuringDoAddPeerTest() throws Exception {
94+
successTestWithAction(
95+
1,
96+
1,
97+
3,
98+
2,
99+
buildSet(RegionMaintainKillPoints.WAIT_TASK_FINISH_POLLING),
100+
noKillPoints(),
101+
actionOfGracefullyRestartConfigNode,
102+
KillNode.CONFIG_NODE);
103+
}
104+
79105
@Test
80106
public void cnCrashDuringUpdateCacheTest() throws Exception {
81107
successTest(

iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1001,7 +1001,7 @@ public final class ProcedureMessages {
10011001
public static final String VALIDATE_TABLE_FOR_TABLE_WHEN_SETTING_PROPERTIES =
10021002
"Validate table for table {}.{} when setting properties";
10031003
public static final String WAITTASKFINISH_RETURNS_PROCESSING_WHICH_MEANS_THE_WAITING_HAS_BEEN_INTERRUPTED =
1004-
"waitTaskFinish() returns PROCESSING, which means the waiting has been interrupted, this procedure will end without rollback";
1004+
"waitTaskFinish() returns PROCESSING, which means the waiting has been interrupted (ConfigNode shutdown or leader change); the AddRegionPeer task is still running on the coordinator, this procedure will stay at DO_ADD_REGION_PEER and resume polling after recovery";
10051005

10061006
public static final String FAILED_TO_CREATE_DATABASE_THE_TTL_SHOULD_BE_NON_NEGATIVE = "Failed to create database. The TTL should be non-negative.";
10071007
public static final String FAILED_TO_CREATE_DATABASE_THE_DATAREGIONGROUPNUM_SHOULD_BE_POSITIVE = "Failed to create database. The dataRegionGroupNum should be positive.";

iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -999,7 +999,7 @@ public final class ProcedureMessages {
999999
public static final String VALIDATE_TABLE_FOR_TABLE_WHEN_SETTING_PROPERTIES =
10001000
"Validate table for table {}.{} when setting properties";
10011001
public static final String WAITTASKFINISH_RETURNS_PROCESSING_WHICH_MEANS_THE_WAITING_HAS_BEEN_INTERRUPTED =
1002-
"waitTaskFinish() returns PROCESSING, which means the waiting has been interrupted, this procedure will end without rollback";
1002+
"waitTaskFinish() 返回 PROCESSING,表示等待被中断(ConfigNode 关闭或主节点切换);AddRegionPeer 任务仍在协调者上运行,该流程将停留在 DO_ADD_REGION_PEER 状态,恢复后继续轮询";
10031003

10041004
public static final String FAILED_TO_CREATE_DATABASE_THE_TTL_SHOULD_BE_NON_NEGATIVE = "创建数据库失败。TTL 不能为负数。";
10051005
public static final String FAILED_TO_CREATE_DATABASE_THE_DATAREGIONGROUPNUM_SHOULD_BE_POSITIVE = "创建数据库失败。dataRegionGroupNum 应为正数。";

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,15 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> p
454454

455455
updateStoreOnExecution(rootProcStack, proc, subprocs);
456456

457-
if (!store.isRunning()) {
457+
// Stop the in-place re-execution loop once this executor is shutting down (e.g. ConfigNode
458+
// leader switch / restart). Checking store.isRunning() alone is not enough: stopExecutor()
459+
// calls executor.stop() and executor.join() before store.stop(), so the store is still
460+
// running while join() waits for this very worker to finish. Without also checking the
461+
// executor's own running flag, a procedure that keeps returning HAS_MORE_STATE for the same
462+
// state (e.g. AddRegionPeerProcedure parking at DO_ADD_REGION_PEER after waitTaskFinish() is
463+
// interrupted) would re-execute forever here and join() would hang. The persisted state lets
464+
// the next leader resume from where it stopped.
465+
if (!isRunning() || !store.isRunning()) {
458466
return;
459467
}
460468

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY;
9090
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_KEY;
9191
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_MODE_KEY;
92+
import static org.apache.iotdb.commons.utils.KillPoint.KillPoint.setKillPoint;
9293
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
9394
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
9495
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2;
@@ -360,10 +361,26 @@ public Map<Integer, TSStatus> resetPeerList(
360361

361362
// TODO: will use 'procedure yield' to refactor later
362363
public TRegionMigrateResult waitTaskFinish(long taskId, TDataNodeLocation dataNodeLocation) {
364+
return waitTaskFinish(taskId, dataNodeLocation, null);
365+
}
366+
367+
/**
368+
* Poll the coordinator DataNode until the region-maintain task identified by {@code taskId}
369+
* reaches a terminal state.
370+
*
371+
* @param killPoint if non-null, fired once right after the first poll confirms the task is still
372+
* PROCESSING. At that point the worker thread is provably blocked inside this method, so
373+
* tests can use the kill point to deterministically interrupt the wait (e.g. by gracefully
374+
* stopping the ConfigNode leader) and exercise the interrupted-PROCESSING path. It is a no-op
375+
* outside integration tests.
376+
*/
377+
public <T extends Enum<T>> TRegionMigrateResult waitTaskFinish(
378+
long taskId, TDataNodeLocation dataNodeLocation, T killPoint) {
363379
final long MAX_DISCONNECTION_TOLERATE_MS = 600_000;
364380
final long INITIAL_DISCONNECTION_TOLERATE_MS = 60_000;
365381
long startTime = System.nanoTime();
366382
long lastReportTime = System.nanoTime();
383+
boolean killPointTriggered = false;
367384
while (true) {
368385
try (SyncDataNodeInternalServiceClient dataNodeClient =
369386
dataNodeClientManager.borrowClient(dataNodeLocation.getInternalEndPoint())) {
@@ -372,6 +389,12 @@ public TRegionMigrateResult waitTaskFinish(long taskId, TDataNodeLocation dataNo
372389
if (report.getTaskStatus() != TRegionMaintainTaskStatus.PROCESSING) {
373390
return report;
374391
}
392+
// The task is confirmed still running and this thread is blocked here, so it is now safe to
393+
// fire the kill point that tests use to interrupt waitTaskFinish() deterministically.
394+
if (killPoint != null && !killPointTriggered) {
395+
setKillPoint(killPoint);
396+
killPointTriggered = true;
397+
}
375398
} catch (Exception ignore) {
376399

377400
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
2727
import org.apache.iotdb.commons.queryengine.utils.DateTimeUtils;
2828
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
29+
import org.apache.iotdb.commons.utils.KillPoint.RegionMaintainKillPoints;
2930
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
3031
import org.apache.iotdb.confignode.i18n.ProcedureMessages;
3132
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -104,8 +105,15 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, AddRegionPeerState s
104105
break;
105106
case DO_ADD_REGION_PEER:
106107
handler.forceUpdateRegionCache(regionId, targetDataNode, RegionStatus.Adding);
107-
// We don't want to re-submit AddRegionPeerTask when leader change or ConfigNode reboot
108-
if (!this.isStateDeserialized()) {
108+
// Only submit the AddRegionPeerTask on the very first entry of this state. We must NOT
109+
// re-submit when:
110+
// - the state was restored from disk after a leader change / ConfigNode reboot
111+
// (isStateDeserialized()), or
112+
// - this state is being re-entered in place because a previous attempt parked here on
113+
// PROCESSING (getCycles() > 0, see the PROCESSING branch below).
114+
// The coordinator DataNode also dedups by taskId, so a duplicate submit would be a no-op,
115+
// but skipping it here avoids the useless RPC and keeps the re-poll cheap.
116+
if (!this.isStateDeserialized() && getCycles() == 0) {
109117
TSStatus tsStatus =
110118
handler.submitAddRegionPeerTask(
111119
this.getProcId(), targetDataNode, regionId, coordinator);
@@ -115,7 +123,9 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, AddRegionPeerState s
115123
env, handler, "submit DO_ADD_REGION_PEER task fail");
116124
}
117125
}
118-
TRegionMigrateResult result = handler.waitTaskFinish(this.getProcId(), coordinator);
126+
TRegionMigrateResult result =
127+
handler.waitTaskFinish(
128+
this.getProcId(), coordinator, RegionMaintainKillPoints.WAIT_TASK_FINISH_POLLING);
119129
switch (result.getTaskStatus()) {
120130
case TASK_NOT_EXIST:
121131
// coordinator crashed and lost its task table
@@ -124,10 +134,22 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, AddRegionPeerState s
124134
return warnAndRollBackAndNoMoreState(
125135
env, handler, String.format("%s result is %s", state, result.getTaskStatus()));
126136
case PROCESSING:
137+
// waitTaskFinish() only returns PROCESSING when its polling loop was interrupted by
138+
// an InterruptedException, i.e. this ConfigNode is shutting down / losing leadership
139+
// (a user CANCEL or a coordinator disconnection both go through the FAIL branch
140+
// above). The AddRegionPeerTask is still running on the coordinator DataNode, so we
141+
// must NOT silently end here: doing so would let the parent RegionMigrateProcedure
142+
// proceed to CHECK_ADD_REGION_PEER / REMOVE_REGION_PEER and remove the source replica
143+
// before the destination replica has actually finished receiving the snapshot.
144+
// Instead, stay in DO_ADD_REGION_PEER and persist it; after recovery the new leader
145+
// re-enters this state and re-polls the still-running coordinator task (the
146+
// isStateDeserialized() guard above prevents re-submitting the task) until it really
147+
// reaches SUCCESS or FAIL.
127148
LOGGER.info(
128149
ProcedureMessages
129150
.WAITTASKFINISH_RETURNS_PROCESSING_WHICH_MEANS_THE_WAITING_HAS_BEEN_INTERRUPTED);
130-
return Flow.NO_MORE_STATE;
151+
setNextState(AddRegionPeerState.DO_ADD_REGION_PEER);
152+
break outerSwitch;
131153
case SUCCESS:
132154
setNextState(UPDATE_REGION_LOCATION_CACHE);
133155
break outerSwitch;

0 commit comments

Comments
 (0)