Karl-WangSK commented on a change in pull request #29360:
URL: https://github.com/apache/spark/pull/29360#discussion_r468955369



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1823,3 +1824,32 @@ object OptimizeLimitZero extends Rule[LogicalPlan] {
       empty(ll)
   }
 }
+
+/**
+ * Split [[Expand]] into several Expand if the projection size of Expand is 
larger
+ * than default projection size.
+ */
+object SplitAggregateWithExpand extends Rule[LogicalPlan] {
+  private def splitExpand(expand: Expand, num: Int): Seq[Expand] = {
+    val groupedProjections = expand.projections.grouped(num).toList
+    val expands: Seq[Expand] = groupedProjections.map {
+      projectionSeq => Expand(projectionSeq, expand.output, expand.child)
+    }
+    expands
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case a @ Aggregate(_, _, e @ Expand(projections, _, _)) =>
+      if (SQLConf.get.groupingWithUnion && projections.length
+        > SQLConf.get.groupingExpandProjections) {
+        val num = SQLConf.get.groupingExpandProjections
+        val subExpands = splitExpand(e, num)
+        val aggregates: Seq[Aggregate] = subExpands.map { expand =>
+          Aggregate(a.groupingExpressions, a.aggregateExpressions, expand)
+        }

Review comment:
       not the same data, they have passed the sql tests, the output is the 
same as expected




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to