ulysses-you commented on code in PR #4393:
URL: https://github.com/apache/kyuubi/pull/4393#discussion_r1115292467
##########
extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala:
##########
@@ -1094,6 +1094,92 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
}
}
+ test("test group by") {
+ withTable("t1", "t2", "v2_catalog.db.t1", "v2_catalog.db.t2") { _ =>
+ spark.sql("CREATE TABLE t1 (a string, b string, c string) USING hive")
+ spark.sql("CREATE TABLE t2 (a string, b string, c string) USING hive")
+ spark.sql("CREATE TABLE v2_catalog.db.t1 (a string, b string, c string)")
+ spark.sql("CREATE TABLE v2_catalog.db.t2 (a string, b string, c string)")
+ val ret0 =
+ exectractLineage(
+ s"insert into table t1 select a," +
+ s"concat_ws('/', collect_set(b))," +
+ s"count(distinct(b)) * count(distinct(c))" +
+ s"from t2 group by a")
+ assert(ret0 == Lineage(
+ List("default.t2"),
+ List("default.t1"),
+ List(
+ ("default.t1.a", Set("default.t2.a")),
+ ("default.t1.b", Set("default.t2.b")),
+ ("default.t1.c", Set("default.t2.b", "default.t2.c")))))
+
+ val ret1 =
+ exectractLineage(
+ s"insert into table v2_catalog.db.t1 select a," +
+ s"concat_ws('/', collect_set(b))," +
+ s"count(distinct(b)) * count(distinct(c))" +
+ s"from v2_catalog.db.t2 group by a")
+ assert(ret1 == Lineage(
+ List("v2_catalog.db.t2"),
+ List("v2_catalog.db.t1"),
+ List(
+ ("v2_catalog.db.t1.a", Set("v2_catalog.db.t2.a")),
+ ("v2_catalog.db.t1.b", Set("v2_catalog.db.t2.b")),
+ ("v2_catalog.db.t1.c", Set("v2_catalog.db.t2.b",
"v2_catalog.db.t2.c")))))
+
+ val ret2 =
+ exectractLineage(
+ s"insert into table v2_catalog.db.t1 select a," +
+ s"count(distinct(b+c))," +
+ s"count(distinct(b)) * count(distinct(c))" +
+ s"from v2_catalog.db.t2 group by a")
+ assert(ret2 == Lineage(
+ List("v2_catalog.db.t2"),
+ List("v2_catalog.db.t1"),
+ List(
+ ("v2_catalog.db.t1.a", Set("v2_catalog.db.t2.a")),
+ ("v2_catalog.db.t1.b", Set("v2_catalog.db.t2.b",
"v2_catalog.db.t2.c")),
+ ("v2_catalog.db.t1.c", Set("v2_catalog.db.t2.b",
"v2_catalog.db.t2.c")))))
+ }
+ }
+
+ test("test grouping sets") {
+ withTable("t1", "t2") { _ =>
+ spark.sql("CREATE TABLE t1 (a string, b string, c string) USING hive")
+ spark.sql("CREATE TABLE t2 (a string, b string, c string, d string)
USING hive")
+ val ret0 =
+ exectractLineage(
+ s"insert into table t1 select a,b,GROUPING__ID " +
+ s"from t2 group by a,b,c,d grouping sets ((a,b,c), (a,b,d))")
+ assert(ret0 == Lineage(
+ List("default.t2"),
+ List("default.t1"),
+ List(
+ ("default.t1.a", Set("default.t2.a")),
+ ("default.t1.b", Set("default.t2.b")),
+ ("default.t1.c", Set()))))
+ }
+ }
+
+ test("test catch table with window function") {
+ withTable("t1", "t2") { _ =>
+ spark.sql("CREATE TABLE t1 (a string, b string) USING hive")
+ spark.sql("CREATE TABLE t2 (a string, b string) USING hive")
+ spark.sql(
+ s"cache table c1 select * from (" +
+ s"select a, b, row_number() over (partition by a order by b asc )
rank from t2)" +
+ s" where rank=1")
+ val ret0 = exectractLineage("insert overwrite table t1 select a, b from
c1")
Review Comment:
this is the test case for code `relationOutput.slice(0,
relationColumnLineage.size)`, right ?
does it work with `insert overwrite table t1 select a, rank from c1` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]