Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions extensions/spark/kyuubi-spark-lineage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
## Build

```shell
build/mvn clean package -DskipTests -pl :kyuubi-spark-lineage_2.12 -am -Dspark.version=3.2.1
build/mvn clean package -DskipTests -pl :kyuubi-spark-lineage_2.12 -am -Dspark.version=3.5.1
```

### Supported Apache Spark Versions
Expand All @@ -37,6 +37,4 @@ build/mvn clean package -DskipTests -pl :kyuubi-spark-lineage_2.12 -am -Dspark.v
- [x] 3.5.x (default)
- [x] 3.4.x
- [x] 3.3.x
- [x] 3.2.x
- [x] 3.1.x

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ trait LineageParser {
val SUBQUERY_COLUMN_IDENTIFIER = "__subquery__"
val AGGREGATE_COUNT_COLUMN_IDENTIFIER = "__count__"
val LOCAL_TABLE_IDENTIFIER = "__local__"
val METADATA_COL_ATTR_KEY = "__metadata_col"
val ORIGINAL_ROW_ID_VALUE_PREFIX: String = "__original_row_id_"
val OPERATION_COLUMN: String = "__row_operation"

type AttributeMap[A] = ListMap[Attribute, A]

Expand Down Expand Up @@ -307,7 +310,35 @@ trait LineageParser {
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) =>
k.withName(s"$table.${k.name}") -> v
}
case p if p.nodeName == "MergeRows" =>
val instructionsOutputs =
Comment thread
wForget marked this conversation as resolved.
getField[Seq[Expression]](p, "matchedInstructions")
.map(extractInstructionOutputs) ++
getField[Seq[Expression]](p, "notMatchedInstructions")
.map(extractInstructionOutputs)
val nextColumnsLineage = ListMap(p.output.indices.map { index =>
val keyAttr = p.output(index)
val instructionOutputs = instructionsOutputs.map(_(index))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this index match always correct?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I am busy these days. I'll confirm later.

@yabola yabola Jul 14, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wForget I think index match should always be correct.

  1. ResolveRowLevelCommandAssignments#alignActions will align the length of the assignments in the merge-into actions to match the target table output.
  2. RewriteMergeIntoTable will rewrite the merge-into operation as either WriteDelta or ReplaceData only when it has being an already aligned plan.
  3. I have reviewed all the places where mergeRows is referenced.
    buildReplaceDataPlan -> target table output + metadataAttrs
    buildWriteDeltaPlan -> operationTypeAttr + target table output + metadataAttrs + originalRowIdAttrs
    RewriteMergeIntoTable -> assignments(target table output)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your explanation, I will continue to review this PR later.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wForget Hi~ Please review the code when you have time. I have another PR that needs to solve a case for asubquery (not related to this PR, but there will be code conflicts).

(keyAttr, instructionOutputs)
}.collect {
case (keyAttr: Attribute, instructionsOutput)
if instructionsOutput
.exists(!_.references.isEmpty) =>
Comment thread
wForget marked this conversation as resolved.
Outdated
val attributeSet = AttributeSet.apply(instructionsOutput)
keyAttr -> attributeSet
}: _*)
p.children.map(
extractColumnsLineage(_, nextColumnsLineage)).reduce(mergeColumnsLineage)

case p if p.nodeName == "WriteDelta" || p.nodeName == "ReplaceData" =>
val table = getV2TableName(getField[NamedRelation](plan, "table"))
val query = getQuery(plan)
val columnsLineage = extractColumnsLineage(query, parentColumnsLineage)
columnsLineage
.filter { case (k, _) => !isMetadataAttr(k) }
.map { case (k, v) =>
k.withName(s"$table.${k.name}") -> v
}
case p if p.nodeName == "MergeIntoTable" =>
val matchedActions = getField[Seq[MergeAction]](plan, "matchedActions")
val notMatchedActions = getField[Seq[MergeAction]](plan, "notMatchedActions")
Expand Down Expand Up @@ -507,6 +538,19 @@ trait LineageParser {
case _ => qualifiedName
}
}

private def isMetadataAttr(attr: Attribute): Boolean = {
attr.metadata.contains(METADATA_COL_ATTR_KEY) ||
attr.name.startsWith(ORIGINAL_ROW_ID_VALUE_PREFIX) ||
attr.name.startsWith(OPERATION_COLUMN)
}

private def extractInstructionOutputs(instruction: Expression): Seq[Expression] = {
instruction match {
case p if p.nodeName == "Split" => getField[Seq[Expression]](p, "otherOutput")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case p => getField[Seq[Expression]](p, "output")
}
}
}

case class SparkSQLLineageParseHelper(sparkSession: SparkSession) extends LineageParser
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.lineage.helper

import org.apache.kyuubi.plugin.lineage.Lineage
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION

class RowLevelCatalogLineageParserSuite extends SparkSQLLineageParserHelperSuite {

override def catalogName: String = {
"org.apache.spark.sql.connector.catalog.InMemoryRowLevelOperationTableCatalog"
}

test("columns lineage extract - WriteDelta") {
assume(
SPARK_RUNTIME_VERSION >= "3.5",
"WriteDelta is only supported in SPARK_RUNTIME_VERSION >= 3.5")
val ddls =
"""
|create table v2_catalog.db.target_t(pk int not null, name string, price float)
| TBLPROPERTIES ('supports-deltas'='true');
|create table v2_catalog.db.source_t(pk int not null, name string, price float)
| TBLPROPERTIES ('supports-deltas'='true');
|create table v2_catalog.db.pivot_t(pk int not null, price float)
| TBLPROPERTIES ('supports-deltas'='true')
|""".stripMargin
ddls.split(";").filter(_.nonEmpty).foreach(spark.sql(_).collect())

withTable("v2_catalog.db.target_t", "v2_catalog.db.source_t", "v2_catalog.db.pivot_t") { _ =>
val ret0 = extractLineageWithoutExecuting(
"MERGE INTO v2_catalog.db.target_t AS target " +
"USING v2_catalog.db.source_t AS source " +
"ON target.pk = source.pk " +
"WHEN MATCHED THEN " +
" UPDATE SET target.name = source.name, target.price = source.price " +
"WHEN NOT MATCHED THEN " +
" INSERT (pk, name, price) VALUES (cast(source.pk as int), source.name, source.price)")
assert(ret0 == Lineage(
List("v2_catalog.db.source_t", "v2_catalog.db.target_t"),
List("v2_catalog.db.target_t"),
List(
(
"v2_catalog.db.target_t.pk",
Set("v2_catalog.db.source_t.pk", "v2_catalog.db.target_t.pk")),
("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
("v2_catalog.db.target_t.price", Set("v2_catalog.db.source_t.price")))))

val ret1 = extractLineageWithoutExecuting(
"MERGE INTO v2_catalog.db.target_t AS target " +
"USING v2_catalog.db.source_t AS source " +
"ON target.pk = source.pk " +
"WHEN MATCHED THEN " +
" UPDATE SET * " +
"WHEN NOT MATCHED THEN " +
" INSERT *")
assert(ret1 == Lineage(
List("v2_catalog.db.source_t"),
List("v2_catalog.db.target_t"),
List(
("v2_catalog.db.target_t.pk", Set("v2_catalog.db.source_t.pk")),
("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
("v2_catalog.db.target_t.price", Set("v2_catalog.db.source_t.price")))))

val ret2 = extractLineageWithoutExecuting(
"MERGE INTO v2_catalog.db.target_t AS target " +
"USING (select a.pk, a.name, b.price " +
"from v2_catalog.db.source_t a join " +
"v2_catalog.db.pivot_t b) AS source " +
"ON target.pk = source.pk " +
"WHEN MATCHED THEN " +
" UPDATE SET * " +
"WHEN NOT MATCHED THEN " +
" INSERT *")

assert(ret2 == Lineage(
List("v2_catalog.db.source_t", "v2_catalog.db.pivot_t"),
List("v2_catalog.db.target_t"),
List(
("v2_catalog.db.target_t.pk", Set("v2_catalog.db.source_t.pk")),
("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
("v2_catalog.db.target_t.price", Set("v2_catalog.db.pivot_t.price")))))

val ret3 = extractLineageWithoutExecuting(
"update v2_catalog.db.target_t AS set name='abc' where price < 10 ")
assert(ret3 == Lineage(
List("v2_catalog.db.target_t"),
List("v2_catalog.db.target_t"),
List(
("v2_catalog.db.target_t.pk", Set("v2_catalog.db.target_t.pk")),
("v2_catalog.db.target_t.name", Set()),
("v2_catalog.db.target_t.price", Set("v2_catalog.db.target_t.price")))))
}
}

test("columns lineage extract - ReplaceData") {
assume(
SPARK_RUNTIME_VERSION >= "3.5",
"ReplaceData[SPARK-43963] for merge into is supported in SPARK_RUNTIME_VERSION >= 3.5")
val ddls =
"""
|create table v2_catalog.db.target_t(id int, name string, price float)
|create table v2_catalog.db.source_t(id int, name string, price float)
|create table v2_catalog.db.pivot_t(id int, price float)
|""".stripMargin
ddls.split("\n").filter(_.nonEmpty).foreach(spark.sql(_).collect())
withTable("v2_catalog.db.target_t", "v2_catalog.db.source_t", "v2_catalog.db.pivot_t") { _ =>
val ret0 = extractLineageWithoutExecuting("MERGE INTO v2_catalog.db.target_t AS target " +
"USING v2_catalog.db.source_t AS source " +
"ON target.id = source.id " +
"WHEN MATCHED THEN " +
" UPDATE SET target.name = source.name, target.price = source.price " +
"WHEN NOT MATCHED THEN " +
" INSERT (id, name, price) VALUES (cast(source.id as int), source.name, source.price)")

/**
* The ReplaceData operation requires that target records which are read but do not match
* any of the MATCHED or NOT MATCHED BY SOURCE clauses also be copied.
* (refer to [[RewriteMergeIntoTable#buildReplaceDataMergeRowsPlan]])
*/
assert(ret0 == Lineage(
List("v2_catalog.db.source_t", "v2_catalog.db.target_t"),
List("v2_catalog.db.target_t"),
List(
(
"v2_catalog.db.target_t.id",
Set("v2_catalog.db.source_t.id", "v2_catalog.db.target_t.id")),
(
"v2_catalog.db.target_t.name",
Set("v2_catalog.db.source_t.name", "v2_catalog.db.target_t.name")),
(
"v2_catalog.db.target_t.price",
Set("v2_catalog.db.source_t.price", "v2_catalog.db.target_t.price")))))

val ret1 = extractLineageWithoutExecuting("MERGE INTO v2_catalog.db.target_t AS target " +
"USING v2_catalog.db.source_t AS source " +
"ON target.id = source.id " +
"WHEN MATCHED THEN " +
" UPDATE SET * " +
"WHEN NOT MATCHED THEN " +
" INSERT *")
assert(ret1 == Lineage(
List("v2_catalog.db.source_t", "v2_catalog.db.target_t"),
List("v2_catalog.db.target_t"),
List(
(
"v2_catalog.db.target_t.id",
Set("v2_catalog.db.source_t.id", "v2_catalog.db.target_t.id")),
(
"v2_catalog.db.target_t.name",
Set("v2_catalog.db.source_t.name", "v2_catalog.db.target_t.name")),
(
"v2_catalog.db.target_t.price",
Set("v2_catalog.db.source_t.price", "v2_catalog.db.target_t.price")))))

val ret2 = extractLineageWithoutExecuting("MERGE INTO v2_catalog.db.target_t AS target " +
"USING (select a.id, a.name, b.price " +
"from v2_catalog.db.source_t a join v2_catalog.db.pivot_t b) AS source " +
"ON target.id = source.id " +
"WHEN MATCHED THEN " +
" UPDATE SET * " +
"WHEN NOT MATCHED THEN " +
" INSERT *")

assert(ret2 == Lineage(
List("v2_catalog.db.source_t", "v2_catalog.db.target_t", "v2_catalog.db.pivot_t"),
List("v2_catalog.db.target_t"),
List(
(
"v2_catalog.db.target_t.id",
Set("v2_catalog.db.source_t.id", "v2_catalog.db.target_t.id")),
(
"v2_catalog.db.target_t.name",
Set("v2_catalog.db.source_t.name", "v2_catalog.db.target_t.name")),
(
"v2_catalog.db.target_t.price",
Set("v2_catalog.db.pivot_t.price", "v2_catalog.db.target_t.price")))))

val ret3 = extractLineageWithoutExecuting(
"update v2_catalog.db.target_t AS set name='abc' where price < 10 ")
// For tables that do not support row-level deletion,
// duplicate data of the same group may be included when writing.
// plan is:
// ReplaceData
// +- Project [if ((price#1160 < cast(10 as float))) id#1158 else id#1158 AS id#1163,
// if ((price#1160 < cast(10 as float))) abc else name#1159 AS name#1164,
// if ((price#1160 < cast(10 as float))) price#1160 else price#1160 AS price#1165,
// _partition#1162]
// +- RelationV2[id#1158, name#1159, price#1160, _partition#1162]
// v2_catalog.db.target_t v2_catalog.db.target_t
assert(ret3 == Lineage(
List("v2_catalog.db.target_t"),
List("v2_catalog.db.target_t"),
List(
(
"v2_catalog.db.target_t.id",
Set("v2_catalog.db.target_t.price", "v2_catalog.db.target_t.id")),
(
"v2_catalog.db.target_t.name",
Set("v2_catalog.db.target_t.price", "v2_catalog.db.target_t.name")),
("v2_catalog.db.target_t.price", Set("v2_catalog.db.target_t.price")))))
}
}
}
Loading
Loading