iodone commented on code in PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#discussion_r1115517270
##########
extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala:
##########
@@ -1094,6 +1094,92 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
}
}
+ test("test group by") {
+ withTable("t1", "t2", "v2_catalog.db.t1", "v2_catalog.db.t2") { _ =>
+ spark.sql("CREATE TABLE t1 (a string, b string, c string) USING hive")
+ spark.sql("CREATE TABLE t2 (a string, b string, c string) USING hive")
+ spark.sql("CREATE TABLE v2_catalog.db.t1 (a string, b string, c string)")
+ spark.sql("CREATE TABLE v2_catalog.db.t2 (a string, b string, c string)")
+ val ret0 =
+ exectractLineage(
+ s"insert into table t1 select a," +
+ s"concat_ws('/', collect_set(b))," +
+ s"count(distinct(b)) * count(distinct(c))" +
+ s"from t2 group by a")
+ assert(ret0 == Lineage(
+ List("default.t2"),
+ List("default.t1"),
+ List(
+ ("default.t1.a", Set("default.t2.a")),
+ ("default.t1.b", Set("default.t2.b")),
+ ("default.t1.c", Set("default.t2.b", "default.t2.c")))))
+
+ val ret1 =
+ exectractLineage(
+ s"insert into table v2_catalog.db.t1 select a," +
+ s"concat_ws('/', collect_set(b))," +
+ s"count(distinct(b)) * count(distinct(c))" +
+ s"from v2_catalog.db.t2 group by a")
+ assert(ret1 == Lineage(
+ List("v2_catalog.db.t2"),
+ List("v2_catalog.db.t1"),
+ List(
+ ("v2_catalog.db.t1.a", Set("v2_catalog.db.t2.a")),
+ ("v2_catalog.db.t1.b", Set("v2_catalog.db.t2.b")),
+ ("v2_catalog.db.t1.c", Set("v2_catalog.db.t2.b",
"v2_catalog.db.t2.c")))))
+
+ val ret2 =
+ exectractLineage(
+ s"insert into table v2_catalog.db.t1 select a," +
+ s"count(distinct(b+c))," +
+ s"count(distinct(b)) * count(distinct(c))" +
+ s"from v2_catalog.db.t2 group by a")
+ assert(ret2 == Lineage(
+ List("v2_catalog.db.t2"),
+ List("v2_catalog.db.t1"),
+ List(
+ ("v2_catalog.db.t1.a", Set("v2_catalog.db.t2.a")),
+ ("v2_catalog.db.t1.b", Set("v2_catalog.db.t2.b",
"v2_catalog.db.t2.c")),
+ ("v2_catalog.db.t1.c", Set("v2_catalog.db.t2.b",
"v2_catalog.db.t2.c")))))
+ }
+ }
+
+ test("test grouping sets") {
+ withTable("t1", "t2") { _ =>
+ spark.sql("CREATE TABLE t1 (a string, b string, c string) USING hive")
+ spark.sql("CREATE TABLE t2 (a string, b string, c string, d string)
USING hive")
+ val ret0 =
+ exectractLineage(
+ s"insert into table t1 select a,b,GROUPING__ID " +
+ s"from t2 group by a,b,c,d grouping sets ((a,b,c), (a,b,d))")
+ assert(ret0 == Lineage(
+ List("default.t2"),
+ List("default.t1"),
+ List(
+ ("default.t1.a", Set("default.t2.a")),
+ ("default.t1.b", Set("default.t2.b")),
+ ("default.t1.c", Set()))))
+ }
+ }
+
+ test("test catch table with window function") {
+ withTable("t1", "t2") { _ =>
+ spark.sql("CREATE TABLE t1 (a string, b string) USING hive")
+ spark.sql("CREATE TABLE t2 (a string, b string) USING hive")
+ spark.sql(
+ s"cache table c1 select * from (" +
+ s"select a, b, row_number() over (partition by a order by b asc )
rank from t2)" +
+ s" where rank=1")
+ val ret0 = exectractLineage("insert overwrite table t1 select a, b from
c1")
Review Comment:
yes, it does work
##########
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala:
##########
@@ -160,15 +162,22 @@ trait LineageParser {
}
}
+ private def isNameWithQualifier(attr: Attribute, qualifier: Seq[String]):
Boolean = {
+ val nameTokens = attr.name.split('.')
+ val namespace = nameTokens.init.mkString(".")
+ nameTokens.length > 1 && namespace.endsWith(qualifier.mkString("."))
+ }
+
private def mergeRelationColumnLineage(
parentColumnsLineage: AttributeMap[AttributeSet],
relationOutput: Seq[Attribute],
relationColumnLineage: AttributeMap[AttributeSet]):
AttributeMap[AttributeSet] = {
val mergedRelationColumnLineage = {
- relationOutput.foldLeft((ListMap[Attribute, AttributeSet](),
relationColumnLineage)) {
- case ((acc, x), attr) =>
- (acc + (attr -> x.head._2), x.tail)
- }._1
+ relationOutput.slice(0, relationColumnLineage.size)
Review Comment:
this case:
```
cache table c2 select * from (
select b, a, row_number() over (partition by a order by b asc ) rank
from t2)
where rank=1
```
the logicalPlan of `CacheTable`'s subquery is
```
Filter (isnotnull(rank#4) AND (rank#4 = 1))
+- Window [row_number() windowspecdefinition(a#9, b#10 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
rank#4], [a#9], [b#10 ASC NULLS FIRST]
+- Project [b#10, a#9]
+- HiveTableRelation [`default`.`t2`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [a#9, b#10],
Partition Cols: []]
```
the output of `CacheTable` is [#a, #b, #rank], but the subquery's lineage of
`CacheTable` is not include `#rank` column
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]