Having the following code in RDD.scala works for me. PS, in the following code, I merge the smaller queue into larger one. I wonder if this will help performance. Let me know when you do the benchmark.
def treeTakeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { if (num == 0) { Array.empty } else { val mapRDDs = mapPartitions { items => // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= util.collection.Utils.takeOrdered(items, num)(ord) Iterator.single(queue) } if (mapRDDs.partitions.length == 0) { Array.empty } else { mapRDDs.treeReduce { (queue1, queue2) => if (queue1.size > queue2.size) { queue1 ++= queue2 queue1 } else { queue2 ++= queue1 queue2 } }.toArray.sorted(ord) } } } def treeTop(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { treeTakeOrdered(num)(ord.reverse) } Sincerely, DB Tsai ---------------------------------------------------------- Blog: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D <https://pgp.mit.edu/pks/lookup?search=0x59DF55B8AF08DF8D> On Tue, Jun 9, 2015 at 10:09 AM, raggy <raghav0110...@gmail.com> wrote: > I am trying to implement top-k in scala within apache spark. I am aware > that > spark has a top action. But, top() uses reduce(). Instead, I would like to > use treeReduce(). I am trying to compare the performance of reduce() and > treeReduce(). > > The main issue I have is that I cannot use these 2 lines of code which are > used in the top() action within my Spark application. > > val queue = new BoundedPriorityQueue[T](num)(ord.reverse) > queue ++= util.collection.Utils.takeOrdered(items, num)(ord) > > How can I go about implementing top() using treeReduce()? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-top-using-treeReduce-tp23227.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >