oleg-smith commented on a change in pull request #28488:
URL: https://github.com/apache/spark/pull/28488#discussion_r422668188



##########
File path: core/src/main/scala/org/apache/spark/rdd/RDD.scala
##########
@@ -1031,20 +1032,101 @@ abstract class RDD[T: ClassTag](
     Array.concat(results: _*)
   }
 
+  def toLocalIterator : Iterator[T] = toLocalIterator(false)
+
   /**
    * Return an iterator that contains all of the elements in this RDD.
    *
    * The iterator will consume as much memory as the largest partition in this 
RDD.
+   * With prefetch it may consume up to the memory of the 2 largest partitions.
+   *
+   * @param prefetchPartitions If Spark should pre-fetch the next partition 
before it is needed.
    *
    * @note This results in multiple Spark jobs, and if the input RDD is the 
result
    * of a wide transformation (e.g. join with different partitioners), to avoid
    * recomputing the input RDD should be cached first.
    */
-  def toLocalIterator: Iterator[T] = withScope {
-    def collectPartition(p: Int): Array[T] = {
-      sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p)).head
+  def toLocalIterator(prefetchPartitions: Boolean = false): Iterator[T] = 
withScope {
+
+    if (!prefetchPartitions || partitions.indices.isEmpty) {
+      def collectPartition(p: Int): Array[T] = {
+        sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p)).head
+      }
+      partitions.indices.iterator.flatMap(i => collectPartition(i))
+

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to