viirya commented on a change in pull request #29360:
URL: https://github.com/apache/spark/pull/29360#discussion_r467601888
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1823,3 +1824,32 @@ object OptimizeLimitZero extends Rule[LogicalPlan] {
empty(ll)
}
}
+
+/**
+ * Split [[Expand]] into several Expand if the projection size of Expand is
larger
+ * than default projection size.
+ */
+object SplitAggregateWithExpand extends Rule[LogicalPlan] {
Review comment:
I'm curious about how this reduces shuffled data. I think the original
idea is, in the cases of Expand will produce huge data, splitting it to small
ones.
But shuffle is happened during Aggregate here, right? By splitting, the
total amount of shuffled data is not changed, but split into several ones. Does
it really result significant improvement?
I'm curious to look at the benchmark results.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1823,3 +1824,32 @@ object OptimizeLimitZero extends Rule[LogicalPlan] {
empty(ll)
}
}
+
+/**
+ * Split [[Expand]] into several Expand if the projection size of Expand is
larger
+ * than default projection size.
+ */
+object SplitAggregateWithExpand extends Rule[LogicalPlan] {
+ private def splitExpand(expand: Expand, num: Int): Seq[Expand] = {
+ val groupedProjections = expand.projections.grouped(num).toList
+ val expands: Seq[Expand] = groupedProjections.map {
+ projectionSeq => Expand(projectionSeq, expand.output, expand.child)
+ }
+ expands
+ }
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case a @ Aggregate(_, _, e @ Expand(projections, _, _)) =>
+ if (SQLConf.get.groupingWithUnion && projections.length
+ > SQLConf.get.groupingExpandProjections) {
+ val num = SQLConf.get.groupingExpandProjections
+ val subExpands = splitExpand(e, num)
+ val aggregates: Seq[Aggregate] = subExpands.map { expand =>
+ Aggregate(a.groupingExpressions, a.aggregateExpressions, expand)
+ }
Review comment:
These split expand + aggregate read same data. That's said this
optimization will read same data multiple times.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]