Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ utilityStatement
| showQueries | showDiskUsage | showCurrentTimestamp | killQuery | grantWatermarkEmbedding
| revokeWatermarkEmbedding | loadConfiguration | loadTimeseries | loadFile
| removeFile | unloadFile | setSqlDialect | showCurrentSqlDialect | showCurrentUser
| repairDataPartitionTable
| repairDataPartitionTable | showRepairDataPartitionTableProgress
;

/**
Expand Down Expand Up @@ -1283,6 +1283,11 @@ repairDataPartitionTable
: REPAIR DATA PARTITION TABLE
;

// Show Repair Data Partition Table Progress
showRepairDataPartitionTableProgress
: SHOW REPAIR DATA PARTITION TABLE PROGRESS
;

// Explain
explain
: EXPLAIN (ANALYZE VERBOSE?)? selectStatement?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,10 @@ REPAIR
: R E P A I R
;

PROGRESS
: P R O G R E S S
;

SCHEMA_REPLICATION_FACTOR
: S C H E M A '_' R E P L I C A T I O N '_' F A C T O R
;
Expand Down Expand Up @@ -1411,4 +1415,4 @@ fragment V: [vV];
fragment W: [wW];
fragment X: [xX];
fragment Y: [yY];
fragment Z: [zZ];
fragment Z: [zZ];
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum CnToDnSyncRequestType {
COLLECT_EARLIEST_TIMESLOTS,
GENERATE_DATA_PARTITION_TABLE,
GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
GET_DATA_PARTITION_TABLE_GENERATOR_PROGRESS,

// PartitionCache
INVALIDATE_PARTITION_CACHE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ private void buildActionMap() {
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
(req, client) ->
client.generateDataPartitionTableHeartbeat((TGenerateDataPartitionTableReq) req));
actionMapBuilder.put(
CnToDnSyncRequestType.GET_DATA_PARTITION_TABLE_GENERATOR_PROGRESS,
(req, client) -> client.getDataPartitionTableGeneratorProgress());
actionMap = actionMapBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTable4InformationSchemaResp;
Expand Down Expand Up @@ -1180,6 +1181,17 @@ public TSStatus dataPartitionTableIntegrityCheck() {
return partitionManager.dataPartitionTableIntegrityCheck();
}

@Override
public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() {
TSStatus status = confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return new TShowRepairDataPartitionTableProgressResp(status, "UNKNOWN", 0.0)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Maintainability] Magic state strings duplicated across modules.

The state string is produced ad hoc in three different places: "UNKNOWN" here, "IDLE" in PartitionManager.showRepairDataPartitionTableProgress, and the raw enum name() in DataPartitionTableIntegrityCheckProcedure.getProgress. Please define these as shared constants or a small enum so the state contract is single-sourced and testable, rather than relying on matching string literals across modules.

.setMessage(status.getMessage());
}

return partitionManager.showRepairDataPartitionTableProgress();
}

private void printNewCreatedDataPartition(
GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan, TDataPartitionTableResp resp) {
final String lineSeparator = System.lineSeparator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTable4InformationSchemaResp;
Expand Down Expand Up @@ -480,6 +481,8 @@ TDataPartitionTableResp getOrCreateDataPartition(

TSStatus dataPartitionTableIntegrityCheck();

TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress();

/**
* Get AuditLogger.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodesProcedure;
import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
Expand Down Expand Up @@ -2468,6 +2469,18 @@ public boolean isExistUnfinishedProcedure(
return false;
}

public Optional<DataPartitionTableIntegrityCheckProcedure>
getUnfinishedDataPartitionTableIntegrityCheckProcedure() {
for (Procedure<ConfigNodeProcedureEnv> procedure : getExecutor().getProcedures().values()) {
if (!procedure.isFinished()
&& procedure instanceof DataPartitionTableIntegrityCheckProcedure) {
return Optional.of((DataPartitionTableIntegrityCheckProcedure) procedure);
}
}

return Optional.empty();
}

// ======================================================
/*
GET-SET Region
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
Expand Down Expand Up @@ -543,6 +544,18 @@ public void markDataPartitionTableIntegrityCheckProcedureFinished() {
dataPartitionTableIntegrityCheckProcedureRunning.set(false);
}

public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() {
return configManager
.getProcedureManager()
.getUnfinishedDataPartitionTableIntegrityCheckProcedure()
.map(procedure -> procedure.getProgress(configManager.getProcedureManager().getEnv()))
.orElseGet(
() ->
new TShowRepairDataPartitionTableProgressResp(
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), "IDLE", 100.0)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Semantics] IDLE returns progress = 100.0.

When no procedure is running, this returns IDLE with progress = 100.0. "Idle at 100%" is ambiguous — a user who has never triggered a repair will see 100% progress. Suggest returning 0.0 (or a distinct sentinel) for the idle case, or at minimum a comment clarifying the semantics.

.setMessage("No running DataPartitionTable integrity check procedure"));
}

private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) {
TSStatus status = getConsensusManager().confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq;
import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataPartitionTableGeneratorProgressResp;
import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.thrift.TException;
Expand Down Expand Up @@ -1114,4 +1117,86 @@ public void setSkipDataNodes(Set<TDataNodeConfiguration> skipDataNodes) {
public void setFailedDataNodes(Set<TDataNodeConfiguration> failedDataNodes) {
this.failedDataNodes = failedDataNodes;
}

public TShowRepairDataPartitionTableProgressResp getProgress(final ConfigNodeProcedureEnv env) {
final DataPartitionTableIntegrityCheckProcedureState currentState = getCurrentState();
final String state = currentState == null ? "UNKNOWN" : currentState.name();
final double progress =
currentState == null ? 0.0 : calculateProgressByState(env, currentState) * 100;

return new TShowRepairDataPartitionTableProgressResp(
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), state, progress)
.setMessage(String.format("DataPartitionTable integrity check progress: %.1f%%", progress));
}

private double calculateProgressByState(
final ConfigNodeProcedureEnv env,
final DataPartitionTableIntegrityCheckProcedureState currentState) {
switch (currentState) {
case COLLECT_EARLIEST_TIMESLOTS:
return 0.0;
case ANALYZE_MISSING_PARTITIONS:
return 0.05;
case REQUEST_PARTITION_TABLES:
return 0.1;
case REQUEST_PARTITION_TABLES_HEART_BEAT:
return 0.1 + 0.8 * calculateDataNodeGeneratorProgress(env);
case MERGE_PARTITION_TABLES:
return 0.95;
case WRITE_PARTITION_TABLE_TO_CONSENSUS:
return 0.99;
default:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Maintainability] Silent default branch.

The default branch maps any unhandled state to 0.0. If a new DataPartitionTableIntegrityCheckProcedureState is added later, progress will silently report 0% for it. Please add a LOG.warn here to catch enum drift.

return 0.0;
}
}

private double calculateDataNodeGeneratorProgress(final ConfigNodeProcedureEnv env) {
final LoadManager currentLoadManager =
loadManager == null ? env.getConfigManager().getLoadManager() : loadManager;

final Set<TDataNodeConfiguration> targetDataNodes = new HashSet<>(allDataNodes);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Thread-safety] Unsynchronized cross-thread read of procedure fields.

This method runs on the RPC handler thread, but allDataNodes is a non-volatile ArrayList that the procedure-executor thread reassigns on every step (allDataNodes = dataNodeManager.getRegisteredDataNodes()) and clears in rollbackState (allDataNodes.clear()). Copying it via new HashSet<>(allDataNodes) while another thread structurally modifies or reassigns it can throw ConcurrentModificationException or observe a half-published reference. The same concern applies to skipDataNodes/failedDataNodes, which are reassigned with = new HashSet<>() in executeFromState/getInitialState. A transient exception here would fail the user's SHOW ... PROGRESS even though the repair itself is healthy.

Please snapshot these fields under a lock, make them volatile, or wrap the read so getProgress degrades to UNKNOWN instead of throwing.

targetDataNodes.removeAll(skipDataNodes);
if (targetDataNodes.isEmpty()) {
return dataPartitionTables.isEmpty() ? 0.0 : 1.0;
}

double progressSum = 0.0;
for (TDataNodeConfiguration dataNode : targetDataNodes) {
final int dataNodeId = dataNode.getLocation().getDataNodeId();
if (dataPartitionTables.containsKey(dataNodeId)
|| failedDataNodes.contains(dataNode)
|| !NodeStatus.Running.equals(currentLoadManager.getNodeStatus(dataNodeId))) {
progressSum += 1.0;
continue;
}

try {
Object response =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithGivenRetry(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Performance] Synchronous fan-out RPC inside a status query.

This issues a synchronous RPC with up to MAX_RETRY_COUNT (3) retries to every target DataNode, sequentially, on the RPC handler thread. On a large cluster with one slow/unreachable DataNode, a SHOW ... PROGRESS query can block for roughly N × retries × timeout.

The REQUEST_PARTITION_TABLES_HEART_BEAT state already polls per-DataNode generation progress — consider caching the latest per-DataNode progress on the procedure and reading it here, which avoids the extra fan-out entirely and is both faster and race-free. If the synchronous fan-out is kept, please bound or parallelize it.

dataNode.getLocation().getInternalEndPoint(),
null,
CnToDnSyncRequestType.GET_DATA_PARTITION_TABLE_GENERATOR_PROGRESS,
MAX_RETRY_COUNT);
if (response instanceof TGetDataPartitionTableGeneratorProgressResp) {
TGetDataPartitionTableGeneratorProgressResp resp =
(TGetDataPartitionTableGeneratorProgressResp) response;
DataPartitionTableGeneratorState state =
DataPartitionTableGeneratorState.getStateByCode(resp.getErrorCode());
if (state == DataPartitionTableGeneratorState.SUCCESS) {
progressSum += 1.0;
} else if (state == DataPartitionTableGeneratorState.IN_PROGRESS) {
progressSum += Math.max(0.0, Math.min(1.0, resp.getProgress()));
}
}
} catch (Exception e) {
LOG.warn(
"[DataPartitionIntegrity] Failed to get DataPartitionTable generation progress from DataNode[id={}]: {}",
dataNodeId,
e.getMessage());
}
}

return progressSum / targetDataNodes.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTTLResp;
Expand Down Expand Up @@ -639,6 +640,11 @@ public TSStatus dataPartitionTableIntegrityCheck() {
return configManager.dataPartitionTableIntegrityCheck();
}

@Override
public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() {
return configManager.showRepairDataPartitionTableProgress();
}

@Override
public TSStatus operatePermission(final TAuthorizerReq req) {
ConfigPhysicalPlanType configPhysicalPlanType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTTLResp;
Expand Down Expand Up @@ -746,6 +747,14 @@ public TSStatus dataPartitionTableIntegrityCheck() throws TException {
() -> client.dataPartitionTableIntegrityCheck(), status -> !updateConfigNodeLeader(status));
}

@Override
public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress()
throws TException {
return executeRemoteCallWithRetry(
() -> client.showRepairDataPartitionTableProgress(),
resp -> !updateConfigNodeLeader(resp.status));
}

@Override
public TSStatus operatePermission(TAuthorizerReq req) throws TException {
return executeRemoteCallWithRetry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq;
import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataPartitionTableGeneratorProgressResp;
import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
Expand Down Expand Up @@ -3522,6 +3523,52 @@ public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartb
return resp;
}

@Override
public TGetDataPartitionTableGeneratorProgressResp getDataPartitionTableGeneratorProgress() {
TGetDataPartitionTableGeneratorProgressResp resp =
new TGetDataPartitionTableGeneratorProgressResp();

if (currentGenerator == null) {
resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
resp.setProgress(0.0);
resp.setMessage("No DataPartitionTable generation task found");
resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
return resp;
}

switch (currentGenerator.getStatus()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Possible NPE] Re-reading volatile currentGenerator without a local copy.

currentGenerator is volatile, but this method reads it multiple times (getStatus(), getProgress(), getErrorMessage()) after the null-check above. The generation-with-heartbeat path sets currentGenerator = null on completion, so a concurrent null-assignment between the null-check and this switch would NPE. Please capture final DataPartitionTableGenerator generator = currentGenerator; once at the top and operate on the local.

case IN_PROGRESS:
resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode());
resp.setProgress(currentGenerator.getProgress());
resp.setMessage(
String.format(
"DataPartitionTable generation in progress: %.1f%%",
currentGenerator.getProgress() * 100));
resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
break;
case COMPLETED:
resp.setErrorCode(DataPartitionTableGeneratorState.SUCCESS.getCode());
resp.setProgress(1.0);
resp.setMessage("DataPartitionTable generation completed successfully");
resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
break;
case FAILED:
resp.setErrorCode(DataPartitionTableGeneratorState.FAILED.getCode());
resp.setProgress(currentGenerator.getProgress());
resp.setMessage(
"DataPartitionTable generation failed: " + currentGenerator.getErrorMessage());
resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
break;
default:
resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
resp.setProgress(currentGenerator.getProgress());
resp.setMessage("Unknown task status: " + currentGenerator.getStatus());
resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
break;
}
return resp;
}

private void parseGenerationStatus(Object resp) {
if (currentGenerator == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ public static DatasetHeader getShowPipeHeader() {
return new DatasetHeader(ColumnHeaderConstant.showPipeColumnHeaders, true);
}

public static DatasetHeader getShowRepairDataPartitionTableProgressHeader() {
return new DatasetHeader(
ColumnHeaderConstant.showRepairDataPartitionTableProgressColumnHeaders, true);
}

public static DatasetHeader getShowTopicHeader() {
return new DatasetHeader(ColumnHeaderConstant.showTopicColumnHeaders, true);
}
Expand Down
Loading
Loading