Skip to content

Commit d61c773

Browse files
authored
[GLUTEN-11058][VL][1.5] Backport #11021: remove support for BNLJ full outer join without condition (#11060)
1 parent a292d8a commit d61c773

10 files changed

Lines changed: 24 additions & 61 deletions

File tree

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -549,8 +549,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
549549

550550
override def needPreComputeRangeFrameBoundary(): Boolean = true
551551

552-
override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true
553-
554552
override def supportIcebergEqualityDeleteRead(): Boolean = false
555553

556554
override def reorderColumnsForPartitionWrite(): Boolean = true

backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import org.apache.spark.SparkConf
2424
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
2525
import org.apache.spark.sql.execution._
2626
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEShuffleReadExec, ShuffleQueryStageExec}
27-
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
2827
import org.apache.spark.sql.execution.window.WindowExec
2928
import org.apache.spark.sql.functions._
3029
import org.apache.spark.sql.internal.SQLConf
@@ -2058,25 +2057,6 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
20582057
}
20592058
}
20602059

2061-
test("FullOuter in BroadcastNestLoopJoin") {
2062-
withTable("t1", "t2") {
2063-
spark.range(10).write.format("parquet").saveAsTable("t1")
2064-
spark.range(10).write.format("parquet").saveAsTable("t2")
2065-
2066-
// with join condition should fallback.
2067-
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "1MB") {
2068-
runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.id < t2.id") {
2069-
checkSparkOperatorMatch[BroadcastNestedLoopJoinExec]
2070-
}
2071-
2072-
// without join condition should offload to gluten operator.
2073-
runQueryAndCompare("SELECT * FROM t1 FULL OUTER JOIN t2") {
2074-
checkGlutenOperatorMatch[BroadcastNestedLoopJoinExecTransformer]
2075-
}
2076-
}
2077-
}
2078-
}
2079-
20802060
test("test get_struct_field with scalar function as input") {
20812061
withSQLConf("spark.sql.json.enablePartialResults" -> "true") {
20822062
withTable("t") {

cpp/velox/substrait/SubstraitToVeloxPlan.cc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,6 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
350350
VELOX_NYI("Unsupported Join type: {}", std::to_string(crossRel.type()));
351351
}
352352
break;
353-
case ::substrait::CrossRel_JoinType::CrossRel_JoinType_JOIN_TYPE_OUTER:
354-
joinType = core::JoinType::kFull;
355-
break;
356353
default:
357354
VELOX_NYI("Unsupported Join type: {}", std::to_string(crossRel.type()));
358355
}

cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,13 +1101,6 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::CrossRel& crossR
11011101
case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT:
11021102
case ::substrait::CrossRel_JoinType_JOIN_TYPE_LEFT_SEMI:
11031103
break;
1104-
case ::substrait::CrossRel_JoinType_JOIN_TYPE_OUTER:
1105-
if (crossRel.has_expression()) {
1106-
LOG_VALIDATION_MSG("Full outer join type with condition is not supported in CrossRel");
1107-
return false;
1108-
} else {
1109-
break;
1110-
}
11111104
default:
11121105
LOG_VALIDATION_MSG("Unsupported Join type in CrossRel");
11131106
return false;

gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,6 @@ trait BackendSettingsApi {
152152

153153
def needPreComputeRangeFrameBoundary(): Boolean = false
154154

155-
def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false
156-
157155
def supportIcebergEqualityDeleteRead(): Boolean = true
158156

159157
def reorderColumnsForPartitionWrite(): Boolean = false

gluten-substrait/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.gluten.utils.SubstraitUtil
2424

2525
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
2626
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
27-
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftOuter, RightOuter}
27+
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, InnerLike, JoinType, LeftOuter, RightOuter}
2828
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
2929
import org.apache.spark.sql.execution.{ExplainUtils, SparkPlan}
3030
import org.apache.spark.sql.execution.joins.BaseJoinExec
@@ -148,14 +148,6 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
148148
def validateJoinTypeAndBuildSide(): ValidationResult = {
149149
val result = joinType match {
150150
case _: InnerLike | LeftOuter | RightOuter => ValidationResult.succeeded
151-
case FullOuter
152-
if BackendsApiManager.getSettings.broadcastNestedLoopJoinSupportsFullOuterJoin() =>
153-
if (condition.isEmpty) {
154-
ValidationResult.succeeded
155-
} else {
156-
ValidationResult.failed(
157-
s"FullOuter join with join condition is not supported with BroadcastNestedLoopJoin")
158-
}
159151
case ExistenceJoin(_) =>
160152
ValidationResult.succeeded
161153
case _ =>

gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/joins/GlutenBroadcastJoinSuite.scala

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -143,12 +143,9 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite with GlutenTestsCommon
143143
// INNER JOIN && t1Size < t2Size => BuildLeft
144144
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2", blt, BuildLeft)
145145
// FULL JOIN && t1Size < t2Size => BuildLeft
146-
assertJoinBuildSide(
147-
"SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN t2 ON t1.key < t2.key",
148-
bl,
149-
BuildLeft)
146+
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN t2", bl, BuildLeft)
150147
// FULL OUTER && t1Size < t2Size => BuildLeft
151-
assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 ON t1.key < t2.key", bl, BuildLeft)
148+
assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft)
152149
// LEFT JOIN => BuildRight
153150
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN t2", blt, BuildRight)
154151
// RIGHT JOIN => BuildLeft
@@ -160,13 +157,10 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite with GlutenTestsCommon
160157
// INNER JOIN && broadcast(t2) => BuildRight
161158
assertJoinBuildSide("SELECT /*+ MAPJOIN(t2) */ * FROM t1 JOIN t2", blt, BuildRight)
162159
// FULL OUTER && broadcast(t1) => BuildLeft
163-
assertJoinBuildSide(
164-
"SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER JOIN t2 on t1.key < t2.key",
165-
bl,
166-
BuildLeft)
160+
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1) */ * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft)
167161
// FULL OUTER && broadcast(t2) => BuildRight
168162
assertJoinBuildSide(
169-
"SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2 on t1.key < t2.key",
163+
"SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2",
170164
bl,
171165
BuildRight)
172166
// LEFT JOIN && broadcast(t1) => BuildLeft
@@ -200,11 +194,8 @@ class GlutenBroadcastJoinSuite extends BroadcastJoinSuite with GlutenTestsCommon
200194
/* ######## test cases for non-equal join ######### */
201195
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
202196
// For full outer join, prefer to broadcast the smaller side.
203-
assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2 on t1.key < t2.key", bl, BuildLeft)
204-
assertJoinBuildSide(
205-
"SELECT * FROM t2 FULL OUTER JOIN t1 on t1.key < t2.key",
206-
bl,
207-
BuildRight)
197+
assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft)
198+
assertJoinBuildSide("SELECT * FROM t2 FULL OUTER JOIN t1", bl, BuildRight)
208199

209200
// For inner join, prefer to broadcast the smaller side, if broadcast-able.
210201
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (t2Size + 1).toString()) {

gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.apache.gluten.GlutenBuildInfo
2020
import org.apache.gluten.config.GlutenConfig
2121
import org.apache.gluten.events.GlutenPlanFallbackEvent
2222
import org.apache.gluten.execution.FileSourceScanExecTransformer
23+
import org.apache.gluten.utils.BackendTestUtils
2324

2425
import org.apache.spark.SparkConf
2526
import org.apache.spark.internal.config.UI.UI_ENABLED
@@ -118,7 +119,14 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp
118119

119120
val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
120121
val execution = glutenStore.execution(id)
121-
execution.get.numFallbackNodes == 0
122+
if (BackendTestUtils.isVeloxBackendLoaded()) {
123+
assert(execution.get.numFallbackNodes == 1)
124+
assert(
125+
execution.get.fallbackNodeToReason.head._2
126+
.contains("FullOuter join is not supported with BroadcastNestedLoopJoin"))
127+
} else {
128+
assert(execution.get.numFallbackNodes == 0)
129+
}
122130
}
123131

124132
// [GLUTEN-4119] Skip add ReusedExchange to fallback node

gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,10 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp
121121
val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
122122
val execution = glutenStore.execution(id)
123123
if (BackendTestUtils.isVeloxBackendLoaded()) {
124-
assert(execution.get.numFallbackNodes == 0)
124+
assert(execution.get.numFallbackNodes == 1)
125+
assert(
126+
execution.get.fallbackNodeToReason.head._2
127+
.contains("FullOuter join is not supported with BroadcastNestedLoopJoin"))
125128
} else {
126129
assert(execution.get.numFallbackNodes == 2)
127130
}

gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,10 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelp
121121
val id = runExecution("SELECT * FROM t1 FULL OUTER JOIN t2")
122122
val execution = glutenStore.execution(id)
123123
if (BackendTestUtils.isVeloxBackendLoaded()) {
124-
assert(execution.get.numFallbackNodes == 0)
124+
assert(execution.get.numFallbackNodes == 1)
125+
assert(
126+
execution.get.fallbackNodeToReason.head._2
127+
.contains("FullOuter join is not supported with BroadcastNestedLoopJoin"))
125128
} else {
126129
assert(execution.get.numFallbackNodes == 2)
127130
}

0 commit comments

Comments
 (0)