maropu commented on a change in pull request #30300:
URL: https://github.com/apache/spark/pull/30300#discussion_r522091225
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
##########
@@ -895,6 +895,121 @@ class PlannerSuite extends SharedSparkSession with
AdaptiveSparkPlanHelper {
}
}
+ test("SPARK-33399: aliases should be handled properly in
PartitioningCollection output" +
+ " partitioning") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withTempView("t1", "t2", "t3") {
+ spark.range(10).repartition($"id").createTempView("t1")
+ spark.range(20).repartition($"id").createTempView("t2")
+ spark.range(30).repartition($"id").createTempView("t3")
+ val planned = sql(
+ """
+ |SELECT t3.id as t3id
+ |FROM (
+ | SELECT t1.id as t1id, t2.id as t2id
+ | FROM t1, t2
+ | WHERE t1.id = t2.id
+ |) t12, t3
+ |WHERE t1id = t3.id
+ """.stripMargin).queryExecution.executedPlan
+ val exchanges = planned.collect { case s: ShuffleExchangeExec => s }
+ assert(exchanges.size == 3)
+ }
+ }
+ }
+
+ test("SPARK-33399: aliases should be handled properly in HashPartitioning") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withTempView("t1", "t2", "t3") {
+ spark.range(10).repartition($"id").createTempView("t1")
+ spark.range(20).repartition($"id").createTempView("t2")
+ spark.range(30).repartition($"id").createTempView("t3")
+ val planned = sql(
+ """
+ |SELECT t1id, t3.id as t3id
+ |FROM (
+ | SELECT t1.id as t1id
+ | FROM t1 LEFT SEMI JOIN t2
+ | ON t1.id = t2.id
+ |) t12 INNER JOIN t3
+ |WHERE t1id = t3.id
+ """.stripMargin).queryExecution.executedPlan
+ val exchanges = planned.collect { case s: ShuffleExchangeExec => s }
+ assert(exchanges.size == 3)
+ }
+ }
+ }
+
+ test("SPARK-33399: alias handling should happen properly for
RangePartitioning") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ val df = spark.range(1, 100)
+ .select(col("id").as("id1")).groupBy("id1").count()
+ // Plan for this will be Range -> ProjectWithAlias -> HashAggregate ->
HashAggregate
+ // if Project normalizes alias in its Range outputPartitioning, then no
Exchange should come
+ // in between HashAggregates
+ val planned = df.queryExecution.executedPlan
Review comment:
Ah, last my nit comment: could you check the query use
`RangePartitioning` correctly in this test (I have the same comment for the
other tests, too)?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]