[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-14 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r454391588



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -102,23 +102,127 @@ import org.apache.spark.sql.types.IntegerType
  * {{{
  * Aggregate(
  *key = ['key]
- *functions = [count(if (('gid = 1)) 'cat1 else null),
- * count(if (('gid = 2)) 'cat2 else null),
+ *functions = [count(if (('gid = 1)) '_gen_attr_1 else null),
+ * count(if (('gid = 2)) '_gen_attr_2 else null),
  * first(if (('gid = 0)) 'total else null) ignore nulls]
  *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
  *   Aggregate(
- *  key = ['key, 'cat1, 'cat2, 'gid]
- *  functions = [sum('value) with FILTER('id > 1)]
- *  output = ['key, 'cat1, 'cat2, 'gid, 'total])
+ *  key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid]
+ *  functions = [sum('_gen_attr_3)]
+ *  output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total])
  * Expand(
- *projections = [('key, null, null, 0, cast('value as bigint), 'id),
+ *projections = [('key, null, null, 0, if ('id > 1) cast('value as 
bigint) else null, 'id),
  *   ('key, 'cat1, null, 1, null, null),
  *   ('key, null, 'cat2, 2, null, null)]
- *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id])
+ *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id])
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses and 
have
+ * not other distinct aggregate function (in sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count('_gen_attr_1),
+  *  sum('_gen_attr_2)]
+ *  output = ['key, 'cat1_cnt, 'total])
+ * Project(
+ *projectionList = ['key, if ('id > 1) 'cat1 else null, cast('value as 
bigint)]

Review comment:
   I mean to unify the implementations of the filter clause that are 
handled by this rule. This case is not handled by this rule before your PR. 
Sorry if I didn't make myself clear enough.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-14 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r454327250



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -102,23 +102,127 @@ import org.apache.spark.sql.types.IntegerType
  * {{{
  * Aggregate(
  *key = ['key]
- *functions = [count(if (('gid = 1)) 'cat1 else null),
- * count(if (('gid = 2)) 'cat2 else null),
+ *functions = [count(if (('gid = 1)) '_gen_attr_1 else null),
+ * count(if (('gid = 2)) '_gen_attr_2 else null),
  * first(if (('gid = 0)) 'total else null) ignore nulls]
  *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
  *   Aggregate(
- *  key = ['key, 'cat1, 'cat2, 'gid]
- *  functions = [sum('value) with FILTER('id > 1)]
- *  output = ['key, 'cat1, 'cat2, 'gid, 'total])
+ *  key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid]
+ *  functions = [sum('_gen_attr_3)]
+ *  output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total])
  * Expand(
- *projections = [('key, null, null, 0, cast('value as bigint), 'id),
+ *projections = [('key, null, null, 0, if ('id > 1) cast('value as 
bigint) else null, 'id),
  *   ('key, 'cat1, null, 1, null, null),
  *   ('key, null, 'cat2, 2, null, null)]
- *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id])
+ *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id])
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses and 
have
+ * not other distinct aggregate function (in sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count('_gen_attr_1),
+  *  sum('_gen_attr_2)]
+ *  output = ['key, 'cat1_cnt, 'total])
+ * Project(
+ *projectionList = ['key, if ('id > 1) 'cat1 else null, cast('value as 
bigint)]

Review comment:
   This rule should be skipped if there is only one distinct. Having a 
filter or not shouldn't change it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-14 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r454326711



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -102,23 +102,127 @@ import org.apache.spark.sql.types.IntegerType
  * {{{
  * Aggregate(
  *key = ['key]
- *functions = [count(if (('gid = 1)) 'cat1 else null),
- * count(if (('gid = 2)) 'cat2 else null),
+ *functions = [count(if (('gid = 1)) '_gen_attr_1 else null),
+ * count(if (('gid = 2)) '_gen_attr_2 else null),
  * first(if (('gid = 0)) 'total else null) ignore nulls]
  *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
  *   Aggregate(
- *  key = ['key, 'cat1, 'cat2, 'gid]
- *  functions = [sum('value) with FILTER('id > 1)]
- *  output = ['key, 'cat1, 'cat2, 'gid, 'total])
+ *  key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid]
+ *  functions = [sum('_gen_attr_3)]
+ *  output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total])
  * Expand(
- *projections = [('key, null, null, 0, cast('value as bigint), 'id),
+ *projections = [('key, null, null, 0, if ('id > 1) cast('value as 
bigint) else null, 'id),
  *   ('key, 'cat1, null, 1, null, null),
  *   ('key, null, 'cat2, 2, null, null)]
- *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id])
+ *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id])
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses and 
have
+ * not other distinct aggregate function (in sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count('_gen_attr_1),
+  *  sum('_gen_attr_2)]
+ *  output = ['key, 'cat1_cnt, 'total])
+ * Project(
+ *projectionList = ['key, if ('id > 1) 'cat1 else null, cast('value as 
bigint)]

Review comment:
   Is this necessary? The query can work fine even if we don't add this 
Project in this rule, right?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-13 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r453513546



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -102,23 +102,126 @@ import org.apache.spark.sql.types.IntegerType
  * {{{
  * Aggregate(
  *key = ['key]
- *functions = [count(if (('gid = 1)) 'cat1 else null),
- * count(if (('gid = 2)) 'cat2 else null),
+ *functions = [count(if (('gid = 1)) '_gen_attr_1 else null),
+ * count(if (('gid = 2)) '_gen_attr_2 else null),
  * first(if (('gid = 0)) 'total else null) ignore nulls]
  *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
  *   Aggregate(
- *  key = ['key, 'cat1, 'cat2, 'gid]
- *  functions = [sum('value) with FILTER('id > 1)]
- *  output = ['key, 'cat1, 'cat2, 'gid, 'total])
+ *  key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid]
+ *  functions = [sum('_gen_attr_3)]
+ *  output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total])
  * Expand(
- *projections = [('key, null, null, 0, cast('value as bigint), 'id),
+ *projections = [('key, null, null, 0, if ('id > 1) cast('value as 
bigint) else null, 'id),
  *   ('key, 'cat1, null, 1, null, null),
  *   ('key, null, 'cat2, 2, null, null)]
- *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id])
+ *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * Third example: single distinct aggregate function with filter clauses and 
have
+ * not other distinct aggregate function (in sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'total])
+ *   LocalTableScan [...]

Review comment:
   Do we need to rewrite this query? The planner can handle single distinct 
agg func AFAIK.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-13 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r453511010



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -102,23 +102,126 @@ import org.apache.spark.sql.types.IntegerType
  * {{{
  * Aggregate(
  *key = ['key]
- *functions = [count(if (('gid = 1)) 'cat1 else null),
- * count(if (('gid = 2)) 'cat2 else null),
+ *functions = [count(if (('gid = 1)) '_gen_attr_1 else null),
+ * count(if (('gid = 2)) '_gen_attr_2 else null),
  * first(if (('gid = 0)) 'total else null) ignore nulls]
  *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
  *   Aggregate(
- *  key = ['key, 'cat1, 'cat2, 'gid]
- *  functions = [sum('value) with FILTER('id > 1)]
- *  output = ['key, 'cat1, 'cat2, 'gid, 'total])
+ *  key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid]
+ *  functions = [sum('_gen_attr_3)]
+ *  output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total])
  * Expand(
- *projections = [('key, null, null, 0, cast('value as bigint), 'id),
+ *projections = [('key, null, null, 0, if ('id > 1) cast('value as 
bigint) else null, 'id),
  *   ('key, 'cat1, null, 1, null, null),
  *   ('key, null, 'cat2, 2, null, null)]
- *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id])
+ *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * Third example: single distinct aggregate function with filter clauses and 
have
+ * not other distinct aggregate function (in sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count('_gen_attr_1),
+  *  sum('_gen_attr_2)]
+ *  output = ['key, 'cat1_cnt, 'total])
+ * Project(
+ *projectionList = ['key, if ('id > 1) 'cat1 else null, cast('value as 
bigint)]
+ *output = ['key, '_gen_attr_1, '_gen_attr_2])
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Four example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * COUNT(DISTINCT cat2) as cat2_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * COUNT(DISTINCT 'cat2),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count(if (('gid = 1)) '_gen_attr_1 else null),
+ *   count(if (('gid = 2)) '_gen_attr_2 else null),
+ *   first(if (('gid = 0)) 'total else null) ignore nulls]
+ *  output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ * Aggregate(
+ *key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid]
+ *functions = [sum('_gen_attr_3)]
+ *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total])
+ *   Expand(
+ *  projections = [('key, null, null, 0, cast('value as bigint)),
+ * ('key, if ('id > 1) 'cat1 else null, null, 1, null),
+ * ('key, null, 'cat2, 2, null)]
+ *  output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3])
+ * LocalTableScan [...]
+ * }}}
+ *
+ * The rule consists of the two phases as follows:
+ *
+ * In the first phase, if the aggregate query exists filter clauses, project 
the output of
+ * the child of the aggregate query:
+ * 1. Project the data. There are three aggregation groups in this query:
+ *i. the non-distinct group;
+ *ii. the distinct 'cat1 group;
+ *iii. the distinct 'cat2 group with filter clause.
+ *Because there is at least one group with filter clause (e.g. the 
distinct 'cat2
+ *group with filter clause), then will project the data.
+ * 2. Avoid projections that may output the same attributes. There are three 
aggregation groups
+ *in this query:
+ *i. the non-distinct 'cat1 group;
+ *ii. the distinct 

[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-13 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r453510469



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -102,23 +102,126 @@ import org.apache.spark.sql.types.IntegerType
  * {{{
  * Aggregate(
  *key = ['key]
- *functions = [count(if (('gid = 1)) 'cat1 else null),
- * count(if (('gid = 2)) 'cat2 else null),
+ *functions = [count(if (('gid = 1)) '_gen_attr_1 else null),
+ * count(if (('gid = 2)) '_gen_attr_2 else null),
  * first(if (('gid = 0)) 'total else null) ignore nulls]
  *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
  *   Aggregate(
- *  key = ['key, 'cat1, 'cat2, 'gid]
- *  functions = [sum('value) with FILTER('id > 1)]
- *  output = ['key, 'cat1, 'cat2, 'gid, 'total])
+ *  key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid]
+ *  functions = [sum('_gen_attr_3)]
+ *  output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total])
  * Expand(
- *projections = [('key, null, null, 0, cast('value as bigint), 'id),
+ *projections = [('key, null, null, 0, if ('id > 1) cast('value as 
bigint) else null, 'id),
  *   ('key, 'cat1, null, 1, null, null),
  *   ('key, null, 'cat2, 2, null, null)]
- *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id])
+ *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * Third example: single distinct aggregate function with filter clauses and 
have
+ * not other distinct aggregate function (in sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count('_gen_attr_1),
+  *  sum('_gen_attr_2)]
+ *  output = ['key, 'cat1_cnt, 'total])
+ * Project(
+ *projectionList = ['key, if ('id > 1) 'cat1 else null, cast('value as 
bigint)]
+ *output = ['key, '_gen_attr_1, '_gen_attr_2])
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Four example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * COUNT(DISTINCT cat2) as cat2_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * COUNT(DISTINCT 'cat2),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count(if (('gid = 1)) '_gen_attr_1 else null),
+ *   count(if (('gid = 2)) '_gen_attr_2 else null),
+ *   first(if (('gid = 0)) 'total else null) ignore nulls]
+ *  output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ * Aggregate(
+ *key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid]
+ *functions = [sum('_gen_attr_3)]
+ *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total])
+ *   Expand(
+ *  projections = [('key, null, null, 0, cast('value as bigint)),
+ * ('key, if ('id > 1) 'cat1 else null, null, 1, null),
+ * ('key, null, 'cat2, 2, null)]
+ *  output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3])
+ * LocalTableScan [...]
+ * }}}
+ *
+ * The rule consists of the two phases as follows:
+ *
+ * In the first phase, if the aggregate query exists filter clauses, project 
the output of
+ * the child of the aggregate query:
+ * 1. Project the data. There are three aggregation groups in this query:
+ *i. the non-distinct group;
+ *ii. the distinct 'cat1 group;
+ *iii. the distinct 'cat2 group with filter clause.
+ *Because there is at least one group with filter clause (e.g. the 
distinct 'cat2

Review comment:
   When there is at least one aggregate function has the filter clause, we 
add a project node on the input plan.

##
File path: 

[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-13 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r453509648



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -102,23 +102,126 @@ import org.apache.spark.sql.types.IntegerType
  * {{{
  * Aggregate(
  *key = ['key]
- *functions = [count(if (('gid = 1)) 'cat1 else null),
- * count(if (('gid = 2)) 'cat2 else null),
+ *functions = [count(if (('gid = 1)) '_gen_attr_1 else null),
+ * count(if (('gid = 2)) '_gen_attr_2 else null),
  * first(if (('gid = 0)) 'total else null) ignore nulls]
  *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
  *   Aggregate(
- *  key = ['key, 'cat1, 'cat2, 'gid]
- *  functions = [sum('value) with FILTER('id > 1)]
- *  output = ['key, 'cat1, 'cat2, 'gid, 'total])
+ *  key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid]
+ *  functions = [sum('_gen_attr_3)]
+ *  output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total])
  * Expand(
- *projections = [('key, null, null, 0, cast('value as bigint), 'id),
+ *projections = [('key, null, null, 0, if ('id > 1) cast('value as 
bigint) else null, 'id),
  *   ('key, 'cat1, null, 1, null, null),
  *   ('key, null, 'cat2, 2, null, null)]
- *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id])
+ *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * Third example: single distinct aggregate function with filter clauses and 
have
+ * not other distinct aggregate function (in sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count('_gen_attr_1),
+  *  sum('_gen_attr_2)]
+ *  output = ['key, 'cat1_cnt, 'total])
+ * Project(
+ *projectionList = ['key, if ('id > 1) 'cat1 else null, cast('value as 
bigint)]
+ *output = ['key, '_gen_attr_1, '_gen_attr_2])
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Four example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * COUNT(DISTINCT cat2) as cat2_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * COUNT(DISTINCT 'cat2),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count(if (('gid = 1)) '_gen_attr_1 else null),
+ *   count(if (('gid = 2)) '_gen_attr_2 else null),
+ *   first(if (('gid = 0)) 'total else null) ignore nulls]
+ *  output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ * Aggregate(
+ *key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid]
+ *functions = [sum('_gen_attr_3)]
+ *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total])
+ *   Expand(
+ *  projections = [('key, null, null, 0, cast('value as bigint)),
+ * ('key, if ('id > 1) 'cat1 else null, null, 1, null),
+ * ('key, null, 'cat2, 2, null)]
+ *  output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3])
+ * LocalTableScan [...]
+ * }}}
+ *
+ * The rule consists of the two phases as follows:
+ *
+ * In the first phase, if the aggregate query exists filter clauses, project 
the output of
+ * the child of the aggregate query:
+ * 1. Project the data. There are three aggregation groups in this query:
+ *i. the non-distinct group;
+ *ii. the distinct 'cat1 group;
+ *iii. the distinct 'cat2 group with filter clause.

Review comment:
   This doesn't match the group. Maybe just make it general `the distinct 
group without filter clause` and `the distinct group with filter clause`





This is an automated message from the Apache Git Service.
To respond to the message, please log 

[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-13 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r453508040



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -102,23 +102,126 @@ import org.apache.spark.sql.types.IntegerType
  * {{{
  * Aggregate(
  *key = ['key]
- *functions = [count(if (('gid = 1)) 'cat1 else null),
- * count(if (('gid = 2)) 'cat2 else null),
+ *functions = [count(if (('gid = 1)) '_gen_attr_1 else null),
+ * count(if (('gid = 2)) '_gen_attr_2 else null),
  * first(if (('gid = 0)) 'total else null) ignore nulls]
  *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
  *   Aggregate(
- *  key = ['key, 'cat1, 'cat2, 'gid]
- *  functions = [sum('value) with FILTER('id > 1)]
- *  output = ['key, 'cat1, 'cat2, 'gid, 'total])
+ *  key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid]
+ *  functions = [sum('_gen_attr_3)]
+ *  output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total])
  * Expand(
- *projections = [('key, null, null, 0, cast('value as bigint), 'id),
+ *projections = [('key, null, null, 0, if ('id > 1) cast('value as 
bigint) else null, 'id),
  *   ('key, 'cat1, null, 1, null, null),
  *   ('key, null, 'cat2, 2, null, null)]
- *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id])
+ *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * Third example: single distinct aggregate function with filter clauses and 
have
+ * not other distinct aggregate function (in sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count('_gen_attr_1),
+  *  sum('_gen_attr_2)]
+ *  output = ['key, 'cat1_cnt, 'total])
+ * Project(
+ *projectionList = ['key, if ('id > 1) 'cat1 else null, cast('value as 
bigint)]
+ *output = ['key, '_gen_attr_1, '_gen_attr_2])
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Four example: single distinct aggregate function with filter clauses (in 
sql):

Review comment:
   `single`  -> `more than one`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-08 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r451635926



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -118,7 +118,75 @@ import org.apache.spark.sql.types.IntegerType
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * COUNT(DISTINCT cat2) as cat2_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * COUNT(DISTINCT 'cat2),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count(if (('gid = 1)) '_gen_distinct_1 else null),
+ *   count(if (('gid = 2)) '_gen_distinct_2 else null),
+ *   first(if (('gid = 0)) 'total else null) ignore nulls]
+ *  output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ * Aggregate(
+ *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid]
+ *functions = [sum('value)]
+ *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total])
+ *   Expand(
+ *  projections = [('key, null, null, 0, cast('value as bigint)),
+ * ('key, if ('id > 1) 'cat1 else null, null, 1, null),
+ * ('key, null, 'cat2, 2, null)]
+ *  output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value])
+ * LocalTableScan [...]
+ * }}}
+ *
+ * The rule consists of the two phases as follows:
+ *
+ * In the first phase, if the aggregate query with distinct aggregations and
+ * filter clauses, project the output of the child of the aggregate query:
+ * 1. Project the data. There are three aggregation groups in this query:

Review comment:
   can you update the comment?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-08 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r451635560



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -118,7 +118,75 @@ import org.apache.spark.sql.types.IntegerType
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * COUNT(DISTINCT cat2) as cat2_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * COUNT(DISTINCT 'cat2),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(

Review comment:
   This plan rewriting LGTM. Shall we update the second example to make it 
consistent with this example?
   ```
  ...
  Aggregate(
 key = ...
 functions = [sum('value)]
 output = ...
Expand(
   projections = [('key, null, null, 0, if ('id > 1) cast('value as 
bigint) else null, 'id),
  (...),
  (...)]
   output = [...])
  LocalTableScan [...]
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-08 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r451630462



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -118,7 +118,75 @@ import org.apache.spark.sql.types.IntegerType
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * COUNT(DISTINCT cat2) as cat2_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * COUNT(DISTINCT 'cat2),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count(if (('gid = 1)) '_gen_distinct_1 else null),
+ *   count(if (('gid = 2)) '_gen_distinct_2 else null),
+ *   first(if (('gid = 0)) 'total else null) ignore nulls]
+ *  output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ * Aggregate(
+ *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid]
+ *functions = [sum('value)]

Review comment:
   We can list some builtin agg functions that ignore nulls, and omit the 
FILTER for them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-08 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r451629863



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -118,7 +118,75 @@ import org.apache.spark.sql.types.IntegerType
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * COUNT(DISTINCT cat2) as cat2_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * COUNT(DISTINCT 'cat2),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count(if (('gid = 1)) '_gen_distinct_1 else null),
+ *   count(if (('gid = 2)) '_gen_distinct_2 else null),
+ *   first(if (('gid = 0)) 'total else null) ignore nulls]
+ *  output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ * Aggregate(
+ *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid]
+ *functions = [sum('value)]

Review comment:
   We assume the agg func ignores null inputs, which may not be true. Shall 
we rewrite it to `sum('value) with FILTER('value is not null)`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-08 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r451624190



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -118,7 +118,75 @@ import org.apache.spark.sql.types.IntegerType
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * COUNT(DISTINCT cat2) as cat2_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * COUNT(DISTINCT 'cat2),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count(if (('gid = 1)) '_gen_distinct_1 else null),
+ *   count(if (('gid = 2)) '_gen_distinct_2 else null),
+ *   first(if (('gid = 0)) 'total else null) ignore nulls]
+ *  output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ * Aggregate(
+ *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid]
+ *functions = [sum('value)]
+ *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total])
+ *   Expand(
+ *  projections = [('key, null, null, 0, cast('value as bigint)),
+ * ('key, if ('id > 1) 'cat1 else null, null, 1, null),
+ * ('key, null, 'cat2, 2, null)]
+ *  output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value])

Review comment:
   can we keep the name `cat1`, `cat2`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-07 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r450729347



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -118,7 +118,78 @@ import org.apache.spark.sql.types.IntegerType
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * COUNT(DISTINCT cat2) as cat2_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * COUNT(DISTINCT 'cat2),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count(if (('gid = 1)) '_gen_distinct_1 else null),
+ *   count(if (('gid = 2)) '_gen_distinct_2 else null),
+ *   first(if (('gid = 0)) 'total else null) ignore nulls]
+ *  output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ * Aggregate(
+ *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid]
+ *functions = [sum('value)]
+ *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total])
+ *   Expand(
+ *   projections = [('key, null, null, 0, 'value),
+ *  ('key, '_gen_distinct_1, null, 1, null),
+ *  ('key, null, '_gen_distinct_2, 2, null)]
+ *   output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value])
+ * Project(

Review comment:
   We can even merge this project with the expand, right?
   ```
*   Expand(
*   projections = [('key, null, null, 0, cast('value as bigint)),
*  ('key, if ('id > 1) 'cat1 else null, null, 1, 
null),
*  ('key, null, 'cat2, 2, null)]
*   output = ['key, 'cat1, 'cat2, 'gid, 'value])
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-06 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r450157642



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -148,24 +204,105 @@ object RewriteDistinctAggregates extends 
Rule[LogicalPlan] {
 val distinctAggs = exprs.flatMap { _.collect {
   case ae: AggregateExpression if ae.isDistinct => ae
 }}
-// We need at least two distinct aggregates for this rule because 
aggregation
-// strategy can handle a single distinct group.
+// We need at least two distinct aggregates or a single distinct aggregate 
with a filter for
+// this rule because aggregation strategy can handle a single distinct 
group without a filter.
 // This check can produce false-positives, e.g., SUM(DISTINCT a) & 
COUNT(DISTINCT a).
-distinctAggs.size > 1
+distinctAggs.size > 1 || distinctAggs.exists(_.filter.isDefined)
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-case a: Aggregate if mayNeedtoRewrite(a.aggregateExpressions) => rewrite(a)
+case a: Aggregate if mayNeedtoRewrite(a.aggregateExpressions) =>
+  val expandAggregate = extractFiltersInDistinctAggregates(a)
+  rewriteDistinctAggregates(expandAggregate)
   }
 
-  def rewrite(a: Aggregate): Aggregate = {
+  private def extractFiltersInDistinctAggregates(a: Aggregate): Aggregate = {
+val aggExpressions = collectAggregateExprs(a)
+val (distinctAggExpressions, regularAggExpressions) = 
aggExpressions.partition(_.isDistinct)
+if (distinctAggExpressions.exists(_.filter.isDefined)) {
+  // Constructs pairs between old and new expressions for regular 
aggregates. Because we
+  // will construct a new `Aggregate` and the children of the distinct 
aggregates will be
+  // changed to generated ones, we need to create new references to avoid 
collisions between
+  // distinct and regular aggregate children.
+  val regularAggExprs = 
regularAggExpressions.filter(_.children.exists(!_.foldable))
+  val regularFunChildren = regularAggExprs
+.flatMap(_.aggregateFunction.children.filter(!_.foldable))
+  val regularFilterAttrs = regularAggExprs.flatMap(_.filterAttributes)
+  val regularAggChildren = (regularFunChildren ++ 
regularFilterAttrs).distinct
+  val regularAggChildrenMap = regularAggChildren.map {
+case ne: NamedExpression => ne -> ne
+case other => other -> Alias(other, other.toString)()
+  }
+  val namedRegularAggChildren = regularAggChildrenMap.map(_._2)
+  val regularAggChildAttrLookup = regularAggChildrenMap.map { kv =>
+(kv._1, kv._2.toAttribute)
+  }.toMap
+  val regularAggPairs = regularAggExprs.map {
+case ae @ AggregateExpression(af, _, _, filter, _) =>
+  val newChildren = af.children.map(c => 
regularAggChildAttrLookup.getOrElse(c, c))
+  val raf = 
af.withNewChildren(newChildren).asInstanceOf[AggregateFunction]
+  val filterOpt = filter.map(_.transform {
+case a: Attribute => regularAggChildAttrLookup.getOrElse(a, a)
+  })
+  val aggExpr = ae.copy(aggregateFunction = raf, filter = filterOpt)
+  (ae, aggExpr)
+  }
 
-// Collect all aggregate expressions.
-val aggExpressions = a.aggregateExpressions.flatMap { e =>
-  e.collect {
-case ae: AggregateExpression => ae
+  // Constructs pairs between old and new expressions for distinct 
aggregates, too.
+  val distinctAggExprs = distinctAggExpressions.filter(e => 
e.children.exists(!_.foldable))
+  val (projections, distinctAggPairs) = distinctAggExprs.map {
+case ae @ AggregateExpression(af, _, _, filter, _) =>
+  // First, In order to reduce costs, it is better to handle the 
filter clause locally.
+  // e.g. COUNT (DISTINCT a) FILTER (WHERE id > 1), evaluate expression
+  // If(id > 1) 'a else null first, and use the result as output.
+  // Second, If at least two DISTINCT aggregate expression which may 
references the
+  // same attributes. We need to construct the generated attributes so 
as the output not
+  // lost. e.g. SUM (DISTINCT a), COUNT (DISTINCT a) FILTER (WHERE id 
> 1) will output
+  // attribute '_gen_distinct-1 and attribute '_gen_distinct-2 instead 
of two 'a.
+  // Note: The illusionary mechanism may result in at least two 
distinct groups, so we
+  // still need to call `rewrite`.
+  val unfoldableChildren = af.children.filter(!_.foldable)
+  // Expand projection
+  val projectionMap = unfoldableChildren.map {
+case e if filter.isDefined =>
+  val ife = If(filter.get, e, nullify(e))
+  e -> Alias(ife, 
s"_gen_distinct_${NamedExpression.newExprId.id}")()
+case e => e -> Alias(e, 

[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-06 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r450157305



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -118,7 +118,63 @@ import org.apache.spark.sql.types.IntegerType
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * COUNT(DISTINCT cat2) as cat2_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * COUNT(DISTINCT 'cat2),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count(if (('gid = 1)) '_gen_distinct_1 else null),
+ *   count(if (('gid = 2)) '_gen_distinct_2 else null),
+ *   first(if (('gid = 0)) 'total else null) ignore nulls]
+ *  output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ * Aggregate(
+ *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid]
+ *functions = [sum('value)]
+ *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total])
+ *   Expand(
+ *   projections = [('key, null, null, 0, 'value),
+ *  ('key, '_gen_distinct_1, null, 1, null),
+ *  ('key, null, '_gen_distinct_2, 2, null)]
+ *   output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value])
+ * Project(
+ *projectList = ['key, if ('id > 1) 'cat1 else null, 'cat2, 
cast('value as bigint)]
+ *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'value])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * The rule consists of the two phases as follows:
+ *
+ * In the first phase, expands data for the distinct aggregates where filter 
clauses exist:

Review comment:
   Please explain "what" first, not "why".
   
   > Guaranteed to compute filter clauses in the first aggregate locally.
   
   This is not a "what".





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-06 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r450007512



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -148,24 +204,105 @@ object RewriteDistinctAggregates extends 
Rule[LogicalPlan] {
 val distinctAggs = exprs.flatMap { _.collect {
   case ae: AggregateExpression if ae.isDistinct => ae
 }}
-// We need at least two distinct aggregates for this rule because 
aggregation
-// strategy can handle a single distinct group.
+// We need at least two distinct aggregates or a single distinct aggregate 
with a filter for
+// this rule because aggregation strategy can handle a single distinct 
group without a filter.
 // This check can produce false-positives, e.g., SUM(DISTINCT a) & 
COUNT(DISTINCT a).
-distinctAggs.size > 1
+distinctAggs.size > 1 || distinctAggs.exists(_.filter.isDefined)
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-case a: Aggregate if mayNeedtoRewrite(a.aggregateExpressions) => rewrite(a)
+case a: Aggregate if mayNeedtoRewrite(a.aggregateExpressions) =>
+  val expandAggregate = extractFiltersInDistinctAggregates(a)
+  rewriteDistinctAggregates(expandAggregate)
   }
 
-  def rewrite(a: Aggregate): Aggregate = {
+  private def extractFiltersInDistinctAggregates(a: Aggregate): Aggregate = {
+val aggExpressions = collectAggregateExprs(a)
+val (distinctAggExpressions, regularAggExpressions) = 
aggExpressions.partition(_.isDistinct)
+if (distinctAggExpressions.exists(_.filter.isDefined)) {
+  // Constructs pairs between old and new expressions for regular 
aggregates. Because we
+  // will construct a new `Aggregate` and the children of the distinct 
aggregates will be
+  // changed to generated ones, we need to create new references to avoid 
collisions between
+  // distinct and regular aggregate children.
+  val regularAggExprs = 
regularAggExpressions.filter(_.children.exists(!_.foldable))
+  val regularFunChildren = regularAggExprs
+.flatMap(_.aggregateFunction.children.filter(!_.foldable))
+  val regularFilterAttrs = regularAggExprs.flatMap(_.filterAttributes)
+  val regularAggChildren = (regularFunChildren ++ 
regularFilterAttrs).distinct
+  val regularAggChildrenMap = regularAggChildren.map {
+case ne: NamedExpression => ne -> ne
+case other => other -> Alias(other, other.toString)()
+  }
+  val namedRegularAggChildren = regularAggChildrenMap.map(_._2)
+  val regularAggChildAttrLookup = regularAggChildrenMap.map { kv =>
+(kv._1, kv._2.toAttribute)
+  }.toMap
+  val regularAggPairs = regularAggExprs.map {
+case ae @ AggregateExpression(af, _, _, filter, _) =>
+  val newChildren = af.children.map(c => 
regularAggChildAttrLookup.getOrElse(c, c))
+  val raf = 
af.withNewChildren(newChildren).asInstanceOf[AggregateFunction]
+  val filterOpt = filter.map(_.transform {
+case a: Attribute => regularAggChildAttrLookup.getOrElse(a, a)
+  })
+  val aggExpr = ae.copy(aggregateFunction = raf, filter = filterOpt)
+  (ae, aggExpr)
+  }
 
-// Collect all aggregate expressions.
-val aggExpressions = a.aggregateExpressions.flatMap { e =>
-  e.collect {
-case ae: AggregateExpression => ae
+  // Constructs pairs between old and new expressions for distinct 
aggregates, too.
+  val distinctAggExprs = distinctAggExpressions.filter(e => 
e.children.exists(!_.foldable))
+  val (projections, distinctAggPairs) = distinctAggExprs.map {
+case ae @ AggregateExpression(af, _, _, filter, _) =>
+  // First, In order to reduce costs, it is better to handle the 
filter clause locally.
+  // e.g. COUNT (DISTINCT a) FILTER (WHERE id > 1), evaluate expression
+  // If(id > 1) 'a else null first, and use the result as output.
+  // Second, If at least two DISTINCT aggregate expression which may 
references the
+  // same attributes. We need to construct the generated attributes so 
as the output not
+  // lost. e.g. SUM (DISTINCT a), COUNT (DISTINCT a) FILTER (WHERE id 
> 1) will output
+  // attribute '_gen_distinct-1 and attribute '_gen_distinct-2 instead 
of two 'a.
+  // Note: The illusionary mechanism may result in at least two 
distinct groups, so we
+  // still need to call `rewrite`.
+  val unfoldableChildren = af.children.filter(!_.foldable)
+  // Expand projection
+  val projectionMap = unfoldableChildren.map {
+case e if filter.isDefined =>
+  val ife = If(filter.get, e, nullify(e))
+  e -> Alias(ife, 
s"_gen_distinct_${NamedExpression.newExprId.id}")()
+case e => e -> Alias(e, 

[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-06 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r450003275



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -118,7 +118,63 @@ import org.apache.spark.sql.types.IntegerType
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * COUNT(DISTINCT cat2) as cat2_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * COUNT(DISTINCT 'cat2),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count(if (('gid = 1)) '_gen_distinct_1 else null),
+ *   count(if (('gid = 2)) '_gen_distinct_2 else null),
+ *   first(if (('gid = 0)) 'total else null) ignore nulls]
+ *  output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ * Aggregate(
+ *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid]
+ *functions = [sum('value)]
+ *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total])
+ *   Expand(
+ *   projections = [('key, null, null, 0, 'value),
+ *  ('key, '_gen_distinct_1, null, 1, null),
+ *  ('key, null, '_gen_distinct_2, 2, null)]
+ *   output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value])
+ * Project(
+ *projectList = ['key, if ('id > 1) 'cat1 else null, 'cat2, 
cast('value as bigint)]
+ *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'value])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * The rule consists of the two phases as follows:
+ *
+ * In the first phase, expands data for the distinct aggregates where filter 
clauses exist:

Review comment:
   what do you mean by this paragraph?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-02 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r449090476



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -118,7 +118,63 @@ import org.apache.spark.sql.types.IntegerType
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * COUNT(DISTINCT cat2) as cat2_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * COUNT(DISTINCT 'cat2),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count(if (('gid = 1)) '_gen_distinct_1 else null),
+ *   count(if (('gid = 2)) '_gen_distinct_2 else null),
+ *   first(if (('gid = 0)) 'total else null) ignore nulls]
+ *  output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ * Aggregate(
+ *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid]
+ *functions = [sum('value)]
+ *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total])
+ *   Expand(
+ *   projections = [('key, null, null, 0, 'value),
+ *  ('key, '_gen_distinct_1, null, 1, null),
+ *  ('key, null, '_gen_distinct_2, 2, null)]
+ *   output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value])
+ * Expand(

Review comment:
   Then we can merge the `Project` with the above `Expand`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-02 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r449088934



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##
@@ -118,7 +118,63 @@ import org.apache.spark.sql.types.IntegerType
  *   LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ * COUNT(DISTINCT cat2) as cat2_cnt,
+ * SUM(value) AS total
+ *  FROM
+ *data
+ *  GROUP BY
+ *key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *key = ['key]
+ *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ * COUNT(DISTINCT 'cat2),
+ * sum('value)]
+ *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *  key = ['key]
+ *  functions = [count(if (('gid = 1)) '_gen_distinct_1 else null),
+ *   count(if (('gid = 2)) '_gen_distinct_2 else null),
+ *   first(if (('gid = 0)) 'total else null) ignore nulls]
+ *  output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ * Aggregate(
+ *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid]
+ *functions = [sum('value)]
+ *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total])
+ *   Expand(
+ *   projections = [('key, null, null, 0, 'value),
+ *  ('key, '_gen_distinct_1, null, 1, null),
+ *  ('key, null, '_gen_distinct_2, 2, null)]
+ *   output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value])
+ * Expand(

Review comment:
   This doesn't need to be an `Expand`: you just have one project list, and 
we can just use `Project`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-02 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r449072745



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
##
@@ -216,15 +216,21 @@ abstract class AggregateFunction extends Expression {
   def toAggregateExpression(): AggregateExpression = 
toAggregateExpression(isDistinct = false)
 
   /**
-   * Wraps this [[AggregateFunction]] in an [[AggregateExpression]] and sets 
`isDistinct`
-   * flag of the [[AggregateExpression]] to the given value because
+   * Wraps this [[AggregateFunction]] in an [[AggregateExpression]] with 
`isDistinct`
+   * flag and `filter` option of the [[AggregateExpression]] to the given 
value because
* [[AggregateExpression]] is the container of an [[AggregateFunction]], 
aggregation mode,
-   * and the flag indicating if this aggregation is distinct aggregation or 
not.
+   * the flag indicating if this aggregation is distinct aggregation or not 
and filter option.

Review comment:
   ditto, `and the optional 'filter'`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-07-02 Thread GitBox


cloud-fan commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r449072337



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
##
@@ -216,15 +216,21 @@ abstract class AggregateFunction extends Expression {
   def toAggregateExpression(): AggregateExpression = 
toAggregateExpression(isDistinct = false)
 
   /**
-   * Wraps this [[AggregateFunction]] in an [[AggregateExpression]] and sets 
`isDistinct`
-   * flag of the [[AggregateExpression]] to the given value because
+   * Wraps this [[AggregateFunction]] in an [[AggregateExpression]] with 
`isDistinct`
+   * flag and `filter` option of the [[AggregateExpression]] to the given 
value because

Review comment:
   `filter option` looks weird, how about `and an optional 'filter'`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org