@@ -21,14 +21,16 @@ import org.apache.gluten.execution.{DeltaScanTransformer, FilterExecTransformerB
2121import org .apache .gluten .extension .columnar .transition .RemoveTransitions
2222
2323import org .apache .spark .sql .SparkSession
24- import org .apache .spark .sql .catalyst .expressions .{Alias , And , Attribute , AttributeReference , Expression , InputFileBlockLength , InputFileBlockStart , InputFileName , NamedExpression }
24+ import org .apache .spark .sql .catalyst .expressions .{Alias , And , Attribute , AttributeReference , CreateNamedStruct , Expression , GetStructField , If , InputFileBlockLength , InputFileBlockStart , InputFileName , IsNull , LambdaFunction , Literal , NamedExpression , NamedLambdaVariable }
25+ import org .apache .spark .sql .catalyst .expressions .{ArrayTransform , TransformKeys , TransformValues }
2526import org .apache .spark .sql .catalyst .rules .Rule
2627import org .apache .spark .sql .catalyst .trees .TreeNodeTag
2728import org .apache .spark .sql .delta .{DeltaColumnMapping , DeltaParquetFileFormat , NoMapping }
2829import org .apache .spark .sql .execution .{FilterExec , ProjectExec , SparkPlan }
2930import org .apache .spark .sql .execution .datasources .FileFormat
30- import org .apache .spark .sql .types .StructType
31+ import org .apache .spark .sql .types .{ ArrayType , DataType , MapType , StructType }
3132
33+ import scala .collection .mutable
3234import scala .collection .mutable .ListBuffer
3335
3436object DeltaPostTransformRules {
@@ -262,6 +264,73 @@ object DeltaPostTransformRules {
262264 }
263265 }
264266
267+ /**
268+ * Checks whether two structurally compatible DataTypes have different struct field names at any
269+ * nesting level.
270+ */
271+ private def nestedFieldNamesDiffer (logical : DataType , physical : DataType ): Boolean = {
272+ (logical, physical) match {
273+ case (l : StructType , p : StructType ) if l.length == p.length =>
274+ l.zip(p).exists {
275+ case (lf, pf) =>
276+ lf.name != pf.name || nestedFieldNamesDiffer(lf.dataType, pf.dataType)
277+ }
278+ case (l : ArrayType , p : ArrayType ) =>
279+ nestedFieldNamesDiffer(l.elementType, p.elementType)
280+ case (l : MapType , p : MapType ) =>
281+ nestedFieldNamesDiffer(l.keyType, p.keyType) ||
282+ nestedFieldNamesDiffer(l.valueType, p.valueType)
283+ case _ => false
284+ }
285+ }
286+
287+ /**
288+ * Rebuilds an expression tree so that nested struct field names match the logical schema. Uses
289+ * positional extraction (GetStructField) and reconstruction (CreateNamedStruct) instead of Cast,
290+ * so correctness does not depend on Velox's cast_match_struct_by_name config.
291+ */
292+ private def reconcileFieldNames (
293+ expr : Expression ,
294+ logical : DataType ,
295+ physical : DataType ): Expression = {
296+ (logical, physical) match {
297+ case (l : StructType , p : StructType ) if l.length == p.length =>
298+ val rebuiltFields = l.zip(p).zipWithIndex.flatMap {
299+ case ((lf, pf), i) =>
300+ val extracted = GetStructField (expr, i, None )
301+ val reconciled = reconcileFieldNames(extracted, lf.dataType, pf.dataType)
302+ Seq (Literal (lf.name), reconciled)
303+ }
304+ val rebuilt = CreateNamedStruct (rebuiltFields)
305+ If (IsNull (expr), Literal .create(null , l), rebuilt)
306+ case (l : ArrayType , p : ArrayType ) if nestedFieldNamesDiffer(l.elementType, p.elementType) =>
307+ val lambdaVar = NamedLambdaVariable (" element" , p.elementType, p.containsNull)
308+ val body = reconcileFieldNames(lambdaVar, l.elementType, p.elementType)
309+ ArrayTransform (expr, LambdaFunction (body, Seq (lambdaVar)))
310+ case (l : MapType , p : MapType ) =>
311+ val needKeys = nestedFieldNamesDiffer(l.keyType, p.keyType)
312+ val needValues = nestedFieldNamesDiffer(l.valueType, p.valueType)
313+ var result = expr
314+ if (needValues) {
315+ val keyVar = NamedLambdaVariable (" key" , p.keyType, false )
316+ val valueVar = NamedLambdaVariable (" value" , p.valueType, p.valueContainsNull)
317+ val body = reconcileFieldNames(valueVar, l.valueType, p.valueType)
318+ result = TransformValues (result, LambdaFunction (body, Seq (keyVar, valueVar)))
319+ }
320+ if (needKeys) {
321+ val keyVar = NamedLambdaVariable (" key" , p.keyType, false )
322+ val valueVar = NamedLambdaVariable (
323+ " value" ,
324+ if (needValues) l.valueType else p.valueType,
325+ p.valueContainsNull)
326+ val body = reconcileFieldNames(keyVar, l.keyType, p.keyType)
327+ result = TransformKeys (result, LambdaFunction (body, Seq (keyVar, valueVar)))
328+ }
329+ result
330+ case _ => expr
331+ }
332+ }
333+
265334 /**
266335 * Used for Delta ColumnMapping FileFormat (nameMapping and idMapping). Each plan is transformed
267336 * at most once; the first run is tagged so re-runs are no-ops.
@@ -319,8 +388,9 @@ object DeltaPostTransformRules {
319388 )(SparkSession .active)
320389 // transform output's name into physical name so Reader can read data correctly
321390 // should keep the columns order the same as the origin output
322- val originColumnNames = ListBuffer .empty[String ]
323- val transformedAttrs = ListBuffer .empty[Attribute ]
391+ case class ColumnMapping (logicalName : String , logicalType : DataType , physicalAttr : Attribute )
392+ val columnMappings = ListBuffer .empty[ColumnMapping ]
393+ val seenNames = mutable.Set .empty[String ]
324394 def mapAttribute (attr : Attribute ) = {
325395 val newAttr = if (plan.isMetadataColumn(attr)) {
326396 attr
@@ -333,9 +403,8 @@ object DeltaPostTransformRules {
333403 .createPhysicalAttributes(Seq (attr), fmt.referenceSchema, fmt.columnMappingMode)
334404 .head
335405 }
336- if (! originColumnNames.contains(attr.name)) {
337- transformedAttrs += newAttr
338- originColumnNames += attr.name
406+ if (seenNames.add(attr.name)) {
407+ columnMappings += ColumnMapping (attr.name, attr.dataType, newAttr)
339408 }
340409 newAttr
341410 }
@@ -372,9 +441,20 @@ object DeltaPostTransformRules {
372441 scanExecTransformer.copyTagsFrom(plan)
373442 tagColumnMappingRule(scanExecTransformer)
374443
375- // alias physicalName into tableName
376- val expr = (transformedAttrs, originColumnNames).zipped.map {
377- (attr, columnName) => Alias (attr, columnName)(exprId = attr.exprId)
444+ // Alias physical names back to logical names. For struct-typed columns, Delta column
445+ // mapping renames internal field names to physical UUIDs. A top-level Alias only restores
446+ // the column name, not the struct's internal field names. We rebuild the struct with
447+ // logical field names using positional extraction (GetStructField/CreateNamedStruct)
448+ // instead of Cast, so correctness does not depend on any Velox cast config.
449+ val expr = columnMappings.map {
450+ cm =>
451+ val projectedExpr : Expression =
452+ if (nestedFieldNamesDiffer(cm.logicalType, cm.physicalAttr.dataType)) {
453+ reconcileFieldNames(cm.physicalAttr, cm.logicalType, cm.physicalAttr.dataType)
454+ } else {
455+ cm.physicalAttr
456+ }
457+ Alias (projectedExpr, cm.logicalName)(exprId = cm.physicalAttr.exprId)
378458 }
379459 val projectExecTransformer = ProjectExecTransformer (expr.toSeq, scanExecTransformer)
380460 projectExecTransformer
0 commit comments