[jira] [Commented] (SPARK-9241) Supporting multiple DISTINCT columns
[ https://issues.apache.org/jira/browse/SPARK-9241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14996403#comment-14996403 ] Apache Spark commented on SPARK-9241: - User 'hvanhovell' has created a pull request for this issue: https://github.com/apache/spark/pull/9566 > Supporting multiple DISTINCT columns > > > Key: SPARK-9241 > URL: https://issues.apache.org/jira/browse/SPARK-9241 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Herman van Hovell >Priority: Critical > Fix For: 1.6.0 > > > Right now the new aggregation code path only support a single distinct column > (you can use it in multiple aggregate functions in the query). We need to > support multiple distinct columns by generating a different plan for handling > multiple distinct columns (without change aggregate functions). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9241) Supporting multiple DISTINCT columns
[ https://issues.apache.org/jira/browse/SPARK-9241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14995249#comment-14995249 ] Apache Spark commented on SPARK-9241: - User 'hvanhovell' has created a pull request for this issue: https://github.com/apache/spark/pull/9541 > Supporting multiple DISTINCT columns > > > Key: SPARK-9241 > URL: https://issues.apache.org/jira/browse/SPARK-9241 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Herman van Hovell >Priority: Critical > Fix For: 1.6.0 > > > Right now the new aggregation code path only support a single distinct column > (you can use it in multiple aggregate functions in the query). We need to > support multiple distinct columns by generating a different plan for handling > multiple distinct columns (without change aggregate functions). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9241) Supporting multiple DISTINCT columns
[ https://issues.apache.org/jira/browse/SPARK-9241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14987069#comment-14987069 ] Michael Armbrust commented on SPARK-9241: - Thanks for working on this. It look like there are a bunch of TODOs in the PR still so I'm going to bump this from 1.6. > Supporting multiple DISTINCT columns > > > Key: SPARK-9241 > URL: https://issues.apache.org/jira/browse/SPARK-9241 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Priority: Critical > > Right now the new aggregation code path only support a single distinct column > (you can use it in multiple aggregate functions in the query). We need to > support multiple distinct columns by generating a different plan for handling > multiple distinct columns (without change aggregate functions). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9241) Supporting multiple DISTINCT columns
[ https://issues.apache.org/jira/browse/SPARK-9241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14984983#comment-14984983 ] Apache Spark commented on SPARK-9241: - User 'hvanhovell' has created a pull request for this issue: https://github.com/apache/spark/pull/9406 > Supporting multiple DISTINCT columns > > > Key: SPARK-9241 > URL: https://issues.apache.org/jira/browse/SPARK-9241 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Priority: Critical > > Right now the new aggregation code path only support a single distinct column > (you can use it in multiple aggregate functions in the query). We need to > support multiple distinct columns by generating a different plan for handling > multiple distinct columns (without change aggregate functions). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9241) Supporting multiple DISTINCT columns
[ https://issues.apache.org/jira/browse/SPARK-9241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14974378#comment-14974378 ] Apache Spark commented on SPARK-9241: - User 'hvanhovell' has created a pull request for this issue: https://github.com/apache/spark/pull/9280 > Supporting multiple DISTINCT columns > > > Key: SPARK-9241 > URL: https://issues.apache.org/jira/browse/SPARK-9241 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Priority: Critical > > Right now the new aggregation code path only support a single distinct column > (you can use it in multiple aggregate functions in the query). We need to > support multiple distinct columns by generating a different plan for handling > multiple distinct columns (without change aggregate functions). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9241) Supporting multiple DISTINCT columns
[ https://issues.apache.org/jira/browse/SPARK-9241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958758#comment-14958758 ] Herman van Hovell commented on SPARK-9241: -- We could implement this using GROUPING SETS. That is how they did it in Calcite: https://issues.apache.org/jira/browse/CALCITE-732 For example using the following data: {noformat} // Create random data similar to the Calcite query. val df = sqlContext .range(1 << 20) .select( $"id".as("employee_id"), (rand(6321782L) * 4 + 1).cast("int").as("department_id"), when(rand(981293L) >= 0.5, "M").otherwise("F").as("gender"), (rand(7123L) * 3 + 1).cast("int").as("education_level") ) df.registerTempTable("employee") {noformat} We can query multiple distinct counts the regular way: {noformat} sql(""" select department_id as d, count(distinct gender, education_level) as c0, count(distinct gender) as c1, count(distinct education_level) as c2 from employee group by department_id """).show() {noformat} This uses the old code path: {noformat} == Physical Plan == Limit 21 Aggregate false, [department_id#64556], [department_id#64556 AS d#64595,CombineAndCount(partialSets#64599) AS c0#64596L,CombineAndCount(partialSets#64600) AS c1#64597L,CombineAndCount(partialSets#64601) AS c2#64598L] Exchange hashpartitioning(department_id#64556,200) Aggregate true, [department_id#64556], [department_id#64556,AddToHashSet(gender#64557,education_level#64558) AS partialSets#64599,AddToHashSet(gender#64557) AS partialSets#64600,AddToHashSet(education_level#64558) AS partialSets#64601] ConvertToSafe TungstenProject [department_id#64556,gender#64557,education_level#64558] TungstenProject [id#64554L AS employee_id#64555L,cast(((rand(6321782) * 4.0) + 1.0) as int) AS department_id#64556,CASE WHEN (rand(981293) >= 0.5) THEN M ELSE F AS gender#64557,cast(((rand(7123) * 3.0) + 1.0) as int) AS education_level#64558] Scan PhysicalRDD[id#64554L] {noformat} Or we can do this using grouping sets: {noformat} sql(""" select A.d, count(case A.i when 3 then 1 else null end) as c0, count(case A.i when 5 then 1 else null end) as c1, count(case A.i when 7 then 1 else null end) as c2 from (select department_id as d, grouping__id as i from employee group by department_id, gender, education_level grouping sets ( (department_id, gender), (department_id, education_level), (department_id, gender, education_level))) A group by A.d """).show {noformat} And use the new tungsten-based code path (except for the Expand operator): {noformat} == Physical Plan == TungstenAggregate(key=[d#64577], functions=[(count(CASE i#64578 WHEN 3 THEN 1 ELSE null),mode=Final,isDistinct=false),(count(CASE i#64578 WHEN 5 THEN 1 ELSE null),mode=Final,isDistinct=false),(count(CASE i#64578 WHEN 7 THEN 1 ELSE null),mode=Final,isDistinct=false)], output=[d#64577,c0#64579L,c1#64580L,c2#64581L]) TungstenExchange hashpartitioning(d#64577,200) TungstenAggregate(key=[d#64577], functions=[(count(CASE i#64578 WHEN 3 THEN 1 ELSE null),mode=Partial,isDistinct=false),(count(CASE i#64578 WHEN 5 THEN 1 ELSE null),mode=Partial,isDistinct=false),(count(CASE i#64578 WHEN 7 THEN 1 ELSE null),mode=Partial,isDistinct=false)], output=[d#64577,currentCount#64587L,currentCount#64589L,currentCount#64591L]) TungstenAggregate(key=[department_id#64556,gender#64557,education_level#64558,grouping__id#64582], functions=[], output=[d#64577,i#64578]) TungstenExchange hashpartitioning(department_id#64556,gender#64557,education_level#64558,grouping__id#64582,200) TungstenAggregate(key=[department_id#64556,gender#64557,education_level#64558,grouping__id#64582], functions=[], output=[department_id#64556,gender#64557,education_level#64558,grouping__id#64582]) Expand [ArrayBuffer(department_id#64556, gender#64557, null, 3),ArrayBuffer(department_id#64556, null, education_level#64558, 5),ArrayBuffer(department_id#64556, gender#64557, education_level#64558, 7)], [department_id#64556,gender#64557,education_level#64558,grouping__id#64582] ConvertToSafe TungstenProject [department_id#64556,gender#64557,education_level#64558] TungstenProject [id#64554L AS employee_id#64555L,cast(((rand(6321782) * 4.0) + 1.0) as int) AS department_id#64556,CASE WHEN (rand(981293) >= 0.5) THEN M ELSE F AS gender#64557,cast(((rand(7123) * 3.0) + 1.0) as int) AS education_level#64558] Scan PhysicalRDD[id#64554L] {noformat} We could implement this using an analysis rule. [~yhuai] / [~rxin] thoughts? > Supporting multiple DISTINCT columns > > > Key: SPARK-9241 > URL: https://issues.apache.org/jira/browse/SPARK-9241 > Project: Spark
[jira] [Commented] (SPARK-9241) Supporting multiple DISTINCT columns
[ https://issues.apache.org/jira/browse/SPARK-9241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14959604#comment-14959604 ] Herman van Hovell commented on SPARK-9241: -- It should grow linear (or am I missing something). For example if we have 3 grouping sets (like in the example), we would duplicate and project the data 3x times. It is still bad, but similar to the approach in [~yhuai]'s example (saving a join). We could have a problem with the {{GROUPING__ID}} bitmask field, only 32/64 fields can be in a grouping set. > Supporting multiple DISTINCT columns > > > Key: SPARK-9241 > URL: https://issues.apache.org/jira/browse/SPARK-9241 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Priority: Critical > > Right now the new aggregation code path only support a single distinct column > (you can use it in multiple aggregate functions in the query). We need to > support multiple distinct columns by generating a different plan for handling > multiple distinct columns (without change aggregate functions). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9241) Supporting multiple DISTINCT columns
[ https://issues.apache.org/jira/browse/SPARK-9241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14959440#comment-14959440 ] Reynold Xin commented on SPARK-9241: Do we have any idea on performance characteristics of this rewrite? IIUC, grouping set's complexity grows exponentially with the number of items in the set? > Supporting multiple DISTINCT columns > > > Key: SPARK-9241 > URL: https://issues.apache.org/jira/browse/SPARK-9241 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Priority: Critical > > Right now the new aggregation code path only support a single distinct column > (you can use it in multiple aggregate functions in the query). We need to > support multiple distinct columns by generating a different plan for handling > multiple distinct columns (without change aggregate functions). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9241) Supporting multiple DISTINCT columns
[ https://issues.apache.org/jira/browse/SPARK-9241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14951212#comment-14951212 ] Josh Rosen commented on SPARK-9241: --- [~yhuai], [~rxin], would you like to update this ticket based on recent discussions? > Supporting multiple DISTINCT columns > > > Key: SPARK-9241 > URL: https://issues.apache.org/jira/browse/SPARK-9241 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Priority: Critical > > Right now the new aggregation code path only support a single distinct column > (you can use it in multiple aggregate functions in the query). We need to > support multiple distinct columns by generating a different plan for handling > multiple distinct columns (without change aggregate functions). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9241) Supporting multiple DISTINCT columns
[ https://issues.apache.org/jira/browse/SPARK-9241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14951290#comment-14951290 ] Yin Huai commented on SPARK-9241: - Yeah. When we compile the query, we can split the queries with multiple distinct columns to multiple queries. Every query evaluates a single distinct aggregation. Then, we can join the results using the group by keys as the join keys. In the join, we need to use null safe equality as the condition. Right now, we need to have another optimization to make it work efficiently. Here is an example, {code} SELECT COUNT(DISTINCT a), COUNT(DISTINCT b), c FROM t GROUP BY c {code} will be rewritten to {code} SELECT x.count_a, y.count_b, x.c FROM (SELECT COUNT(DISTINCT a) count_a FROM t GROUP BY c) x JOIN (SELECT COUNT(DISTINCT b) count_b FROM t GROUP BY c) y ON coalesce(x.c, 0) = coalesce(y.c, 0) AND x.c <=> y.c {code} > Supporting multiple DISTINCT columns > > > Key: SPARK-9241 > URL: https://issues.apache.org/jira/browse/SPARK-9241 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Priority: Critical > > Right now the new aggregation code path only support a single distinct column > (you can use it in multiple aggregate functions in the query). We need to > support multiple distinct columns by generating a different plan for handling > multiple distinct columns (without change aggregate functions). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org