[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...

2017-09-25 Thread DonnyZone
Github user DonnyZone commented on a diff in the pull request:

https://github.com/apache/spark/pull/19175#discussion_r140741265
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1287,3 +1288,33 @@ object RemoveRepetitionFromGroupExpressions extends 
Rule[LogicalPlan] {
   a.copy(groupingExpressions = newGrouping)
   }
 }
+
+/**
+ * Splits [[Aggregate]] on [[Expand]], which has large number of 
projections,
+ * into various [[Aggregate]]s.
+ */
+object SplitAggregateWithExpand extends Rule[LogicalPlan] {
+  /**
+   * Split [[Expand]] operator to a number of [[Expand]] operators
+   */
+  private def splitExpand(expand: Expand): Seq[Expand] = {
+val len = expand.projections.length
+val allProjections = expand.projections
+Seq.tabulate(len)(
+  i => Expand(Seq(allProjections(i)), expand.output, expand.child)
--- End diff --

And I think we may figure out some cost-based methods.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...

2017-09-25 Thread DonnyZone
Github user DonnyZone commented on a diff in the pull request:

https://github.com/apache/spark/pull/19175#discussion_r140741053
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1287,3 +1288,33 @@ object RemoveRepetitionFromGroupExpressions extends 
Rule[LogicalPlan] {
   a.copy(groupingExpressions = newGrouping)
   }
 }
+
+/**
+ * Splits [[Aggregate]] on [[Expand]], which has large number of 
projections,
+ * into various [[Aggregate]]s.
+ */
+object SplitAggregateWithExpand extends Rule[LogicalPlan] {
+  /**
+   * Split [[Expand]] operator to a number of [[Expand]] operators
+   */
+  private def splitExpand(expand: Expand): Seq[Expand] = {
+val len = expand.projections.length
+val allProjections = expand.projections
+Seq.tabulate(len)(
+  i => Expand(Seq(allProjections(i)), expand.output, expand.child)
--- End diff --

Can we set another config param here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...

2017-09-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19175#discussion_r140731720
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1287,3 +1288,33 @@ object RemoveRepetitionFromGroupExpressions extends 
Rule[LogicalPlan] {
   a.copy(groupingExpressions = newGrouping)
   }
 }
+
+/**
+ * Splits [[Aggregate]] on [[Expand]], which has large number of 
projections,
+ * into various [[Aggregate]]s.
+ */
+object SplitAggregateWithExpand extends Rule[LogicalPlan] {
+  /**
+   * Split [[Expand]] operator to a number of [[Expand]] operators
+   */
+  private def splitExpand(expand: Expand): Seq[Expand] = {
+val len = expand.projections.length
+val allProjections = expand.projections
+Seq.tabulate(len)(
+  i => Expand(Seq(allProjections(i)), expand.output, expand.child)
--- End diff --

How do you propose optimizing further?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...

2017-09-25 Thread DonnyZone
Github user DonnyZone commented on a diff in the pull request:

https://github.com/apache/spark/pull/19175#discussion_r140729331
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1287,3 +1288,33 @@ object RemoveRepetitionFromGroupExpressions extends 
Rule[LogicalPlan] {
   a.copy(groupingExpressions = newGrouping)
   }
 }
+
+/**
+ * Splits [[Aggregate]] on [[Expand]], which has large number of 
projections,
+ * into various [[Aggregate]]s.
+ */
+object SplitAggregateWithExpand extends Rule[LogicalPlan] {
+  /**
+   * Split [[Expand]] operator to a number of [[Expand]] operators
+   */
+  private def splitExpand(expand: Expand): Seq[Expand] = {
+val len = expand.projections.length
+val allProjections = expand.projections
+Seq.tabulate(len)(
+  i => Expand(Seq(allProjections(i)), expand.output, expand.child)
--- End diff --

Shall we keep Expand here for further optimization?  I think we may put 
more than one projections together for Union operator.

In our production, we found some cube queries may even have 22 dimensions, 
whch result to 2^22=4194304 projections. In such case, it is not appropriate to 
tansform it into "one-projection-one-child" for Union node.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...

2017-09-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19175#discussion_r140719222
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -481,6 +481,13 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val GROUPING_WITH_UNION = buildConf("spark.sql.grouping.union.enabled")
+.doc("When true, the grouping analytics (i.e., cube, rollup, grouping 
sets) will be " +
+  "implemented by Union with a number of aggregates for each group. " +
--- End diff --

`implemented using a union with aggregates for each group.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...

2017-09-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19175#discussion_r140718061
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1287,3 +1288,33 @@ object RemoveRepetitionFromGroupExpressions extends 
Rule[LogicalPlan] {
   a.copy(groupingExpressions = newGrouping)
   }
 }
+
+/**
+ * Splits [[Aggregate]] on [[Expand]], which has large number of 
projections,
+ * into various [[Aggregate]]s.
+ */
+object SplitAggregateWithExpand extends Rule[LogicalPlan] {
+  /**
+   * Split [[Expand]] operator to a number of [[Expand]] operators
+   */
+  private def splitExpand(expand: Expand): Seq[Expand] = {
+val len = expand.projections.length
+val allProjections = expand.projections
+Seq.tabulate(len)(
+  i => Expand(Seq(allProjections(i)), expand.output, expand.child)
+)
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case a @ Aggregate(_, _, e @ Expand(projections, _, _)) =>
+  if (SQLConf.get.groupingWithUnion && projections.length > 1) {
+val expands = splitExpand(e)
+val aggregates: Seq[Aggregate] = Seq.tabulate(expands.length)(
--- End diff --

You can just map over the `expands`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...

2017-09-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19175#discussion_r140717934
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1287,3 +1288,33 @@ object RemoveRepetitionFromGroupExpressions extends 
Rule[LogicalPlan] {
   a.copy(groupingExpressions = newGrouping)
   }
 }
+
+/**
+ * Splits [[Aggregate]] on [[Expand]], which has large number of 
projections,
+ * into various [[Aggregate]]s.
+ */
+object SplitAggregateWithExpand extends Rule[LogicalPlan] {
+  /**
+   * Split [[Expand]] operator to a number of [[Expand]] operators
+   */
+  private def splitExpand(expand: Expand): Seq[Expand] = {
+val len = expand.projections.length
+val allProjections = expand.projections
+Seq.tabulate(len)(
--- End diff --

You can just map over the `expand.projections`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...

2017-09-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19175#discussion_r140717827
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1287,3 +1288,33 @@ object RemoveRepetitionFromGroupExpressions extends 
Rule[LogicalPlan] {
   a.copy(groupingExpressions = newGrouping)
   }
 }
+
+/**
+ * Splits [[Aggregate]] on [[Expand]], which has large number of 
projections,
+ * into various [[Aggregate]]s.
+ */
+object SplitAggregateWithExpand extends Rule[LogicalPlan] {
+  /**
+   * Split [[Expand]] operator to a number of [[Expand]] operators
+   */
+  private def splitExpand(expand: Expand): Seq[Expand] = {
+val len = expand.projections.length
+val allProjections = expand.projections
+Seq.tabulate(len)(
+  i => Expand(Seq(allProjections(i)), expand.output, expand.child)
--- End diff --

Why not remove the Expand altogether. The only thing you you might need to 
do is to make sure that the expressions produce the correct attributes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19175: [SPARK-21964][SQL]Enable splitting the Aggregate ...

2017-09-09 Thread DonnyZone
GitHub user DonnyZone opened a pull request:

https://github.com/apache/spark/pull/19175

[SPARK-21964][SQL]Enable splitting the Aggregate (on Expand) into a number 
of Aggregates for grouing analytics

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-21964

Current implementation for grouping analytics (i.e., cube, rollup and 
grouping sets) is "heavyweight" for many scenarios (e.g., high dimensions 
cube), as the Expand operator produces a large number of projections, resulting 
vast shuffle write.  It may result into low performance or even OOM issues for 
direct buffer memory.

This PR provides another choice which enables splitting the heavyweight 
aggregate into a number of lightweight aggregates for each group. Actually, it 
implements the grouping analytics as Union and executes the aggregates one by 
one. This optimization is opposite to the general sense of "avoding redundant 
data scan"

The current splitting strategy is simple as one aggregation for one group. 
In future, we may figure out more intelligent splitting stategies (e.g., 
cost-based method).

## How was this patch tested?

Unit tests
Manual tests in production environment


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/DonnyZone/spark spark-21964

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19175.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19175


commit da36e37df9c31901975c29dfa77cb7d648e94f40
Author: donnyzone 
Date:   2017-09-09T13:46:48Z

grouping with union




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org