Skip to content

Commit 0836a19

Browse files
committed
Address review: move Gluten checksum tests to dedicated suite
Review feedback from philo-he on PR #12067: - Drop stale TODO comment above enableSuite[GlutenMapStatusEndToEndSuite] in VeloxTestSettings. - Move the two Gluten-specific row-based checksum tests out of GlutenMapStatusEndToEndSuite (which is a wrapper around upstream Spark's MapStatusEndToEndSuite) into a new dedicated GlutenRowBasedChecksumSuite under org.apache.spark.sql.gluten. The new suite extends GlutenSQLTestsTrait directly and configures LEAF_NODE_DEFAULT_PARALLELISM, CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, and GlutenConfig.GLUTEN_ANSI_FALLBACK_ENABLED via sparkConf rather than mutating the live session in beforeAll.
1 parent fe887c6 commit 0836a19

3 files changed

Lines changed: 93 additions & 56 deletions

File tree

gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.joins._
4040
import org.apache.spark.sql.execution.metric.{GlutenCustomMetricsSuite, GlutenSQLMetricsSuite}
4141
import org.apache.spark.sql.execution.python._
4242
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite}
43-
import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite, GlutenFallbackSuite}
43+
import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite, GlutenFallbackSuite, GlutenRowBasedChecksumSuite}
4444
import org.apache.spark.sql.hive.execution._
4545
import org.apache.spark.sql.sources._
4646
import org.apache.spark.sql.streaming._
@@ -1026,7 +1026,6 @@ class VeloxTestSettings extends BackendTestSettings {
10261026
// TODO: fix on Spark-4.1 introduced by https://github.com/apache/spark/pull/47856
10271027
.exclude("SPARK-49386: test SortMergeJoin (with spill by size threshold)")
10281028
enableSuite[GlutenMathFunctionsSuite]
1029-
// TODO: fix on Spark-4.1 see https://github.com/apache/spark/pull/50230
10301029
enableSuite[GlutenMapStatusEndToEndSuite]
10311030
enableSuite[GlutenMetadataCacheSuite]
10321031
.exclude("SPARK-16336,SPARK-27961 Suggest fixing FileNotFoundException")
@@ -1093,6 +1092,7 @@ class VeloxTestSettings extends BackendTestSettings {
10931092
enableSuite[GlutenUnsafeRowChecksumSuite]
10941093
enableSuite[GlutenXPathFunctionsSuite]
10951094
enableSuite[GlutenFallbackSuite]
1095+
enableSuite[GlutenRowBasedChecksumSuite]
10961096
enableSuite[GlutenHashAggregationQuerySuite]
10971097
// TODO: fix on https://github.com/apache/gluten/issues/11919
10981098
.exclude("udaf with all data types")

gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenMapStatusEndToEndSuite.scala

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.spark.sql
1818

19-
import org.apache.spark.sql.functions.col
2019
import org.apache.spark.sql.internal.SQLConf
2120

2221
class GlutenMapStatusEndToEndSuite extends MapStatusEndToEndSuite with GlutenTestsTrait {
@@ -29,58 +28,5 @@ class GlutenMapStatusEndToEndSuite extends MapStatusEndToEndSuite with GlutenTes
2928
_spark.sparkContext.conf
3029
.set(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key, "false")
3130
_spark.conf.set(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key, "false")
32-
33-
// Disable ANSI fallback to force Gluten's ColumnarShuffleWriter path.
34-
_spark.conf.set("spark.gluten.sql.ansiFallback.enabled", "false")
35-
}
36-
37-
import org.apache.spark.MapOutputTrackerMaster
38-
39-
private def getLatestShuffleChecksumValues(): Array[Long] = {
40-
val tracker = _spark.sparkContext.env.mapOutputTracker
41-
.asInstanceOf[MapOutputTrackerMaster]
42-
val latestShuffleId = tracker.shuffleStatuses.keys.max
43-
tracker.shuffleStatuses(latestShuffleId).mapStatuses.map(_.checksumValue)
44-
}
45-
46-
test("Gluten row-based checksum is deterministic") {
47-
withSQLConf(
48-
SQLConf.SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED.key -> "true",
49-
SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false") {
50-
withTable("t_det1", "t_det2") {
51-
_spark.range(500).repartition(5, col("id")).write.mode("overwrite").saveAsTable("t_det1")
52-
val checksums1 = getLatestShuffleChecksumValues()
53-
54-
_spark.range(500).repartition(5, col("id")).write.mode("overwrite").saveAsTable("t_det2")
55-
val checksums2 = getLatestShuffleChecksumValues()
56-
57-
// Same input -> same checksumValue (deterministic)
58-
assert(
59-
checksums1.zip(checksums2).forall { case (a, b) => a == b },
60-
s"Checksums not deterministic: ${checksums1.toSeq} vs ${checksums2.toSeq}")
61-
}
62-
}
63-
}
64-
65-
test("Gluten row-based checksum detects data change") {
66-
withSQLConf(
67-
SQLConf.SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED.key -> "true",
68-
SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false") {
69-
withTable("t_diff1", "t_diff2") {
70-
_spark.range(500).repartition(5, col("id")).write.mode("overwrite").saveAsTable("t_diff1")
71-
val checksums1 = getLatestShuffleChecksumValues()
72-
73-
// Different data
74-
_spark.range(
75-
500,
76-
1000).repartition(5, col("id")).write.mode("overwrite").saveAsTable("t_diff2")
77-
val checksums2 = getLatestShuffleChecksumValues()
78-
79-
// Different input -> different checksumValue
80-
assert(
81-
checksums1.zip(checksums2).exists { case (a, b) => a != b },
82-
s"Checksums should differ for different data: ${checksums1.toSeq} vs ${checksums2.toSeq}")
83-
}
84-
}
8531
}
8632
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.gluten
18+
19+
import org.apache.gluten.config.GlutenConfig
20+
21+
import org.apache.spark.{MapOutputTrackerMaster, SparkConf}
22+
import org.apache.spark.sql.GlutenSQLTestsTrait
23+
import org.apache.spark.sql.functions.col
24+
import org.apache.spark.sql.internal.SQLConf
25+
26+
/**
27+
* End-to-end tests for the row-based checksum (SPARK-51756) computed by Gluten's
28+
* ColumnarShuffleWriter. Verifies that `MapStatus.checksumValue` is propagated, deterministic
29+
* for identical input, and changes when row data changes.
30+
*/
31+
class GlutenRowBasedChecksumSuite extends GlutenSQLTestsTrait {
32+
33+
override def sparkConf: SparkConf = {
34+
super.sparkConf
35+
.set(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM.key, "5")
36+
.set(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key, "false")
37+
// Disable ANSI fallback to force Gluten's ColumnarShuffleWriter path.
38+
.set(GlutenConfig.GLUTEN_ANSI_FALLBACK_ENABLED.key, "false")
39+
}
40+
41+
private def getLatestShuffleChecksumValues(): Array[Long] = {
42+
val tracker = spark.sparkContext.env.mapOutputTracker
43+
.asInstanceOf[MapOutputTrackerMaster]
44+
val latestShuffleId = tracker.shuffleStatuses.keys.max
45+
tracker.shuffleStatuses(latestShuffleId).mapStatuses.map(_.checksumValue)
46+
}
47+
48+
test("Gluten row-based checksum is deterministic") {
49+
withSQLConf(
50+
SQLConf.SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED.key -> "true",
51+
SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false") {
52+
withTable("t_det1", "t_det2") {
53+
spark.range(500).repartition(5, col("id")).write.mode("overwrite").saveAsTable("t_det1")
54+
val checksums1 = getLatestShuffleChecksumValues()
55+
56+
spark.range(500).repartition(5, col("id")).write.mode("overwrite").saveAsTable("t_det2")
57+
val checksums2 = getLatestShuffleChecksumValues()
58+
59+
// Same input -> same checksumValue (deterministic)
60+
assert(
61+
checksums1.zip(checksums2).forall { case (a, b) => a == b },
62+
s"Checksums not deterministic: ${checksums1.toSeq} vs ${checksums2.toSeq}")
63+
}
64+
}
65+
}
66+
67+
test("Gluten row-based checksum detects data change") {
68+
withSQLConf(
69+
SQLConf.SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED.key -> "true",
70+
SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false") {
71+
withTable("t_diff1", "t_diff2") {
72+
spark.range(500).repartition(5, col("id")).write.mode("overwrite").saveAsTable("t_diff1")
73+
val checksums1 = getLatestShuffleChecksumValues()
74+
75+
// Different data
76+
spark
77+
.range(500, 1000)
78+
.repartition(5, col("id"))
79+
.write
80+
.mode("overwrite")
81+
.saveAsTable("t_diff2")
82+
val checksums2 = getLatestShuffleChecksumValues()
83+
84+
// Different input -> different checksumValue
85+
assert(
86+
checksums1.zip(checksums2).exists { case (a, b) => a != b },
87+
s"Checksums should differ for different data: ${checksums1.toSeq} vs ${checksums2.toSeq}")
88+
}
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)