Skip to content

Commit 581f7e5

Browse files
committed
Use spark.execution.id as the Velox's queryId
1 parent d7885c3 commit 581f7e5

5 files changed

Lines changed: 42 additions & 15 deletions

File tree

cpp/core/compute/Runtime.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,14 @@ struct SparkTaskInfo {
4242
int32_t partitionId{0};
4343
// Same as TID.
4444
int64_t taskId{0};
45+
// Same as Spark SQL execution id. -1 means unavailable.
46+
int64_t executionId{-1};
4547
// virtual id for each backend internal use
4648
int32_t vId{0};
4749

4850
std::string toString() const {
49-
return "[Stage: " + std::to_string(stageId) + " TID: " + std::to_string(taskId) + " VID: " + std::to_string(vId) +
50-
"]";
51+
return "[Stage: " + std::to_string(stageId) + " Execution: " + std::to_string(executionId) +
52+
" TID: " + std::to_string(taskId) + " VID: " + std::to_string(vId) + "]";
5153
}
5254

5355
friend std::ostream& operator<<(std::ostream& os, const SparkTaskInfo& taskInfo) {

cpp/core/jni/JniWrapper.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,13 +468,14 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith
468468
jint stageId,
469469
jint partitionId,
470470
jlong taskId,
471+
jlong executionId,
471472
jboolean enableDumping,
472473
jstring spillDir) {
473474
JNI_METHOD_START
474475

475476
auto ctx = getRuntime(env, wrapper);
476477

477-
ctx->setSparkTaskInfo({stageId, partitionId, taskId});
478+
ctx->setSparkTaskInfo({stageId, partitionId, taskId, executionId});
478479

479480
if (enableDumping) {
480481
ctx->enableDumping();

cpp/velox/compute/WholeStageResultIterator.cc

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,19 @@ const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__";
7272

7373
} // namespace
7474

75+
namespace {
76+
std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) {
77+
if (taskInfo.executionId != -1) {
78+
return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId));
79+
}
80+
return fmt::format(
81+
"Gluten_Stage_{}_TID_{}_VTID_{}",
82+
std::to_string(taskInfo.stageId),
83+
std::to_string(taskInfo.taskId),
84+
std::to_string(taskInfo.vId));
85+
}
86+
} // namespace
87+
7588
WholeStageResultIterator::WholeStageResultIterator(
7689
VeloxMemoryManager* memoryManager,
7790
const std::shared_ptr<const facebook::velox::core::PlanNode>& planNode,
@@ -111,11 +124,7 @@ WholeStageResultIterator::WholeStageResultIterator(
111124
velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet};
112125
std::shared_ptr<velox::core::QueryCtx> queryCtx = createNewVeloxQueryCtx();
113126
task_ = velox::exec::Task::create(
114-
fmt::format(
115-
"Gluten_Stage_{}_TID_{}_VTID_{}",
116-
std::to_string(taskInfo_.stageId),
117-
std::to_string(taskInfo_.taskId),
118-
std::to_string(taskInfo.vId)),
127+
getVeloxTaskId(taskInfo_),
119128
std::move(planFragment),
120129
0,
121130
std::move(queryCtx),
@@ -233,11 +242,7 @@ std::shared_ptr<velox::core::QueryCtx> WholeStageResultIterator::createNewVeloxQ
233242
gluten::VeloxBackend::get()->getAsyncDataCache(),
234243
memoryManager_->getAggregateMemoryPool(),
235244
spillExecutor_,
236-
fmt::format(
237-
"Gluten_Stage_{}_TID_{}_VTID_{}",
238-
std::to_string(taskInfo_.stageId),
239-
std::to_string(taskInfo_.taskId),
240-
std::to_string(taskInfo_.vId)));
245+
getVeloxTaskId(taskInfo_));
241246
return ctx;
242247
}
243248

gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
public class NativePlanEvaluator {
3535
private static final Logger LOGGER = LoggerFactory.getLogger(NativePlanEvaluator.class);
3636
private static final AtomicInteger id = new AtomicInteger(0);
37+
private static final long INVALID_EXECUTION_ID = -1L;
38+
private static final String SPARK_EXECUTION_ID_KEY = "spark.sql.execution.id";
3739

3840
private final Runtime runtime;
3941
private final PlanEvaluatorJniWrapper jniWrapper;
@@ -78,14 +80,16 @@ public ColumnarBatchOutIterator createKernelWithBatchIterator(
7880
int partitionIndex,
7981
String spillDirPath)
8082
throws RuntimeException {
83+
final TaskContext taskContext = TaskContext.get();
8184
final long itrHandle =
8285
jniWrapper.nativeCreateKernelWithIterator(
8386
wsPlan,
8487
splitInfo,
8588
iterList,
86-
TaskContext.get().stageId(),
89+
taskContext.stageId(),
8790
partitionIndex, // TaskContext.getPartitionId(),
88-
TaskContext.get().taskAttemptId(),
91+
taskContext.taskAttemptId(),
92+
getExecutionId(taskContext),
8993
DebugUtil.isDumpingEnabledForTask(),
9094
spillDirPath);
9195
final ColumnarBatchOutIterator out = createOutIterator(runtime, itrHandle);
@@ -113,4 +117,18 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) {
113117
private ColumnarBatchOutIterator createOutIterator(Runtime runtime, long itrHandle) {
114118
return new ColumnarBatchOutIterator(runtime, itrHandle);
115119
}
120+
121+
private static long getExecutionId(TaskContext taskContext) {
122+
final String executionId = taskContext.getLocalProperty(SPARK_EXECUTION_ID_KEY);
123+
if (executionId == null) {
124+
return INVALID_EXECUTION_ID;
125+
}
126+
try {
127+
return Long.parseLong(executionId);
128+
} catch (NumberFormatException e) {
129+
LOGGER.warn(
130+
"Invalid Spark execution id '{}', fallback to {}", executionId, INVALID_EXECUTION_ID);
131+
return INVALID_EXECUTION_ID;
132+
}
133+
}
116134
}

gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public native long nativeCreateKernelWithIterator(
7676
int stageId,
7777
int partitionId,
7878
long taskId,
79+
long executionId,
7980
boolean enableDumping,
8081
String spillDir)
8182
throws RuntimeException;

0 commit comments

Comments
 (0)