Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/19050#discussion_r137167260
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
---
@@ -875,4 +876,70 @@ class SubquerySuite extends QueryTest with
SharedSQLContext {
assert(e.message.contains("cannot resolve '`a`' given input columns:
[t.i, t.j]"))
}
}
+
+ test("SPARK-21835: Join in correlated subquery should be
duplicateResolved: case 1") {
+ withTable("t1") {
+ withTempPath { path =>
+ Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
+ sql(s"CREATE TABLE t1 USING parquet LOCATION '${path.toURI}'")
+
+ val sqlText =
+ """
+ |SELECT * FROM t1
+ |WHERE
+ |NOT EXISTS (SELECT * FROM t1)
+ """.stripMargin
+ val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan
+ val join = optimizedPlan.collect {
+ case j: Join => j
+ }.head.asInstanceOf[Join]
+ assert(join.duplicateResolved)
+ assert(optimizedPlan.resolved)
+ }
+ }
+ }
+
+ test("SPARK-21835: Join in correlated subquery should be
duplicateResolved: case 2") {
+ withTable("t1", "t2", "t3") {
+ withTempPath { path =>
+ val data = Seq((1, 1, 1), (2, 0, 2))
+
+ data.toDF("t1a", "t1b", "t1c").write.parquet(path.getCanonicalPath
+ "/t1")
+ data.toDF("t2a", "t2b", "t2c").write.parquet(path.getCanonicalPath
+ "/t2")
+ data.toDF("t3a", "t3b", "t3c").write.parquet(path.getCanonicalPath
+ "/t3")
+
+ sql(s"CREATE TABLE t1 USING parquet LOCATION '${path.toURI}/t1'")
+ sql(s"CREATE TABLE t2 USING parquet LOCATION '${path.toURI}/t2'")
+ sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.toURI}/t3'")
+
+ val sqlText =
+ s"""
+ |SELECT *
+ |FROM (SELECT *
+ | FROM t2
+ | WHERE t2c IN (SELECT t1c
+ | FROM t1
+ | WHERE t1a = t2a)
+ | UNION
+ | SELECT *
+ | FROM t3
+ | WHERE t3a IN (SELECT t2a
+ | FROM t2
+ | UNION ALL
+ | SELECT t1a
+ | FROM t1
+ | WHERE t1b > 0)) t4
+ |WHERE t4.t2b IN (SELECT Min(t3b)
+ | FROM t3
+ | WHERE t4.t2a = t3a)
+ """.stripMargin
+ val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan
+ val joinNodes = optimizedPlan.collect {
+ case j: Join => j
+ }.map(_.asInstanceOf[Join])
+ joinNodes.map(j => assert(j.duplicateResolved))
--- End diff --
```Scala
val joinNodes = optimizedPlan.collect { case j: Join => j }
joinNodes.foreach(j => assert(j.duplicateResolved))
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]