Github user jinxing64 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21252#discussion_r186320836
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -127,13 +127,36 @@ case class TakeOrderedAndProjectExec(
         projectList: Seq[NamedExpression],
         child: SparkPlan) extends UnaryExecNode {
     
    +  val sortInMemThreshold = conf.sortInMemForLimitThreshold
    +
    +  override def requiredChildDistribution: Seq[Distribution] = {
    +    if (limit < sortInMemThreshold) {
    +      super.requiredChildDistribution
    +    } else {
    +      Seq(AllTuples)
    +    }
    +  }
    +
    +  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
    +    if (limit < sortInMemThreshold) {
    +      super.requiredChildOrdering
    +    } else {
    +      Seq(sortOrder)
    +    }
    +  }
    --- End diff --
    
    Thanks a lot for comments :)
    Our users have queries with big number of limit, which suffers OOM with 
current code. So I added a config, below which we go by current code, if not we 
do global sort.
    Yes there will be performance degradation. But it's better than OOM;
    I think a limit-ed heap sort by disk will be a better idea. But I'm didn't 
find an existing implementation and don't want to make it too complicated here.
    Thanks again for review this @viirya 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to