maropu commented on a change in pull request #32769:
URL: https://github.com/apache/spark/pull/32769#discussion_r647019453
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
##########
@@ -42,9 +42,27 @@ case class ExpandExec(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"))
- // The GroupExpressions can output data with arbitrary partitioning, so set
it
- // as UNKNOWN partitioning
- override def outputPartitioning: Partitioning = UnknownPartitioning(0)
+ /**
+ * The Expand is commonly introduced by the RewriteDistinctAggregates
optimizer rule.
Review comment:
What does this comment mean? I think we have the three cases for
`Expand`: distinct aggregates, grouping analytics, and window aggregates.
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
##########
@@ -4003,6 +4003,56 @@ class SQLQuerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
}
checkAnswer(sql(s"select /*+ REPARTITION(3, a) */ a b from values('123')
t(a)"), Row("123"))
}
+
+ test("SPARK-35630: ExpandExec should not introduce unnecessary exchanges") {
+ withTable("test_table") {
+ spark.range(11)
+ .withColumn("group1", $"id" % 2)
+ .withColumn("group2", $"id" % 4)
+ .withColumn("a", $"id" % 3)
+ .withColumn("b", $"id" % 6)
+ .write.saveAsTable("test_table")
Review comment:
nit: for better test performance, could you use a temporary view instead?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
##########
@@ -42,9 +42,27 @@ case class ExpandExec(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"))
- // The GroupExpressions can output data with arbitrary partitioning, so set
it
- // as UNKNOWN partitioning
- override def outputPartitioning: Partitioning = UnknownPartitioning(0)
+ /**
+ * The Expand is commonly introduced by the RewriteDistinctAggregates
optimizer rule.
+ * In that case there can be several attributes that are kept as they are by
the Expand.
+ * If the child's output is partitioned by those attributes, then so will be
+ * the output of the Expand.
+ * In general case the Expand can output data with arbitrary partitioning,
so set it
+ * as UNKNOWN partitioning.
+ */
+ override def outputPartitioning: Partitioning = {
+ val stableAttrs = ExpressionSet(output.zipWithIndex.filter {
Review comment:
Could you make this var name clearer? e.g., `passThroughtAttrs`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
##########
@@ -42,9 +42,27 @@ case class ExpandExec(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"))
- // The GroupExpressions can output data with arbitrary partitioning, so set
it
- // as UNKNOWN partitioning
- override def outputPartitioning: Partitioning = UnknownPartitioning(0)
+ /**
+ * The Expand is commonly introduced by the RewriteDistinctAggregates
optimizer rule.
+ * In that case there can be several attributes that are kept as they are by
the Expand.
+ * If the child's output is partitioned by those attributes, then so will be
+ * the output of the Expand.
+ * In general case the Expand can output data with arbitrary partitioning,
so set it
+ * as UNKNOWN partitioning.
+ */
+ override def outputPartitioning: Partitioning = {
+ val stableAttrs = ExpressionSet(output.zipWithIndex.filter {
+ case (attr, i) => projections.forall(_(i).semanticEquals(attr))
+ }.map(_._1))
+
+ child.outputPartitioning match {
Review comment:
How about the `PartitioningCollection` case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]