tanelk commented on a change in pull request #32769:
URL: https://github.com/apache/spark/pull/32769#discussion_r647238152
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
##########
@@ -42,9 +42,33 @@ case class ExpandExec(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"))
- // The GroupExpressions can output data with arbitrary partitioning, so set
it
- // as UNKNOWN partitioning
- override def outputPartitioning: Partitioning = UnknownPartitioning(0)
+ /**
+ * It is common for Expand to keep some of its inputs unchanged in all
projections.
+ * If the child's output is partitioned by those attributes, then so will be
+ * the output of the Expand.
+ * In general case the Expand can output data with arbitrary partitioning,
so set it
+ * as UNKNOWN partitioning.
+ */
+ override def outputPartitioning: Partitioning = {
+ lazy val passThroughAttrs = ExpressionSet(output.zipWithIndex.filter {
+ case (attr, i) => projections.forall(_(i).semanticEquals(attr))
+ }.map(_._1))
+
+ def getOutputPartitioning(partitioning: Partitioning): Partitioning = {
+ partitioning match {
+ case HashPartitioning(exprs, _) if
exprs.forall(passThroughAttrs.contains) =>
+ partitioning
+ case RangePartitioning(ordering, _)
+ if ordering.map(_.child).forall(passThroughAttrs.contains) =>
+ partitioning
+ case PartitioningCollection(partitionings) =>
+ PartitioningCollection(partitionings.map(getOutputPartitioning))
Review comment:
Can't the childs output partitioning be any partitioning? It shouldn't
depend on the parent node. Kinda like `FilterExec` has `outputPartitioning =
child.outputPartitioning`
I initially covered only `HashPartitioning` and `RangePartitioning`, but
@maropu also requested `PartitioningCollection`.
https://github.com/apache/spark/pull/32769#discussion_r647018067
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]