Re: How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Liang-Chi Hsieh

Hi Andy,

Because hash-based aggregate uses unsafe row as aggregation states, so the
aggregation buffer schema must be mutable types in unsafe row.

If you can use TypedImperativeAggregate to implement your aggregation
function, SparkSQL has ObjectHashAggregateExec which supports hash-based
aggregate using arbitrary JVM objects as aggregation states.



Andy Dang wrote
> Hi Takeshi,
> 
> Thanks for the answer. My UDAF aggregates data into an array of rows.
> 
> Apparently this makes it ineligible to using Hash-based aggregate based on
> the logic at:
> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java#L74
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L108
> 
> The list of support data type is VERY limited unfortunately.
> 
> It doesn't make sense to me that data type must be mutable for the UDAF to
> use hash-based aggregate, but I could be missing something here :). I
> could
> achieve hash-based aggregate by turning this query to RDD mode, but that
> is
> counter intuitive IMO.
> 
> ---
> Regards,
> Andy
> 
> On Mon, Jan 9, 2017 at 2:05 PM, Takeshi Yamamuro 

> linguin.m.s@

> 
> wrote:
> 
>> Hi,
>>
>> Spark always uses hash-based aggregates if the types of aggregated data
>> are supported there;
>> otherwise, spark fails to use hash-based ones, then it uses sort-based
>> ones.
>> See: https://github.com/apache/spark/blob/master/sql/
>> core/src/main/scala/org/apache/spark/sql/execution/
>> aggregate/AggUtils.scala#L38
>>
>> So, I'm not sure about your query though, it seems the types of
>> aggregated
>> data in your query
>> are not supported for hash-based aggregates.
>>
>> // maropu
>>
>>
>>
>> On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang 

> namd88@

>  wrote:
>>
>>> Hi all,
>>>
>>> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
>>> which is very inefficient for certain aggration:
>>>
>>> The code is very simple:
>>> - I have a UDAF
>>> - What I want to do is: dataset.groupBy(cols).agg(udaf).count()
>>>
>>> The physical plan I got was:
>>> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
>>> +- Exchange SinglePartition
>>>+- *HashAggregate(keys=[], functions=[partial_count(1)],
>>> output=[count#71L])
>>>   +- *Project
>>>  +- Generate explode(internal_col#31), false, false,
>>> [internal_col#42]
>>> +- SortAggregate(key=[key#0],
>>> functions=[aggregatefunction(key#0,
>>> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
>>> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31])
>>>+- *Sort [key#0 ASC], false, 0
>>>   +- Exchange hashpartitioning(key#0, 200)
>>>  +- SortAggregate(key=[key#0],
>>> functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
>>> nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
>>> output=[key#0,internal_col#37])
>>> +- *Sort [key#0 ASC], false, 0
>>>+- Scan ExistingRDD[key#0,nested#1,nes
>>> tedArray#2,nestedObjectArray#3,value#4L]
>>>
>>> How can I make Spark to use HashAggregate (like the count(*) expression)
>>> instead of SortAggregate with my UDAF?
>>>
>>> Is it intentional? Is there an issue tracking this?
>>>
>>> ---
>>> Regards,
>>> Andy
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>





-
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/How-to-hint-Spark-to-use-HashAggregate-for-UDAF-tp20526p20531.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Andy Dang
Hi Takeshi,

Thanks for the answer. My UDAF aggregates data into an array of rows.

Apparently this makes it ineligible to using Hash-based aggregate based on
the logic at:
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java#L74
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L108

The list of support data type is VERY limited unfortunately.

It doesn't make sense to me that data type must be mutable for the UDAF to
use hash-based aggregate, but I could be missing something here :). I could
achieve hash-based aggregate by turning this query to RDD mode, but that is
counter intuitive IMO.

---
Regards,
Andy

On Mon, Jan 9, 2017 at 2:05 PM, Takeshi Yamamuro 
wrote:

> Hi,
>
> Spark always uses hash-based aggregates if the types of aggregated data
> are supported there;
> otherwise, spark fails to use hash-based ones, then it uses sort-based
> ones.
> See: https://github.com/apache/spark/blob/master/sql/
> core/src/main/scala/org/apache/spark/sql/execution/
> aggregate/AggUtils.scala#L38
>
> So, I'm not sure about your query though, it seems the types of aggregated
> data in your query
> are not supported for hash-based aggregates.
>
> // maropu
>
>
>
> On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang  wrote:
>
>> Hi all,
>>
>> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
>> which is very inefficient for certain aggration:
>>
>> The code is very simple:
>> - I have a UDAF
>> - What I want to do is: dataset.groupBy(cols).agg(udaf).count()
>>
>> The physical plan I got was:
>> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
>> +- Exchange SinglePartition
>>+- *HashAggregate(keys=[], functions=[partial_count(1)],
>> output=[count#71L])
>>   +- *Project
>>  +- Generate explode(internal_col#31), false, false,
>> [internal_col#42]
>> +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0,
>> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
>> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31])
>>+- *Sort [key#0 ASC], false, 0
>>   +- Exchange hashpartitioning(key#0, 200)
>>  +- SortAggregate(key=[key#0],
>> functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
>> nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
>> output=[key#0,internal_col#37])
>> +- *Sort [key#0 ASC], false, 0
>>+- Scan ExistingRDD[key#0,nested#1,nes
>> tedArray#2,nestedObjectArray#3,value#4L]
>>
>> How can I make Spark to use HashAggregate (like the count(*) expression)
>> instead of SortAggregate with my UDAF?
>>
>> Is it intentional? Is there an issue tracking this?
>>
>> ---
>> Regards,
>> Andy
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Takeshi Yamamuro
Hi,

Spark always uses hash-based aggregates if the types of aggregated data are
supported there;
otherwise, spark fails to use hash-based ones, then it uses sort-based ones.
See:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L38

So, I'm not sure about your query though, it seems the types of aggregated
data in your query
are not supported for hash-based aggregates.

// maropu



On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang  wrote:

> Hi all,
>
> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
> which is very inefficient for certain aggration:
>
> The code is very simple:
> - I have a UDAF
> - What I want to do is: dataset.groupBy(cols).agg(udaf).count()
>
> The physical plan I got was:
> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
> +- Exchange SinglePartition
>+- *HashAggregate(keys=[], functions=[partial_count(1)],
> output=[count#71L])
>   +- *Project
>  +- Generate explode(internal_col#31), false, false,
> [internal_col#42]
> +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0,
> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31])
>+- *Sort [key#0 ASC], false, 0
>   +- Exchange hashpartitioning(key#0, 200)
>  +- SortAggregate(key=[key#0], 
> functions=[partial_aggregatefunction(key#0,
> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
> com.[...]uDf@108b121f, 0, 0)], output=[key#0,internal_col#37])
> +- *Sort [key#0 ASC], false, 0
>+- Scan ExistingRDD[key#0,nested#1,
> nestedArray#2,nestedObjectArray#3,value#4L]
>
> How can I make Spark to use HashAggregate (like the count(*) expression)
> instead of SortAggregate with my UDAF?
>
> Is it intentional? Is there an issue tracking this?
>
> ---
> Regards,
> Andy
>



-- 
---
Takeshi Yamamuro


How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Andy Dang
Hi all,

It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
which is very inefficient for certain aggration:

The code is very simple:
- I have a UDAF
- What I want to do is: dataset.groupBy(cols).agg(udaf).count()

The physical plan I got was:
*HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)],
output=[count#71L])
  +- *Project
 +- Generate explode(internal_col#31), false, false,
[internal_col#42]
+- SortAggregate(key=[key#0],
functions=[aggregatefunction(key#0, nested#1, nestedArray#2,
nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
output=[internal_col#31])
   +- *Sort [key#0 ASC], false, 0
  +- Exchange hashpartitioning(key#0, 200)
 +- SortAggregate(key=[key#0],
functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
output=[key#0,internal_col#37])
+- *Sort [key#0 ASC], false, 0
   +- Scan
ExistingRDD[key#0,nested#1,nestedArray#2,nestedObjectArray#3,value#4L]

How can I make Spark to use HashAggregate (like the count(*) expression)
instead of SortAggregate with my UDAF?

Is it intentional? Is there an issue tracking this?

---
Regards,
Andy