Repository: spark Updated Branches: refs/heads/master 7012ffafa -> e258e5040
[SPARK-1259] Make RDD locally iterable Author: Egor Pakhomov <pahomov.e...@gmail.com> Closes #156 from epahomov/SPARK-1259 and squashes the following commits: 8ec8f24 [Egor Pakhomov] Make to local iterator shorter 34aa300 [Egor Pakhomov] Fix toLocalIterator docs 08363ef [Egor Pakhomov] SPARK-1259 from toLocallyIterable to toLocalIterator 6a994eb [Egor Pakhomov] SPARK-1259 Make RDD locally iterable 8be3dcf [Egor Pakhomov] SPARK-1259 Make RDD locally iterable 33ecb17 [Egor Pakhomov] SPARK-1259 Make RDD locally iterable Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e258e504 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e258e504 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e258e504 Branch: refs/heads/master Commit: e258e5040fa1905a04efcb7b3ca4a6d33e18fa61 Parents: 7012ffa Author: Egor Pakhomov <pahomov.e...@gmail.com> Authored: Sun Apr 6 16:41:23 2014 -0700 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Sun Apr 6 16:43:01 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/api/java/JavaRDDLike.scala | 14 +++++++++++++- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 12 ++++++++++++ core/src/test/java/org/apache/spark/JavaAPISuite.java | 9 +++++++++ .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 1 + 4 files changed, 35 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e258e504/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index e03b8e7..6e8ec8e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -17,7 +17,8 @@ package org.apache.spark.api.java -import java.util.{Comparator, List => JList} +import java.util.{Comparator, Iterator => JIterator, List => JList} +import java.lang.{Iterable => JIterable} import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -281,6 +282,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { } /** + * Return an iterator that contains all of the elements in this RDD. + * + * The iterator will consume as much memory as the largest partition in this RDD. + */ + def toLocalIterator(): JIterator[T] = { + import scala.collection.JavaConversions._ + rdd.toLocalIterator + } + + + /** * Return an array that contains all of the elements in this RDD. * @deprecated As of Spark 1.0.0, toArray() is deprecated, use {@link #collect()} instead */ http://git-wip-us.apache.org/repos/asf/spark/blob/e258e504/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 08c42c5..c43823b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -662,6 +662,18 @@ abstract class RDD[T: ClassTag]( } /** + * Return an iterator that contains all of the elements in this RDD. + * + * The iterator will consume as much memory as the largest partition in this RDD. + */ + def toLocalIterator: Iterator[T] = { + def collectPartition(p: Int): Array[T] = { + sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head + } + (0 until partitions.length).iterator.flatMap(i => collectPartition(i)) + } + + /** * Return an array that contains all of the elements in this RDD. */ @deprecated("use collect", "1.0.0") http://git-wip-us.apache.org/repos/asf/spark/blob/e258e504/core/src/test/java/org/apache/spark/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 2372f2d..762405b 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -22,6 +22,7 @@ import java.util.*; import scala.Tuple2; +import com.google.common.collect.Lists; import com.google.common.base.Optional; import com.google.common.base.Charsets; import com.google.common.io.Files; @@ -179,6 +180,14 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(2, foreachCalls); } + @Test + public void toLocalIterator() { + List<Integer> correct = Arrays.asList(1, 2, 3, 4); + JavaRDD<Integer> rdd = sc.parallelize(correct); + List<Integer> result = Lists.newArrayList(rdd.toLocalIterator()); + Assert.assertTrue(correct.equals(result)); + } + @SuppressWarnings("unchecked") @Test public void lookup() { http://git-wip-us.apache.org/repos/asf/spark/blob/e258e504/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index d6b5fdc..2597334 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -33,6 +33,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("basic operations") { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(nums.collect().toList === List(1, 2, 3, 4)) + assert(nums.toLocalIterator.toList === List(1, 2, 3, 4)) val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2) assert(dups.distinct().count() === 4) assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses?