Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2530#discussion_r18122160
  
    --- Diff: graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala ---
    @@ -32,74 +32,7 @@ import org.apache.spark.graphx.impl.EdgePartitionBuilder
      * edge to provide the triplet view. Shipping of the vertex attributes is 
managed by
      * `impl.ReplicatedVertexView`.
      */
    -class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
    -    val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
    -    val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
    -  extends RDD[Edge[ED]](partitionsRDD.context, List(new 
OneToOneDependency(partitionsRDD))) {
    -
    -  override def setName(_name: String): this.type = {
    -    if (partitionsRDD.name != null) {
    -      partitionsRDD.setName(partitionsRDD.name + ", " + _name)
    -    } else {
    -      partitionsRDD.setName(_name)
    -    }
    -    this
    -  }
    -  setName("EdgeRDD")
    -
    -  override protected def getPartitions: Array[Partition] = 
partitionsRDD.partitions
    -
    -  /**
    -   * If `partitionsRDD` already has a partitioner, use it. Otherwise 
assume that the
    -   * [[PartitionID]]s in `partitionsRDD` correspond to the actual 
partitions and create a new
    -   * partitioner that allows co-partitioning with `partitionsRDD`.
    -   */
    -  override val partitioner =
    -    
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
    -
    -  override def compute(part: Partition, context: TaskContext): 
Iterator[Edge[ED]] = {
    -    val p = firstParent[(PartitionID, EdgePartition[ED, 
VD])].iterator(part, context)
    -    if (p.hasNext) {
    -      p.next._2.iterator.map(_.copy())
    -    } else {
    -      Iterator.empty
    -    }
    -  }
    -
    -  override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
    -
    -  /**
    -   * Persists the edge partitions at the specified storage level, ignoring 
any existing target
    -   * storage level.
    -   */
    -  override def persist(newLevel: StorageLevel): this.type = {
    -    partitionsRDD.persist(newLevel)
    -    this
    -  }
    -
    -  override def unpersist(blocking: Boolean = true): this.type = {
    -    partitionsRDD.unpersist(blocking)
    -    this
    -  }
    -
    -  /** Persists the vertex partitions using `targetStorageLevel`, which 
defaults to MEMORY_ONLY. */
    -  override def cache(): this.type = {
    -    partitionsRDD.persist(targetStorageLevel)
    -    this
    -  }
    -
    -  private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
    -      f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): 
EdgeRDD[ED2, VD2] = {
    -    this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
    -      if (iter.hasNext) {
    -        val (pid, ep) = iter.next()
    -        Iterator(Tuple2(pid, f(pid, ep)))
    -      } else {
    -        Iterator.empty
    -      }
    -    }, preservesPartitioning = true))
    -  }
    -
    +trait EdgeRDD[@specialized ED, VD] extends RDD[Edge[ED]] {
    --- End diff --
    
    i think we should probably remove the specialized and test performance. 
maybe we can do that in a a separate pr


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to