Repository: spark
Updated Branches:
  refs/heads/master ed88db4cb -> c3ad48603


[SPARK-4719][API] Consolidate various narrow dep RDD classes with 
MapPartitionsRDD

MappedRDD, MappedValuesRDD, FlatMappedValuesRDD, FilteredRDD, GlommedRDD, 
FlatMappedRDD are not necessary. They can be implemented trivially using 
MapPartitionsRDD.

Author: Reynold Xin <r...@databricks.com>

Closes #3578 from rxin/SPARK-4719 and squashes the following commits:

eed9853 [Reynold Xin] Preserve partitioning for filter.
eb1a89b [Reynold Xin] [SPARK-4719][API] Consolidate various narrow dep RDD 
classes with MapPartitionsRDD.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3ad4860
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3ad4860
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3ad4860

Branch: refs/heads/master
Commit: c3ad48603632a039a51be3d33e917105149fdd7a
Parents: ed88db4
Author: Reynold Xin <r...@databricks.com>
Authored: Thu Dec 4 00:45:57 2014 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Dec 4 00:45:57 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/rdd/BinaryFileRDD.scala    | 12 ++---
 .../org/apache/spark/rdd/FilteredRDD.scala      | 35 --------------
 .../org/apache/spark/rdd/FlatMappedRDD.scala    | 34 -------------
 .../apache/spark/rdd/FlatMappedValuesRDD.scala  | 35 --------------
 .../scala/org/apache/spark/rdd/GlommedRDD.scala | 31 ------------
 .../scala/org/apache/spark/rdd/MappedRDD.scala  | 32 -------------
 .../org/apache/spark/rdd/MappedValuesRDD.scala  | 33 -------------
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 10 +++-
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 28 +++++++----
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 50 ++++++++------------
 10 files changed, 55 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c3ad4860/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index 6e66ddb..1f755db 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -24,12 +24,12 @@ import org.apache.spark.input.StreamFileInputFormat
 import org.apache.spark.{ Partition, SparkContext }
 
 private[spark] class BinaryFileRDD[T](
-  sc: SparkContext,
-  inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
-  keyClass: Class[String],
-  valueClass: Class[T],
-  @transient conf: Configuration,
-  minPartitions: Int)
+    sc: SparkContext,
+    inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
+    keyClass: Class[String],
+    valueClass: Class[T],
+    @transient conf: Configuration,
+    minPartitions: Int)
   extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, 
conf) {
 
   override def getPartitions: Array[Partition] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c3ad4860/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
deleted file mode 100644
index 9e41b3d..0000000
--- a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, TaskContext}
-
-private[spark] class FilteredRDD[T: ClassTag](
-    prev: RDD[T],
-    f: T => Boolean)
-  extends RDD[T](prev) {
-
-  override def getPartitions: Array[Partition] = firstParent[T].partitions
-
-  override val partitioner = prev.partitioner    // Since filter cannot change 
a partition's keys
-
-  override def compute(split: Partition, context: TaskContext) =
-    firstParent[T].iterator(split, context).filter(f)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c3ad4860/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
deleted file mode 100644
index d8f87d4..0000000
--- a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, TaskContext}
-
-private[spark]
-class FlatMappedRDD[U: ClassTag, T: ClassTag](
-    prev: RDD[T],
-    f: T => TraversableOnce[U])
-  extends RDD[U](prev) {
-
-  override def getPartitions: Array[Partition] = firstParent[T].partitions
-
-  override def compute(split: Partition, context: TaskContext) =
-    firstParent[T].iterator(split, context).flatMap(f)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c3ad4860/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
deleted file mode 100644
index 7c9023f..0000000
--- a/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import org.apache.spark.{Partition, TaskContext}
-
-private[spark]
-class FlatMappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => 
TraversableOnce[U])
-  extends RDD[(K, U)](prev) {
-
-  override def getPartitions = firstParent[Product2[K, V]].partitions
-
-  override val partitioner = firstParent[Product2[K, V]].partitioner
-
-  override def compute(split: Partition, context: TaskContext) = {
-    firstParent[Product2[K, V]].iterator(split, context).flatMap { case 
Product2(k, v) =>
-      f(v).map(x => (k, x))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c3ad4860/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
deleted file mode 100644
index f6463fa..0000000
--- a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, TaskContext}
-
-private[spark] class GlommedRDD[T: ClassTag](prev: RDD[T])
-  extends RDD[Array[T]](prev) {
-
-  override def getPartitions: Array[Partition] = firstParent[T].partitions
-
-  override def compute(split: Partition, context: TaskContext) =
-    Array(firstParent[T].iterator(split, context).toArray).iterator
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c3ad4860/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
deleted file mode 100644
index 8d7c288..0000000
--- a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, TaskContext}
-
-private[spark]
-class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U)
-  extends RDD[U](prev) {
-
-  override def getPartitions: Array[Partition] = firstParent[T].partitions
-
-  override def compute(split: Partition, context: TaskContext) =
-    firstParent[T].iterator(split, context).map(f)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c3ad4860/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
deleted file mode 100644
index a60952e..0000000
--- a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import org.apache.spark.{Partition, TaskContext}
-
-private[spark]
-class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
-  extends RDD[(K, U)](prev) {
-
-  override def getPartitions = firstParent[Product2[K, U]].partitions
-
-  override val partitioner = firstParent[Product2[K, U]].partitioner
-
-  override def compute(split: Partition, context: TaskContext): Iterator[(K, 
U)] = {
-    firstParent[Product2[K, V]].iterator(split, context).map { pair => 
(pair._1, f(pair._2)) }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c3ad4860/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index e78e576..c43e1f2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -660,7 +660,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    */
   def mapValues[U](f: V => U): RDD[(K, U)] = {
     val cleanF = self.context.clean(f)
-    new MappedValuesRDD(self, cleanF)
+    new MapPartitionsRDD[(K, U), (K, V)](self,
+      (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
+      preservesPartitioning = true)
   }
 
   /**
@@ -669,7 +671,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    */
   def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
     val cleanF = self.context.clean(f)
-    new FlatMappedValuesRDD(self, cleanF)
+    new MapPartitionsRDD[(K, U), (K, V)](self,
+      (context, pid, iter) => iter.flatMap { case (k, v) =>
+        cleanF(v).map(x => (k, x))
+      },
+      preservesPartitioning = true)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c3ad4860/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 8dfd952..0bd616e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import java.util.{Properties, Random}
+import java.util.Random
 
 import scala.collection.{mutable, Map}
 import scala.collection.mutable.ArrayBuffer
@@ -36,13 +36,12 @@ import org.apache.spark._
 import org.apache.spark.Partitioner._
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.JavaRDD
-import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.partial.BoundedDouble
 import org.apache.spark.partial.CountEvaluator
 import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
+import org.apache.spark.util.{BoundedPriorityQueue, Utils}
 import org.apache.spark.util.collection.OpenHashMap
 import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, 
BernoulliCellSampler,
   SamplingUtils}
@@ -270,19 +269,30 @@ abstract class RDD[T: ClassTag](
   /**
    * Return a new RDD by applying a function to all elements of this RDD.
    */
-  def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
+  def map[U: ClassTag](f: T => U): RDD[U] = {
+    val cleanF = sc.clean(f)
+    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
+  }
 
   /**
    *  Return a new RDD by first applying a function to all elements of this
    *  RDD, and then flattening the results.
    */
-  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =
-    new FlatMappedRDD(this, sc.clean(f))
+  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
+    val cleanF = sc.clean(f)
+    new MapPartitionsRDD[U, T](this, (context, pid, iter) => 
iter.flatMap(cleanF))
+  }
 
   /**
    * Return a new RDD containing only the elements that satisfy a predicate.
    */
-  def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
+  def filter(f: T => Boolean): RDD[T] = {
+    val cleanF = sc.clean(f)
+    new MapPartitionsRDD[T, T](
+      this,
+      (context, pid, iter) => iter.filter(cleanF),
+      preservesPartitioning = true)
+  }
 
   /**
    * Return a new RDD containing the distinct elements in this RDD.
@@ -503,7 +513,9 @@ abstract class RDD[T: ClassTag](
   /**
    * Return an RDD created by coalescing all elements within each partition 
into an array.
    */
-  def glom(): RDD[Array[T]] = new GlommedRDD(this)
+  def glom(): RDD[Array[T]] = {
+    new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => 
Iterator(iter.toArray))
+  }
 
   /**
    * Return the Cartesian product of this RDD and another one, that is, the 
RDD of all pairs of

http://git-wip-us.apache.org/repos/asf/spark/blob/c3ad4860/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 6d9be79..46fcb80 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -24,10 +24,9 @@ import scala.reflect.ClassTag
 import org.scalatest.FunSuite
 
 import org.apache.spark._
-import org.apache.spark.util.Utils
-
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDDSuiteUtils._
+import org.apache.spark.util.Utils
 
 class RDDSuite extends FunSuite with SharedSparkContext {
 
@@ -37,8 +36,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(nums.toLocalIterator.toList === List(1, 2, 3, 4))
     val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
     assert(dups.distinct().count() === 4)
-    assert(dups.distinct.count === 4)  // Can distinct and count be called 
without parentheses?
-    assert(dups.distinct.collect === dups.distinct().collect)
+    assert(dups.distinct().count === 4)  // Can distinct and count be called 
without parentheses?
+    assert(dups.distinct().collect === dups.distinct().collect)
     assert(dups.distinct(2).collect === dups.distinct().collect)
     assert(nums.reduce(_ + _) === 10)
     assert(nums.fold(0)(_ + _) === 10)
@@ -617,9 +616,9 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     for(seed <- 1 to 5) {
       val splits = data.randomSplit(Array(1.0, 2.0, 3.0), seed)
       assert(splits.size == 3, "wrong number of splits")
-      assert(splits.flatMap(_.collect).sorted.toList == data.collect.toList,
+      assert(splits.flatMap(_.collect()).sorted.toList == 
data.collect().toList,
         "incomplete or wrong split")
-      val s = splits.map(_.count)
+      val s = splits.map(_.count())
       assert(math.abs(s(0) - 100) < 50) // std =  9.13
       assert(math.abs(s(1) - 200) < 50) // std = 11.55
       assert(math.abs(s(2) - 300) < 50) // std = 12.25
@@ -762,8 +761,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     val rdd3 = rdd2.map(_ + 1)
     val rdd4 = new UnionRDD(sc, List(rdd1, rdd2, rdd3))
     assert(rdd4.parent(0).isInstanceOf[ParallelCollectionRDD[_]])
-    assert(rdd4.parent(1).isInstanceOf[FilteredRDD[_]])
-    assert(rdd4.parent(2).isInstanceOf[MappedRDD[_, _]])
+    assert(rdd4.parent[Int](1) === rdd2)
+    assert(rdd4.parent[Int](2) === rdd3)
   }
 
   test("getNarrowAncestors") {
@@ -781,20 +780,18 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     // Simple dependency tree with a single branch
     assert(ancestors1.size === 0)
     assert(ancestors2.size === 2)
-    assert(ancestors2.count(_.isInstanceOf[ParallelCollectionRDD[_]]) === 1)
-    assert(ancestors2.count(_.isInstanceOf[FilteredRDD[_]]) === 1)
+    assert(ancestors2.count(_ === rdd1) === 1)
+    assert(ancestors2.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 1)
     assert(ancestors3.size === 5)
-    assert(ancestors3.count(_.isInstanceOf[ParallelCollectionRDD[_]]) === 1)
-    assert(ancestors3.count(_.isInstanceOf[FilteredRDD[_]]) === 2)
-    assert(ancestors3.count(_.isInstanceOf[MappedRDD[_, _]]) === 2)
+    assert(ancestors3.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 4)
 
     // Any ancestors before the shuffle are not considered
     assert(ancestors4.size === 0)
     assert(ancestors4.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 0)
     assert(ancestors5.size === 3)
     assert(ancestors5.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 1)
-    assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 0)
-    assert(ancestors5.count(_.isInstanceOf[MappedValuesRDD[_, _, _]]) === 2)
+    assert(ancestors5.count(_ === rdd3) === 0)
+    assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 2)
   }
 
   test("getNarrowAncestors with multiple parents") {
@@ -815,16 +812,16 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     // Simple dependency tree with multiple branches
     assert(ancestors6.size === 3)
     assert(ancestors6.count(_.isInstanceOf[ParallelCollectionRDD[_]]) === 2)
-    assert(ancestors6.count(_.isInstanceOf[MappedRDD[_, _]]) === 1)
+    assert(ancestors6.count(_ === rdd2) === 1)
     assert(ancestors7.size === 5)
     assert(ancestors7.count(_.isInstanceOf[ParallelCollectionRDD[_]]) === 3)
-    assert(ancestors7.count(_.isInstanceOf[MappedRDD[_, _]]) === 1)
-    assert(ancestors7.count(_.isInstanceOf[FilteredRDD[_]]) === 1)
+    assert(ancestors7.count(_ === rdd2) === 1)
+    assert(ancestors7.count(_ === rdd3) === 1)
 
     // Dependency tree with duplicate nodes (e.g. rdd1 should not be reported 
twice)
     assert(ancestors8.size === 7)
-    assert(ancestors8.count(_.isInstanceOf[MappedRDD[_, _]]) === 1)
-    assert(ancestors8.count(_.isInstanceOf[FilteredRDD[_]]) === 1)
+    assert(ancestors8.count(_ === rdd2) === 1)
+    assert(ancestors8.count(_ === rdd3) === 1)
     assert(ancestors8.count(_.isInstanceOf[UnionRDD[_]]) === 2)
     assert(ancestors8.count(_.isInstanceOf[ParallelCollectionRDD[_]]) === 3)
     assert(ancestors8.count(_ == rdd1) === 1)
@@ -834,7 +831,6 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     // Any ancestors before the shuffle are not considered
     assert(ancestors9.size === 2)
     assert(ancestors9.count(_.isInstanceOf[CoGroupedRDD[_]]) === 1)
-    assert(ancestors9.count(_.isInstanceOf[MappedValuesRDD[_, _, _]]) === 1)
   }
 
   /**
@@ -868,12 +864,10 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     val ancestors3 = rdd3.getNarrowAncestors
     val ancestors4 = rdd4.getNarrowAncestors
     assert(ancestors3.size === 4)
-    assert(ancestors3.count(_.isInstanceOf[MappedRDD[_, _]]) === 2)
-    assert(ancestors3.count(_.isInstanceOf[FilteredRDD[_]]) === 2)
+    assert(ancestors3.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 4)
     assert(ancestors3.count(_ == rdd3) === 0)
     assert(ancestors4.size === 4)
-    assert(ancestors4.count(_.isInstanceOf[MappedRDD[_, _]]) === 2)
-    assert(ancestors4.count(_.isInstanceOf[FilteredRDD[_]]) === 1)
+    assert(ancestors4.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 3)
     assert(ancestors4.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 1)
     assert(ancestors4.count(_ == rdd3) === 1)
     assert(ancestors4.count(_ == rdd4) === 0)
@@ -881,8 +875,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     // Cycles that do not involve the root
     val ancestors5 = rdd5.getNarrowAncestors
     assert(ancestors5.size === 6)
-    assert(ancestors5.count(_.isInstanceOf[MappedRDD[_, _]]) === 3)
-    assert(ancestors5.count(_.isInstanceOf[FilteredRDD[_]]) === 2)
+    assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 5)
     assert(ancestors5.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 1)
     assert(ancestors4.count(_ == rdd3) === 1)
 
@@ -890,8 +883,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     val ancestors6 = rdd6.getNarrowAncestors
     assert(ancestors6.size === 12)
     assert(ancestors6.count(_.isInstanceOf[UnionRDD[_]]) === 2)
-    assert(ancestors6.count(_.isInstanceOf[MappedRDD[_, _]]) === 4)
-    assert(ancestors6.count(_.isInstanceOf[FilteredRDD[_]]) === 3)
+    assert(ancestors6.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 7)
     assert(ancestors6.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 3)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to