Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/21252#discussion_r186320836 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -127,13 +127,36 @@ case class TakeOrderedAndProjectExec( projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryExecNode { + val sortInMemThreshold = conf.sortInMemForLimitThreshold + + override def requiredChildDistribution: Seq[Distribution] = { + if (limit < sortInMemThreshold) { + super.requiredChildDistribution + } else { + Seq(AllTuples) + } + } + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = { + if (limit < sortInMemThreshold) { + super.requiredChildOrdering + } else { + Seq(sortOrder) + } + } --- End diff -- Thanks a lot for comments :) Our users have queries with big number of limit, which suffers OOM with current code. So I added a config, below which we go by current code, if not we do global sort. Yes there will be performance degradation. But it's better than OOM; I think a limit-ed heap sort by disk will be a better idea. But I'm didn't find an existing implementation and don't want to make it too complicated here. Thanks again for review this @viirya
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org