Github user GraceH commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8128#discussion_r37046803
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---
    @@ -224,6 +225,56 @@ case class Limit(limit: Int, child: SparkPlan)
     
     /**
      * :: DeveloperApi ::
    + * Take the first limit elements. and the limit can be any number less 
than Integer.MAX_VALUE.
    + * If it is terminal and is invoked using executeCollect, it probably 
cause OOM if the
    + * records number is large enough. Not like the Limit clause, this 
operator will not change
    + * any partitions of its child operator.
    + */
    +@DeveloperApi
    +case class LargeLimit(limit: Int, child: SparkPlan)
    +  extends UnaryNode {
    +  /** We must copy rows when sort based shuffle is on */
    +  private def sortBasedShuffleOn = 
SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
    +
    +  override def output: Seq[Attribute] = child.output
    +
    +  override def executeCollect(): Array[Row] = child.executeTake(limit)
    +
    +  protected override def doExecute(): RDD[InternalRow] = {
    +    val rdd = if (sortBasedShuffleOn) {
    +      child.execute().map(_.copy()).persist(StorageLevel.MEMORY_AND_DISK)
    +    } else {
    +      child.execute().persist(StorageLevel.MEMORY_AND_DISK)
    +    }
    +
    +    // We assume the maximize record number in a partition is less than 
Integer.MAX_VALUE
    +    val partitionRecordCounts = rdd.mapPartitions({ iterator =>
    +      Iterator(iterator.count(_ => true))
    +    }, true).collect()
    +
    +    var totalSize = 0
    +    // how many records we have to take from each partition
    +    val requiredRecordCounts = partitionRecordCounts.map { count =>
    --- End diff --
    
    just minor suggestion. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to