cloud-fan commented on a change in pull request #29614:
URL: https://github.com/apache/spark/pull/29614#discussion_r481931375



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1224,4 +1224,68 @@ class AdaptiveQueryExecSuite
       })
     }
   }
+
+  test("SPARK-32765: EliminateJoinToEmptyRelation should respect exchange 
behavior " +
+    "when canChangeNumPartitions == false") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString,
+      // exclude ConvertToLocalRelation rule make it easier for Test.
+      SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> ConvertToLocalRelation.ruleName) 
{
+      withTempView("m", "s", "pm", "ps") {
+        sql(
+          """
+            |CREATE TEMPORARY VIEW m AS SELECT * FROM VALUES
+            |  (null, 1.0),
+            |  (2, 3.0),
+            |  (4, 5.0)
+            |  AS m(a, b)
+          """.stripMargin)
+
+        sql(
+          """
+            |CREATE TEMPORARY VIEW s AS SELECT * FROM VALUES
+            |  (null, 1.0),
+            |  (2, 3.0),
+            |  (6, 7.0)
+            |  AS s(c, d)
+          """.stripMargin)
+
+        spark.table("m").repartition(77).createOrReplaceTempView("pm")
+        spark.table("s").repartition(99).createOrReplaceTempView("ps")
+
+        // The follow cases can still be converted from Join to EmptyRelation,
+        // because optimizer put Repartition node after Join node by rule
+        // RewritePredicateSubquery or PushDownLeftSemiAntiJoin, hence 
converted from
+        // Join to EmptyRelation does not lost user specified number partition 
information.
+        Seq(
+          // NAAJ streamedSide canChangeNumPartitions == false
+          "SELECT a FROM pm WHERE pm.a not in (SELECT c FROM s)",
+          // NAAJ(hand-written left anti join) streamedSide 
canChangeNumPartitions == false
+          "SELECT a FROM pm LEFT ANTI JOIN s ON a = c or isnull(a = c)",
+          // LeftSemi streamedSide canChangeNumPartitions == false
+          "SELECT a FROM pm LEFT SEMI JOIN (SELECT * FROM s WHERE s.d > 100) 
ts ON pm.a = ts.c"
+        ).foreach(query => {
+          val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query)
+          val bhj = findTopLevelBroadcastHashJoin(plan)
+          assert(bhj.size == 1)
+          val join = findTopLevelBaseJoin(adaptivePlan)
+          assert(join.isEmpty)
+        })
+
+        Seq(
+          // Inner streamedSide(Left) canChangeNumPartitions == false
+          "SELECT /*+ broadcast(s) */a FROM pm, s WHERE pm.a = s.c and s.d > 
100",
+          // Inner streamedSide(Right) canChangeNumPartitions == false
+          "SELECT /*+ broadcast(m) */ a FROM m, ps WHERE m.a = ps.c and m.b > 
100"
+        ).foreach(query => {
+          val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query)
+          val bhj = findTopLevelBroadcastHashJoin(plan)
+          assert(bhj.size == 1)
+          val join = findTopLevelBaseJoin(adaptivePlan)
+          assert(join.nonEmpty)

Review comment:
       nit:
   ```
   val bhjInAQE = findTopLevelBroadcastHashJoin(adaptivePlan)
   assert(bhjInAQE.size == 1)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to