[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...
Github user DonnyZone commented on a diff in the pull request: https://github.com/apache/spark/pull/19175#discussion_r140741265 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1287,3 +1288,33 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { a.copy(groupingExpressions = newGrouping) } } + +/** + * Splits [[Aggregate]] on [[Expand]], which has large number of projections, + * into various [[Aggregate]]s. + */ +object SplitAggregateWithExpand extends Rule[LogicalPlan] { + /** + * Split [[Expand]] operator to a number of [[Expand]] operators + */ + private def splitExpand(expand: Expand): Seq[Expand] = { +val len = expand.projections.length +val allProjections = expand.projections +Seq.tabulate(len)( + i => Expand(Seq(allProjections(i)), expand.output, expand.child) --- End diff -- And I think we may figure out some cost-based methods. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...
Github user DonnyZone commented on a diff in the pull request: https://github.com/apache/spark/pull/19175#discussion_r140741053 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1287,3 +1288,33 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { a.copy(groupingExpressions = newGrouping) } } + +/** + * Splits [[Aggregate]] on [[Expand]], which has large number of projections, + * into various [[Aggregate]]s. + */ +object SplitAggregateWithExpand extends Rule[LogicalPlan] { + /** + * Split [[Expand]] operator to a number of [[Expand]] operators + */ + private def splitExpand(expand: Expand): Seq[Expand] = { +val len = expand.projections.length +val allProjections = expand.projections +Seq.tabulate(len)( + i => Expand(Seq(allProjections(i)), expand.output, expand.child) --- End diff -- Can we set another config param here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19175#discussion_r140731720 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1287,3 +1288,33 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { a.copy(groupingExpressions = newGrouping) } } + +/** + * Splits [[Aggregate]] on [[Expand]], which has large number of projections, + * into various [[Aggregate]]s. + */ +object SplitAggregateWithExpand extends Rule[LogicalPlan] { + /** + * Split [[Expand]] operator to a number of [[Expand]] operators + */ + private def splitExpand(expand: Expand): Seq[Expand] = { +val len = expand.projections.length +val allProjections = expand.projections +Seq.tabulate(len)( + i => Expand(Seq(allProjections(i)), expand.output, expand.child) --- End diff -- How do you propose optimizing further? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...
Github user DonnyZone commented on a diff in the pull request: https://github.com/apache/spark/pull/19175#discussion_r140729331 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1287,3 +1288,33 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { a.copy(groupingExpressions = newGrouping) } } + +/** + * Splits [[Aggregate]] on [[Expand]], which has large number of projections, + * into various [[Aggregate]]s. + */ +object SplitAggregateWithExpand extends Rule[LogicalPlan] { + /** + * Split [[Expand]] operator to a number of [[Expand]] operators + */ + private def splitExpand(expand: Expand): Seq[Expand] = { +val len = expand.projections.length +val allProjections = expand.projections +Seq.tabulate(len)( + i => Expand(Seq(allProjections(i)), expand.output, expand.child) --- End diff -- Shall we keep Expand here for further optimization? I think we may put more than one projections together for Union operator. In our production, we found some cube queries may even have 22 dimensions, whch result to 2^22=4194304 projections. In such case, it is not appropriate to tansform it into "one-projection-one-child" for Union node. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19175#discussion_r140719222 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -481,6 +481,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val GROUPING_WITH_UNION = buildConf("spark.sql.grouping.union.enabled") +.doc("When true, the grouping analytics (i.e., cube, rollup, grouping sets) will be " + + "implemented by Union with a number of aggregates for each group. " + --- End diff -- `implemented using a union with aggregates for each group.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19175#discussion_r140718061 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1287,3 +1288,33 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { a.copy(groupingExpressions = newGrouping) } } + +/** + * Splits [[Aggregate]] on [[Expand]], which has large number of projections, + * into various [[Aggregate]]s. + */ +object SplitAggregateWithExpand extends Rule[LogicalPlan] { + /** + * Split [[Expand]] operator to a number of [[Expand]] operators + */ + private def splitExpand(expand: Expand): Seq[Expand] = { +val len = expand.projections.length +val allProjections = expand.projections +Seq.tabulate(len)( + i => Expand(Seq(allProjections(i)), expand.output, expand.child) +) + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case a @ Aggregate(_, _, e @ Expand(projections, _, _)) => + if (SQLConf.get.groupingWithUnion && projections.length > 1) { +val expands = splitExpand(e) +val aggregates: Seq[Aggregate] = Seq.tabulate(expands.length)( --- End diff -- You can just map over the `expands` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19175#discussion_r140717934 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1287,3 +1288,33 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { a.copy(groupingExpressions = newGrouping) } } + +/** + * Splits [[Aggregate]] on [[Expand]], which has large number of projections, + * into various [[Aggregate]]s. + */ +object SplitAggregateWithExpand extends Rule[LogicalPlan] { + /** + * Split [[Expand]] operator to a number of [[Expand]] operators + */ + private def splitExpand(expand: Expand): Seq[Expand] = { +val len = expand.projections.length +val allProjections = expand.projections +Seq.tabulate(len)( --- End diff -- You can just map over the `expand.projections` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19175#discussion_r140717827 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1287,3 +1288,33 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { a.copy(groupingExpressions = newGrouping) } } + +/** + * Splits [[Aggregate]] on [[Expand]], which has large number of projections, + * into various [[Aggregate]]s. + */ +object SplitAggregateWithExpand extends Rule[LogicalPlan] { + /** + * Split [[Expand]] operator to a number of [[Expand]] operators + */ + private def splitExpand(expand: Expand): Seq[Expand] = { +val len = expand.projections.length +val allProjections = expand.projections +Seq.tabulate(len)( + i => Expand(Seq(allProjections(i)), expand.output, expand.child) --- End diff -- Why not remove the Expand altogether. The only thing you you might need to do is to make sure that the expressions produce the correct attributes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...
GitHub user DonnyZone opened a pull request: https://github.com/apache/spark/pull/19175 [SPARK-21964][SQL]Enable splitting the Aggregate (on Expand) into a number of Aggregates for grouing analytics ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-21964 Current implementation for grouping analytics (i.e., cube, rollup and grouping sets) is "heavyweight" for many scenarios (e.g., high dimensions cube), as the Expand operator produces a large number of projections, resulting vast shuffle write. It may result into low performance or even OOM issues for direct buffer memory. This PR provides another choice which enables splitting the heavyweight aggregate into a number of lightweight aggregates for each group. Actually, it implements the grouping analytics as Union and executes the aggregates one by one. This optimization is opposite to the general sense of "avoding redundant data scan" The current splitting strategy is simple as one aggregation for one group. In future, we may figure out more intelligent splitting stategies (e.g., cost-based method). ## How was this patch tested? Unit tests Manual tests in production environment You can merge this pull request into a Git repository by running: $ git pull https://github.com/DonnyZone/spark spark-21964 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19175.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19175 commit da36e37df9c31901975c29dfa77cb7d648e94f40 Author: donnyzoneDate: 2017-09-09T13:46:48Z grouping with union --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org