Right. That is unavoidable unless as you say you repartition into 1 partition, 
which may do the trick.


When I say send the top k per partition I don't mean send the pq but the actual 
values. This may end up being relatively small if k and p are not too big. (I'm 
not sure how large serialized pq is).
—
Sent from Mailbox

On Sat, Jul 5, 2014 at 10:29 AM, Koert Kuipers <ko...@tresata.com> wrote:

> hey nick,
> you are right. i didnt explain myself well and my code example was wrong...
> i am keeping a priority-queue with k items per partition (using
> com.twitter.algebird.mutable.PriorityQueueMonoid.build to limit the sizes
> of the queues).
> but this still means i am sending k items per partition to my driver, so k
> x p, while i only need k.
> thanks! koert
> On Sat, Jul 5, 2014 at 1:21 PM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>> To make it efficient in your case you may need to do a bit of custom code
>> to emit the top k per partition and then only send those to the driver. On
>> the driver you can just top k the combined top k from each partition
>> (assuming you have (object, count) for each top k list).
>>
>> —
>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>
>>
>> On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> my initial approach to taking top k values of a rdd was using a
>>> priority-queue monoid. along these lines:
>>>
>>>  rdd.mapPartitions({ items => Iterator.single(new PriorityQueue(...)) },
>>> false).reduce(monoid.plus)
>>>
>>> this works fine, but looking at the code for reduce it first reduces
>>> within a partition (which doesnt help me) and then sends the results to the
>>> driver where these again get reduced. this means that for every partition
>>> the (potentially very bulky) priorityqueue gets shipped to the driver.
>>>
>>> my driver is client side, not inside cluster, and i cannot change this,
>>> so this shipping to driver of all these queues can be expensive.
>>>
>>> is there a better way to do this? should i try to a shuffle first to
>>> reduce the partitions to the minimal amount (since number of queues shipped
>>> is equal to number of partitions)?
>>>
>>> is was a way to reduce to a single item RDD, so the queues stay inside
>>> cluster and i can retrieve the final result with RDD.first?
>>>
>>
>>

Reply via email to