Right. That is unavoidable unless as you say you repartition into 1 partition, which may do the trick.
When I say send the top k per partition I don't mean send the pq but the actual values. This may end up being relatively small if k and p are not too big. (I'm not sure how large serialized pq is). — Sent from Mailbox On Sat, Jul 5, 2014 at 10:29 AM, Koert Kuipers <ko...@tresata.com> wrote: > hey nick, > you are right. i didnt explain myself well and my code example was wrong... > i am keeping a priority-queue with k items per partition (using > com.twitter.algebird.mutable.PriorityQueueMonoid.build to limit the sizes > of the queues). > but this still means i am sending k items per partition to my driver, so k > x p, while i only need k. > thanks! koert > On Sat, Jul 5, 2014 at 1:21 PM, Nick Pentreath <nick.pentre...@gmail.com> > wrote: >> To make it efficient in your case you may need to do a bit of custom code >> to emit the top k per partition and then only send those to the driver. On >> the driver you can just top k the combined top k from each partition >> (assuming you have (object, count) for each top k list). >> >> — >> Sent from Mailbox <https://www.dropbox.com/mailbox> >> >> >> On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers <ko...@tresata.com> wrote: >> >>> my initial approach to taking top k values of a rdd was using a >>> priority-queue monoid. along these lines: >>> >>> rdd.mapPartitions({ items => Iterator.single(new PriorityQueue(...)) }, >>> false).reduce(monoid.plus) >>> >>> this works fine, but looking at the code for reduce it first reduces >>> within a partition (which doesnt help me) and then sends the results to the >>> driver where these again get reduced. this means that for every partition >>> the (potentially very bulky) priorityqueue gets shipped to the driver. >>> >>> my driver is client side, not inside cluster, and i cannot change this, >>> so this shipping to driver of all these queues can be expensive. >>> >>> is there a better way to do this? should i try to a shuffle first to >>> reduce the partitions to the minimal amount (since number of queues shipped >>> is equal to number of partitions)? >>> >>> is was a way to reduce to a single item RDD, so the queues stay inside >>> cluster and i can retrieve the final result with RDD.first? >>> >> >>