Github user cenyuhai commented on a diff in the pull request:
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/Utils.scala 
    @@ -30,10 +34,22 @@ private[spark] object Utils {
        * Returns the first K elements from the input as defined by the 
specified implicit Ordering[T]
        * and maintains the ordering.
    -  def takeOrdered[T](input: Iterator[T], num: Int)(implicit ord: 
Ordering[T]): Iterator[T] = {
    -    val ordering = new GuavaOrdering[T] {
    -      override def compare(l: T, r: T): Int =, r)
    +  def takeOrdered[T](input: Iterator[T], num: Int,
    +      ser: Serializer = SparkEnv.get.serializer)(implicit ord: 
Ordering[T]): Iterator[T] = {
    +    val context = TaskContext.get()
    +    if (context == null) {
    +      val ordering = new GuavaOrdering[T] {
    +        override def compare(l: T, r: T): Int =, r)
    +      }
    +      ordering.leastOf(input.asJava, num).iterator.asScala
    +    } else {
    +      val sorter =
    +        new ExternalSorter[T, Any, Any](context, None, None, Some(ord), 
    +      sorter.insertAll( => (x, null)))
    --- End diff --
    1.In my case,  user execute a sql "select * from table sort by time limit 
10000000", the k is very large, it's an extreme case. I need not change 
RDD.takeOrdered. I will limit the changes in limit.scala.
    2. GuavaOrdering will sort all data in memory and then take top k. If there 
is enough memory, ExternalSorter will not spill.

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to