[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r454391588 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -102,23 +102,127 @@ import org.apache.spark.sql.types.IntegerType * {{{ * Aggregate( *key = ['key] - *functions = [count(if (('gid = 1)) 'cat1 else null), - * count(if (('gid = 2)) 'cat2 else null), + *functions = [count(if (('gid = 1)) '_gen_attr_1 else null), + * count(if (('gid = 2)) '_gen_attr_2 else null), * first(if (('gid = 0)) 'total else null) ignore nulls] *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) * Aggregate( - * key = ['key, 'cat1, 'cat2, 'gid] - * functions = [sum('value) with FILTER('id > 1)] - * output = ['key, 'cat1, 'cat2, 'gid, 'total]) + * key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid] + * functions = [sum('_gen_attr_3)] + * output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total]) * Expand( - *projections = [('key, null, null, 0, cast('value as bigint), 'id), + *projections = [('key, null, null, 0, if ('id > 1) cast('value as bigint) else null, 'id), * ('key, 'cat1, null, 1, null, null), * ('key, null, 'cat2, 2, null, null)] - *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id]) + *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id]) * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Third example: single distinct aggregate function with filter clauses and have + * not other distinct aggregate function (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * sum('value)] + *output = ['key, 'cat1_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count('_gen_attr_1), + * sum('_gen_attr_2)] + * output = ['key, 'cat1_cnt, 'total]) + * Project( + *projectionList = ['key, if ('id > 1) 'cat1 else null, cast('value as bigint)] Review comment: I mean to unify the implementations of the filter clause that are handled by this rule. This case is not handled by this rule before your PR. Sorry if I didn't make myself clear enough. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r454327250 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -102,23 +102,127 @@ import org.apache.spark.sql.types.IntegerType * {{{ * Aggregate( *key = ['key] - *functions = [count(if (('gid = 1)) 'cat1 else null), - * count(if (('gid = 2)) 'cat2 else null), + *functions = [count(if (('gid = 1)) '_gen_attr_1 else null), + * count(if (('gid = 2)) '_gen_attr_2 else null), * first(if (('gid = 0)) 'total else null) ignore nulls] *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) * Aggregate( - * key = ['key, 'cat1, 'cat2, 'gid] - * functions = [sum('value) with FILTER('id > 1)] - * output = ['key, 'cat1, 'cat2, 'gid, 'total]) + * key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid] + * functions = [sum('_gen_attr_3)] + * output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total]) * Expand( - *projections = [('key, null, null, 0, cast('value as bigint), 'id), + *projections = [('key, null, null, 0, if ('id > 1) cast('value as bigint) else null, 'id), * ('key, 'cat1, null, 1, null, null), * ('key, null, 'cat2, 2, null, null)] - *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id]) + *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id]) * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Third example: single distinct aggregate function with filter clauses and have + * not other distinct aggregate function (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * sum('value)] + *output = ['key, 'cat1_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count('_gen_attr_1), + * sum('_gen_attr_2)] + * output = ['key, 'cat1_cnt, 'total]) + * Project( + *projectionList = ['key, if ('id > 1) 'cat1 else null, cast('value as bigint)] Review comment: This rule should be skipped if there is only one distinct. Having a filter or not shouldn't change it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r454326711 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -102,23 +102,127 @@ import org.apache.spark.sql.types.IntegerType * {{{ * Aggregate( *key = ['key] - *functions = [count(if (('gid = 1)) 'cat1 else null), - * count(if (('gid = 2)) 'cat2 else null), + *functions = [count(if (('gid = 1)) '_gen_attr_1 else null), + * count(if (('gid = 2)) '_gen_attr_2 else null), * first(if (('gid = 0)) 'total else null) ignore nulls] *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) * Aggregate( - * key = ['key, 'cat1, 'cat2, 'gid] - * functions = [sum('value) with FILTER('id > 1)] - * output = ['key, 'cat1, 'cat2, 'gid, 'total]) + * key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid] + * functions = [sum('_gen_attr_3)] + * output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total]) * Expand( - *projections = [('key, null, null, 0, cast('value as bigint), 'id), + *projections = [('key, null, null, 0, if ('id > 1) cast('value as bigint) else null, 'id), * ('key, 'cat1, null, 1, null, null), * ('key, null, 'cat2, 2, null, null)] - *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id]) + *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id]) * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Third example: single distinct aggregate function with filter clauses and have + * not other distinct aggregate function (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * sum('value)] + *output = ['key, 'cat1_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count('_gen_attr_1), + * sum('_gen_attr_2)] + * output = ['key, 'cat1_cnt, 'total]) + * Project( + *projectionList = ['key, if ('id > 1) 'cat1 else null, cast('value as bigint)] Review comment: Is this necessary? The query can work fine even if we don't add this Project in this rule, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r453513546 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -102,23 +102,126 @@ import org.apache.spark.sql.types.IntegerType * {{{ * Aggregate( *key = ['key] - *functions = [count(if (('gid = 1)) 'cat1 else null), - * count(if (('gid = 2)) 'cat2 else null), + *functions = [count(if (('gid = 1)) '_gen_attr_1 else null), + * count(if (('gid = 2)) '_gen_attr_2 else null), * first(if (('gid = 0)) 'total else null) ignore nulls] *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) * Aggregate( - * key = ['key, 'cat1, 'cat2, 'gid] - * functions = [sum('value) with FILTER('id > 1)] - * output = ['key, 'cat1, 'cat2, 'gid, 'total]) + * key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid] + * functions = [sum('_gen_attr_3)] + * output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total]) * Expand( - *projections = [('key, null, null, 0, cast('value as bigint), 'id), + *projections = [('key, null, null, 0, if ('id > 1) cast('value as bigint) else null, 'id), * ('key, 'cat1, null, 1, null, null), * ('key, null, 'cat2, 2, null, null)] - *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id]) + *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id]) + * LocalTableScan [...] + * }}} + * + * Third example: single distinct aggregate function with filter clauses and have + * not other distinct aggregate function (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * sum('value)] + *output = ['key, 'cat1_cnt, 'total]) + * LocalTableScan [...] Review comment: Do we need to rewrite this query? The planner can handle single distinct agg func AFAIK. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r453511010 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -102,23 +102,126 @@ import org.apache.spark.sql.types.IntegerType * {{{ * Aggregate( *key = ['key] - *functions = [count(if (('gid = 1)) 'cat1 else null), - * count(if (('gid = 2)) 'cat2 else null), + *functions = [count(if (('gid = 1)) '_gen_attr_1 else null), + * count(if (('gid = 2)) '_gen_attr_2 else null), * first(if (('gid = 0)) 'total else null) ignore nulls] *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) * Aggregate( - * key = ['key, 'cat1, 'cat2, 'gid] - * functions = [sum('value) with FILTER('id > 1)] - * output = ['key, 'cat1, 'cat2, 'gid, 'total]) + * key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid] + * functions = [sum('_gen_attr_3)] + * output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total]) * Expand( - *projections = [('key, null, null, 0, cast('value as bigint), 'id), + *projections = [('key, null, null, 0, if ('id > 1) cast('value as bigint) else null, 'id), * ('key, 'cat1, null, 1, null, null), * ('key, null, 'cat2, 2, null, null)] - *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id]) + *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id]) + * LocalTableScan [...] + * }}} + * + * Third example: single distinct aggregate function with filter clauses and have + * not other distinct aggregate function (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * sum('value)] + *output = ['key, 'cat1_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count('_gen_attr_1), + * sum('_gen_attr_2)] + * output = ['key, 'cat1_cnt, 'total]) + * Project( + *projectionList = ['key, if ('id > 1) 'cat1 else null, cast('value as bigint)] + *output = ['key, '_gen_attr_1, '_gen_attr_2]) * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Four example: single distinct aggregate function with filter clauses (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * COUNT(DISTINCT cat2) as cat2_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * COUNT(DISTINCT 'cat2), + * sum('value)] + *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count(if (('gid = 1)) '_gen_attr_1 else null), + * count(if (('gid = 2)) '_gen_attr_2 else null), + * first(if (('gid = 0)) 'total else null) ignore nulls] + * output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * Aggregate( + *key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid] + *functions = [sum('_gen_attr_3)] + *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total]) + * Expand( + * projections = [('key, null, null, 0, cast('value as bigint)), + * ('key, if ('id > 1) 'cat1 else null, null, 1, null), + * ('key, null, 'cat2, 2, null)] + * output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3]) + * LocalTableScan [...] + * }}} + * + * The rule consists of the two phases as follows: + * + * In the first phase, if the aggregate query exists filter clauses, project the output of + * the child of the aggregate query: + * 1. Project the data. There are three aggregation groups in this query: + *i. the non-distinct group; + *ii. the distinct 'cat1 group; + *iii. the distinct 'cat2 group with filter clause. + *Because there is at least one group with filter clause (e.g. the distinct 'cat2 + *group with filter clause), then will project the data. + * 2. Avoid projections that may output the same attributes. There are three aggregation groups + *in this query: + *i. the non-distinct 'cat1 group; + *ii. the distinct
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r453510469 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -102,23 +102,126 @@ import org.apache.spark.sql.types.IntegerType * {{{ * Aggregate( *key = ['key] - *functions = [count(if (('gid = 1)) 'cat1 else null), - * count(if (('gid = 2)) 'cat2 else null), + *functions = [count(if (('gid = 1)) '_gen_attr_1 else null), + * count(if (('gid = 2)) '_gen_attr_2 else null), * first(if (('gid = 0)) 'total else null) ignore nulls] *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) * Aggregate( - * key = ['key, 'cat1, 'cat2, 'gid] - * functions = [sum('value) with FILTER('id > 1)] - * output = ['key, 'cat1, 'cat2, 'gid, 'total]) + * key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid] + * functions = [sum('_gen_attr_3)] + * output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total]) * Expand( - *projections = [('key, null, null, 0, cast('value as bigint), 'id), + *projections = [('key, null, null, 0, if ('id > 1) cast('value as bigint) else null, 'id), * ('key, 'cat1, null, 1, null, null), * ('key, null, 'cat2, 2, null, null)] - *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id]) + *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id]) + * LocalTableScan [...] + * }}} + * + * Third example: single distinct aggregate function with filter clauses and have + * not other distinct aggregate function (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * sum('value)] + *output = ['key, 'cat1_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count('_gen_attr_1), + * sum('_gen_attr_2)] + * output = ['key, 'cat1_cnt, 'total]) + * Project( + *projectionList = ['key, if ('id > 1) 'cat1 else null, cast('value as bigint)] + *output = ['key, '_gen_attr_1, '_gen_attr_2]) * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Four example: single distinct aggregate function with filter clauses (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * COUNT(DISTINCT cat2) as cat2_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * COUNT(DISTINCT 'cat2), + * sum('value)] + *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count(if (('gid = 1)) '_gen_attr_1 else null), + * count(if (('gid = 2)) '_gen_attr_2 else null), + * first(if (('gid = 0)) 'total else null) ignore nulls] + * output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * Aggregate( + *key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid] + *functions = [sum('_gen_attr_3)] + *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total]) + * Expand( + * projections = [('key, null, null, 0, cast('value as bigint)), + * ('key, if ('id > 1) 'cat1 else null, null, 1, null), + * ('key, null, 'cat2, 2, null)] + * output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3]) + * LocalTableScan [...] + * }}} + * + * The rule consists of the two phases as follows: + * + * In the first phase, if the aggregate query exists filter clauses, project the output of + * the child of the aggregate query: + * 1. Project the data. There are three aggregation groups in this query: + *i. the non-distinct group; + *ii. the distinct 'cat1 group; + *iii. the distinct 'cat2 group with filter clause. + *Because there is at least one group with filter clause (e.g. the distinct 'cat2 Review comment: When there is at least one aggregate function has the filter clause, we add a project node on the input plan. ## File path:
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r453509648 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -102,23 +102,126 @@ import org.apache.spark.sql.types.IntegerType * {{{ * Aggregate( *key = ['key] - *functions = [count(if (('gid = 1)) 'cat1 else null), - * count(if (('gid = 2)) 'cat2 else null), + *functions = [count(if (('gid = 1)) '_gen_attr_1 else null), + * count(if (('gid = 2)) '_gen_attr_2 else null), * first(if (('gid = 0)) 'total else null) ignore nulls] *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) * Aggregate( - * key = ['key, 'cat1, 'cat2, 'gid] - * functions = [sum('value) with FILTER('id > 1)] - * output = ['key, 'cat1, 'cat2, 'gid, 'total]) + * key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid] + * functions = [sum('_gen_attr_3)] + * output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total]) * Expand( - *projections = [('key, null, null, 0, cast('value as bigint), 'id), + *projections = [('key, null, null, 0, if ('id > 1) cast('value as bigint) else null, 'id), * ('key, 'cat1, null, 1, null, null), * ('key, null, 'cat2, 2, null, null)] - *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id]) + *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id]) + * LocalTableScan [...] + * }}} + * + * Third example: single distinct aggregate function with filter clauses and have + * not other distinct aggregate function (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * sum('value)] + *output = ['key, 'cat1_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count('_gen_attr_1), + * sum('_gen_attr_2)] + * output = ['key, 'cat1_cnt, 'total]) + * Project( + *projectionList = ['key, if ('id > 1) 'cat1 else null, cast('value as bigint)] + *output = ['key, '_gen_attr_1, '_gen_attr_2]) * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Four example: single distinct aggregate function with filter clauses (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * COUNT(DISTINCT cat2) as cat2_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * COUNT(DISTINCT 'cat2), + * sum('value)] + *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count(if (('gid = 1)) '_gen_attr_1 else null), + * count(if (('gid = 2)) '_gen_attr_2 else null), + * first(if (('gid = 0)) 'total else null) ignore nulls] + * output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * Aggregate( + *key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid] + *functions = [sum('_gen_attr_3)] + *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total]) + * Expand( + * projections = [('key, null, null, 0, cast('value as bigint)), + * ('key, if ('id > 1) 'cat1 else null, null, 1, null), + * ('key, null, 'cat2, 2, null)] + * output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3]) + * LocalTableScan [...] + * }}} + * + * The rule consists of the two phases as follows: + * + * In the first phase, if the aggregate query exists filter clauses, project the output of + * the child of the aggregate query: + * 1. Project the data. There are three aggregation groups in this query: + *i. the non-distinct group; + *ii. the distinct 'cat1 group; + *iii. the distinct 'cat2 group with filter clause. Review comment: This doesn't match the group. Maybe just make it general `the distinct group without filter clause` and `the distinct group with filter clause` This is an automated message from the Apache Git Service. To respond to the message, please log
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r453508040 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -102,23 +102,126 @@ import org.apache.spark.sql.types.IntegerType * {{{ * Aggregate( *key = ['key] - *functions = [count(if (('gid = 1)) 'cat1 else null), - * count(if (('gid = 2)) 'cat2 else null), + *functions = [count(if (('gid = 1)) '_gen_attr_1 else null), + * count(if (('gid = 2)) '_gen_attr_2 else null), * first(if (('gid = 0)) 'total else null) ignore nulls] *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) * Aggregate( - * key = ['key, 'cat1, 'cat2, 'gid] - * functions = [sum('value) with FILTER('id > 1)] - * output = ['key, 'cat1, 'cat2, 'gid, 'total]) + * key = ['key, '_gen_attr_1, '_gen_attr_2, 'gid] + * functions = [sum('_gen_attr_3)] + * output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, 'total]) * Expand( - *projections = [('key, null, null, 0, cast('value as bigint), 'id), + *projections = [('key, null, null, 0, if ('id > 1) cast('value as bigint) else null, 'id), * ('key, 'cat1, null, 1, null, null), * ('key, null, 'cat2, 2, null, null)] - *output = ['key, 'cat1, 'cat2, 'gid, 'value, 'id]) + *output = ['key, '_gen_attr_1, '_gen_attr_2, 'gid, '_gen_attr_3, 'id]) + * LocalTableScan [...] + * }}} + * + * Third example: single distinct aggregate function with filter clauses and have + * not other distinct aggregate function (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * sum('value)] + *output = ['key, 'cat1_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count('_gen_attr_1), + * sum('_gen_attr_2)] + * output = ['key, 'cat1_cnt, 'total]) + * Project( + *projectionList = ['key, if ('id > 1) 'cat1 else null, cast('value as bigint)] + *output = ['key, '_gen_attr_1, '_gen_attr_2]) * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Four example: single distinct aggregate function with filter clauses (in sql): Review comment: `single` -> `more than one`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r451635926 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -118,7 +118,75 @@ import org.apache.spark.sql.types.IntegerType * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Third example: single distinct aggregate function with filter clauses (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * COUNT(DISTINCT cat2) as cat2_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * COUNT(DISTINCT 'cat2), + * sum('value)] + *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count(if (('gid = 1)) '_gen_distinct_1 else null), + * count(if (('gid = 2)) '_gen_distinct_2 else null), + * first(if (('gid = 0)) 'total else null) ignore nulls] + * output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * Aggregate( + *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid] + *functions = [sum('value)] + *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total]) + * Expand( + * projections = [('key, null, null, 0, cast('value as bigint)), + * ('key, if ('id > 1) 'cat1 else null, null, 1, null), + * ('key, null, 'cat2, 2, null)] + * output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value]) + * LocalTableScan [...] + * }}} + * + * The rule consists of the two phases as follows: + * + * In the first phase, if the aggregate query with distinct aggregations and + * filter clauses, project the output of the child of the aggregate query: + * 1. Project the data. There are three aggregation groups in this query: Review comment: can you update the comment? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r451635560 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -118,7 +118,75 @@ import org.apache.spark.sql.types.IntegerType * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Third example: single distinct aggregate function with filter clauses (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * COUNT(DISTINCT cat2) as cat2_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * COUNT(DISTINCT 'cat2), + * sum('value)] + *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( Review comment: This plan rewriting LGTM. Shall we update the second example to make it consistent with this example? ``` ... Aggregate( key = ... functions = [sum('value)] output = ... Expand( projections = [('key, null, null, 0, if ('id > 1) cast('value as bigint) else null, 'id), (...), (...)] output = [...]) LocalTableScan [...] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r451630462 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -118,7 +118,75 @@ import org.apache.spark.sql.types.IntegerType * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Third example: single distinct aggregate function with filter clauses (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * COUNT(DISTINCT cat2) as cat2_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * COUNT(DISTINCT 'cat2), + * sum('value)] + *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count(if (('gid = 1)) '_gen_distinct_1 else null), + * count(if (('gid = 2)) '_gen_distinct_2 else null), + * first(if (('gid = 0)) 'total else null) ignore nulls] + * output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * Aggregate( + *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid] + *functions = [sum('value)] Review comment: We can list some builtin agg functions that ignore nulls, and omit the FILTER for them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r451629863 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -118,7 +118,75 @@ import org.apache.spark.sql.types.IntegerType * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Third example: single distinct aggregate function with filter clauses (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * COUNT(DISTINCT cat2) as cat2_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * COUNT(DISTINCT 'cat2), + * sum('value)] + *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count(if (('gid = 1)) '_gen_distinct_1 else null), + * count(if (('gid = 2)) '_gen_distinct_2 else null), + * first(if (('gid = 0)) 'total else null) ignore nulls] + * output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * Aggregate( + *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid] + *functions = [sum('value)] Review comment: We assume the agg func ignores null inputs, which may not be true. Shall we rewrite it to `sum('value) with FILTER('value is not null)`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r451624190 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -118,7 +118,75 @@ import org.apache.spark.sql.types.IntegerType * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Third example: single distinct aggregate function with filter clauses (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * COUNT(DISTINCT cat2) as cat2_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * COUNT(DISTINCT 'cat2), + * sum('value)] + *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count(if (('gid = 1)) '_gen_distinct_1 else null), + * count(if (('gid = 2)) '_gen_distinct_2 else null), + * first(if (('gid = 0)) 'total else null) ignore nulls] + * output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * Aggregate( + *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid] + *functions = [sum('value)] + *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total]) + * Expand( + * projections = [('key, null, null, 0, cast('value as bigint)), + * ('key, if ('id > 1) 'cat1 else null, null, 1, null), + * ('key, null, 'cat2, 2, null)] + * output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value]) Review comment: can we keep the name `cat1`, `cat2`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r450729347 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -118,7 +118,78 @@ import org.apache.spark.sql.types.IntegerType * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Third example: single distinct aggregate function with filter clauses (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * COUNT(DISTINCT cat2) as cat2_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * COUNT(DISTINCT 'cat2), + * sum('value)] + *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count(if (('gid = 1)) '_gen_distinct_1 else null), + * count(if (('gid = 2)) '_gen_distinct_2 else null), + * first(if (('gid = 0)) 'total else null) ignore nulls] + * output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * Aggregate( + *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid] + *functions = [sum('value)] + *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total]) + * Expand( + * projections = [('key, null, null, 0, 'value), + * ('key, '_gen_distinct_1, null, 1, null), + * ('key, null, '_gen_distinct_2, 2, null)] + * output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value]) + * Project( Review comment: We can even merge this project with the expand, right? ``` * Expand( * projections = [('key, null, null, 0, cast('value as bigint)), * ('key, if ('id > 1) 'cat1 else null, null, 1, null), * ('key, null, 'cat2, 2, null)] * output = ['key, 'cat1, 'cat2, 'gid, 'value]) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r450157642 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -148,24 +204,105 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { val distinctAggs = exprs.flatMap { _.collect { case ae: AggregateExpression if ae.isDistinct => ae }} -// We need at least two distinct aggregates for this rule because aggregation -// strategy can handle a single distinct group. +// We need at least two distinct aggregates or a single distinct aggregate with a filter for +// this rule because aggregation strategy can handle a single distinct group without a filter. // This check can produce false-positives, e.g., SUM(DISTINCT a) & COUNT(DISTINCT a). -distinctAggs.size > 1 +distinctAggs.size > 1 || distinctAggs.exists(_.filter.isDefined) } def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case a: Aggregate if mayNeedtoRewrite(a.aggregateExpressions) => rewrite(a) +case a: Aggregate if mayNeedtoRewrite(a.aggregateExpressions) => + val expandAggregate = extractFiltersInDistinctAggregates(a) + rewriteDistinctAggregates(expandAggregate) } - def rewrite(a: Aggregate): Aggregate = { + private def extractFiltersInDistinctAggregates(a: Aggregate): Aggregate = { +val aggExpressions = collectAggregateExprs(a) +val (distinctAggExpressions, regularAggExpressions) = aggExpressions.partition(_.isDistinct) +if (distinctAggExpressions.exists(_.filter.isDefined)) { + // Constructs pairs between old and new expressions for regular aggregates. Because we + // will construct a new `Aggregate` and the children of the distinct aggregates will be + // changed to generated ones, we need to create new references to avoid collisions between + // distinct and regular aggregate children. + val regularAggExprs = regularAggExpressions.filter(_.children.exists(!_.foldable)) + val regularFunChildren = regularAggExprs +.flatMap(_.aggregateFunction.children.filter(!_.foldable)) + val regularFilterAttrs = regularAggExprs.flatMap(_.filterAttributes) + val regularAggChildren = (regularFunChildren ++ regularFilterAttrs).distinct + val regularAggChildrenMap = regularAggChildren.map { +case ne: NamedExpression => ne -> ne +case other => other -> Alias(other, other.toString)() + } + val namedRegularAggChildren = regularAggChildrenMap.map(_._2) + val regularAggChildAttrLookup = regularAggChildrenMap.map { kv => +(kv._1, kv._2.toAttribute) + }.toMap + val regularAggPairs = regularAggExprs.map { +case ae @ AggregateExpression(af, _, _, filter, _) => + val newChildren = af.children.map(c => regularAggChildAttrLookup.getOrElse(c, c)) + val raf = af.withNewChildren(newChildren).asInstanceOf[AggregateFunction] + val filterOpt = filter.map(_.transform { +case a: Attribute => regularAggChildAttrLookup.getOrElse(a, a) + }) + val aggExpr = ae.copy(aggregateFunction = raf, filter = filterOpt) + (ae, aggExpr) + } -// Collect all aggregate expressions. -val aggExpressions = a.aggregateExpressions.flatMap { e => - e.collect { -case ae: AggregateExpression => ae + // Constructs pairs between old and new expressions for distinct aggregates, too. + val distinctAggExprs = distinctAggExpressions.filter(e => e.children.exists(!_.foldable)) + val (projections, distinctAggPairs) = distinctAggExprs.map { +case ae @ AggregateExpression(af, _, _, filter, _) => + // First, In order to reduce costs, it is better to handle the filter clause locally. + // e.g. COUNT (DISTINCT a) FILTER (WHERE id > 1), evaluate expression + // If(id > 1) 'a else null first, and use the result as output. + // Second, If at least two DISTINCT aggregate expression which may references the + // same attributes. We need to construct the generated attributes so as the output not + // lost. e.g. SUM (DISTINCT a), COUNT (DISTINCT a) FILTER (WHERE id > 1) will output + // attribute '_gen_distinct-1 and attribute '_gen_distinct-2 instead of two 'a. + // Note: The illusionary mechanism may result in at least two distinct groups, so we + // still need to call `rewrite`. + val unfoldableChildren = af.children.filter(!_.foldable) + // Expand projection + val projectionMap = unfoldableChildren.map { +case e if filter.isDefined => + val ife = If(filter.get, e, nullify(e)) + e -> Alias(ife, s"_gen_distinct_${NamedExpression.newExprId.id}")() +case e => e -> Alias(e,
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r450157305 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -118,7 +118,63 @@ import org.apache.spark.sql.types.IntegerType * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Third example: single distinct aggregate function with filter clauses (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * COUNT(DISTINCT cat2) as cat2_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * COUNT(DISTINCT 'cat2), + * sum('value)] + *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count(if (('gid = 1)) '_gen_distinct_1 else null), + * count(if (('gid = 2)) '_gen_distinct_2 else null), + * first(if (('gid = 0)) 'total else null) ignore nulls] + * output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * Aggregate( + *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid] + *functions = [sum('value)] + *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total]) + * Expand( + * projections = [('key, null, null, 0, 'value), + * ('key, '_gen_distinct_1, null, 1, null), + * ('key, null, '_gen_distinct_2, 2, null)] + * output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value]) + * Project( + *projectList = ['key, if ('id > 1) 'cat1 else null, 'cat2, cast('value as bigint)] + *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'value]) + * LocalTableScan [...] + * }}} + * + * The rule consists of the two phases as follows: + * + * In the first phase, expands data for the distinct aggregates where filter clauses exist: Review comment: Please explain "what" first, not "why". > Guaranteed to compute filter clauses in the first aggregate locally. This is not a "what". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r450007512 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -148,24 +204,105 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { val distinctAggs = exprs.flatMap { _.collect { case ae: AggregateExpression if ae.isDistinct => ae }} -// We need at least two distinct aggregates for this rule because aggregation -// strategy can handle a single distinct group. +// We need at least two distinct aggregates or a single distinct aggregate with a filter for +// this rule because aggregation strategy can handle a single distinct group without a filter. // This check can produce false-positives, e.g., SUM(DISTINCT a) & COUNT(DISTINCT a). -distinctAggs.size > 1 +distinctAggs.size > 1 || distinctAggs.exists(_.filter.isDefined) } def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case a: Aggregate if mayNeedtoRewrite(a.aggregateExpressions) => rewrite(a) +case a: Aggregate if mayNeedtoRewrite(a.aggregateExpressions) => + val expandAggregate = extractFiltersInDistinctAggregates(a) + rewriteDistinctAggregates(expandAggregate) } - def rewrite(a: Aggregate): Aggregate = { + private def extractFiltersInDistinctAggregates(a: Aggregate): Aggregate = { +val aggExpressions = collectAggregateExprs(a) +val (distinctAggExpressions, regularAggExpressions) = aggExpressions.partition(_.isDistinct) +if (distinctAggExpressions.exists(_.filter.isDefined)) { + // Constructs pairs between old and new expressions for regular aggregates. Because we + // will construct a new `Aggregate` and the children of the distinct aggregates will be + // changed to generated ones, we need to create new references to avoid collisions between + // distinct and regular aggregate children. + val regularAggExprs = regularAggExpressions.filter(_.children.exists(!_.foldable)) + val regularFunChildren = regularAggExprs +.flatMap(_.aggregateFunction.children.filter(!_.foldable)) + val regularFilterAttrs = regularAggExprs.flatMap(_.filterAttributes) + val regularAggChildren = (regularFunChildren ++ regularFilterAttrs).distinct + val regularAggChildrenMap = regularAggChildren.map { +case ne: NamedExpression => ne -> ne +case other => other -> Alias(other, other.toString)() + } + val namedRegularAggChildren = regularAggChildrenMap.map(_._2) + val regularAggChildAttrLookup = regularAggChildrenMap.map { kv => +(kv._1, kv._2.toAttribute) + }.toMap + val regularAggPairs = regularAggExprs.map { +case ae @ AggregateExpression(af, _, _, filter, _) => + val newChildren = af.children.map(c => regularAggChildAttrLookup.getOrElse(c, c)) + val raf = af.withNewChildren(newChildren).asInstanceOf[AggregateFunction] + val filterOpt = filter.map(_.transform { +case a: Attribute => regularAggChildAttrLookup.getOrElse(a, a) + }) + val aggExpr = ae.copy(aggregateFunction = raf, filter = filterOpt) + (ae, aggExpr) + } -// Collect all aggregate expressions. -val aggExpressions = a.aggregateExpressions.flatMap { e => - e.collect { -case ae: AggregateExpression => ae + // Constructs pairs between old and new expressions for distinct aggregates, too. + val distinctAggExprs = distinctAggExpressions.filter(e => e.children.exists(!_.foldable)) + val (projections, distinctAggPairs) = distinctAggExprs.map { +case ae @ AggregateExpression(af, _, _, filter, _) => + // First, In order to reduce costs, it is better to handle the filter clause locally. + // e.g. COUNT (DISTINCT a) FILTER (WHERE id > 1), evaluate expression + // If(id > 1) 'a else null first, and use the result as output. + // Second, If at least two DISTINCT aggregate expression which may references the + // same attributes. We need to construct the generated attributes so as the output not + // lost. e.g. SUM (DISTINCT a), COUNT (DISTINCT a) FILTER (WHERE id > 1) will output + // attribute '_gen_distinct-1 and attribute '_gen_distinct-2 instead of two 'a. + // Note: The illusionary mechanism may result in at least two distinct groups, so we + // still need to call `rewrite`. + val unfoldableChildren = af.children.filter(!_.foldable) + // Expand projection + val projectionMap = unfoldableChildren.map { +case e if filter.isDefined => + val ife = If(filter.get, e, nullify(e)) + e -> Alias(ife, s"_gen_distinct_${NamedExpression.newExprId.id}")() +case e => e -> Alias(e,
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r450003275 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -118,7 +118,63 @@ import org.apache.spark.sql.types.IntegerType * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Third example: single distinct aggregate function with filter clauses (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * COUNT(DISTINCT cat2) as cat2_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * COUNT(DISTINCT 'cat2), + * sum('value)] + *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count(if (('gid = 1)) '_gen_distinct_1 else null), + * count(if (('gid = 2)) '_gen_distinct_2 else null), + * first(if (('gid = 0)) 'total else null) ignore nulls] + * output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * Aggregate( + *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid] + *functions = [sum('value)] + *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total]) + * Expand( + * projections = [('key, null, null, 0, 'value), + * ('key, '_gen_distinct_1, null, 1, null), + * ('key, null, '_gen_distinct_2, 2, null)] + * output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value]) + * Project( + *projectList = ['key, if ('id > 1) 'cat1 else null, 'cat2, cast('value as bigint)] + *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'value]) + * LocalTableScan [...] + * }}} + * + * The rule consists of the two phases as follows: + * + * In the first phase, expands data for the distinct aggregates where filter clauses exist: Review comment: what do you mean by this paragraph? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r449090476 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -118,7 +118,63 @@ import org.apache.spark.sql.types.IntegerType * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Third example: single distinct aggregate function with filter clauses (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * COUNT(DISTINCT cat2) as cat2_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * COUNT(DISTINCT 'cat2), + * sum('value)] + *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count(if (('gid = 1)) '_gen_distinct_1 else null), + * count(if (('gid = 2)) '_gen_distinct_2 else null), + * first(if (('gid = 0)) 'total else null) ignore nulls] + * output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * Aggregate( + *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid] + *functions = [sum('value)] + *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total]) + * Expand( + * projections = [('key, null, null, 0, 'value), + * ('key, '_gen_distinct_1, null, 1, null), + * ('key, null, '_gen_distinct_2, 2, null)] + * output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value]) + * Expand( Review comment: Then we can merge the `Project` with the above `Expand` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r449088934 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -118,7 +118,63 @@ import org.apache.spark.sql.types.IntegerType * LocalTableScan [...] * }}} * - * The rule does the following things here: + * Third example: single distinct aggregate function with filter clauses (in sql): + * {{{ + * SELECT + * COUNT(DISTINCT cat1) FILTER (WHERE id > 1) as cat1_cnt, + * COUNT(DISTINCT cat2) as cat2_cnt, + * SUM(value) AS total + * FROM + *data + * GROUP BY + *key + * }}} + * + * This translates to the following (pseudo) logical plan: + * {{{ + * Aggregate( + *key = ['key] + *functions = [COUNT(DISTINCT 'cat1) with FILTER('id > 1), + * COUNT(DISTINCT 'cat2), + * sum('value)] + *output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * LocalTableScan [...] + * }}} + * + * This rule rewrites this logical plan to the following (pseudo) logical plan: + * {{{ + * Aggregate( + * key = ['key] + * functions = [count(if (('gid = 1)) '_gen_distinct_1 else null), + * count(if (('gid = 2)) '_gen_distinct_2 else null), + * first(if (('gid = 0)) 'total else null) ignore nulls] + * output = ['key, 'cat1_cnt, 'cat2_cnt, 'total]) + * Aggregate( + *key = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid] + *functions = [sum('value)] + *output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'total]) + * Expand( + * projections = [('key, null, null, 0, 'value), + * ('key, '_gen_distinct_1, null, 1, null), + * ('key, null, '_gen_distinct_2, 2, null)] + * output = ['key, '_gen_distinct_1, '_gen_distinct_2, 'gid, 'value]) + * Expand( Review comment: This doesn't need to be an `Expand`: you just have one project list, and we can just use `Project`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r449072745 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala ## @@ -216,15 +216,21 @@ abstract class AggregateFunction extends Expression { def toAggregateExpression(): AggregateExpression = toAggregateExpression(isDistinct = false) /** - * Wraps this [[AggregateFunction]] in an [[AggregateExpression]] and sets `isDistinct` - * flag of the [[AggregateExpression]] to the given value because + * Wraps this [[AggregateFunction]] in an [[AggregateExpression]] with `isDistinct` + * flag and `filter` option of the [[AggregateExpression]] to the given value because * [[AggregateExpression]] is the container of an [[AggregateFunction]], aggregation mode, - * and the flag indicating if this aggregation is distinct aggregation or not. + * the flag indicating if this aggregation is distinct aggregation or not and filter option. Review comment: ditto, `and the optional 'filter'` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
cloud-fan commented on a change in pull request #27428: URL: https://github.com/apache/spark/pull/27428#discussion_r449072337 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala ## @@ -216,15 +216,21 @@ abstract class AggregateFunction extends Expression { def toAggregateExpression(): AggregateExpression = toAggregateExpression(isDistinct = false) /** - * Wraps this [[AggregateFunction]] in an [[AggregateExpression]] and sets `isDistinct` - * flag of the [[AggregateExpression]] to the given value because + * Wraps this [[AggregateFunction]] in an [[AggregateExpression]] with `isDistinct` + * flag and `filter` option of the [[AggregateExpression]] to the given value because Review comment: `filter option` looks weird, how about `and an optional 'filter'`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org