holdenk commented on a change in pull request #28488:
URL: https://github.com/apache/spark/pull/28488#discussion_r422572289
##########
File path: core/src/main/scala/org/apache/spark/rdd/RDD.scala
##########
@@ -1031,20 +1032,101 @@ abstract class RDD[T: ClassTag](
Array.concat(results: _*)
}
+ def toLocalIterator : Iterator[T] = toLocalIterator(false)
+
/**
* Return an iterator that contains all of the elements in this RDD.
*
* The iterator will consume as much memory as the largest partition in this
RDD.
+ * With prefetch it may consume up to the memory of the 2 largest partitions.
+ *
+ * @param prefetchPartitions If Spark should pre-fetch the next partition
before it is needed.
*
* @note This results in multiple Spark jobs, and if the input RDD is the
result
* of a wide transformation (e.g. join with different partitioners), to avoid
* recomputing the input RDD should be cached first.
*/
- def toLocalIterator: Iterator[T] = withScope {
- def collectPartition(p: Int): Array[T] = {
- sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p)).head
+ def toLocalIterator(prefetchPartitions: Boolean = false): Iterator[T] =
withScope {
+
+ if (!prefetchPartitions || partitions.indices.isEmpty) {
+ def collectPartition(p: Int): Array[T] = {
+ sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p)).head
+ }
+ partitions.indices.iterator.flatMap(i => collectPartition(i))
+
+ } else {
+
+ val iterator: Iterator[Array[T]] = prefetchingIterator
+ iterator.hasNext
+ iterator.flatMap(data => data)
+ }
+ }
+
+ private def prefetchingIterator: Iterator[Array[T]] = {
+
+ val partitionIterator = partitions.indices.iterator
+
+ new Iterator[Array[T]] with Serializable {
+
+ private val lock = new ReentrantLock()
+ private val ready = lock.newCondition()
+
+ private var nextResult: Array[T] = _
+ private var fetchInProgress = false
+
+ /**
+ * In addition, it prefetches next element, if it exists
+ */
+ override def hasNext(): Boolean = withLock(() => {
+ if (fetchInProgress) true
Review comment:
What about if the next partition is empty?
##########
File path: core/src/main/scala/org/apache/spark/rdd/RDD.scala
##########
@@ -1031,20 +1032,101 @@ abstract class RDD[T: ClassTag](
Array.concat(results: _*)
}
+ def toLocalIterator : Iterator[T] = toLocalIterator(false)
+
/**
* Return an iterator that contains all of the elements in this RDD.
*
* The iterator will consume as much memory as the largest partition in this
RDD.
+ * With prefetch it may consume up to the memory of the 2 largest partitions.
+ *
+ * @param prefetchPartitions If Spark should pre-fetch the next partition
before it is needed.
*
* @note This results in multiple Spark jobs, and if the input RDD is the
result
* of a wide transformation (e.g. join with different partitioners), to avoid
* recomputing the input RDD should be cached first.
*/
- def toLocalIterator: Iterator[T] = withScope {
- def collectPartition(p: Int): Array[T] = {
- sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p)).head
+ def toLocalIterator(prefetchPartitions: Boolean = false): Iterator[T] =
withScope {
+
+ if (!prefetchPartitions || partitions.indices.isEmpty) {
+ def collectPartition(p: Int): Array[T] = {
+ sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p)).head
+ }
+ partitions.indices.iterator.flatMap(i => collectPartition(i))
+
Review comment:
Minor style: we don't normally leave an empty line here
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]