beliefer commented on a change in pull request #27058: [SPARK-30276][SQL] 
Support Filter expression allows simultaneous use of DISTINCT
URL: https://github.com/apache/spark/pull/27058#discussion_r369943790
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
 ##########
 @@ -118,7 +118,66 @@ import org.apache.spark.sql.types.IntegerType
  *       LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ *     COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt1,
+ *     COUNT(DISTINCT cat1) as cat1_cnt2,
+ *     SUM(value) AS total
+ *  FROM
+ *    data
+ *  GROUP BY
+ *    key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *    key = ['key]
+ *    functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ *                 COUNT(DISTINCT 'cat1),
+ *                 sum('value)]
+ *    output = ['key, 'cat1_cnt1, 'cat1_cnt2, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *      key = ['key]
+ *      functions = [count(if (('gid = 1)) '_gen_distinct_1 else null),
+ *                   count(if (('gid = 2)) '_gen_distinct_2 else null),
+ *                   first(if (('gid = 0)) 'total else null) ignore nulls]
+ *      output = ['key, 'cat1_cnt, 'cat1_cnt2, 'total])
+ *     Aggregate(
+ *        key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid]
+ *        functions = [sum('value)]
+ *        output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total])
+ *       Expand(
+ *           projections = [('key, null, null, 0, 'value),
+ *                         ('key, '_gen_distinct_1, null, 1, null),
+ *                         ('key, null, '_gen_distinct_2, 2, null)]
+ *           output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value])
+ *         Expand(
+ *            projections = [('key, if ('id > 1) 'cat1 else null, 'cat1, 
cast('value as bigint))]
+ *            output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'value])
+ *           LocalTableScan [...]
+ * }}}
+ *
+ * The rule serves two purposes:
+ * 1. Expand distinct aggregates which exists filter clause.
+ * 2. Rewrite when aggregate exists at least two distinct aggregates.
+ *
+ * The first child rule does the following things here:
+ * 1. Guaranteed to compute filter clause locally.
 
 Review comment:
   I have update this statement.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to