What about
https://github.com/myui/hivemall/wiki/Efficient-Top-k-computation-on-Apache-Hive-using-Hivemall-UDTF
Koert Kuipers <ko...@tresata.com> schrieb am Mi. 4. Jan. 2017 um 16:11:

> i assumed topk of frequencies in one pass. if its topk by known
> sorting/ordering then use priority queue aggregator instead of spacesaver.
>
> On Tue, Jan 3, 2017 at 3:11 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
> i dont know anything about windowing or about not using developer apis...
>
> but
>
> but a trivial implementation of top-k requires a total sort per group.
> this can be done with dataset. we do this using spark-sorted (
> https://github.com/tresata/spark-sorted) but its not hard to do it
> yourself for datasets either. for rdds its actually a little harder i think
> (if you want to avoid in memory assumption, which i assume you do)..
>
> a perhaps more efficient implementation uses an aggregator. it is not hard
> to adapt algebirds topk aggregator (spacesaver) to use as a spark
> aggregator. this requires a simple adapter class. we do this in-house as
> well. although i have to say i would recommend spark 2.1.0 for this. spark
> 2.0.x aggregator codegen is too buggy in my experience.
>
> On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang <nam...@gmail.com> wrote:
>
> Hi Austin,
>
> It's trivial to implement top-k in the RDD world - however I would like to
> stay in the Dataset API world instead of flip-flopping between the two APIs
> (consistency, wholestage codegen etc).
>
> The twitter library appears to support only RDD, and the solution you gave
> me is very similar to what I did - it doesn't work very well with skewed
> dataset :) (it has to perform the sort to work out the row number).
>
> I've been toying with the UDAF idea, but the more I write the code the
> more I see myself digging deeper into the developer API land  - not very
> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it
> gets messy really fast.
>
> -------
> Regards,
> Andy
>
> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L <ah6...@att.com> wrote:
>
> Andy,
>
>
>
> You might want to also checkout the Algebird libraries from Twitter. They
> have topK and a lot of other helpful functions. I’ve used the Algebird topk
> successfully on very large data sets.
>
>
>
> You can also use Spark SQL to do a “poor man’s” topK. This depends on how
> scrupulous you are about your TopKs (I can expound on this, if needed).
>
>
>
> I obfuscated the field names, before pasting this into email – I think I
> got them all consistently.
>
>
>
> Here’s the meat of the TopK part (found on SO, but I don’t have a
> reference) – this one takes the top 4, hence “rowNum <= 4”:
>
>
>
> SELECT time_bucket,
>
>        identifier1,
>
>        identifier2,
>
>        incomingCount
>
>   FROM (select time_bucket,
>
>         identifier1,
>
>         identifier2,
>
>         incomingCount,
>
>        ROW_NUMBER() OVER (PARTITION BY time_bucket,
>
>                                        identifier1
>
>                               ORDER BY count DESC) as rowNum
>
>                                   FROM tablename) tmp
>
>   WHERE rowNum <=4
>
>   ORDER BY time_bucket, identifier1, rowNum
>
>
>
> The count and order by:
>
>
>
>
>
> SELECT time_bucket,
>
>        identifier1,
>
>        identifier2,
>
>        count(identifier2) as myCount
>
>   FROM table
>
>   GROUP BY time_bucket,
>
>            identifier1,
>
>            identifier2
>
>   ORDER BY time_bucket,
>
>            identifier1,
>
>            count(identifier2) DESC
>
>
>
>
>
> *From: *Andy Dang <nam...@gmail.com>
> *Date: *Tuesday, January 3, 2017 at 7:06 AM
> *To: *user <user@spark.apache.org>
> *Subject: *top-k function for Window
>
>
>
> Hi all,
>
>
>
> What's the best way to do top-k with Windowing in Dataset world?
>
>
>
> I have a snippet of code that filters the data to the top-k, but with
> skewed keys:
>
>
>
> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>
> val rank = row_number().over(windowSpec)
>
>
>
> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>
>
>
> The problem with this code is that Spark doesn't know that it can sort the
> data locally, get the local rank first. What it ends up doing is performing
> a sort by key using the skewed keys, and this blew up the cluster since the
> keys are heavily skewed.
>
>
>
> In the RDD world we can do something like:
>
> rdd.mapPartitioins(iterator -> topK(iterator))
>
> but I can't really think of an obvious to do this in the Dataset API,
> especially with Window function. I guess some UserAggregateFunction would
> do, but I wonder if there's obvious way that I missed.
>
>
>
> -------
> Regards,
> Andy
>
>
>
>
>

Reply via email to