Skip to content

Commit 3dc8950

Browse files
committed
fix
1 parent 98fd257 commit 3dc8950

18 files changed

Lines changed: 147 additions & 90 deletions

File tree

backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
259259
GlutenPartition(
260260
index,
261261
planByteArray,
262+
wsCtx.queryId,
262263
splitInfos.toArray
263264
)
264265
}
@@ -315,6 +316,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
315316
inputIterators: Seq[Iterator[ColumnarBatch]],
316317
sparkConf: SparkConf,
317318
rootNode: PlanNode,
319+
queryId: String,
318320
pipelineTime: SQLMetric,
319321
updateNativeMetrics: IMetrics => Unit,
320322
partitionIndex: Int,

backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
481481
child: SparkPlan,
482482
numOutputRows: SQLMetric,
483483
dataSize: SQLMetric,
484-
buildThreads: SQLMetric): BuildSideRelation = {
484+
buildThreads: SQLMetric,
485+
queryId: String): BuildSideRelation = {
485486

486487
val (buildKeys, isNullAware) = mode match {
487488
case mode1: HashedRelationBroadcastMode =>

backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ object GlutenDeltaJobStatsTracker extends Logging {
194194
null,
195195
null,
196196
0,
197-
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
197+
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
198+
s"Gluten_Delta_Stats_${UUID.randomUUID()}"
198199
)
199200
nativeOutItr
200201
}

backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,8 @@ object GlutenDeltaJobStatsTracker extends Logging {
198198
null,
199199
null,
200200
0,
201-
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
201+
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
202+
s"Gluten_Delta_Stats_${UUID.randomUUID()}"
202203
)
203204
nativeOutItr
204205
}

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
130130
GlutenPartition(
131131
index,
132132
planByteArray,
133+
wsCtx.queryId,
133134
splitInfos.toArray
134135
)
135136
}
@@ -216,7 +217,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
216217
if (splitInfoByteArray.nonEmpty) splitInfoByteArray else null,
217218
if (columnarNativeIterators.nonEmpty) columnarNativeIterators.toArray else null,
218219
partitionIndex,
219-
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
220+
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
221+
inputPartition.queryId
220222
)
221223
resIter.noMoreSplits()
222224
val itrMetrics = IteratorMetricsJniWrapper.create()
@@ -243,6 +245,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
243245
inputIterators: Seq[Iterator[ColumnarBatch]],
244246
sparkConf: SparkConf,
245247
rootNode: PlanNode,
248+
queryId: String,
246249
pipelineTime: SQLMetric,
247250
updateNativeMetrics: IMetrics => Unit,
248251
partitionIndex: Int,
@@ -270,7 +273,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
270273
null,
271274
if (columnarNativeIterator.nonEmpty) columnarNativeIterator.toArray else null,
272275
partitionIndex,
273-
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
276+
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath),
277+
queryId
274278
)
275279
nativeResultIterator.noMoreSplits()
276280
val itrMetrics = IteratorMetricsJniWrapper.create()

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging {
715715
child: SparkPlan,
716716
numOutputRows: SQLMetric,
717717
dataSize: SQLMetric,
718-
buildThreads: SQLMetric): BuildSideRelation = {
718+
buildThreads: SQLMetric,
719+
queryId: String): BuildSideRelation = {
719720

720721
val buildKeys = mode match {
721722
case mode1: HashedRelationBroadcastMode =>
@@ -867,25 +868,36 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging {
867868
val buildThreadsValue = if (rawThreads < 1) 1 else rawThreads
868869
buildThreads += buildThreadsValue
869870

870-
if (useOffheapBroadcastBuildRelation) {
871-
TaskResources.runUnsafe {
872-
UnsafeColumnarBuildSideRelation(
871+
val relation =
872+
if (useOffheapBroadcastBuildRelation) {
873+
TaskResources.runUnsafe {
874+
UnsafeColumnarBuildSideRelation(
875+
newOutput,
876+
serialized.flatMap(_.offHeapData().asScala),
877+
mode,
878+
newBuildKeys,
879+
offload,
880+
buildThreadsValue,
881+
queryId)
882+
}
883+
} else {
884+
ColumnarBuildSideRelation(
873885
newOutput,
874-
serialized.flatMap(_.offHeapData().asScala),
886+
serialized.flatMap(_.onHeapData().asScala).toArray,
875887
mode,
876888
newBuildKeys,
877889
offload,
878-
buildThreadsValue)
890+
buildThreadsValue,
891+
queryId)
879892
}
880-
} else {
881-
ColumnarBuildSideRelation(
882-
newOutput,
883-
serialized.flatMap(_.onHeapData().asScala).toArray,
884-
mode,
885-
newBuildKeys,
886-
offload,
887-
buildThreadsValue)
888-
}
893+
logWarning(
894+
s"GlutenQueryIdTrace[driver-relation-create]: queryId=$queryId, " +
895+
s"relationType=${relation.getClass.getSimpleName}, " +
896+
s"mode=${mode.getClass.getSimpleName}, offload=$offload, " +
897+
s"useOffheap=$useOffheapBroadcastBuildRelation, rawSize=$rawSize, " +
898+
s"numOutputRows=${numOutputRows.value}, buildThreads=$buildThreadsValue, " +
899+
s"child=${child.nodeName}, buildPlan=${newChild.nodeName}")
900+
relation
889901
}
890902

891903
override def doCanonicalizeForBroadcastMode(mode: BroadcastMode): BroadcastMode = {

backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@ import org.apache.spark.sql.catalyst.expressions._
2626
import org.apache.spark.sql.catalyst.optimizer.{BuildRight, BuildSide}
2727
import org.apache.spark.sql.catalyst.plans._
2828
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
29+
import org.apache.spark.sql.execution.ColumnarBuildSideRelation
2930
import org.apache.spark.sql.execution.joins.BuildSideRelation
3031
import org.apache.spark.sql.execution.metric.SQLMetric
32+
import org.apache.spark.sql.execution.unsafe.UnsafeColumnarBuildSideRelation
3133
import org.apache.spark.sql.vectorized.ColumnarBatch
3234

3335
import io.substrait.proto.JoinRel
@@ -135,8 +137,13 @@ case class BroadcastHashJoinExecTransformer(
135137
override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
136138
val streamedRDD = getColumnarInputRDDs(streamedPlan)
137139
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
138-
val queryId = GlutenQueryContext.getOrCreateQueryId(sparkContext)
140+
val broadcast = buildPlan.executeBroadcast[BuildSideRelation]()
141+
val queryId = extractQueryId(broadcast.value)
139142
val cacheKey = s"$queryId:$buildHashTableId"
143+
logWarning(
144+
s"GlutenQueryIdTrace[driver-cache-key]: queryId=$queryId, " +
145+
s"buildHashTableId=$buildHashTableId, cacheKey=$cacheKey, " +
146+
s"relationType=${broadcast.value.getClass.getSimpleName}, executionId=$executionId")
140147
if (executionId != null) {
141148
GlutenDriverEndpoint.collectResources(executionId, cacheKey)
142149
} else {
@@ -145,8 +152,6 @@ case class BroadcastHashJoinExecTransformer(
145152
s" because execution id is null." +
146153
s" Will clean up until expire time.")
147154
}
148-
149-
val broadcast = buildPlan.executeBroadcast[BuildSideRelation]()
150155
val bloomFilterPushdownSize = if (VeloxConfig.get.hashProbeDynamicFilterPushdownEnabled) {
151156
VeloxConfig.get.hashProbeBloomFilterPushdownMaxSize
152157
} else {
@@ -185,6 +190,17 @@ case class BroadcastHashJoinExecTransformer(
185190
// FIXME: Do we have to make build side a RDD?
186191
streamedRDD :+ broadcastRDD
187192
}
193+
194+
private def extractQueryId(relation: BuildSideRelation): String = relation match {
195+
case columnar: ColumnarBuildSideRelation if columnar.queryId.nonEmpty =>
196+
columnar.queryId
197+
case unsafe: UnsafeColumnarBuildSideRelation if unsafe.queryIdValue.nonEmpty =>
198+
unsafe.queryIdValue
199+
case _ =>
200+
throw new IllegalStateException(
201+
s"Missing query id in build-side relation for broadcast hash join " +
202+
s"$buildBroadcastTableId, relationType=${relation.getClass.getName}")
203+
}
188204
}
189205

190206
case class BroadcastHashJoinContext(

backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ object ColumnarBuildSideRelation {
5252
mode: BroadcastMode,
5353
newBuildKeys: Seq[Expression] = Seq.empty,
5454
offload: Boolean = false,
55-
buildThreads: Int = 1): ColumnarBuildSideRelation = {
55+
buildThreads: Int = 1,
56+
queryId: String = ""): ColumnarBuildSideRelation = {
5657
val boundMode = mode match {
5758
case HashedRelationBroadcastMode(keys, isNullAware) =>
5859
// Bind each key to the build-side output so simple cols become BoundReference
@@ -68,7 +69,8 @@ object ColumnarBuildSideRelation {
6869
BroadcastModeUtils.toSafe(boundMode),
6970
newBuildKeys,
7071
offload,
71-
buildThreads)
72+
buildThreads,
73+
queryId)
7274
}
7375
}
7476

@@ -78,7 +80,8 @@ case class ColumnarBuildSideRelation(
7880
safeBroadcastMode: SafeBroadcastMode,
7981
newBuildKeys: Seq[Expression],
8082
offload: Boolean,
81-
buildThreads: Int)
83+
buildThreads: Int,
84+
queryId: String)
8285
extends BuildSideRelation
8386
with Logging
8487
with KnownSizeEstimation {

backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ object UnsafeColumnarBuildSideRelation {
5757
mode: BroadcastMode,
5858
newBuildKeys: Seq[Expression] = Seq.empty,
5959
offload: Boolean = false,
60-
buildThreads: Int = 1): UnsafeColumnarBuildSideRelation = {
60+
buildThreads: Int = 1,
61+
queryId: String = ""): UnsafeColumnarBuildSideRelation = {
6162
val boundMode = mode match {
6263
case HashedRelationBroadcastMode(keys, isNullAware) =>
6364
// Bind each key to the build-side output so simple cols become BoundReference
@@ -73,7 +74,8 @@ object UnsafeColumnarBuildSideRelation {
7374
BroadcastModeUtils.toSafe(boundMode),
7475
newBuildKeys,
7576
offload,
76-
buildThreads)
77+
buildThreads,
78+
queryId)
7779
}
7880
}
7981

@@ -94,7 +96,8 @@ class UnsafeColumnarBuildSideRelation(
9496
private var safeBroadcastMode: SafeBroadcastMode,
9597
private var newBuildKeys: Seq[Expression],
9698
private var offload: Boolean,
97-
private var buildThreads: Int)
99+
private var buildThreads: Int,
100+
private var queryId: String)
98101
extends BuildSideRelation
99102
with Externalizable
100103
with Logging
@@ -116,13 +119,15 @@ class UnsafeColumnarBuildSideRelation(
116119

117120
/** needed for serialization. */
118121
def this() = {
119-
this(null, null, null, Seq.empty, false, 1)
122+
this(null, null, null, Seq.empty, false, 1, "")
120123
}
121124

122125
private[unsafe] def getBatches(): Seq[UnsafeByteArray] = {
123126
batches
124127
}
125128

129+
def queryIdValue: String = queryId
130+
126131
private var hashTableData: Long = 0L
127132

128133
def buildHashTable(broadcastContext: BroadcastHashJoinContext): (Long, BuildSideRelation) =
@@ -219,6 +224,7 @@ class UnsafeColumnarBuildSideRelation(
219224
out.writeObject(newBuildKeys)
220225
out.writeBoolean(offload)
221226
out.writeInt(buildThreads)
227+
out.writeUTF(queryId)
222228
}
223229

224230
override def write(kryo: Kryo, out: Output): Unit = Utils.tryOrIOException {
@@ -228,6 +234,7 @@ class UnsafeColumnarBuildSideRelation(
228234
kryo.writeClassAndObject(out, newBuildKeys)
229235
out.writeBoolean(offload)
230236
out.writeInt(buildThreads)
237+
out.writeString(queryId)
231238
}
232239

233240
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -237,6 +244,7 @@ class UnsafeColumnarBuildSideRelation(
237244
newBuildKeys = in.readObject().asInstanceOf[Seq[Expression]]
238245
offload = in.readBoolean()
239246
buildThreads = in.readInt()
247+
queryId = in.readUTF()
240248
}
241249

242250
override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
@@ -246,6 +254,7 @@ class UnsafeColumnarBuildSideRelation(
246254
newBuildKeys = kryo.readClassAndObject(in).asInstanceOf[Seq[Expression]]
247255
offload = in.readBoolean()
248256
buildThreads = in.readInt()
257+
queryId = in.readString()
249258
}
250259

251260
private def transformProjection: UnsafeProjection = safeBroadcastMode match {

backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,25 +190,29 @@ class UnsafeColumnarBuildSideRelationTest extends SharedSparkSession {
190190
}
191191

192192
test("Verify offload field serialization") {
193+
val queryId = "query-id-for-serialization"
193194
val relation = UnsafeColumnarBuildSideRelation(
194195
output,
195196
Seq(sampleUnsafeByteArrayInKb(1)),
196197
IdentityBroadcastMode,
197198
Seq.empty,
198-
offload = true
199+
offload = true,
200+
queryId = queryId
199201
)
200202

201203
// Java Serialization
202204
val javaSerializer = new JavaSerializer(SparkEnv.get.conf).newInstance()
203205
val javaBuffer = javaSerializer.serialize(relation)
204206
val javaObj = javaSerializer.deserialize[UnsafeColumnarBuildSideRelation](javaBuffer)
205207
assert(javaObj.isOffload, "Java deserialization failed to restore offload=true")
208+
assert(javaObj.queryIdValue == queryId, "Java deserialization failed to restore queryId")
206209

207210
// Kryo Serialization
208211
val kryoSerializer = new KryoSerializer(SparkEnv.get.conf).newInstance()
209212
val kryoBuffer = kryoSerializer.serialize(relation)
210213
val kryoObj = kryoSerializer.deserialize[UnsafeColumnarBuildSideRelation](kryoBuffer)
211214
assert(kryoObj.isOffload, "Kryo deserialization failed to restore offload=true")
215+
assert(kryoObj.queryIdValue == queryId, "Kryo deserialization failed to restore queryId")
212216

213217
// Create another relation with offload=false to compare byte size if possible,
214218
// but boolean only takes 1 byte, might be hard to distinguish from metadata noise.

0 commit comments

Comments
 (0)