maropu commented on a change in pull request #30300:
URL: https://github.com/apache/spark/pull/30300#discussion_r522556996



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
##########
@@ -895,6 +895,140 @@ class PlannerSuite extends SharedSparkSession with 
AdaptiveSparkPlanHelper {
     }
   }
 
+  test("SPARK-33399: aliases should be handled properly in 
PartitioningCollection output" +
+    " partitioning") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      withTempView("t1", "t2", "t3") {
+        spark.range(10).repartition($"id").createTempView("t1")
+        spark.range(20).repartition($"id").createTempView("t2")
+        spark.range(30).repartition($"id").createTempView("t3")
+        val planned = sql(
+          """
+            |SELECT t3.id as t3id
+            |FROM (
+            |    SELECT t1.id as t1id, t2.id as t2id
+            |    FROM t1, t2
+            |    WHERE t1.id = t2.id
+            |) t12, t3
+            |WHERE t1id = t3.id
+          """.stripMargin).queryExecution.executedPlan
+        val exchanges = planned.collect { case s: ShuffleExchangeExec => s }
+        assert(exchanges.size == 3)
+      }
+    }
+  }
+
+  test("SPARK-33399: aliases should be handled properly in HashPartitioning") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      withTempView("t1", "t2", "t3") {
+        spark.range(10).repartition($"id").createTempView("t1")
+        spark.range(20).repartition($"id").createTempView("t2")
+        spark.range(30).repartition($"id").createTempView("t3")
+        val planned = sql(
+          """
+            |SELECT t1id, t3.id as t3id
+            |FROM (
+            |    SELECT t1.id as t1id
+            |    FROM t1 LEFT SEMI JOIN t2
+            |    ON t1.id = t2.id
+            |) t12 INNER JOIN t3
+            |WHERE t1id = t3.id
+          """.stripMargin).queryExecution.executedPlan
+        val exchanges = planned.collect { case s: ShuffleExchangeExec => s }
+        assert(exchanges.size == 3)
+
+        val projects = planned.collect { case p: ProjectExec => p }
+        assert(projects.exists(_.outputPartitioning match {
+          case HashPartitioning(Seq(a: AttributeReference), _) => a.name == 
"t1id"
+          case _ => false
+        }))
+      }
+    }
+  }
+
+  test("SPARK-33399: alias handling should happen properly for 
RangePartitioning") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      val df = spark.range(1, 100)
+        .select(col("id").as("id1")).groupBy("id1").count()
+      // Plan for this will be Range -> ProjectWithAlias -> HashAggregate -> 
HashAggregate
+      // if Project normalizes alias in its Range outputPartitioning, then no 
Exchange should come
+      // in between HashAggregates
+      val planned = df.queryExecution.executedPlan
+      val exchanges = planned.collect { case s: ShuffleExchangeExec => s }
+      assert(exchanges.isEmpty)
+
+      val projects = planned.collect { case p: ProjectExec => p }
+      assert(projects.exists(_.outputPartitioning match {
+        case RangePartitioning(Seq(_@SortOrder(ar: AttributeReference, _, _, 
_)), _) =>

Review comment:
       Could you add the same assert in the other tests, too?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to