Repository: spark
Updated Branches:
  refs/heads/master 7012ffafa -> e258e5040


[SPARK-1259] Make RDD locally iterable

Author: Egor Pakhomov <pahomov.e...@gmail.com>

Closes #156 from epahomov/SPARK-1259 and squashes the following commits:

8ec8f24 [Egor Pakhomov] Make to local iterator shorter
34aa300 [Egor Pakhomov] Fix toLocalIterator docs
08363ef [Egor Pakhomov] SPARK-1259 from toLocallyIterable to toLocalIterator
6a994eb [Egor Pakhomov] SPARK-1259 Make RDD locally iterable
8be3dcf [Egor Pakhomov] SPARK-1259 Make RDD locally iterable
33ecb17 [Egor Pakhomov] SPARK-1259 Make RDD locally iterable


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e258e504
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e258e504
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e258e504

Branch: refs/heads/master
Commit: e258e5040fa1905a04efcb7b3ca4a6d33e18fa61
Parents: 7012ffa
Author: Egor Pakhomov <pahomov.e...@gmail.com>
Authored: Sun Apr 6 16:41:23 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Sun Apr 6 16:43:01 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/api/java/JavaRDDLike.scala | 14 +++++++++++++-
 core/src/main/scala/org/apache/spark/rdd/RDD.scala    | 12 ++++++++++++
 core/src/test/java/org/apache/spark/JavaAPISuite.java |  9 +++++++++
 .../test/scala/org/apache/spark/rdd/RDDSuite.scala    |  1 +
 4 files changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e258e504/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index e03b8e7..6e8ec8e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.api.java
 
-import java.util.{Comparator, List => JList}
+import java.util.{Comparator, Iterator => JIterator, List => JList}
+import java.lang.{Iterable => JIterable}
 
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
@@ -281,6 +282,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends 
Serializable {
   }
 
   /**
+   * Return an iterator that contains all of the elements in this RDD.
+   *
+   * The iterator will consume as much memory as the largest partition in this 
RDD.
+   */
+  def toLocalIterator(): JIterator[T] = {
+     import scala.collection.JavaConversions._
+     rdd.toLocalIterator
+  }
+
+
+  /**
    * Return an array that contains all of the elements in this RDD.
    * @deprecated As of Spark 1.0.0, toArray() is deprecated, use {@link 
#collect()} instead
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/e258e504/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 08c42c5..c43823b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -662,6 +662,18 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
+   * Return an iterator that contains all of the elements in this RDD.
+   *
+   * The iterator will consume as much memory as the largest partition in this 
RDD.
+   */
+  def toLocalIterator: Iterator[T] = {
+    def collectPartition(p: Int): Array[T] = {
+      sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal 
= false).head
+    }
+    (0 until partitions.length).iterator.flatMap(i => collectPartition(i))
+  }
+
+  /**
    * Return an array that contains all of the elements in this RDD.
    */
   @deprecated("use collect", "1.0.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/e258e504/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java 
b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 2372f2d..762405b 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -22,6 +22,7 @@ import java.util.*;
 
 import scala.Tuple2;
 
+import com.google.common.collect.Lists;
 import com.google.common.base.Optional;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
@@ -179,6 +180,14 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(2, foreachCalls);
   }
 
+    @Test
+    public void toLocalIterator() {
+        List<Integer> correct = Arrays.asList(1, 2, 3, 4);
+        JavaRDD<Integer> rdd = sc.parallelize(correct);
+        List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
+        Assert.assertTrue(correct.equals(result));
+    }
+
   @SuppressWarnings("unchecked")
   @Test
   public void lookup() {

http://git-wip-us.apache.org/repos/asf/spark/blob/e258e504/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index d6b5fdc..2597334 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -33,6 +33,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
   test("basic operations") {
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
     assert(nums.collect().toList === List(1, 2, 3, 4))
+    assert(nums.toLocalIterator.toList === List(1, 2, 3, 4))
     val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
     assert(dups.distinct().count() === 4)
     assert(dups.distinct.count === 4)  // Can distinct and count be called 
without parentheses?

Reply via email to