Re: top-k function for Window

2017-01-04 Thread Georg Heiler
What about
https://github.com/myui/hivemall/wiki/Efficient-Top-k-computation-on-Apache-Hive-using-Hivemall-UDTF
Koert Kuipers  schrieb am Mi. 4. Jan. 2017 um 16:11:

> i assumed topk of frequencies in one pass. if its topk by known
> sorting/ordering then use priority queue aggregator instead of spacesaver.
>
> On Tue, Jan 3, 2017 at 3:11 PM, Koert Kuipers  wrote:
>
> i dont know anything about windowing or about not using developer apis...
>
> but
>
> but a trivial implementation of top-k requires a total sort per group.
> this can be done with dataset. we do this using spark-sorted (
> https://github.com/tresata/spark-sorted) but its not hard to do it
> yourself for datasets either. for rdds its actually a little harder i think
> (if you want to avoid in memory assumption, which i assume you do)..
>
> a perhaps more efficient implementation uses an aggregator. it is not hard
> to adapt algebirds topk aggregator (spacesaver) to use as a spark
> aggregator. this requires a simple adapter class. we do this in-house as
> well. although i have to say i would recommend spark 2.1.0 for this. spark
> 2.0.x aggregator codegen is too buggy in my experience.
>
> On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang  wrote:
>
> Hi Austin,
>
> It's trivial to implement top-k in the RDD world - however I would like to
> stay in the Dataset API world instead of flip-flopping between the two APIs
> (consistency, wholestage codegen etc).
>
> The twitter library appears to support only RDD, and the solution you gave
> me is very similar to what I did - it doesn't work very well with skewed
> dataset :) (it has to perform the sort to work out the row number).
>
> I've been toying with the UDAF idea, but the more I write the code the
> more I see myself digging deeper into the developer API land  - not very
> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it
> gets messy really fast.
>
> ---
> Regards,
> Andy
>
> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L  wrote:
>
> Andy,
>
>
>
> You might want to also checkout the Algebird libraries from Twitter. They
> have topK and a lot of other helpful functions. I’ve used the Algebird topk
> successfully on very large data sets.
>
>
>
> You can also use Spark SQL to do a “poor man’s” topK. This depends on how
> scrupulous you are about your TopKs (I can expound on this, if needed).
>
>
>
> I obfuscated the field names, before pasting this into email – I think I
> got them all consistently.
>
>
>
> Here’s the meat of the TopK part (found on SO, but I don’t have a
> reference) – this one takes the top 4, hence “rowNum <= 4”:
>
>
>
> SELECT time_bucket,
>
>identifier1,
>
>identifier2,
>
>incomingCount
>
>   FROM (select time_bucket,
>
> identifier1,
>
> identifier2,
>
> incomingCount,
>
>ROW_NUMBER() OVER (PARTITION BY time_bucket,
>
>identifier1
>
>   ORDER BY count DESC) as rowNum
>
>   FROM tablename) tmp
>
>   WHERE rowNum <=4
>
>   ORDER BY time_bucket, identifier1, rowNum
>
>
>
> The count and order by:
>
>
>
>
>
> SELECT time_bucket,
>
>identifier1,
>
>identifier2,
>
>count(identifier2) as myCount
>
>   FROM table
>
>   GROUP BY time_bucket,
>
>identifier1,
>
>identifier2
>
>   ORDER BY time_bucket,
>
>identifier1,
>
>count(identifier2) DESC
>
>
>
>
>
> *From: *Andy Dang 
> *Date: *Tuesday, January 3, 2017 at 7:06 AM
> *To: *user 
> *Subject: *top-k function for Window
>
>
>
> Hi all,
>
>
>
> What's the best way to do top-k with Windowing in Dataset world?
>
>
>
> I have a snippet of code that filters the data to the top-k, but with
> skewed keys:
>
>
>
> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>
> val rank = row_number().over(windowSpec)
>
>
>
> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>
>
>
> The problem with this code is that Spark doesn't know that it can sort the
> data locally, get the local rank first. What it ends up doing is performing
> a sort by key using the skewed keys, and this blew up the cluster since the
> keys are heavily skewed.
>
>
>
> In the RDD world we can do something like:
>
> rdd.mapPartitioins(iterator -> topK(iterator))
>
> but I can't really think of an obvious to do this in the Dataset API,
> especially with Window function. I guess some UserAggregateFunction would
> do, but I wonder if there's obvious way that I missed.
>
>
>
> ---
> Regards,
> Andy
>
>
>
>
>


Re: top-k function for Window

2017-01-04 Thread Koert Kuipers
i assumed topk of frequencies in one pass. if its topk by known
sorting/ordering then use priority queue aggregator instead of spacesaver.

On Tue, Jan 3, 2017 at 3:11 PM, Koert Kuipers  wrote:

> i dont know anything about windowing or about not using developer apis...
>
> but
>
> but a trivial implementation of top-k requires a total sort per group.
> this can be done with dataset. we do this using spark-sorted (
> https://github.com/tresata/spark-sorted) but its not hard to do it
> yourself for datasets either. for rdds its actually a little harder i think
> (if you want to avoid in memory assumption, which i assume you do)..
>
> a perhaps more efficient implementation uses an aggregator. it is not hard
> to adapt algebirds topk aggregator (spacesaver) to use as a spark
> aggregator. this requires a simple adapter class. we do this in-house as
> well. although i have to say i would recommend spark 2.1.0 for this. spark
> 2.0.x aggregator codegen is too buggy in my experience.
>
> On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang  wrote:
>
>> Hi Austin,
>>
>> It's trivial to implement top-k in the RDD world - however I would like
>> to stay in the Dataset API world instead of flip-flopping between the two
>> APIs (consistency, wholestage codegen etc).
>>
>> The twitter library appears to support only RDD, and the solution you
>> gave me is very similar to what I did - it doesn't work very well with
>> skewed dataset :) (it has to perform the sort to work out the row number).
>>
>> I've been toying with the UDAF idea, but the more I write the code the
>> more I see myself digging deeper into the developer API land  - not very
>> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it
>> gets messy really fast.
>>
>> ---
>> Regards,
>> Andy
>>
>> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L  wrote:
>>
>>> Andy,
>>>
>>>
>>>
>>> You might want to also checkout the Algebird libraries from Twitter.
>>> They have topK and a lot of other helpful functions. I’ve used the Algebird
>>> topk successfully on very large data sets.
>>>
>>>
>>>
>>> You can also use Spark SQL to do a “poor man’s” topK. This depends on
>>> how scrupulous you are about your TopKs (I can expound on this, if needed).
>>>
>>>
>>>
>>> I obfuscated the field names, before pasting this into email – I think I
>>> got them all consistently.
>>>
>>>
>>>
>>> Here’s the meat of the TopK part (found on SO, but I don’t have a
>>> reference) – this one takes the top 4, hence “rowNum <= 4”:
>>>
>>>
>>>
>>> SELECT time_bucket,
>>>
>>>identifier1,
>>>
>>>identifier2,
>>>
>>>incomingCount
>>>
>>>   FROM (select time_bucket,
>>>
>>> identifier1,
>>>
>>> identifier2,
>>>
>>> incomingCount,
>>>
>>>ROW_NUMBER() OVER (PARTITION BY time_bucket,
>>>
>>>identifier1
>>>
>>>   ORDER BY count DESC) as rowNum
>>>
>>>   FROM tablename) tmp
>>>
>>>   WHERE rowNum <=4
>>>
>>>   ORDER BY time_bucket, identifier1, rowNum
>>>
>>>
>>>
>>> The count and order by:
>>>
>>>
>>>
>>>
>>>
>>> SELECT time_bucket,
>>>
>>>identifier1,
>>>
>>>identifier2,
>>>
>>>count(identifier2) as myCount
>>>
>>>   FROM table
>>>
>>>   GROUP BY time_bucket,
>>>
>>>identifier1,
>>>
>>>identifier2
>>>
>>>   ORDER BY time_bucket,
>>>
>>>identifier1,
>>>
>>>count(identifier2) DESC
>>>
>>>
>>>
>>>
>>>
>>> *From: *Andy Dang 
>>> *Date: *Tuesday, January 3, 2017 at 7:06 AM
>>> *To: *user 
>>> *Subject: *top-k function for Window
>>>
>>>
>>>
>>> Hi all,
>>>
>>>
>>>
>>> What's the best way to do top-k with Windowing in Dataset world?
>>>
>>>
>>>
>>> I have a snippet of code that filters the data to the top-k, but with
>>> skewed keys:
>>>
>>>
>>>
>>> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>>>
>>> val rank = row_number().over(windowSpec)
>>>
>>>
>>>
>>> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>>>
>>>
>>>
>>> The problem with this code is that Spark doesn't know that it can sort
>>> the data locally, get the local rank first. What it ends up doing is
>>> performing a sort by key using the skewed keys, and this blew up the
>>> cluster since the keys are heavily skewed.
>>>
>>>
>>>
>>> In the RDD world we can do something like:
>>>
>>> rdd.mapPartitioins(iterator -> topK(iterator))
>>>
>>> but I can't really think of an obvious to do this in the Dataset API,
>>> especially with Window function. I guess some UserAggregateFunction would
>>> do, but I wonder if there's obvious way that I missed.
>>>
>>>
>>>
>>> ---
>>> Regards,
>>> Andy
>>>
>>
>>
>


RE: top-k function for Window

2017-01-03 Thread Mendelson, Assaf
Assume you have a UDAF which looks like this:

-  Input: The value

-  Buffer: K elements

-  Output: An array (which would have the K elements)

-  Init: Initialize all elements to some irrelevant value (e.g. 
int.MinValue)

-  Update: Start going over the buffer find the spot which is smaller 
than the current value then push everything forward and put it in (i.e. sorted 
insert)

-  Merge: “merge sort” between the two buffers

-  Evaluate: turn the buffer to array
Then run the UDAF on the groupby.

The result would be an array of (upto) K elements per key. To turn it back to K 
lines all you need to do is explode it.

Assuming that K is small, the calculation of the UDAF would be much faster than 
the sorting (it only needs to do sortings on very small K).

Assaf.
From: Andy Dang [mailto:nam...@gmail.com]
Sent: Tuesday, January 03, 2017 8:03 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: top-k function for Window

> Furthermore, in your example you don’t even need a window function, you can 
> simply use groupby and explode

Can you clarify? You need to sort somehow (be it map-side sorting or 
reduce-side sorting).



---
Regards,
Andy

On Tue, Jan 3, 2017 at 2:07 PM, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
You can write a UDAF in which the buffer contains the top K and manage it. This 
means you don’t need to sort at all. Furthermore, in your example you don’t 
even need a window function, you can simply use groupby and explode.
Of course, this is only relevant if k is small…

From: Andy Dang [mailto:nam...@gmail.com<mailto:nam...@gmail.com>]
Sent: Tuesday, January 03, 2017 3:07 PM
To: user
Subject: top-k function for Window

Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with skewed 
keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the data 
locally, get the local rank first. What it ends up doing is performing a sort 
by key using the skewed keys, and this blew up the cluster since the keys are 
heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API, 
especially with Window function. I guess some UserAggregateFunction would do, 
but I wonder if there's obvious way that I missed.

---
Regards,
Andy



Re: top-k function for Window

2017-01-03 Thread Koert Kuipers
i dont know anything about windowing or about not using developer apis...

but

but a trivial implementation of top-k requires a total sort per group. this
can be done with dataset. we do this using spark-sorted (
https://github.com/tresata/spark-sorted) but its not hard to do it yourself
for datasets either. for rdds its actually a little harder i think (if you
want to avoid in memory assumption, which i assume you do)..

a perhaps more efficient implementation uses an aggregator. it is not hard
to adapt algebirds topk aggregator (spacesaver) to use as a spark
aggregator. this requires a simple adapter class. we do this in-house as
well. although i have to say i would recommend spark 2.1.0 for this. spark
2.0.x aggregator codegen is too buggy in my experience.

On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang  wrote:

> Hi Austin,
>
> It's trivial to implement top-k in the RDD world - however I would like to
> stay in the Dataset API world instead of flip-flopping between the two APIs
> (consistency, wholestage codegen etc).
>
> The twitter library appears to support only RDD, and the solution you gave
> me is very similar to what I did - it doesn't work very well with skewed
> dataset :) (it has to perform the sort to work out the row number).
>
> I've been toying with the UDAF idea, but the more I write the code the
> more I see myself digging deeper into the developer API land  - not very
> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it
> gets messy really fast.
>
> ---
> Regards,
> Andy
>
> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L  wrote:
>
>> Andy,
>>
>>
>>
>> You might want to also checkout the Algebird libraries from Twitter. They
>> have topK and a lot of other helpful functions. I’ve used the Algebird topk
>> successfully on very large data sets.
>>
>>
>>
>> You can also use Spark SQL to do a “poor man’s” topK. This depends on how
>> scrupulous you are about your TopKs (I can expound on this, if needed).
>>
>>
>>
>> I obfuscated the field names, before pasting this into email – I think I
>> got them all consistently.
>>
>>
>>
>> Here’s the meat of the TopK part (found on SO, but I don’t have a
>> reference) – this one takes the top 4, hence “rowNum <= 4”:
>>
>>
>>
>> SELECT time_bucket,
>>
>>identifier1,
>>
>>identifier2,
>>
>>incomingCount
>>
>>   FROM (select time_bucket,
>>
>> identifier1,
>>
>> identifier2,
>>
>> incomingCount,
>>
>>ROW_NUMBER() OVER (PARTITION BY time_bucket,
>>
>>identifier1
>>
>>   ORDER BY count DESC) as rowNum
>>
>>   FROM tablename) tmp
>>
>>   WHERE rowNum <=4
>>
>>   ORDER BY time_bucket, identifier1, rowNum
>>
>>
>>
>> The count and order by:
>>
>>
>>
>>
>>
>> SELECT time_bucket,
>>
>>identifier1,
>>
>>identifier2,
>>
>>count(identifier2) as myCount
>>
>>   FROM table
>>
>>   GROUP BY time_bucket,
>>
>>identifier1,
>>
>>identifier2
>>
>>   ORDER BY time_bucket,
>>
>>identifier1,
>>
>>count(identifier2) DESC
>>
>>
>>
>>
>>
>> *From: *Andy Dang 
>> *Date: *Tuesday, January 3, 2017 at 7:06 AM
>> *To: *user 
>> *Subject: *top-k function for Window
>>
>>
>>
>> Hi all,
>>
>>
>>
>> What's the best way to do top-k with Windowing in Dataset world?
>>
>>
>>
>> I have a snippet of code that filters the data to the top-k, but with
>> skewed keys:
>>
>>
>>
>> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>>
>> val rank = row_number().over(windowSpec)
>>
>>
>>
>> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>>
>>
>>
>> The problem with this code is that Spark doesn't know that it can sort
>> the data locally, get the local rank first. What it ends up doing is
>> performing a sort by key using the skewed keys, and this blew up the
>> cluster since the keys are heavily skewed.
>>
>>
>>
>> In the RDD world we can do something like:
>>
>> rdd.mapPartitioins(iterator -> topK(iterator))
>>
>> but I can't really think of an obvious to do this in the Dataset API,
>> especially with Window function. I guess some UserAggregateFunction would
>> do, but I wonder if there's obvious way that I missed.
>>
>>
>>
>> ---
>> Regards,
>> Andy
>>
>
>


Re: top-k function for Window

2017-01-03 Thread Andy Dang
Hi Austin,

It's trivial to implement top-k in the RDD world - however I would like to
stay in the Dataset API world instead of flip-flopping between the two APIs
(consistency, wholestage codegen etc).

The twitter library appears to support only RDD, and the solution you gave
me is very similar to what I did - it doesn't work very well with skewed
dataset :) (it has to perform the sort to work out the row number).

I've been toying with the UDAF idea, but the more I write the code the more
I see myself digging deeper into the developer API land  - not very ideal
to be honest. Also, UDAF doesn't have any concept of sorting, so it gets
messy really fast.

---
Regards,
Andy

On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L  wrote:

> Andy,
>
>
>
> You might want to also checkout the Algebird libraries from Twitter. They
> have topK and a lot of other helpful functions. I’ve used the Algebird topk
> successfully on very large data sets.
>
>
>
> You can also use Spark SQL to do a “poor man’s” topK. This depends on how
> scrupulous you are about your TopKs (I can expound on this, if needed).
>
>
>
> I obfuscated the field names, before pasting this into email – I think I
> got them all consistently.
>
>
>
> Here’s the meat of the TopK part (found on SO, but I don’t have a
> reference) – this one takes the top 4, hence “rowNum <= 4”:
>
>
>
> SELECT time_bucket,
>
>identifier1,
>
>identifier2,
>
>incomingCount
>
>   FROM (select time_bucket,
>
> identifier1,
>
> identifier2,
>
> incomingCount,
>
>ROW_NUMBER() OVER (PARTITION BY time_bucket,
>
>identifier1
>
>   ORDER BY count DESC) as rowNum
>
>   FROM tablename) tmp
>
>   WHERE rowNum <=4
>
>   ORDER BY time_bucket, identifier1, rowNum
>
>
>
> The count and order by:
>
>
>
>
>
> SELECT time_bucket,
>
>identifier1,
>
>identifier2,
>
>count(identifier2) as myCount
>
>   FROM table
>
>   GROUP BY time_bucket,
>
>identifier1,
>
>identifier2
>
>   ORDER BY time_bucket,
>
>identifier1,
>
>count(identifier2) DESC
>
>
>
>
>
> *From: *Andy Dang 
> *Date: *Tuesday, January 3, 2017 at 7:06 AM
> *To: *user 
> *Subject: *top-k function for Window
>
>
>
> Hi all,
>
>
>
> What's the best way to do top-k with Windowing in Dataset world?
>
>
>
> I have a snippet of code that filters the data to the top-k, but with
> skewed keys:
>
>
>
> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>
> val rank = row_number().over(windowSpec)
>
>
>
> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>
>
>
> The problem with this code is that Spark doesn't know that it can sort the
> data locally, get the local rank first. What it ends up doing is performing
> a sort by key using the skewed keys, and this blew up the cluster since the
> keys are heavily skewed.
>
>
>
> In the RDD world we can do something like:
>
> rdd.mapPartitioins(iterator -> topK(iterator))
>
> but I can't really think of an obvious to do this in the Dataset API,
> especially with Window function. I guess some UserAggregateFunction would
> do, but I wonder if there's obvious way that I missed.
>
>
>
> ---
> Regards,
> Andy
>


Re: top-k function for Window

2017-01-03 Thread HENSLEE, AUSTIN L
Andy,

You might want to also checkout the Algebird libraries from Twitter. They have 
topK and a lot of other helpful functions. I’ve used the Algebird topk 
successfully on very large data sets.

You can also use Spark SQL to do a “poor man’s” topK. This depends on how 
scrupulous you are about your TopKs (I can expound on this, if needed).

I obfuscated the field names, before pasting this into email – I think I got 
them all consistently.

Here’s the meat of the TopK part (found on SO, but I don’t have a reference) – 
this one takes the top 4, hence “rowNum <= 4”:

SELECT time_bucket,
   identifier1,
   identifier2,
   incomingCount
  FROM (select time_bucket,
identifier1,
identifier2,
incomingCount,
   ROW_NUMBER() OVER (PARTITION BY time_bucket,
   identifier1
  ORDER BY count DESC) as rowNum
  FROM tablename) tmp
  WHERE rowNum <=4
  ORDER BY time_bucket, identifier1, rowNum

The count and order by:


SELECT time_bucket,
   identifier1,
   identifier2,
   count(identifier2) as myCount
  FROM table
  GROUP BY time_bucket,
   identifier1,
   identifier2
  ORDER BY time_bucket,
   identifier1,
   count(identifier2) DESC


From: Andy Dang 
Date: Tuesday, January 3, 2017 at 7:06 AM
To: user 
Subject: top-k function for Window

Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with skewed 
keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the data 
locally, get the local rank first. What it ends up doing is performing a sort 
by key using the skewed keys, and this blew up the cluster since the keys are 
heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API, 
especially with Window function. I guess some UserAggregateFunction would do, 
but I wonder if there's obvious way that I missed.

---
Regards,
Andy


RE: top-k function for Window

2017-01-03 Thread Mendelson, Assaf
You can write a UDAF in which the buffer contains the top K and manage it. This 
means you don’t need to sort at all. Furthermore, in your example you don’t 
even need a window function, you can simply use groupby and explode.
Of course, this is only relevant if k is small…

From: Andy Dang [mailto:nam...@gmail.com]
Sent: Tuesday, January 03, 2017 3:07 PM
To: user
Subject: top-k function for Window

Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with skewed 
keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the data 
locally, get the local rank first. What it ends up doing is performing a sort 
by key using the skewed keys, and this blew up the cluster since the keys are 
heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API, 
especially with Window function. I guess some UserAggregateFunction would do, 
but I wonder if there's obvious way that I missed.

---
Regards,
Andy