prakharjain09 commented on a change in pull request #30300:
URL: https://github.com/apache/spark/pull/30300#discussion_r522717726
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
##########
@@ -68,11 +65,8 @@ trait AliasAwareOutputOrdering extends
AliasAwareOutputExpression {
final override def outputOrdering: Seq[SortOrder] = {
if (hasAlias) {
- orderingExpressions.map { s =>
- s.child match {
- case a: AttributeReference => s.copy(child =
replaceAlias(a).getOrElse(a))
- case _ => s
- }
+ orderingExpressions.map { sortOrder =>
+ normalizeExpression(sortOrder).asInstanceOf[SortOrder]
}
Review comment:
done.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
##########
@@ -895,6 +895,140 @@ class PlannerSuite extends SharedSparkSession with
AdaptiveSparkPlanHelper {
}
}
+ test("SPARK-33399: aliases should be handled properly in
PartitioningCollection output" +
+ " partitioning") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withTempView("t1", "t2", "t3") {
+ spark.range(10).repartition($"id").createTempView("t1")
+ spark.range(20).repartition($"id").createTempView("t2")
+ spark.range(30).repartition($"id").createTempView("t3")
+ val planned = sql(
+ """
+ |SELECT t3.id as t3id
+ |FROM (
+ | SELECT t1.id as t1id, t2.id as t2id
+ | FROM t1, t2
+ | WHERE t1.id = t2.id
+ |) t12, t3
+ |WHERE t1id = t3.id
+ """.stripMargin).queryExecution.executedPlan
+ val exchanges = planned.collect { case s: ShuffleExchangeExec => s }
+ assert(exchanges.size == 3)
+ }
+ }
+ }
+
+ test("SPARK-33399: aliases should be handled properly in HashPartitioning") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withTempView("t1", "t2", "t3") {
+ spark.range(10).repartition($"id").createTempView("t1")
+ spark.range(20).repartition($"id").createTempView("t2")
+ spark.range(30).repartition($"id").createTempView("t3")
+ val planned = sql(
+ """
+ |SELECT t1id, t3.id as t3id
+ |FROM (
+ | SELECT t1.id as t1id
+ | FROM t1 LEFT SEMI JOIN t2
+ | ON t1.id = t2.id
+ |) t12 INNER JOIN t3
+ |WHERE t1id = t3.id
+ """.stripMargin).queryExecution.executedPlan
+ val exchanges = planned.collect { case s: ShuffleExchangeExec => s }
+ assert(exchanges.size == 3)
+
+ val projects = planned.collect { case p: ProjectExec => p }
+ assert(projects.exists(_.outputPartitioning match {
+ case HashPartitioning(Seq(a: AttributeReference), _) => a.name ==
"t1id"
+ case _ => false
+ }))
+ }
+ }
+ }
+
+ test("SPARK-33399: alias handling should happen properly for
RangePartitioning") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ val df = spark.range(1, 100)
+ .select(col("id").as("id1")).groupBy("id1").count()
+ // Plan for this will be Range -> ProjectWithAlias -> HashAggregate ->
HashAggregate
+ // if Project normalizes alias in its Range outputPartitioning, then no
Exchange should come
+ // in between HashAggregates
+ val planned = df.queryExecution.executedPlan
+ val exchanges = planned.collect { case s: ShuffleExchangeExec => s }
+ assert(exchanges.isEmpty)
+
+ val projects = planned.collect { case p: ProjectExec => p }
+ assert(projects.exists(_.outputPartitioning match {
+ case RangePartitioning(Seq(_@SortOrder(ar: AttributeReference, _, _,
_)), _) =>
Review comment:
done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]