Skip to content

Commit 52b845f

Browse files
taiyang-liAIME
authored andcommitted
[GLUTEN] Fix wrong input_file_name() for BHJ build-side LocalRelation
## Background When running queries like below on Gluten (e.g. with the Velox backend), `input_file_name()` returns an empty string: ```sql SELECT a.event, input_file_name() AS fname FROM parquet_table a JOIN (SELECT X AS k1, 123L AS k2) b ON a.event = b.k1 AND a.device_id = b.k2; ``` The physical plan contains (key fragment): ``` ProjectExecTransformer [..., input_file_name#1816 AS fname] +- BroadcastHashJoinExecTransformer ..., BuildLeft :- InputIteratorTransformer[..., input_file_name#1816] : +- RowToColumnar : +- *(1) Project [..., input_file_name() AS input_file_name#1816] ← BUG : +- ColumnarToRow : +- BroadcastQueryStage 0 : +- ColumnarBroadcastExchange : +- LocalTableScan [...] ← no file context +- ProjectExecTransformer [..., input_file_name#1816] +- FileScanTransformer parquet ...[..., input_file_name#1816] ← same ExprId ``` ## Root cause `PushDownInputFileExpression.PreOffload` originally injects `Project [..., input_file_name() AS attr#N]` above **every** `LeafExecNode`. 1. When BHJ's build side is a `LocalTableScanExec` / `RangeExec` / `RDDScanExec` etc. (no real file context), `input_file_name()` has no `InputFileBlockHolder` thread-local and always returns `""`. 2. Both leaves end up reusing the same `ExprId`. When BHJ resolves `left ++ right`, the outer `Project` is rebound to the build-side empty attribute, so the final query returns an empty file name. ## Fix Only inject `input_file_name()` on leaves that can really populate `InputFileBlockHolder`: - `FileSourceScanExec` - v2 `BatchScanExec` - Hive table scan (`HiveTableScanExecTransformer.isHiveTableScan`) - `BatchScanExecTransformerBase` (already special-cased in community) In addition, the `ProjectExec` match in `PreOffload` now requires that the subtree actually contains at least one such file-aware source via the new `hasInputFileRelatedSource` helper. This avoids producing a fake `input_file_name` attribute on non-file leaves and avoids polluting the common ExprId with the empty string from the BHJ build side. ## Test Added `input_file_name() with BHJ build-side LocalRelation must return real path` in `ScalarFunctionsValidateSuite` (backends-velox) covering: - `fname` column is non-empty for every joined row; - `fname` contains the real parquet path; - compared against vanilla Spark. The existing `test("input_file_name")` (file/Hive scan paths) is unchanged because those scans are still in the whitelist. Co-Authored-By: AIME <aime@bytedance.com> Change-Id: I77c1fa343444488fb8b71deb8dd0b13d587d2155
1 parent 9202345 commit 52b845f

2 files changed

Lines changed: 68 additions & 3 deletions

File tree

backends-velox/src/test/scala/org/apache/gluten/functions/ScalarFunctionsValidateSuite.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,6 +1077,50 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite {
10771077
}
10781078
}
10791079

1080+
test("input_file_name() with BHJ build-side LocalRelation must return real path") {
1081+
withTempPath {
1082+
path =>
1083+
Seq(("mp_cert_face_result", 3915800915739947L, "param1"))
1084+
.toDF("event", "device_id", "params")
1085+
.write
1086+
.parquet(path.getCanonicalPath)
1087+
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("event_log")
1088+
1089+
withSQLConf(
1090+
"spark.sql.autoBroadcastJoinThreshold" -> "10MB",
1091+
"spark.sql.adaptive.enabled" -> "true"
1092+
) {
1093+
val sql =
1094+
"""
1095+
|SELECT a.event,
1096+
| a.params,
1097+
| a.device_id,
1098+
| input_file_name() AS fname
1099+
|FROM event_log a
1100+
|JOIN
1101+
| (
1102+
| SELECT 'mp_cert_face_result' AS envent,
1103+
| 3915800915739947 AS device_id
1104+
| ) b
1105+
|ON a.event = b.envent
1106+
|AND a.device_id = b.device_id
1107+
|""".stripMargin
1108+
1109+
compareResultsAgainstVanillaSpark(sql, true, { _ => })
1110+
1111+
val df = spark.sql(sql)
1112+
val rows = df.collect()
1113+
assert(rows.nonEmpty, "Join should match at least one row")
1114+
rows.foreach {
1115+
r =>
1116+
val fname = r.getAs[String]("fname")
1117+
assert(fname != null && fname.nonEmpty)
1118+
assert(fname.contains(path.getName))
1119+
}
1120+
}
1121+
}
1122+
}
1123+
10801124
testWithMinSparkVersion("array insert", "3.4") {
10811125
withTempPath {
10821126
path =>

gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import org.apache.gluten.execution.{BatchScanExecTransformerBase, FileSourceScan
2121
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, NamedExpression}
2222
import org.apache.spark.sql.catalyst.optimizer.CollapseProjectShim
2323
import org.apache.spark.sql.catalyst.rules.Rule
24-
import org.apache.spark.sql.execution.{DeserializeToObjectExec, LeafExecNode, ProjectExec, SerializeFromObjectExec, SparkPlan, UnionExec}
24+
import org.apache.spark.sql.execution.{DeserializeToObjectExec, FileSourceScanExec, LeafExecNode, ProjectExec, SerializeFromObjectExec, SparkPlan, UnionExec}
25+
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
2526
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
2627

2728
import java.util.Locale
@@ -87,7 +88,8 @@ object PushDownInputFileExpression {
8788

8889
object PreOffload extends Rule[SparkPlan] {
8990
override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
90-
case ProjectExec(projectList, child) if projectList.exists(containsInputFileRelatedExpr) =>
91+
case ProjectExec(projectList, child)
92+
if projectList.exists(containsInputFileRelatedExpr) && hasInputFileRelatedSource(child) =>
9193
val replacedExprs = mutable.Map[String, Alias]()
9294
val newProjectList = projectList.map {
9395
expr => rewriteExpr(expr, replacedExprs).asInstanceOf[NamedExpression]
@@ -104,8 +106,10 @@ object PushDownInputFileExpression {
104106
// For BatchScanExecTransformerBase (includes Iceberg scans), add fallback tag
105107
// to prevent offloading when input_file expressions are present
106108
addFallbackTag(ProjectExec(p.output ++ replacedExprs.values, p))
107-
case p: LeafExecNode =>
109+
case p: LeafExecNode if shouldAddInputFileExpr(p) =>
108110
addFallbackTag(ProjectExec(p.output ++ replacedExprs.values, p))
111+
case p: LeafExecNode =>
112+
p
109113
// Output of SerializeFromObjectExec's child and output of DeserializeToObjectExec must be
110114
// a single-field row.
111115
case p @ (_: SerializeFromObjectExec | _: DeserializeToObjectExec) =>
@@ -127,6 +131,23 @@ object PushDownInputFileExpression {
127131
u.copy(children = newFirstChild +: newOtherChildren)
128132
case p => p.withNewChildren(p.children.map(child => addMetadataCol(child, replacedExprs)))
129133
}
134+
135+
private def hasInputFileRelatedSource(plan: SparkPlan): Boolean = {
136+
plan match {
137+
case _: BatchScanExecTransformerBase => true
138+
case p: LeafExecNode => shouldAddInputFileExpr(p)
139+
case _ => plan.children.exists(hasInputFileRelatedSource)
140+
}
141+
}
142+
143+
private def shouldAddInputFileExpr(plan: SparkPlan): Boolean = {
144+
plan match {
145+
case _: FileSourceScanExec => true
146+
case _: BatchScanExec => true
147+
case p if HiveTableScanExecTransformer.isHiveTableScan(p) => true
148+
case _ => false
149+
}
150+
}
130151
}
131152

132153
object PostOffload extends Rule[SparkPlan] {

0 commit comments

Comments
 (0)