-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Display data partition repair progress by providing a new SQL statement #17637
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
e634a25
b705abd
1bbd982
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -90,6 +90,7 @@ | |
| import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq; | ||
| import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq; | ||
| import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq; | ||
| import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; | ||
| import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; | ||
| import org.apache.iotdb.consensus.exception.ConsensusException; | ||
| import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; | ||
|
|
@@ -543,6 +544,18 @@ public void markDataPartitionTableIntegrityCheckProcedureFinished() { | |
| dataPartitionTableIntegrityCheckProcedureRunning.set(false); | ||
| } | ||
|
|
||
| public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() { | ||
| return configManager | ||
| .getProcedureManager() | ||
| .getUnfinishedDataPartitionTableIntegrityCheckProcedure() | ||
| .map(procedure -> procedure.getProgress(configManager.getProcedureManager().getEnv())) | ||
| .orElseGet( | ||
| () -> | ||
| new TShowRepairDataPartitionTableProgressResp( | ||
| RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), "IDLE", 100.0) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Semantics] When no procedure is running, this returns |
||
| .setMessage("No running DataPartitionTable integrity check procedure")); | ||
| } | ||
|
|
||
| private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) { | ||
| TSStatus status = getConsensusManager().confirmLeader(); | ||
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,11 +42,14 @@ | |
| import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; | ||
| import org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState; | ||
| import org.apache.iotdb.confignode.procedure.store.ProcedureType; | ||
| import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; | ||
| import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; | ||
| import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp; | ||
| import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; | ||
| import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp; | ||
| import org.apache.iotdb.mpp.rpc.thrift.TGetDataPartitionTableGeneratorProgressResp; | ||
| import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp; | ||
| import org.apache.iotdb.rpc.RpcUtils; | ||
| import org.apache.iotdb.rpc.TSStatusCode; | ||
|
|
||
| import org.apache.thrift.TException; | ||
|
|
@@ -1114,4 +1117,86 @@ public void setSkipDataNodes(Set<TDataNodeConfiguration> skipDataNodes) { | |
| public void setFailedDataNodes(Set<TDataNodeConfiguration> failedDataNodes) { | ||
| this.failedDataNodes = failedDataNodes; | ||
| } | ||
|
|
||
| public TShowRepairDataPartitionTableProgressResp getProgress(final ConfigNodeProcedureEnv env) { | ||
| final DataPartitionTableIntegrityCheckProcedureState currentState = getCurrentState(); | ||
| final String state = currentState == null ? "UNKNOWN" : currentState.name(); | ||
| final double progress = | ||
| currentState == null ? 0.0 : calculateProgressByState(env, currentState) * 100; | ||
|
|
||
| return new TShowRepairDataPartitionTableProgressResp( | ||
| RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), state, progress) | ||
| .setMessage(String.format("DataPartitionTable integrity check progress: %.1f%%", progress)); | ||
| } | ||
|
|
||
| private double calculateProgressByState( | ||
| final ConfigNodeProcedureEnv env, | ||
| final DataPartitionTableIntegrityCheckProcedureState currentState) { | ||
| switch (currentState) { | ||
| case COLLECT_EARLIEST_TIMESLOTS: | ||
| return 0.0; | ||
| case ANALYZE_MISSING_PARTITIONS: | ||
| return 0.05; | ||
| case REQUEST_PARTITION_TABLES: | ||
| return 0.1; | ||
| case REQUEST_PARTITION_TABLES_HEART_BEAT: | ||
| return 0.1 + 0.8 * calculateDataNodeGeneratorProgress(env); | ||
| case MERGE_PARTITION_TABLES: | ||
| return 0.95; | ||
| case WRITE_PARTITION_TABLE_TO_CONSENSUS: | ||
| return 0.99; | ||
| default: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Maintainability] Silent The |
||
| return 0.0; | ||
| } | ||
| } | ||
|
|
||
| private double calculateDataNodeGeneratorProgress(final ConfigNodeProcedureEnv env) { | ||
| final LoadManager currentLoadManager = | ||
| loadManager == null ? env.getConfigManager().getLoadManager() : loadManager; | ||
|
|
||
| final Set<TDataNodeConfiguration> targetDataNodes = new HashSet<>(allDataNodes); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Thread-safety] Unsynchronized cross-thread read of procedure fields. This method runs on the RPC handler thread, but Please snapshot these fields under a lock, make them |
||
| targetDataNodes.removeAll(skipDataNodes); | ||
| if (targetDataNodes.isEmpty()) { | ||
| return dataPartitionTables.isEmpty() ? 0.0 : 1.0; | ||
| } | ||
|
|
||
| double progressSum = 0.0; | ||
| for (TDataNodeConfiguration dataNode : targetDataNodes) { | ||
| final int dataNodeId = dataNode.getLocation().getDataNodeId(); | ||
| if (dataPartitionTables.containsKey(dataNodeId) | ||
| || failedDataNodes.contains(dataNode) | ||
| || !NodeStatus.Running.equals(currentLoadManager.getNodeStatus(dataNodeId))) { | ||
| progressSum += 1.0; | ||
| continue; | ||
| } | ||
|
|
||
| try { | ||
| Object response = | ||
| SyncDataNodeClientPool.getInstance() | ||
| .sendSyncRequestToDataNodeWithGivenRetry( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Performance] Synchronous fan-out RPC inside a status query. This issues a synchronous RPC with up to The |
||
| dataNode.getLocation().getInternalEndPoint(), | ||
| null, | ||
| CnToDnSyncRequestType.GET_DATA_PARTITION_TABLE_GENERATOR_PROGRESS, | ||
| MAX_RETRY_COUNT); | ||
| if (response instanceof TGetDataPartitionTableGeneratorProgressResp) { | ||
| TGetDataPartitionTableGeneratorProgressResp resp = | ||
| (TGetDataPartitionTableGeneratorProgressResp) response; | ||
| DataPartitionTableGeneratorState state = | ||
| DataPartitionTableGeneratorState.getStateByCode(resp.getErrorCode()); | ||
| if (state == DataPartitionTableGeneratorState.SUCCESS) { | ||
| progressSum += 1.0; | ||
| } else if (state == DataPartitionTableGeneratorState.IN_PROGRESS) { | ||
| progressSum += Math.max(0.0, Math.min(1.0, resp.getProgress())); | ||
| } | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.warn( | ||
| "[DataPartitionIntegrity] Failed to get DataPartitionTable generation progress from DataNode[id={}]: {}", | ||
| dataNodeId, | ||
| e.getMessage()); | ||
| } | ||
| } | ||
|
|
||
| return progressSum / targetDataNodes.size(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -280,6 +280,7 @@ | |
| import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp; | ||
| import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; | ||
| import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp; | ||
| import org.apache.iotdb.mpp.rpc.thrift.TGetDataPartitionTableGeneratorProgressResp; | ||
| import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp; | ||
| import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq; | ||
| import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq; | ||
|
|
@@ -3522,6 +3523,52 @@ public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartb | |
| return resp; | ||
| } | ||
|
|
||
| @Override | ||
| public TGetDataPartitionTableGeneratorProgressResp getDataPartitionTableGeneratorProgress() { | ||
| TGetDataPartitionTableGeneratorProgressResp resp = | ||
| new TGetDataPartitionTableGeneratorProgressResp(); | ||
|
|
||
| if (currentGenerator == null) { | ||
| resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode()); | ||
| resp.setProgress(0.0); | ||
| resp.setMessage("No DataPartitionTable generation task found"); | ||
| resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); | ||
| return resp; | ||
| } | ||
|
|
||
| switch (currentGenerator.getStatus()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Possible NPE] Re-reading
|
||
| case IN_PROGRESS: | ||
| resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode()); | ||
| resp.setProgress(currentGenerator.getProgress()); | ||
| resp.setMessage( | ||
| String.format( | ||
| "DataPartitionTable generation in progress: %.1f%%", | ||
| currentGenerator.getProgress() * 100)); | ||
| resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); | ||
| break; | ||
| case COMPLETED: | ||
| resp.setErrorCode(DataPartitionTableGeneratorState.SUCCESS.getCode()); | ||
| resp.setProgress(1.0); | ||
| resp.setMessage("DataPartitionTable generation completed successfully"); | ||
| resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); | ||
| break; | ||
| case FAILED: | ||
| resp.setErrorCode(DataPartitionTableGeneratorState.FAILED.getCode()); | ||
| resp.setProgress(currentGenerator.getProgress()); | ||
| resp.setMessage( | ||
| "DataPartitionTable generation failed: " + currentGenerator.getErrorMessage()); | ||
| resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); | ||
| break; | ||
| default: | ||
| resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode()); | ||
| resp.setProgress(currentGenerator.getProgress()); | ||
| resp.setMessage("Unknown task status: " + currentGenerator.getStatus()); | ||
| resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); | ||
| break; | ||
| } | ||
| return resp; | ||
| } | ||
|
|
||
| private void parseGenerationStatus(Object resp) { | ||
| if (currentGenerator == null) { | ||
| return; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Maintainability] Magic state strings duplicated across modules.
The
statestring is produced ad hoc in three different places:"UNKNOWN"here,"IDLE"inPartitionManager.showRepairDataPartitionTableProgress, and the raw enumname()inDataPartitionTableIntegrityCheckProcedure.getProgress. Please define these as shared constants or a small enum so thestatecontract is single-sourced and testable, rather than relying on matching string literals across modules.