maropu commented on a change in pull request #30659:
URL: https://github.com/apache/spark/pull/30659#discussion_r540950144
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala
##########
@@ -61,15 +63,24 @@ object RemoveRedundantProjects extends Rule[SparkPlan] {
val keepOrdering = a.aggregateExpressions
.exists(ae => ae.mode.equals(Final) || ae.mode.equals(PartialMerge))
a.mapChildren(removeProject(_, keepOrdering))
- // GenerateExec requires column ordering since it binds input rows
directly with its
- // requiredChildOutput without using child's output schema.
- case g: GenerateExec => g.mapChildren(removeProject(_, true))
- // JoinExec ordering requirement will inherit from its parent. If there
is no ProjectExec in
- // its ancestors, JoinExec should require output columns to be ordered.
- case o => o.mapChildren(removeProject(_, requireOrdering))
+ case p if canPassThrough(p) => p.mapChildren(removeProject(_,
requireOrdering))
+ case o => o.mapChildren(removeProject(_, requireOrdering = true))
}
}
+ /**
+ * Check if the given node can pass the ordering requirement from its parent.
+ */
+ private def canPassThrough(plan: SparkPlan): Boolean = plan match {
+ case _: FilterExec => true
+ // JoinExec ordering requirement should inherit from its parent. If there
is no ProjectExec in
+ // its ancestors, JoinExec should require output columns to be ordered,
and vice versa.
+ case _: BaseJoinExec => true
+ case _: WindowExec => true
+ case _: ExpandExec => true
Review comment:
Just a question; how did you extract the list (filter, join, window, and
expand) above? You extracted it from the existing tests?
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
##########
@@ -166,6 +166,53 @@ abstract class RemoveRedundantProjectsSuiteBase
assertProjectExec(query, 0, 1)
}
}
+
+ test("SPARK-33697: UnionExec should require column ordering") {
+ withTable("t1", "t2") {
+ spark.range(-10, 20)
+ .selectExpr(
+ "id",
+ "date_add(date '1950-01-01', cast(id as int)) as datecol",
+ "cast(id as string) strcol")
+ .write.mode("overwrite").format("parquet").saveAsTable("t1")
+ spark.range(-10, 20)
+ .selectExpr(
+ "cast(id as string) strcol",
+ "id",
+ "date_add(date '1950-01-01', cast(id as int)) as datecol")
+ .write.mode("overwrite").format("parquet").saveAsTable("t2")
+
+ val queryTemplate =
+ """
+ |SELECT DISTINCT datecol, strcol FROM
+ |(
+ |(SELECT datecol, id, strcol from t1)
+ | %s
+ |(SELECT datecol, id, strcol from t2)
+ |)
+ |""".stripMargin
+
+ Seq(("UNION", 2, 2), ("UNION ALL", 1, 2)).foreach { case (setOperation,
enabled, disabled) =>
+ val query = queryTemplate.format(setOperation)
+ assertProjectExec(query, enabled = enabled, disabled = disabled)
+ }
+ }
+ }
+
+ test("SPARK-33697: expand") {
Review comment:
Could you make the test title clearer?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]