beliefer commented on a change in pull request #27428:
URL: https://github.com/apache/spark/pull/27428#discussion_r451985794



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
##########
@@ -118,7 +118,75 @@ import org.apache.spark.sql.types.IntegerType
  *       LocalTableScan [...]
  * }}}
  *
- * The rule does the following things here:
+ * Third example: single distinct aggregate function with filter clauses (in 
sql):
+ * {{{
+ *   SELECT
+ *     COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt,
+ *     COUNT(DISTINCT cat2) as cat2_cnt,
+ *     SUM(value) AS total
+ *  FROM
+ *    data
+ *  GROUP BY
+ *    key
+ * }}}
+ *
+ * This translates to the following (pseudo) logical plan:
+ * {{{
+ * Aggregate(
+ *    key = ['key]
+ *    functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1),
+ *                 COUNT(DISTINCT 'cat2),
+ *                 sum('value)]
+ *    output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *   LocalTableScan [...]
+ * }}}
+ *
+ * This rule rewrites this logical plan to the following (pseudo) logical plan:
+ * {{{
+ *   Aggregate(
+ *      key = ['key]
+ *      functions = [count(if (('gid = 1)) '_gen_distinct_1 else null),
+ *                   count(if (('gid = 2)) '_gen_distinct_2 else null),
+ *                   first(if (('gid = 0)) 'total else null) ignore nulls]
+ *      output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
+ *     Aggregate(
+ *        key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid]
+ *        functions = [sum('value)]
+ *        output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total])
+ *       Expand(
+ *          projections = [('key, null, null, 0, cast('value as bigint)),
+ *                         ('key, if ('id > 1) 'cat1 else null, null, 1, null),
+ *                         ('key, null, 'cat2, 2, null)]
+ *          output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value])
+ *         LocalTableScan [...]
+ * }}}
+ *
+ * The rule consists of the two phases as follows:
+ *
+ * In the first phase, if the aggregate query with distinct aggregations and
+ * filter clauses, project the output of the child of the aggregate query:
+ * 1. Project the data. There are three aggregation groups in this query:

Review comment:
       1. If sql seems like
   `SELECT b, COUNT(DISTINCT cat1), COUNT(DISTINCT cat1) FILTER (WHERE id > 0) 
FROM cachedData GROUP BY b`
   In the first phase, project the data.
   In the second phase will expand the data and merge project with expand.
   I added some comments at 
https://github.com/apache/spark/blob/16d8c1d26681c875d61e50305de43f3ca5e75154/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala#L206
   
   2. If sql seems like
   `SELECT b, COUNT(DISTINCT cat1) FILTER (WHERE id > 0) FROM cachedData GROUP 
BY b`
   In the first phase, project the data.
   In the second phase, will not expand the data and still preserve the project.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to