Github user chenghao-intel commented on a diff in the pull request:
https://github.com/apache/spark/pull/8128#discussion_r36866693
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---
@@ -224,6 +225,56 @@ case class Limit(limit: Int, child: SparkPlan)
/**
* :: DeveloperApi ::
+ * Take the first limit elements. and the limit can be any number less
than Integer.MAX_VALUE.
+ * If it is terminal and is invoked using executeCollect, it probably
cause OOM if the
+ * records number is large enough. Not like the Limit clause, this
operator will not change
+ * any partitions of its child operator.
+ */
+@DeveloperApi
+case class LargeLimit(limit: Int, child: SparkPlan)
+ extends UnaryNode {
+ /** We must copy rows when sort based shuffle is on */
+ private def sortBasedShuffleOn =
SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
+
+ override def output: Seq[Attribute] = child.output
+
+ override def executeCollect(): Array[Row] = child.executeTake(limit)
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ val rdd = if (sortBasedShuffleOn) {
+ child.execute().map(_.copy()).persist(StorageLevel.MEMORY_AND_DISK)
+ } else {
+ child.execute().persist(StorageLevel.MEMORY_AND_DISK)
--- End diff --
@rxin, is there anyway to auto unpersist the cache?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]