Here is my understanding def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = { if (num == 0) { //if 0, return empty array Array.empty } else { mapPartitions { items => //map each partition to a a new one with the iterator consists of the single queue, which has num of elements. // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= util.collection.Utils.takeOrdered(items, num)(ord) Iterator.single(queue) }.reduce { (queue1, queue2) => //runJob is called here to collect all the element from rdd, which is actually a queue from each partition. queue1 ++= queue2 queue1 }.toArray.sorted(ord) //to array and sort } }
On Sep 23, 2014, at 9:33 PM, Deep Pradhan <pradhandeep1...@gmail.com> wrote: > Hi, > Is it always possible to get one RDD from another. > For example, if I do a top(K)(Ordering....), I get an Int right? (In my > example the type is Int). I do not get an RDD. > Can anyone explain this to me? > Thank You -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.