Re: How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Andy Dang
Hi Takeshi,

Thanks for the answer. My UDAF aggregates data into an array of rows.

Apparently this makes it ineligible to using Hash-based aggregate based on
the logic at:
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java#L74
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L108

The list of support data type is VERY limited unfortunately.

It doesn't make sense to me that data type must be mutable for the UDAF to
use hash-based aggregate, but I could be missing something here :). I could
achieve hash-based aggregate by turning this query to RDD mode, but that is
counter intuitive IMO.

---
Regards,
Andy

On Mon, Jan 9, 2017 at 2:05 PM, Takeshi Yamamuro 
wrote:

> Hi,
>
> Spark always uses hash-based aggregates if the types of aggregated data
> are supported there;
> otherwise, spark fails to use hash-based ones, then it uses sort-based
> ones.
> See: https://github.com/apache/spark/blob/master/sql/
> core/src/main/scala/org/apache/spark/sql/execution/
> aggregate/AggUtils.scala#L38
>
> So, I'm not sure about your query though, it seems the types of aggregated
> data in your query
> are not supported for hash-based aggregates.
>
> // maropu
>
>
>
> On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang  wrote:
>
>> Hi all,
>>
>> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
>> which is very inefficient for certain aggration:
>>
>> The code is very simple:
>> - I have a UDAF
>> - What I want to do is: dataset.groupBy(cols).agg(udaf).count()
>>
>> The physical plan I got was:
>> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
>> +- Exchange SinglePartition
>>+- *HashAggregate(keys=[], functions=[partial_count(1)],
>> output=[count#71L])
>>   +- *Project
>>  +- Generate explode(internal_col#31), false, false,
>> [internal_col#42]
>> +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0,
>> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
>> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31])
>>+- *Sort [key#0 ASC], false, 0
>>   +- Exchange hashpartitioning(key#0, 200)
>>  +- SortAggregate(key=[key#0],
>> functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
>> nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
>> output=[key#0,internal_col#37])
>> +- *Sort [key#0 ASC], false, 0
>>+- Scan ExistingRDD[key#0,nested#1,nes
>> tedArray#2,nestedObjectArray#3,value#4L]
>>
>> How can I make Spark to use HashAggregate (like the count(*) expression)
>> instead of SortAggregate with my UDAF?
>>
>> Is it intentional? Is there an issue tracking this?
>>
>> ---
>> Regards,
>> Andy
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Takeshi Yamamuro
Hi,

Spark always uses hash-based aggregates if the types of aggregated data are
supported there;
otherwise, spark fails to use hash-based ones, then it uses sort-based ones.
See:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L38

So, I'm not sure about your query though, it seems the types of aggregated
data in your query
are not supported for hash-based aggregates.

// maropu



On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang  wrote:

> Hi all,
>
> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
> which is very inefficient for certain aggration:
>
> The code is very simple:
> - I have a UDAF
> - What I want to do is: dataset.groupBy(cols).agg(udaf).count()
>
> The physical plan I got was:
> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
> +- Exchange SinglePartition
>+- *HashAggregate(keys=[], functions=[partial_count(1)],
> output=[count#71L])
>   +- *Project
>  +- Generate explode(internal_col#31), false, false,
> [internal_col#42]
> +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0,
> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31])
>+- *Sort [key#0 ASC], false, 0
>   +- Exchange hashpartitioning(key#0, 200)
>  +- SortAggregate(key=[key#0], 
> functions=[partial_aggregatefunction(key#0,
> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
> com.[...]uDf@108b121f, 0, 0)], output=[key#0,internal_col#37])
> +- *Sort [key#0 ASC], false, 0
>+- Scan ExistingRDD[key#0,nested#1,
> nestedArray#2,nestedObjectArray#3,value#4L]
>
> How can I make Spark to use HashAggregate (like the count(*) expression)
> instead of SortAggregate with my UDAF?
>
> Is it intentional? Is there an issue tracking this?
>
> ---
> Regards,
> Andy
>



-- 
---
Takeshi Yamamuro


How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Andy Dang
Hi all,

It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
which is very inefficient for certain aggration:

The code is very simple:
- I have a UDAF
- What I want to do is: dataset.groupBy(cols).agg(udaf).count()

The physical plan I got was:
*HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)],
output=[count#71L])
  +- *Project
 +- Generate explode(internal_col#31), false, false,
[internal_col#42]
+- SortAggregate(key=[key#0],
functions=[aggregatefunction(key#0, nested#1, nestedArray#2,
nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
output=[internal_col#31])
   +- *Sort [key#0 ASC], false, 0
  +- Exchange hashpartitioning(key#0, 200)
 +- SortAggregate(key=[key#0],
functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
output=[key#0,internal_col#37])
+- *Sort [key#0 ASC], false, 0
   +- Scan
ExistingRDD[key#0,nested#1,nestedArray#2,nestedObjectArray#3,value#4L]

How can I make Spark to use HashAggregate (like the count(*) expression)
instead of SortAggregate with my UDAF?

Is it intentional? Is there an issue tracking this?

---
Regards,
Andy