viirya commented on a change in pull request #32769:
URL: https://github.com/apache/spark/pull/32769#discussion_r647624637
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
##########
@@ -42,9 +42,33 @@ case class ExpandExec(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"))
- // The GroupExpressions can output data with arbitrary partitioning, so set
it
- // as UNKNOWN partitioning
- override def outputPartitioning: Partitioning = UnknownPartitioning(0)
+ /**
+ * It is common for Expand to keep some of its inputs unchanged in all
projections.
+ * If the child's output is partitioned by those attributes, then so will be
+ * the output of the Expand.
+ * In general case the Expand can output data with arbitrary partitioning,
so set it
+ * as UNKNOWN partitioning.
+ */
+ override def outputPartitioning: Partitioning = {
+ lazy val passThroughAttrs = ExpressionSet(output.zipWithIndex.filter {
+ case (attr, i) => projections.forall(_(i).semanticEquals(attr))
+ }.map(_._1))
+
+ def getOutputPartitioning(partitioning: Partitioning): Partitioning = {
+ partitioning match {
+ case HashPartitioning(exprs, _) if
exprs.forall(passThroughAttrs.contains) =>
+ partitioning
+ case RangePartitioning(ordering, _)
+ if ordering.map(_.child).forall(passThroughAttrs.contains) =>
+ partitioning
+ case PartitioningCollection(partitionings) =>
+ PartitioningCollection(partitionings.map(getOutputPartitioning))
Review comment:
I think we should. Just being curious on if there is an example case,
maybe a test case too?
BTW, should we remove any unknown partition from
`partitionings.map(getOutputPartitioning)`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]