oleg-smith commented on a change in pull request #28488:
URL: https://github.com/apache/spark/pull/28488#discussion_r422698223
##########
File path: core/src/main/scala/org/apache/spark/rdd/RDD.scala
##########
@@ -1031,20 +1033,24 @@ abstract class RDD[T: ClassTag](
Array.concat(results: _*)
}
+ def toLocalIterator : Iterator[T] = toLocalIterator(false)
+
/**
* Return an iterator that contains all of the elements in this RDD.
*
* The iterator will consume as much memory as the largest partition in this
RDD.
+ * With prefetch it may consume up to the memory of the 2 largest partitions.
+ *
+ * @param prefetchPartitions If Spark should pre-fetch the next partition
before it is needed.
*
* @note This results in multiple Spark jobs, and if the input RDD is the
result
* of a wide transformation (e.g. join with different partitioners), to avoid
* recomputing the input RDD should be cached first.
*/
- def toLocalIterator: Iterator[T] = withScope {
- def collectPartition(p: Int): Array[T] = {
- sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p)).head
- }
- partitions.indices.iterator.flatMap(i => collectPartition(i))
+ def toLocalIterator(prefetchPartitions: Boolean = false): Iterator[T] =
withScope {
+ val iterator = new PrefetchingIterator(this, prefetchPartitions)
+ if (prefetchPartitions) iterator.hasNext
Review comment:
@holdenk @srowen Should we not prefetch the very first element here?
For a user, it might be better to get iterator without side effects first,
and submit jobs or receive errors only if iteration was intentionally started.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]