Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/9409#discussion_r44213709
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala
---
@@ -54,10 +54,14 @@ object Utils {
mode = aggregate.Complete,
isDistinct = false)
- // We do not support multiple COUNT DISTINCT columns for now.
- case expressions.CountDistinct(children) if children.length == 1 =>
+ case expressions.CountDistinct(children) =>
+ val child = if (children.size > 1) {
+ DropAnyNull(CreateStruct(children))
--- End diff --
@yhuai if we combine this with the distinct rewriting rule. It will add a
```struct``` to the groupBy clause of the first aggregate. This is currently
not allowed in the new UDAF path, so it'll fall back to the old path. For
example:
val data2 = Seq[(Integer, Integer, Integer)](
(1, 10, -10),
(null, -60, 60),
(1, 30, -30),
(1, 30, 30),
(2, 1, 1),
(null, -10, 10),
(2, -1, null),
(2, 1, 1),
(2, null, 1),
(null, 100, -10),
(3, null, 3),
(null, null, null),
(3, null, null)).toDF("key", "value1", "value2")
data2.registerTempTable("agg2")
val q sql(
"""
|SELECT
| key,
| count(distinct value1),
| count(distinct value2),
| count(distinct value1, value2)
|FROM agg2
|GROUP BY key
""".stripMargin)
Will create the following physical plan:
== Physical Plan ==
TungstenAggregate(key=[key#3], functions=[(count(if ((gid#44 = 1))
attributereference#45 else null),mode=Final,isDistinct=false),(count(if
((gid#44 = 3)) attributereference#47 else
null),mode=Final,isDistinct=false),(count(if ((gid#44 = 2)) dropanynull#46 else
null),mode=Final,isDistinct=false)], output=[key#3,_c1#32L,_c2#33L,_c3#34L])
TungstenExchange(Shuffle without coordinator)
hashpartitioning(key#3,200), None
TungstenAggregate(key=[key#3], functions=[(count(if ((gid#44 = 1))
attributereference#45 else null),mode=Partial,isDistinct=false),(count(if
((gid#44 = 3)) attributereference#47 else
null),mode=Partial,isDistinct=false),(count(if ((gid#44 = 2)) dropanynull#46
else null),mode=Partial,isDistinct=false)],
output=[key#3,count#49L,count#53L,count#51L])
Aggregate false,
[key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44],
[key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44]
ConvertToSafe
TungstenExchange(Shuffle without coordinator)
hashpartitioning(key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44,200),
None
ConvertToUnsafe
Aggregate true,
[key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44],
[key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44]
!Expand [List(key#3, value1#4, null, null, 1),List(key#3, null,
dropanynull(struct(value1#4,value2#5)), null, 2),List(key#3, null, null,
value2#5, 3)],
[key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44]
LocalTableScan [key#3,value1#4,value2#5],
[[1,10,-10],[null,-60,60],[1,30,-30],[1,30,30],[2,1,1],[null,-10,10],[2,-1,null],[2,1,1],[2,null,1],[null,100,-10],[3,null,3],[null,null,null],[3,null,null]]
Is it possible to add support for fixed width structs as group by
expression to the new aggregation path?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]